В эрланге (и эликсире) мне всегда недоставало способа организовать «потоковый» обмен сообщениями, наподобие того, который обеспечивает какой-нибудь Message Broker. Нормальные разработчики смиряются с ограничениями, которые им задают их фреймворки: в Финиксе есть PubSub, в OTP — :gen_event, в эликсире — депрекейтнутый еще до рождения GenEvent.
Ни один из них меня не устраивал. PubSub
— штука мощная, но асинхронность прибита там гвоздями, а иногда все-таки надо вызывать подписчиков синхронно (я понимаю, что можно высылать вместе с сообщением свой pid
и дожидаться ответов, но этот ad-hoc, к сожалению, не поможет в ситуации, когда количество подписчиков неизвестно). :gen_event
— почти то, что нужно, но из его дизайна прямо торчат уши сайд-эффектных обработчиков, из-за чего удобная фильтрация входящего потока сообщений превращается в спагетти. В общем, всё как всегда: сам не сделаешь — никто не сделает.
Так родилась библиотека Antenna, которая предоставляет все те возможности, которые обычно обеспечиваются посредством вкрячивания дополнительной зависимости от брокера сообщений. Требования, которые я к ней предъявлял, были следующими:
готовность к back pressure из коробки (миллион ивентов, отосланных одномоментно, не должны повалить обработчики, а добавление новых нод в кластер должно их прозрачно разгружать)
каналы (топики, теги), куда можно отослать ивент, который будет доставлен всем зарегистрированным обработчикам
обработчик (помимо подписок на каналы) должен иметь возможность прозрачно фильтровать входящий поток сообщений, используя всю мощь паттерн-матчинга эрланга
как обработчики, так и подписки, — должны успешно восстанавливаться после краша
отправитель сообщения может затребовать синхронное выполнение обработчиков, блокируя вызывающий процесс до получения всех ответов (аналог rpc)
библиотека должна предоставлять удобные примитивы для тестирования её использования
Те, кому доводилось использовать RabbitMQ, без труда заметят, откуда я воровал идеи.
Псевдокод
С давних пор я использую некое подобие TDD с запахом DDD, или наоборот, я не силен в аббревиатурной баззвордистике. В общем, я начинаю проектирование любого куска кода с вызова его (пока несуществующего) API. Первым делом добиваюсь того, чтобы вызывать мой код было легко и приятно, а с реализацией я уж как-нибудь разберусь потом. Поэтому я набросал что-то вот такое:
{:ok, pid} = match({:tag_answer, _}, self())
subscribe(:chan, pid)
event(:chan, {:tag_answer, 42})
receive do
{:event, :chan, {:tag_answer, 42}} -> :ok
after
1_000 -> :error
end
Первые две строки — регистрация обработчика и подписка. Следующая — отсылка сообщения. Заключительный блок — проверка, что сообщение получено. Эти три куска обычно будут находиться в разных местах кода, никак не связанных между собой.
Первое, что я заметил, — первые две строки почти всегда ходят парой, поэтому имеет смысл принимать список каналов для подписки прямо в определении «матчера» (не отменяя, разумеется, функцию subscribe/2
, если кому-то захочется присосаться к каналу позже).
Еще я заметил, что обработчиков может быть тоже много, почему нет.
Таким образом, определяющим уникальным идентификатором для матчера — остаётся собственно матч. Он может быть довольно заковыристым, поэтому в качестве id
я выбрал его текстовое представление (строка "{:tag_answer, _}"
для примера выше). Это решение мне не очень нравится, но лучшего у меня (пока) нет. Как минимум, инспектировать матчеры так гораздо проще, чем, например, если использовать что-то хешеподобное.
Итак, у нас получается отношение много ко многим: много обработчиков на один матч, которые могут получать сообщения из многих каналов. Например, мы можем подписаться на все сообщения вида {:error, _}
, прилетевшие из всех каналов, и приспособить два обработчика: логгер сплюнет в консоль, телеметрия отошлет что-нибудь в графану, или куда там принято всё отсылать.
Основная архитектура
Back pressure в эликсире подразумевает использование библиотеки GenStage. Я уже реализовывал на ней свой Throttler, теперь настал черед брокера. Мы помним про горизонтальное масштабирование — а значит, — несколько консьюмеров, хотя бы по одному на ноде. Каждый консьюмер будет высылать сообщения в указанные каналы, матчеры — проверять, надо ли вообще суетиться (матчится ли сообщение), и если да — вызывать обработчики. Вроде звучит адекватно.
Консьюмеры я размазал по нодам при помощи безымянных процессов под управлением DistributedSupervisor, броадкастер — тоже (только этот процесс именован, а значит — один на кластер, и будет перезапущен на другой ноде, если текущая скроется в тумане).
Каждый матчер — тоже процесс, который в своём стейте хранит список обработчиков и, собственно, матч. Тут меня поджидала первая архитектурная дилемма: как хранить матч? {:foo, _}
— просто так не сохранишь, такой код допустим только как LHO в собственно прямых вызовах паттерн-матчинга, а хранить его AST — не вариант, потому что его тогда не вставить в матч. В общем, я в результате решил генерировать функцию-матчер (всё равно реализация match/4
возможна только в виде макроса):
quote generated: true, location: :keep do
matcher = fn
unquote(match) -> true
_ -> false
end
…
end
Ну, отлично. Теперь в самом процессе матчера появляется колбэк наподобие вот такого:
@impl GenServer
@doc false
def handle_cast({:handle_event, channel, event}, state) do
if state.matcher.(event) do
Enum.each(state.handlers, fn
handler when is_function(handler, 1) -> handler.(event)
handler when is_function(handler, 2) -> handler.(channel, event)
process -> send(process, {:antenna_event, channel, event})
end)
end
{:noreply, state}
end
И мы готовы слать ему сообщения из консьюмеров (матчеры тоже находятся под управлением DistributedSupervisor
, то есть, равномерно размазаны по кластеру.
В этот момент я сказал git init
, потому что MVP уже вырисовывался.
Синхронный вызов
Да этого момента всё было довольно-таки тривиально. Но как организовать синхронный вызов, когда все сообщения проходят через семь кругов ада (броадкаст, консьюмеры, матчеры — и все на хрен пойми каких нодах в кластере)?
Вы когда-нибудь задумывались, почему сигнатура асинхронного колбэка GenServer
’a — арности 2
, а синхронного — 3
? Кстати, это один из моих любимых вопросов на собеседовании: сразу становится понятно, мечтательный формошлёп перед тобой, или низкоуровневый фрик-социопат.
Второй аргумент в колбэке handle_call/3 — это идентификатор процесса (без потери общности), дожидающегося синхронного ответа. И вместо туплы {:reply, result, state}
— из этого колбэка можно вернуть {:noreply, new_state}
, а когда-нибудь потом уже выслать синхронный ответ напрямую вызывавшему процессу при помощи GenServer.reply/2. Если вы этого не знали, выпейте за меня ковшичек виски: это тот молоток, который всё вокруг вас сделает гвоздями.
GenStage
, в свою очередь, тоже экспортирует reply/2
. Поэтому теперь мне просто надо пробросить from
через все консьюмеры и матчеры, а потом глубоко внутри написать что-то вроде:
Enum.each(results, fn
{nil, _} -> :ok
{from, results} -> GenStage.reply(from, results)
end)
И если это был асинхронный вызов — никакого from
нет, и мы ничего не делаем. А если он есть — мы высылаем ему назад аккумулированные результаты вызова всех хендлеров (и пущай он, гад, подавится).
Вот, кажется, и всё, что я хотел рассказать сегодня. Ссылка на библиотеку, её исходный код и тесты — выше.
Удачного брокеринга!