Как стать автором
Обновить

Пишем простой планировщик файберов на ruby

Уровень сложностиПростой
Время на прочтение11 мин
Количество просмотров897

С версии 3.0 в руби появились неблокирующие файберы, с помощью которых писать код с асинхронным вводом-выводом стало заметно удобнее. На мой взгляд у реализации асинхронного ввода-вывода в руби есть две отличительные особенности:

  1. Синхронный и асинхронный код никак не отличаются друг от друга - нет никаких дополнительных конструкций типа async/await, код написанный с расчётом на синхронный ввод-вывод также будет работать и в случае запуска его в неблокирующем файбере.

  2. Нет стандартной реализации планировщика. Надо использовать сторонние библиотеки, например, socketry/async, в которых реализуется планировщик.

Недавно для увеличения производительности одного сервиса мне пришлось переписать его на асинхронный ввод-вывод, в связи с чем мне стало интересно хотя бы в общих чертах понять, как работают планировщики, поэтому я и решил написать свой. Сразу скажу, что я не буду останавливаться на организации цикла событий, поэтому если вы ни разу не имели дела с одним из перечисленных системных вызовов: select, poll, epoll, kqueue, - то рекомендую найти информацию по любому из них и попробовать написать небольшой пример.

Для начала разберёмся, как вообще использовать планировщик и запускать неблокирующие файберы:

# Устанавливаем текущий планировщик.
Fiber.set_scheduler(FiberScheduler.new)

# Запускаем первый файбер.
Fiber.schedule do
  puts Net::HTTP.get(URI('https://motherfuckingwebsite.com'))
end

# Запускаем второй файбер.
Fiber.schedule do
  puts Net::HTTP.get(URI('https://example.com'))
end

Опишу на пальцах, как данный код работает. В любом планировщике должны быть реализованы определённые спецификацией методы. При достижении блокирующего вызова внутри неблокирующего файбера вызывается один из методов планировщика, после чего планировщик делает нужную работу, например, регистрирует, что такой-то файбер ожидает возможности записать данные в такой-то сокет, и вытесняет текущий файбер с помощью вызова Fiber.yield.

Теперь откроем спецификацию планировщика https://docs.ruby-lang.org/en/3.2/Fiber/Scheduler.html и посмотрим, какие методы он должен реализовывать. Из написанного следует, что класс может реализовывать довольно большое количество методов, но не все из них обязательные. В рамках данной статьи мы реализуем следующие методы:

  • fiber(&block). Данный метод будет вызываться при вызове Fiber.schedule. Он должен создавать неблокирующий файбер и передавать ему управление.

  • io_wait(io, events, timeout). Данный метод будет вызываться при вызове метода wait у io объекта. io - это io объект, events - битовая маска ожидаемых событий (чтение, запись или и то и другое), timeout - максимальное время ожидания. Данный метод должен "запомнить", что текущий файбер ожидает доступности для записи и/или чтения такого-то io и вытеснить его, вызвав Fiber.yield.

  • block(blocker, timeout = nil). Данный метод вызывается при блокировках, связанных с синхронизацией: ожидание освобождения мьютекса, вызов Thread.join и т. п. blocker содержит отладочную информацию о том, чего мы ожидаем. timeout - максимальное время ожидания.

  • unblock(blocker, fiber). Данный метод вызывается для разблокировки файбера, заблокированного ранее с помощью block. Такое происходит, например, при освобождении мьютекса, который мы хотели захватить, или при завершении потока, завершения которого мы ожидали, вызвав join.

  • kernel_sleep(duration = nil). Данный метод вызывается при вызове sleep.

  • close. Данный метод вызывается после завершения основного потока. В нашем примере после того, как второй файбер дойдёт до блокирующего вызова. Как мы помним, после этого файбер будет вытеснен и исполнение продолжится с места вызова schedule, а другого кода после него у нас нет.

Начнём со вспомогательного кода. Для реализации таймеров мы будем использовать двоичную пирамиду, код реализации приведён под спойлером, на нём мы останавливаться не будем. Реализация пирамиды требует, чтобы всякий элемент реализовывал три метода: priority, который возвращает приоритет, bheap_idx, который возвращает индекс в двоичной пирамиде и bheap_idx=, который позволяет устанавливать данный индекс.

lib/bheap.rb
# frozen_string_literal: true

class BHeap
  def initialize
    @heap = []
  end

  def peek_min
    @heap[0]
  end

  def pop_min
    return if @heap.empty?

    top = @heap[0]

    if @heap.length == 1
      @heap.pop
    else
      @heap[0] = @heap.pop
      move_down(0)
    end

    top
  end

  def push(elem)
    @heap << elem
    move_up(@heap.size - 1)
    elem
  end

  def delete(elem)
    tmp = @heap.pop
    return if @heap.empty? || elem.bheap_idx >= @heap.length

    @heap[elem.bheap_idx] = tmp
    move_down(elem.bheap_idx)

    nil
  end

  def increase(elem)
    move_down(elem.bheap_idx)
  end

  def decrease(elem)
    move_up(elem.bheap_idx)
  end

  def size
    @heap.length
  end

  def empty?
    @heap.empty?
  end

  private

  def move_up(i)
    until i.zero?
      p = (i - 1) / 2
      break if @heap[p].priority <= @heap[i].priority

      @heap[p], @heap[i] = @heap[i], @heap[p]
      @heap[i].bheap_idx = i
      i = p
    end

    @heap[i].bheap_idx = i
  end

  def move_down(i)
    while i < @heap.length
      l = 2 * i + 1
      break if l >= @heap.length

      min_i = i
      r = l + 1

      min_i = l if @heap[l].priority < @heap[i].priority
      min_i = r if r < @heap.length && @heap[r].priority < @heap[min_i].priority
      break if min_i == i

      @heap[min_i], @heap[i] = @heap[i], @heap[min_i]
      @heap[i].bheap_idx = i
      i = min_i
    end

    @heap[i].bheap_idx = i
  end
end

Теперь же перейдём к реализации класса FiberScheduler. Для мультиплексирования ввода-вывода мы будем использовать библиотеку nio4r, потому что в стандартной библиотеке руби есть поддержка только системного вызова select, который плохо себя показывает при большом количестве файловых дескрипторов. Итак начнём с методов initialize и fiber:

require 'nio4r'
require_relative 'bheap'

class FiberScheduler
  # Структура для файбера, который чего-то ожидает.
  # events - битовая маска или nil, если файбер ожидает таймера.
  # timer - объект типа Timer или nil.
  WaitFiber = Struct.new(:fiber, :events, :timer)

  # Структура таймера. Будет использоваться для вставки в
  # двоичную пирамиду.
  # io - io объект или файбер, если файбер ожидает только таймера;
  # priority он же scheduled_at - время, когда таймер должен сработать;
  # bheap_idx - индекс в двоичной пирамиде.
  Timer = Struct.new(:io, :priority, :bheap_idx) do
    alias_method :scheduled_at, :priority
  end

  def initialize
    @timers = BHeap.new
    @selector = NIO::Selector.new

    # В этой хэш-таблице будут храниться файберы,
    # которые в данный момент чего-то ожидают. Ключами будут служить
    # io объекты и сами файберы в случае вызова sleep. Поэтому вызываем
    # compare_by_identity, чтобы любой объект считался равным только
    # самому себе.
    @fibers = {}.compare_by_identity

    # Данный мьютекс нужен для метода unblock, так как разблокировка
    # может происходить из другого потока. Например, файбер пытается
    # захватить мьютекс, который уже захвачен в другом потоке, после того,
    # как поток освободит мьютекс, из него будет вызван метод unblock.
    @mutex = Mutex.new

    # Массив файберов для выполнения. Нужен для того, чтобы
    # запланировать выполнение асинхронных задач на следующую итерацию
    # цикла событий.
    @ready = []

    # Массив разблокированных файберов (для которых был вызван unblock).
    @unblocked = []
  end

  def fiber(&)
    # Создаём неблокирующий файбер и передаём ему управление.
    fiber = Fiber.new(blocking: false, &)
    fiber.resume
    fiber
  end
end

Далее перейдём к реализации io_wait, основная задача которого - зарегистрировать факт, что файбер ожидает доступа на запись и/или чтение такого-то io объекта и вернуть управление в главный поток:

def io_wait(io, events, timeout)
  # Если передан timeout, создаём таймер и добавляем его в пирамиду.
  timer =
    if timeout
      cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      @timers.push(Timer.new(io, cur_time + timeout))
    end

  # Переводим битовую маску в один из символов :r, :w или :rw.
  mode =
    if events.anybits?(IO::READABLE)
      events.anybits?(IO::WRITABLE) ? :rw : :r
    elsif events.anybits?(IO::WRITABLE)
      :w
    else
      raise 'Wrong events mask'
    end

  # Регистрируем io для последующего мультиплексирования.
  @selector.register(io, mode)
  # Добавляем ассоциацию данного io с текущим файбером.
  @fibers[io] = WaitFiber.new(Fiber.current, events, timer)

  # Вытесняем текущий файбер.
  Fiber.yield
end

Теперь реализуем три вспомогательных метода. Первый - resume_fiber, который возобновляет выполнение переданного файбера. Второй метод - min_timeout, который будет доставать таймер с наименьшим временем срабатывания и преобразовывать его в максимальное время, которое наш поток может спать, путём вычитания из него текущего времени. Третий - process_timers, данный метод обрабатывает все таймеры, которые должны сработать на данный момент.

def resume_fiber(io, mode)
  # Пытаемся найти файбер, ожидающий io,
  # если не находим, ничего не делаем.
  wait_fiber = @fibers.delete(io)
  return unless wait_fiber

  events = nil
  # Если у файбера задана маска, мы ожидаем io, а не таймер.
  if wait_fiber.events
    events =
      case mode
      when :r then IO::READABLE
      when :w then IO::WRITABLE
      when nil then 0
      else IO::READABLE | IO::WRITABLE
      end
    # В таком случае при возобновлении исполнения файбера
    # мы должны передать маску, соответсвующую пересечению доступных
    # событий и ожидаемых.
    events = events & wait_fiber.events
    # Также мы должны перестать отслеживать данный io объект.
    @selector.deregister(io)
  end
  # Если есть ассоциированный с файбером таймер,
  # удаляем его из пирамиды.
  @timers.delete(wait_fiber.timer) if wait_fiber.timer
  # Если файбер ещё жив, возобновляем его исполнение.
  wait_fiber.fiber.resume(events) if wait_fiber.fiber.alive?
end

def min_timeout
  # Если таймеров нет, возвращаем nil.
  timer = @timers.peek_min
  return unless timer

  # В ином случае возвращаем разницу между временем, на которое был
  # запланирован таймер, и текущим временем.
  cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  timer.scheduled_at - cur_time
end

def process_timers
  loop do
    # Если таймеров нет, завершаем цикл.
    timer = @timers.peek_min
    break unless timer

    # Если время исполнения ближайшего таймера ещё не наступило,
    # завершаем цикл.
    cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    timeout = timer.scheduled_at - cur_time
    break if timeout.positive?

    # В ином случае возобновляем работу соответствующего файбера.
    resume_fiber(timer.io, nil)
  end
end

Теперь перейдём к реализации событийного цикла:

def close
  # Цикл работает, пока есть хотя бы один файбер,
  # ожидающий io, или хотя бы один запланированный файбер.
  while @fibers.any? || @ready.any?
    # Сначала запускаем все запланированные файберы.
    running, @ready = @ready, []
    running.each do |fiber|
      fiber.resume if fiber.alive?
    end

    # Затем запускаем файберы, которые были разблокированы. Мы используем
    # resume_fiber, чтобы удалить файбер из списка ожидающих, а также,
    # чтобы удалить соответсвующий ему таймер в случае наличия оного.
    running, @unblocked = @unblocked, []
    running.each do |fiber|
      resume_fiber(fiber, nil)
    end
    
    # После получаем максимальное время сна.
    # Если оно меньше нуля, значит у нас есть хотя бы один таймер,
    # который должен сработать, а значит мы не можем уходить в сон, поэтому
    # выставляем timeout в 0. Если у нас есть хотя бы один запланированный
    # файбер, мы тоже не можем уходить в сон, поэтому так же обнуляем timeout.
    timeout = min_timeout
    timeout = 0 if timeout&.negative? || @ready.any?

    if @selector.empty?
      # Если у нас нет ни одного ожидающего файбера и timeout положительный,
      # уходим в сон.
      sleep(timeout) if @ready.empty? && timeout&.positive?
    else
      # Если же есть файберы, ожидающие io, ждём пока хотя бы один io объект
      # будет готов. Если timeout = nil, то есть у нас нет ни таймеров, ни
      # запланированных задач, время ожидания не ограничено.
      # Если же timeout = 0, то в случае, если ни один io объект не готов,
      # поток не уйдёт в сон и продолжит выполненин. Если timeout является
      # положительным числом, то поток будет ожидать готовности io в течение
      # данного времени.
      @selector.select(timeout) do |monitor|
        resume_fiber(monitor.io, monitor.readiness)
      end
    end

    # Обрабатываем все таймеры, которые должны сработать в данный момент.
    process_timers
  end
ensure
  @selector.close
end

Далее мы реализуем методы block, unblock и kernel_sleep.

def block(_blocker, timeout = nil)
  # Так как файбер в случае вызова block не ожидает
  # никакого io объекта мы используем сам файбер в качестве ключа.
  wait_fiber = WaitFiber.new(Fiber.current, nil, nil)
  @fibers[wait_fiber.fiber] = wait_fiber
  # Если timeout не задан, вытесняем текущий файбер.
  return Fiber.yield unless timeout

  # В ином случае создаём таймер и делаем то же самое.
  cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  timer = @timers.push(Timer.new(wait_fiber.fiber, cur_time + timeout))
  wait_fiber.timer = timer

  Fiber.yield
end

def unblock(_blocked, fiber)
  # Добавляем файбер в список разблокированных, выводим
  # поток из сна, если он находится в процесее ожидания io.
  @mutex.synchronize do
    @unblocked << fiber
    @selector.wakeup
  end
end

def kernel_sleep(duration = nil)
  # Здесь просто вызываем метод block с заданным таймаутом.
  block(:sleep, duration)
end

Далее добавим в планировщик пару полезных методов, которые не определены спецификацией, но будут нам полезны.

# Мы можем вызывать данный метод из любого файбера,
# для планирования задачи на следующую итерацию цикла.
# Делется это следующим образом: Fiber.scheduler.schedule(fiber).
def schedule(fiber)
  @ready << fiber
end

# Данный метод нужен для того, чтобы остановить исполнение текущего
# файбера и запланировать возобновление его работы на следующую итерацию
# цикла событий. Вызов метода осуществляется аналогично вызову schedule:
# Fiber.scheduler.yield.
def yield
  @ready << Fiber.current
  Fiber.yield
end

На этом реализация планировщика завершена, полный код находится под спойлером:

lib/fiber_scheduler.rb
# frozen_string_literal: true

require 'nio4r'
require_relative 'bheap'

class FiberScheduler
  WaitFiber = Struct.new(:fiber, :events, :timer)

  Timer = Struct.new(:io, :priority, :bheap_idx) do
    alias_method :scheduled_at, :priority
  end

  def initialize
    @timers = BHeap.new
    @selector = NIO::Selector.new
    @fibers = {}.compare_by_identity
    @mutex = Mutex.new
    @ready = []
    @unblocked = []
  end

  def fiber(&)
    fiber = Fiber.new(blocking: false, &)
    fiber.resume
    fiber
  end

  def io_wait(io, events, timeout)
    timer =
      if timeout
        cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
        @timers.push(Timer.new(io, cur_time + timeout))
      end

    mode =
      if events.anybits?(IO::READABLE)
        events.anybits?(IO::WRITABLE) ? :rw : :r
      elsif events.anybits?(IO::WRITABLE)
        :w
      else
        raise 'Wrong events mask'
      end
    @selector.register(io, mode)
    @fibers[io] = WaitFiber.new(Fiber.current, events, timer)

    Fiber.yield
  end

  def block(_blocker, timeout = nil)
    wait_fiber = WaitFiber.new(Fiber.current, nil, nil)
    @fibers[wait_fiber.fiber] = wait_fiber
    return Fiber.yield unless timeout

    cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    timer = @timers.push(Timer.new(wait_fiber.fiber, cur_time + timeout))
    wait_fiber.timer = timer

    Fiber.yield
  end

  def unblock(_blocked, fiber)
    @mutex.synchronize do
      @unblocked << fiber
      @selector.wakeup
    end
  end

  def kernel_sleep(duration = nil)
    block(:sleep, duration)
  end

  def close
    while @fibers.any? || @ready.any?
      running, @ready = @ready, []
      running.each do |fiber|
        fiber.resume if fiber.alive?
      end

      running, @unblocked = @unblocked, []
      running.each do |fiber|
        resume_fiber(fiber, nil)
      end

      timeout = min_timeout
      timeout = 0 if timeout&.negative? || @ready.any?

      if @selector.empty?
        sleep(timeout) if @ready.empty? && timeout&.positive?
      else
        @selector.select(timeout) do |monitor|
          resume_fiber(monitor.io, monitor.readiness)
        end
      end

      process_timers
    end
  ensure
    @selector.close
  end

  def schedule(fiber)
    @ready << fiber
  end

  def yield
    @ready << Fiber.current
    Fiber.yield
  end

  private

  def min_timeout
    timer = @timers.peek_min
    return unless timer

    cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    timer.scheduled_at - cur_time
  end

  def process_timers
    loop do
      timer = @timers.peek_min
      break unless timer

      cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      timeout = timer.scheduled_at - cur_time
      break if timeout.positive?

      resume_fiber(timer.io, nil)
    end
  end

  def resume_fiber(io, mode)
    wait_fiber = @fibers.delete(io)
    return unless wait_fiber

    events = nil
    if wait_fiber.events
      events =
        case mode
        when :r then IO::READABLE
        when :w then IO::WRITABLE
        when nil then 0
        else IO::READABLE | IO::WRITABLE
        end
      events &= wait_fiber.events
      @selector.deregister(io)
    end
    @timers.delete(wait_fiber.timer) if wait_fiber.timer
    wait_fiber.fiber.resume(events) if wait_fiber.fiber.alive?
  end
end

Далее мы напишем пару полезных примитивов - условную переменную и ограничитель количества файберов. Ограничитель нужен для обработки большого числа задач - вместо того, чтобы сразу создавать файбер под каждую задачу, ограничитель создаёт максимум n файберов и при попытке создать n + 1 ожидает, пока хотя бы один из существующих файберов завершится.

Начнём с условной переменной:

# frozen_string_literal: true

class Notification
  def initialize
    @fiber = Fiber.current
  end

  def wait
    # При вызове wait, возвращаем управление.
    @wait = true
    Fiber.yield
  end

  def notify
    return unless @wait

    # При вызове notify, планируем файбер, из которого был вызван wait,
    # к выполнению на следующей итерации цикла событий.
    @wait = false
    Fiber.scheduler.schedule(@fiber)
  end
end

Далее следует код ограничителя:

# frozen_string_literal: true

require_relative 'notification'

class TaskLimiter
  def initialize(limit)
    @limit = limit
    @count = 0
    @notification = Notification.new
  end

  def schedule(&block)
    # Если количество созданных файберов превышает лимит,
    # ожидаем, пока хотя бы один из них завершится.
    @notification.wait if @count >= @limit

    # Иначе создаём файбер, в конце исполнения которого вызываем notify.
    @count += 1
    Fiber.schedule do
      block.call
    ensure
      @count -= 1
      @notification.notify
    end
  end
end

На этом статья подошла к концу. В моём репозитории на github можно посмотреть различные примеры использования данного планировщика. Также рекомендую к прочтению две статьи от Bruno Sutic: Ruby Fiber Scheduler и Async Ruby, а также чтение кода в репозиториях: https://github.com/bruno-/fiber_scheduler и https://github.com/socketry/async/tree/stable-v1 (я смотрел именно ветку stable-v1, так как код там значительно проще, чем в актуальных версиях).

Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+2
Комментарии0

Публикации

Работа

Ruby on Rails
2 вакансии
Программист Ruby
2 вакансии

Ближайшие события