С версии 3.0 в руби появились неблокирующие файберы, с помощью которых писать код с асинхронным вводом-выводом стало заметно удобнее. На мой взгляд у реализации асинхронного ввода-вывода в руби есть две отличительные особенности:
Синхронный и асинхронный код никак не отличаются друг от друга - нет никаких дополнительных конструкций типа async/await, код написанный с расчётом на синхронный ввод-вывод также будет работать и в случае запуска его в неблокирующем файбере.
Нет стандартной реализации планировщика. Надо использовать сторонние библиотеки, например, socketry/async, в которых реализуется планировщик.
Недавно для увеличения производительности одного сервиса мне пришлось переписать его на асинхронный ввод-вывод, в связи с чем мне стало интересно хотя бы в общих чертах понять, как работают планировщики, поэтому я и решил написать свой. Сразу скажу, что я не буду останавливаться на организации цикла событий, поэтому если вы ни разу не имели дела с одним из перечисленных системных вызовов: select, poll, epoll, kqueue, - то рекомендую найти информацию по любому из них и попробовать написать небольшой пример.
Для начала разберёмся, как вообще использовать планировщик и запускать неблокирующие файберы:
# Устанавливаем текущий планировщик.
Fiber.set_scheduler(FiberScheduler.new)
# Запускаем первый файбер.
Fiber.schedule do
puts Net::HTTP.get(URI('https://motherfuckingwebsite.com'))
end
# Запускаем второй файбер.
Fiber.schedule do
puts Net::HTTP.get(URI('https://example.com'))
end
Опишу на пальцах, как данный код работает. В любом планировщике должны быть реализованы определённые спецификацией методы. При достижении блокирующего вызова внутри неблокирующего файбера вызывается один из методов планировщика, после чего планировщик делает нужную работу, например, регистрирует, что такой-то файбер ожидает возможности записать данные в такой-то сокет, и вытесняет текущий файбер с помощью вызова Fiber.yield.
Теперь откроем спецификацию планировщика https://docs.ruby-lang.org/en/3.2/Fiber/Scheduler.html и посмотрим, какие методы он должен реализовывать. Из написанного следует, что класс может реализовывать довольно большое количество методов, но не все из них обязательные. В рамках данной статьи мы реализуем следующие методы:
fiber(&block)
. Данный метод будет вызываться при вызове Fiber.schedule. Он должен создавать неблокирующий файбер и передавать ему управление.io_wait(io, events, timeout)
. Данный метод будет вызываться при вызове метода wait у io объекта. io - это io объект, events - битовая маска ожидаемых событий (чтение, запись или и то и другое), timeout - максимальное время ожидания. Данный метод должен "запомнить", что текущий файбер ожидает доступности для записи и/или чтения такого-то io и вытеснить его, вызвав Fiber.yield.block(blocker, timeout = nil)
. Данный метод вызывается при блокировках, связанных с синхронизацией: ожидание освобождения мьютекса, вызов Thread.join и т. п. blocker содержит отладочную информацию о том, чего мы ожидаем. timeout - максимальное время ожидания.unblock(blocker, fiber)
. Данный метод вызывается для разблокировки файбера, заблокированного ранее с помощью block. Такое происходит, например, при освобождении мьютекса, который мы хотели захватить, или при завершении потока, завершения которого мы ожидали, вызвав join.kernel_sleep(duration = nil)
. Данный метод вызывается при вызове sleep.close
. Данный метод вызывается после завершения основного потока. В нашем примере после того, как второй файбер дойдёт до блокирующего вызова. Как мы помним, после этого файбер будет вытеснен и исполнение продолжится с места вызова schedule, а другого кода после него у нас нет.
Начнём со вспомогательного кода. Для реализации таймеров мы будем использовать двоичную пирамиду, код реализации приведён под спойлером, на нём мы останавливаться не будем. Реализация пирамиды требует, чтобы всякий элемент реализовывал три метода: priority, который возвращает приоритет, bheap_idx, который возвращает индекс в двоичной пирамиде и bheap_idx=, который позволяет устанавливать данный индекс.
lib/bheap.rb
# frozen_string_literal: true
class BHeap
def initialize
@heap = []
end
def peek_min
@heap[0]
end
def pop_min
return if @heap.empty?
top = @heap[0]
if @heap.length == 1
@heap.pop
else
@heap[0] = @heap.pop
move_down(0)
end
top
end
def push(elem)
@heap << elem
move_up(@heap.size - 1)
elem
end
def delete(elem)
tmp = @heap.pop
return if @heap.empty? || elem.bheap_idx >= @heap.length
@heap[elem.bheap_idx] = tmp
move_down(elem.bheap_idx)
nil
end
def increase(elem)
move_down(elem.bheap_idx)
end
def decrease(elem)
move_up(elem.bheap_idx)
end
def size
@heap.length
end
def empty?
@heap.empty?
end
private
def move_up(i)
until i.zero?
p = (i - 1) / 2
break if @heap[p].priority <= @heap[i].priority
@heap[p], @heap[i] = @heap[i], @heap[p]
@heap[i].bheap_idx = i
i = p
end
@heap[i].bheap_idx = i
end
def move_down(i)
while i < @heap.length
l = 2 * i + 1
break if l >= @heap.length
min_i = i
r = l + 1
min_i = l if @heap[l].priority < @heap[i].priority
min_i = r if r < @heap.length && @heap[r].priority < @heap[min_i].priority
break if min_i == i
@heap[min_i], @heap[i] = @heap[i], @heap[min_i]
@heap[i].bheap_idx = i
i = min_i
end
@heap[i].bheap_idx = i
end
end
Теперь же перейдём к реализации класса FiberScheduler. Для мультиплексирования ввода-вывода мы будем использовать библиотеку nio4r, потому что в стандартной библиотеке руби есть поддержка только системного вызова select, который плохо себя показывает при большом количестве файловых дескрипторов. Итак начнём с методов initialize и fiber:
require 'nio4r'
require_relative 'bheap'
class FiberScheduler
# Структура для файбера, который чего-то ожидает.
# events - битовая маска или nil, если файбер ожидает таймера.
# timer - объект типа Timer или nil.
WaitFiber = Struct.new(:fiber, :events, :timer)
# Структура таймера. Будет использоваться для вставки в
# двоичную пирамиду.
# io - io объект или файбер, если файбер ожидает только таймера;
# priority он же scheduled_at - время, когда таймер должен сработать;
# bheap_idx - индекс в двоичной пирамиде.
Timer = Struct.new(:io, :priority, :bheap_idx) do
alias_method :scheduled_at, :priority
end
def initialize
@timers = BHeap.new
@selector = NIO::Selector.new
# В этой хэш-таблице будут храниться файберы,
# которые в данный момент чего-то ожидают. Ключами будут служить
# io объекты и сами файберы в случае вызова sleep. Поэтому вызываем
# compare_by_identity, чтобы любой объект считался равным только
# самому себе.
@fibers = {}.compare_by_identity
# Данный мьютекс нужен для метода unblock, так как разблокировка
# может происходить из другого потока. Например, файбер пытается
# захватить мьютекс, который уже захвачен в другом потоке, после того,
# как поток освободит мьютекс, из него будет вызван метод unblock.
@mutex = Mutex.new
# Массив файберов для выполнения. Нужен для того, чтобы
# запланировать выполнение асинхронных задач на следующую итерацию
# цикла событий.
@ready = []
# Массив разблокированных файберов (для которых был вызван unblock).
@unblocked = []
end
def fiber(&)
# Создаём неблокирующий файбер и передаём ему управление.
fiber = Fiber.new(blocking: false, &)
fiber.resume
fiber
end
end
Далее перейдём к реализации io_wait, основная задача которого - зарегистрировать факт, что файбер ожидает доступа на запись и/или чтение такого-то io объекта и вернуть управление в главный поток:
def io_wait(io, events, timeout)
# Если передан timeout, создаём таймер и добавляем его в пирамиду.
timer =
if timeout
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@timers.push(Timer.new(io, cur_time + timeout))
end
# Переводим битовую маску в один из символов :r, :w или :rw.
mode =
if events.anybits?(IO::READABLE)
events.anybits?(IO::WRITABLE) ? :rw : :r
elsif events.anybits?(IO::WRITABLE)
:w
else
raise 'Wrong events mask'
end
# Регистрируем io для последующего мультиплексирования.
@selector.register(io, mode)
# Добавляем ассоциацию данного io с текущим файбером.
@fibers[io] = WaitFiber.new(Fiber.current, events, timer)
# Вытесняем текущий файбер.
Fiber.yield
end
Теперь реализуем три вспомогательных метода. Первый - resume_fiber, который возобновляет выполнение переданного файбера. Второй метод - min_timeout, который будет доставать таймер с наименьшим временем срабатывания и преобразовывать его в максимальное время, которое наш поток может спать, путём вычитания из него текущего времени. Третий - process_timers, данный метод обрабатывает все таймеры, которые должны сработать на данный момент.
def resume_fiber(io, mode)
# Пытаемся найти файбер, ожидающий io,
# если не находим, ничего не делаем.
wait_fiber = @fibers.delete(io)
return unless wait_fiber
events = nil
# Если у файбера задана маска, мы ожидаем io, а не таймер.
if wait_fiber.events
events =
case mode
when :r then IO::READABLE
when :w then IO::WRITABLE
when nil then 0
else IO::READABLE | IO::WRITABLE
end
# В таком случае при возобновлении исполнения файбера
# мы должны передать маску, соответсвующую пересечению доступных
# событий и ожидаемых.
events = events & wait_fiber.events
# Также мы должны перестать отслеживать данный io объект.
@selector.deregister(io)
end
# Если есть ассоциированный с файбером таймер,
# удаляем его из пирамиды.
@timers.delete(wait_fiber.timer) if wait_fiber.timer
# Если файбер ещё жив, возобновляем его исполнение.
wait_fiber.fiber.resume(events) if wait_fiber.fiber.alive?
end
def min_timeout
# Если таймеров нет, возвращаем nil.
timer = @timers.peek_min
return unless timer
# В ином случае возвращаем разницу между временем, на которое был
# запланирован таймер, и текущим временем.
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
timer.scheduled_at - cur_time
end
def process_timers
loop do
# Если таймеров нет, завершаем цикл.
timer = @timers.peek_min
break unless timer
# Если время исполнения ближайшего таймера ещё не наступило,
# завершаем цикл.
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
timeout = timer.scheduled_at - cur_time
break if timeout.positive?
# В ином случае возобновляем работу соответствующего файбера.
resume_fiber(timer.io, nil)
end
end
Теперь перейдём к реализации событийного цикла:
def close
# Цикл работает, пока есть хотя бы один файбер,
# ожидающий io, или хотя бы один запланированный файбер.
while @fibers.any? || @ready.any?
# Сначала запускаем все запланированные файберы.
running, @ready = @ready, []
running.each do |fiber|
fiber.resume if fiber.alive?
end
# Затем запускаем файберы, которые были разблокированы. Мы используем
# resume_fiber, чтобы удалить файбер из списка ожидающих, а также,
# чтобы удалить соответсвующий ему таймер в случае наличия оного.
running, @unblocked = @unblocked, []
running.each do |fiber|
resume_fiber(fiber, nil)
end
# После получаем максимальное время сна.
# Если оно меньше нуля, значит у нас есть хотя бы один таймер,
# который должен сработать, а значит мы не можем уходить в сон, поэтому
# выставляем timeout в 0. Если у нас есть хотя бы один запланированный
# файбер, мы тоже не можем уходить в сон, поэтому так же обнуляем timeout.
timeout = min_timeout
timeout = 0 if timeout&.negative? || @ready.any?
if @selector.empty?
# Если у нас нет ни одного ожидающего файбера и timeout положительный,
# уходим в сон.
sleep(timeout) if @ready.empty? && timeout&.positive?
else
# Если же есть файберы, ожидающие io, ждём пока хотя бы один io объект
# будет готов. Если timeout = nil, то есть у нас нет ни таймеров, ни
# запланированных задач, время ожидания не ограничено.
# Если же timeout = 0, то в случае, если ни один io объект не готов,
# поток не уйдёт в сон и продолжит выполненин. Если timeout является
# положительным числом, то поток будет ожидать готовности io в течение
# данного времени.
@selector.select(timeout) do |monitor|
resume_fiber(monitor.io, monitor.readiness)
end
end
# Обрабатываем все таймеры, которые должны сработать в данный момент.
process_timers
end
ensure
@selector.close
end
Далее мы реализуем методы block, unblock и kernel_sleep.
def block(_blocker, timeout = nil)
# Так как файбер в случае вызова block не ожидает
# никакого io объекта мы используем сам файбер в качестве ключа.
wait_fiber = WaitFiber.new(Fiber.current, nil, nil)
@fibers[wait_fiber.fiber] = wait_fiber
# Если timeout не задан, вытесняем текущий файбер.
return Fiber.yield unless timeout
# В ином случае создаём таймер и делаем то же самое.
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
timer = @timers.push(Timer.new(wait_fiber.fiber, cur_time + timeout))
wait_fiber.timer = timer
Fiber.yield
end
def unblock(_blocked, fiber)
# Добавляем файбер в список разблокированных, выводим
# поток из сна, если он находится в процесее ожидания io.
@mutex.synchronize do
@unblocked << fiber
@selector.wakeup
end
end
def kernel_sleep(duration = nil)
# Здесь просто вызываем метод block с заданным таймаутом.
block(:sleep, duration)
end
Далее добавим в планировщик пару полезных методов, которые не определены спецификацией, но будут нам полезны.
# Мы можем вызывать данный метод из любого файбера,
# для планирования задачи на следующую итерацию цикла.
# Делется это следующим образом: Fiber.scheduler.schedule(fiber).
def schedule(fiber)
@ready << fiber
end
# Данный метод нужен для того, чтобы остановить исполнение текущего
# файбера и запланировать возобновление его работы на следующую итерацию
# цикла событий. Вызов метода осуществляется аналогично вызову schedule:
# Fiber.scheduler.yield.
def yield
@ready << Fiber.current
Fiber.yield
end
На этом реализация планировщика завершена, полный код находится под спойлером:
lib/fiber_scheduler.rb
# frozen_string_literal: true
require 'nio4r'
require_relative 'bheap'
class FiberScheduler
WaitFiber = Struct.new(:fiber, :events, :timer)
Timer = Struct.new(:io, :priority, :bheap_idx) do
alias_method :scheduled_at, :priority
end
def initialize
@timers = BHeap.new
@selector = NIO::Selector.new
@fibers = {}.compare_by_identity
@mutex = Mutex.new
@ready = []
@unblocked = []
end
def fiber(&)
fiber = Fiber.new(blocking: false, &)
fiber.resume
fiber
end
def io_wait(io, events, timeout)
timer =
if timeout
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@timers.push(Timer.new(io, cur_time + timeout))
end
mode =
if events.anybits?(IO::READABLE)
events.anybits?(IO::WRITABLE) ? :rw : :r
elsif events.anybits?(IO::WRITABLE)
:w
else
raise 'Wrong events mask'
end
@selector.register(io, mode)
@fibers[io] = WaitFiber.new(Fiber.current, events, timer)
Fiber.yield
end
def block(_blocker, timeout = nil)
wait_fiber = WaitFiber.new(Fiber.current, nil, nil)
@fibers[wait_fiber.fiber] = wait_fiber
return Fiber.yield unless timeout
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
timer = @timers.push(Timer.new(wait_fiber.fiber, cur_time + timeout))
wait_fiber.timer = timer
Fiber.yield
end
def unblock(_blocked, fiber)
@mutex.synchronize do
@unblocked << fiber
@selector.wakeup
end
end
def kernel_sleep(duration = nil)
block(:sleep, duration)
end
def close
while @fibers.any? || @ready.any?
running, @ready = @ready, []
running.each do |fiber|
fiber.resume if fiber.alive?
end
running, @unblocked = @unblocked, []
running.each do |fiber|
resume_fiber(fiber, nil)
end
timeout = min_timeout
timeout = 0 if timeout&.negative? || @ready.any?
if @selector.empty?
sleep(timeout) if @ready.empty? && timeout&.positive?
else
@selector.select(timeout) do |monitor|
resume_fiber(monitor.io, monitor.readiness)
end
end
process_timers
end
ensure
@selector.close
end
def schedule(fiber)
@ready << fiber
end
def yield
@ready << Fiber.current
Fiber.yield
end
private
def min_timeout
timer = @timers.peek_min
return unless timer
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
timer.scheduled_at - cur_time
end
def process_timers
loop do
timer = @timers.peek_min
break unless timer
cur_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
timeout = timer.scheduled_at - cur_time
break if timeout.positive?
resume_fiber(timer.io, nil)
end
end
def resume_fiber(io, mode)
wait_fiber = @fibers.delete(io)
return unless wait_fiber
events = nil
if wait_fiber.events
events =
case mode
when :r then IO::READABLE
when :w then IO::WRITABLE
when nil then 0
else IO::READABLE | IO::WRITABLE
end
events &= wait_fiber.events
@selector.deregister(io)
end
@timers.delete(wait_fiber.timer) if wait_fiber.timer
wait_fiber.fiber.resume(events) if wait_fiber.fiber.alive?
end
end
Далее мы напишем пару полезных примитивов - условную переменную и ограничитель количества файберов. Ограничитель нужен для обработки большого числа задач - вместо того, чтобы сразу создавать файбер под каждую задачу, ограничитель создаёт максимум n файберов и при попытке создать n + 1 ожидает, пока хотя бы один из существующих файберов завершится.
Начнём с условной переменной:
# frozen_string_literal: true
class Notification
def initialize
@fiber = Fiber.current
end
def wait
# При вызове wait, возвращаем управление.
@wait = true
Fiber.yield
end
def notify
return unless @wait
# При вызове notify, планируем файбер, из которого был вызван wait,
# к выполнению на следующей итерации цикла событий.
@wait = false
Fiber.scheduler.schedule(@fiber)
end
end
Далее следует код ограничителя:
# frozen_string_literal: true
require_relative 'notification'
class TaskLimiter
def initialize(limit)
@limit = limit
@count = 0
@notification = Notification.new
end
def schedule(&block)
# Если количество созданных файберов превышает лимит,
# ожидаем, пока хотя бы один из них завершится.
@notification.wait if @count >= @limit
# Иначе создаём файбер, в конце исполнения которого вызываем notify.
@count += 1
Fiber.schedule do
block.call
ensure
@count -= 1
@notification.notify
end
end
end
На этом статья подошла к концу. В моём репозитории на github можно посмотреть различные примеры использования данного планировщика. Также рекомендую к прочтению две статьи от Bruno Sutic: Ruby Fiber Scheduler и Async Ruby, а также чтение кода в репозиториях: https://github.com/bruno-/fiber_scheduler и https://github.com/socketry/async/tree/stable-v1 (я смотрел именно ветку stable-v1, так как код там значительно проще, чем в актуальных версиях).