С версии 3.0 в руби появились неблокирующие файберы, с помощью которых писать код с асинхронным вводом-выводом стало заметно удобнее. На мой взгляд у реализации асинхронного ввода-вывода в руби есть две отличительные особенности:
Синхронный и асинхронный код никак не отличаются друг от друга - нет никаких дополнительных конструкций типа async/await, код написанный с расчётом на синхронный ввод-вывод также будет работать и в случае запуска его в неблокирующем файбере.
Нет стандартной реализации планировщика. Надо использовать сторонние библиотеки, например, socketry/async, в которых реализуется планировщик.
Недавно для увеличения производительности одного сервиса мне пришлось переписать его на асинхронный ввод-вывод, в связи с чем мне стало интересно хотя бы в общих чертах понять, как работают планировщики, поэтому я и решил написать свой. Сразу скажу, что я не буду останавливаться на организации цикла событий, поэтому если вы ни разу не имели дела с одним из перечисленных системных вызовов: select, poll, epoll, kqueue, - то рекомендую найти информацию по любому из них и попробовать написать небольшой пример.
Для начала разберёмся, как вообще использовать планировщик и запускать неблокирующие файберы:
# Устанавливаем текущий планировщик. Fiber.set_scheduler(FiberScheduler.new) # Запускаем первый файбер. Fiber.schedule do puts Net::HTTP.get(URI('https://motherfuckingwebsite.com')) end # Запускаем второй файбер. Fiber.schedule do puts Net::HTTP.get(URI('https://example.com')) end
Опишу на пальцах, как данный код работает. В любом планировщике должны быть реализованы определённые спецификацией методы. При достижении блокирующего вызова внутри неблокирующего файбера вызывается один из методов планировщика, после чего планировщик делает нужную работу, например, регистрирует, что такой-то файбер ожидает возможности записать данные в такой-то сокет, и вытесняет текущий файбер с помощью вызова Fiber.yield.
Теперь откроем спецификацию планировщика https://docs.ruby-lang.org/en/3.2/Fiber/Scheduler.html и посмотрим, какие методы он должен реализовывать. Из написанного следует, что класс может реализовывать довольно большое количество методов, но не все из них обязательные. В рамках данной статьи мы реализуем следующие методы:
fiber(&block). Данный метод будет вызываться при вызове Fiber.schedule. Он должен создавать неблокирующий файбер и передавать ему управление.io_wait(io, events, timeout). Данный метод будет вызываться при вызове метода wait у io объекта. io - это io объект, events - битовая маска ожидаемых событий (чтение, запись или и то и другое), timeout - максимальное время ожидания. Данный метод должен "запомнить", что текущий файбер ожидает доступности для записи и/или чтения такого-то io и вытеснить его, вызвав Fiber.yield.block(blocker, timeout = nil). Данный метод вызывается при блокировках, связанных с синхронизацией: ожидание освобождения мьютекса, вызов Thread.join и т. п. blocker содержит отладочную информацию о том, чего мы ожидаем. timeout - максимальное время ожидания.unblock(blocker, fiber). Данный метод вызывается для разблокировки файбера, заблокированного ранее с помощью block. Такое происходит, например, при освобождении мьютекса, который мы хотели захватить, или при завершении потока, завершения которого мы ожидали, вызвав join.kernel_sleep(duration = nil). Данный метод вызывается при вызове sleep.close. Данный метод вызывается после завершения основного потока. В нашем примере после того, как второй файбер дойдёт до блокирующего вызова. Как мы помним, после этого файбер будет вытеснен и исполнение продолжится с места вызова schedule, а другого кода после него у нас нет.
Начнём со вспомогательного кода. Для реализации таймеров мы будем использовать двоичную пирамиду, код реализации приведён под спойлером, на нём мы останавливаться не будем. Реализация пирамиды требует, чтобы всякий элемент реализовывал три метода: priority, который возвращает приоритет, bheap_idx, который возвращает индекс в двоичной пирамиде и bheap_idx=, который позволяет устанавливать данный индекс.
lib/bheap.rb
# frozen_string_literal: true class BHeap def initialize @heap = [] end def peek_min @heap[0] end def pop_min return if @heap.empty? top = @heap[0] if @heap.length == 1 @heap.pop else @heap[0] = @heap.pop move_down(0) end top end def push(elem) @heap << elem move_up(@heap.size - 1) elem end def delete(elem) tmp = @heap.pop return if @heap.empty? || elem.bheap_idx >= @heap.length @heap[elem.bheap_idx] = tmp move_down(elem.bheap_idx) nil end def increase(elem) move_down(elem.bheap_idx) end def decrease(elem) move_up(elem.bheap_idx) end def size @heap.length end def empty? @heap.empty? end private def move_up(i) until i.zero? p = (i - 1) / 2 break if @heap[p].priority <= @heap[i].priority @heap[p], @heap[i] = @heap[i], @heap[p] @heap[i].bheap_idx = i i = p end @heap[i].bheap_idx = i end def move_down(i) while i < @heap.length l = 2 * i + 1 break if l >= @heap.length min_i = i r = l + 1 min_i = l if @heap[l].priority < @heap[i].priority min_i = r if r < @heap.length && @heap[r].priority < @heap[min_i].priority break if min_i == i @heap[min_i], @heap[i] = @heap[i], @heap[min_i] @heap[i].bheap_idx = i i = min_i end @heap[i].bheap_idx = i end end
Теперь же перейдём к реализации класса FiberScheduler. Для мультиплексирования ввода-вывода мы будем использовать библиотеку nio4r, потому что в стандартной библиотеке руби есть поддержка только системного вызова select, который плохо себя показывает при большом количестве файловых дескрипторов. Итак начнём с методов initialize и fiber:
require 'nio4r' require_relative 'bheap' class FiberScheduler # Структура для файбера, который чего-то ожидает. # events - битовая маска или nil, если файбер ожидает таймера. # timer - объект типа Timer или nil. WaitFiber = Struct.new(:fiber, :events, :timer) # Структура таймера. Будет использоваться для вставки в # двоичную пирамиду. # io - io объект или файбер, если файбер ожидает только таймера; # priority он же scheduled_at - время, когда таймер должен сработать; # bheap_idx - индекс в двоичной пирамиде. Timer = Struct.new(:io, :priority, :bheap_idx) do alias_method :scheduled_at, :priority end def initialize @timers = BHeap.new @selector = NIO::Selector.new # В этой хэш-таблице будут храниться файберы, # которые в данный момент чего-то ожидают. Ключами будут служить # io объекты и сами файберы в случае вызова sleep. Поэтому вызываем # compare_by_identity, чтобы любой объект считался равным только # самому себе. @fibers = {}.compare_by_identity # Данный мьютекс нужен для метода unblock, так как разблокировка # может происходить из другого потока. Например, файбер пытается # захватить мьютекс, который уже захвачен в другом потоке, после того, # как поток освободит мьютекс, из него будет вызван метод unblock. @mutex = Mutex.new # Массив файберов для выполнения. Нужен для того, чтобы # запланировать выполнение асинхронных задач на следующую итерацию # цикла событий. @ready = [] # Массив разблокированных файберов (для которых был вызван unblock). @unblocked = [] end def fiber(&) # Создаём неблокирующий файбер и передаём ему управление. fiber = Fiber.new(blocking: false, &) fiber.resume fiber end end
Далее перейдём к реализации io_wait, основная задача которого - зарегистрировать факт, что файбер ожидает доступа на запись и/или чтение такого-то io объекта и вернуть управление в главный поток:
def io_wait(io, events, timeout) # Если передан timeout, создаём таймер и добавляем его в пирамиду. timer = if timeout cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @timers.push(Timer.new(io, cur_time + timeout)) end # Переводим битовую маску в один из символов :r, :w или :rw. mode = if events.anybits?(IO::READABLE) events.anybits?(IO::WRITABLE) ? :rw : :r elsif events.anybits?(IO::WRITABLE) :w else raise 'Wrong events mask' end # Регистрируем io для последующего мультиплексирования. @selector.register(io, mode) # Добавляем ассоциацию данного io с текущим файбером. @fibers[io] = WaitFiber.new(Fiber.current, events, timer) # Вытесняем текущий файбер. Fiber.yield end
Теперь реализуем три вспомогательных метода. Первый - resume_fiber, который возобновляет выполнение переданного файбера. Второй метод - min_timeout, который будет доставать таймер с наименьшим временем срабатывания и преобразовывать его в максимальное время, которое наш поток может спать, путём вычитания из него текущего времени. Третий - process_timers, данный метод обрабатывает все таймеры, которые должны сработать на данный момент.
def resume_fiber(io, mode) # Пытаемся найти файбер, ожидающий io, # если не находим, ничего не делаем. wait_fiber = @fibers.delete(io) return unless wait_fiber events = nil # Если у файбера задана маска, мы ожидаем io, а не таймер. if wait_fiber.events events = case mode when :r then IO::READABLE when :w then IO::WRITABLE when nil then 0 else IO::READABLE | IO::WRITABLE end # В таком случае при возобновлении исполнения файбера # мы должны передать маску, соответсвующую пересечению доступных # событий и ожидаемых. events = events & wait_fiber.events # Также мы должны перестать отслеживать данный io объект. @selector.deregister(io) end # Если есть ассоциированный с файбером таймер, # удаляем его из пирамиды. @timers.delete(wait_fiber.timer) if wait_fiber.timer # Если файбер ещё жив, возобновляем его исполнение. wait_fiber.fiber.resume(events) if wait_fiber.fiber.alive? end def min_timeout # Если таймеров нет, возвращаем nil. timer = @timers.peek_min return unless timer # В ином случае возвращаем разницу между временем, на которое был # запланирован таймер, и текущим временем. cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timer.scheduled_at - cur_time end def process_timers loop do # Если таймеров нет, завершаем цикл. timer = @timers.peek_min break unless timer # Если время исполнения ближайшего таймера ещё не наступило, # завершаем цикл. cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timeout = timer.scheduled_at - cur_time break if timeout.positive? # В ином случае возобновляем работу соответствующего файбера. resume_fiber(timer.io, nil) end end
Теперь перейдём к реализации событийного цикла:
def close # Цикл работает, пока есть хотя бы один файбер, # ожидающий io, или хотя бы один запланированный файбер. while @fibers.any? || @ready.any? # Сначала запускаем все запланированные файберы. running, @ready = @ready, [] running.each do |fiber| fiber.resume if fiber.alive? end # Затем запускаем файберы, которые были разблокированы. Мы используем # resume_fiber, чтобы удалить файбер из списка ожидающих, а также, # чтобы удалить соответсвующий ему таймер в случае наличия оного. running, @unblocked = @unblocked, [] running.each do |fiber| resume_fiber(fiber, nil) end # После получаем максимальное время сна. # Если оно меньше нуля, значит у нас есть хотя бы один таймер, # который должен сработать, а значит мы не можем уходить в сон, поэтому # выставляем timeout в 0. Если у нас есть хотя бы один запланированный # файбер, мы тоже не можем уходить в сон, поэтому так же обнуляем timeout. timeout = min_timeout timeout = 0 if timeout&.negative? || @ready.any? if @selector.empty? # Если у нас нет ни одного ожидающего файбера и timeout положительный, # уходим в сон. sleep(timeout) if @ready.empty? && timeout&.positive? else # Если же есть файберы, ожидающие io, ждём пока хотя бы один io объект # будет готов. Если timeout = nil, то есть у нас нет ни таймеров, ни # запланированных задач, время ожидания не ограничено. # Если же timeout = 0, то в случае, если ни один io объект не готов, # поток не уйдёт в сон и продолжит выполненин. Если timeout является # положительным числом, то поток будет ожидать готовности io в течение # данного времени. @selector.select(timeout) do |monitor| resume_fiber(monitor.io, monitor.readiness) end end # Обрабатываем все таймеры, которые должны сработать в данный момент. process_timers end ensure @selector.close end
Далее мы реализуем методы block, unblock и kernel_sleep.
def block(_blocker, timeout = nil) # Так как файбер в случае вызова block не ожидает # никакого io объекта мы используем сам файбер в качестве ключа. wait_fiber = WaitFiber.new(Fiber.current, nil, nil) @fibers[wait_fiber.fiber] = wait_fiber # Если timeout не задан, вытесняем текущий файбер. return Fiber.yield unless timeout # В ином случае создаём таймер и делаем то же самое. cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timer = @timers.push(Timer.new(wait_fiber.fiber, cur_time + timeout)) wait_fiber.timer = timer Fiber.yield end def unblock(_blocked, fiber) # Добавляем файбер в список разблокированных, выводим # поток из сна, если он находится в процесее ожидания io. @mutex.synchronize do @unblocked << fiber @selector.wakeup end end def kernel_sleep(duration = nil) # Здесь просто вызываем метод block с заданным таймаутом. block(:sleep, duration) end
Далее добавим в планировщик пару полезных методов, которые не определены спецификацией, но будут нам полезны.
# Мы можем вызывать данный метод из любого файбера, # для планирования задачи на следующую итерацию цикла. # Делется это следующим образом: Fiber.scheduler.schedule(fiber). def schedule(fiber) @ready << fiber end # Данный метод нужен для того, чтобы остановить исполнение текущего # файбера и запланировать возобновление его работы на следующую итерацию # цикла событий. Вызов метода осуществляется аналогично вызову schedule: # Fiber.scheduler.yield. def yield @ready << Fiber.current Fiber.yield end
На этом реализация планировщика завершена, полный код находится под спойлером:
lib/fiber_scheduler.rb
# frozen_string_literal: true require 'nio4r' require_relative 'bheap' class FiberScheduler WaitFiber = Struct.new(:fiber, :events, :timer) Timer = Struct.new(:io, :priority, :bheap_idx) do alias_method :scheduled_at, :priority end def initialize @timers = BHeap.new @selector = NIO::Selector.new @fibers = {}.compare_by_identity @mutex = Mutex.new @ready = [] @unblocked = [] end def fiber(&) fiber = Fiber.new(blocking: false, &) fiber.resume fiber end def io_wait(io, events, timeout) timer = if timeout cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @timers.push(Timer.new(io, cur_time + timeout)) end mode = if events.anybits?(IO::READABLE) events.anybits?(IO::WRITABLE) ? :rw : :r elsif events.anybits?(IO::WRITABLE) :w else raise 'Wrong events mask' end @selector.register(io, mode) @fibers[io] = WaitFiber.new(Fiber.current, events, timer) Fiber.yield end def block(_blocker, timeout = nil) wait_fiber = WaitFiber.new(Fiber.current, nil, nil) @fibers[wait_fiber.fiber] = wait_fiber return Fiber.yield unless timeout cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timer = @timers.push(Timer.new(wait_fiber.fiber, cur_time + timeout)) wait_fiber.timer = timer Fiber.yield end def unblock(_blocked, fiber) @mutex.synchronize do @unblocked << fiber @selector.wakeup end end def kernel_sleep(duration = nil) block(:sleep, duration) end def close while @fibers.any? || @ready.any? running, @ready = @ready, [] running.each do |fiber| fiber.resume if fiber.alive? end running, @unblocked = @unblocked, [] running.each do |fiber| resume_fiber(fiber, nil) end timeout = min_timeout timeout = 0 if timeout&.negative? || @ready.any? if @selector.empty? sleep(timeout) if @ready.empty? && timeout&.positive? else @selector.select(timeout) do |monitor| resume_fiber(monitor.io, monitor.readiness) end end process_timers end ensure @selector.close end def schedule(fiber) @ready << fiber end def yield @ready << Fiber.current Fiber.yield end private def min_timeout timer = @timers.peek_min return unless timer cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timer.scheduled_at - cur_time end def process_timers loop do timer = @timers.peek_min break unless timer cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) timeout = timer.scheduled_at - cur_time break if timeout.positive? resume_fiber(timer.io, nil) end end def resume_fiber(io, mode) wait_fiber = @fibers.delete(io) return unless wait_fiber events = nil if wait_fiber.events events = case mode when :r then IO::READABLE when :w then IO::WRITABLE when nil then 0 else IO::READABLE | IO::WRITABLE end events &= wait_fiber.events @selector.deregister(io) end @timers.delete(wait_fiber.timer) if wait_fiber.timer wait_fiber.fiber.resume(events) if wait_fiber.fiber.alive? end end
Далее мы напишем пару полезных примитивов - условную переменную и ограничитель количества файберов. Ограничитель нужен для обработки большого числа задач - вместо того, чтобы сразу создавать файбер под каждую задачу, ограничитель создаёт максимум n файберов и при попытке создать n + 1 ожидает, пока хотя бы один из существующих файберов завершится.
Начнём с условной переменной:
# frozen_string_literal: true class Notification def initialize @fiber = Fiber.current end def wait # При вызове wait, возвращаем управление. @wait = true Fiber.yield end def notify return unless @wait # При вызове notify, планируем файбер, из которого был вызван wait, # к выполнению на следующей итерации цикла событий. @wait = false Fiber.scheduler.schedule(@fiber) end end
Далее следует код ограничителя:
# frozen_string_literal: true require_relative 'notification' class TaskLimiter def initialize(limit) @limit = limit @count = 0 @notification = Notification.new end def schedule(&block) # Если количество созданных файберов превышает лимит, # ожидаем, пока хотя бы один из них завершится. @notification.wait if @count >= @limit # Иначе создаём файбер, в конце исполнения которого вызываем notify. @count += 1 Fiber.schedule do block.call ensure @count -= 1 @notification.notify end end end
На этом статья подошла к концу. В моём репозитории на github можно посмотреть различные примеры использования данного планировщика. Также рекомендую к прочтению две статьи от Bruno Sutic: Ruby Fiber Scheduler и Async Ruby, а также чтение кода в репозиториях: https://github.com/bruno-/fiber_scheduler и https://github.com/socketry/async/tree/stable-v1 (я смотрел именно ветку stable-v1, так как код там значительно проще, чем в актуальных версиях).
