Как стать автором
Обновить

Брокер сообщений своими руками

Уровень сложностиСредний
Время на прочтение5 мин
Количество просмотров2.3K

В эрланге (и эликсире) мне всегда недоставало способа организовать «потоковый» обмен сообщениями, наподобие того, который обеспечивает какой-нибудь Message Broker. Нормальные разработчики смиряются с ограничениями, которые им задают их фреймворки: в Финиксе есть PubSub, в OTP — :gen_event, в эликсире — депрекейтнутый еще до рождения GenEvent.

Ни один из них меня не устраивал. PubSub — штука мощная, но асинхронность прибита там гвоздями, а иногда все-таки надо вызывать подписчиков синхронно (я понимаю, что можно высылать вместе с сообщением свой pid и дожидаться ответов, но этот ad-hoc, к сожалению, не поможет в ситуации, когда количество подписчиков неизвестно). :gen_event — почти то, что нужно, но из его дизайна прямо торчат уши сайд-эффектных обработчиков, из-за чего удобная фильтрация входящего потока сообщений превращается в спагетти. В общем, всё как всегда: сам не сделаешь — никто не сделает.

Так родилась библиотека Antenna, которая предоставляет все те возможности, которые обычно обеспечиваются посредством вкрячивания дополнительной зависимости от брокера сообщений. Требования, которые я к ней предъявлял, были следующими:

  • готовность к back pressure из коробки (миллион ивентов, отосланных одномоментно, не должны повалить обработчики, а добавление новых нод в кластер должно их прозрачно разгружать)

  • каналы (топики, теги), куда можно отослать ивент, который будет доставлен всем зарегистрированным обработчикам

  • обработчик (помимо подписок на каналы) должен иметь возможность прозрачно фильтровать входящий поток сообщений, используя всю мощь паттерн-матчинга эрланга

  • как обработчики, так и подписки, — должны успешно восстанавливаться после краша

  • отправитель сообщения может затребовать синхронное выполнение обработчиков, блокируя вызывающий процесс до получения всех ответов (аналог rpc)

  • библиотека должна предоставлять удобные примитивы для тестирования её использования

Те, кому доводилось использовать RabbitMQ, без труда заметят, откуда я воровал идеи.

Псевдокод

С давних пор я использую некое подобие TDD с запахом DDD, или наоборот, я не силен в аббревиатурной баззвордистике. В общем, я начинаю проектирование любого куска кода с вызова его (пока несуществующего) API. Первым делом добиваюсь того, чтобы вызывать мой код было легко и приятно, а с реализацией я уж как-нибудь разберусь потом. Поэтому я набросал что-то вот такое:

{:ok, pid} = match({:tag_answer, _}, self())
subscribe(:chan, pid)

event(:chan, {:tag_answer, 42})

receive do
  {:event, :chan, {:tag_answer, 42}} -> :ok
after
  1_000 -> :error
end

Первые две строки — регистрация обработчика и подписка. Следующая — отсылка сообщения. Заключительный блок — проверка, что сообщение получено. Эти три куска обычно будут находиться в разных местах кода, никак не связанных между собой.

Первое, что я заметил, — первые две строки почти всегда ходят парой, поэтому имеет смысл принимать список каналов для подписки прямо в определении «матчера» (не отменяя, разумеется, функцию subscribe/2, если кому-то захочется присосаться к каналу позже).

Еще я заметил, что обработчиков может быть тоже много, почему нет.

Таким образом, определяющим уникальным идентификатором для матчера — остаётся собственно матч. Он может быть довольно заковыристым, поэтому в качестве id я выбрал его текстовое представление (строка "{:tag_answer, _}" для примера выше). Это решение мне не очень нравится, но лучшего у меня (пока) нет. Как минимум, инспектировать матчеры так гораздо проще, чем, например, если использовать что-то хешеподобное.

Итак, у нас получается отношение много ко многим: много обработчиков на один матч, которые могут получать сообщения из многих каналов. Например, мы можем подписаться на все сообщения вида {:error, _}, прилетевшие из всех каналов, и приспособить два обработчика: логгер сплюнет в консоль, телеметрия отошлет что-нибудь в графану, или куда там принято всё отсылать.

Основная архитектура

Back pressure в эликсире подразумевает использование библиотеки GenStage. Я уже реализовывал на ней свой Throttler, теперь настал черед брокера. Мы помним про горизонтальное масштабирование — а значит, — несколько консьюмеров, хотя бы по одному на ноде. Каждый консьюмер будет высылать сообщения в указанные каналы, матчеры — проверять, надо ли вообще суетиться (матчится ли сообщение), и если да — вызывать обработчики. Вроде звучит адекватно.

Консьюмеры я размазал по нодам при помощи безымянных процессов под управлением DistributedSupervisor, броадкастер — тоже (только этот процесс именован, а значит — один на кластер, и будет перезапущен на другой ноде, если текущая скроется в тумане).

Каждый матчер — тоже процесс, который в своём стейте хранит список обработчиков и, собственно, матч. Тут меня поджидала первая архитектурная дилемма: как хранить матч? {:foo, _} — просто так не сохранишь, такой код допустим только как LHO в собственно прямых вызовах паттерн-матчинга, а хранить его AST — не вариант, потому что его тогда не вставить в матч. В общем, я в результате решил генерировать функцию-матчер (всё равно реализация match/4 возможна только в виде макроса):

quote generated: true, location: :keep do
  matcher = fn
    unquote(match) -> true
    _ -> false
  end
  …
end

Ну, отлично. Теперь в самом процессе матчера появляется колбэк наподобие вот такого:

  @impl GenServer
  @doc false
  def handle_cast({:handle_event, channel, event}, state) do
    if state.matcher.(event) do
      Enum.each(state.handlers, fn
        handler when is_function(handler, 1) -> handler.(event)
        handler when is_function(handler, 2) -> handler.(channel, event)
        process -> send(process, {:antenna_event, channel, event})
      end)
    end

    {:noreply, state}
  end

И мы готовы слать ему сообщения из консьюмеров (матчеры тоже находятся под управлением DistributedSupervisor, то есть, равномерно размазаны по кластеру.

В этот момент я сказал git init, потому что MVP уже вырисовывался.

Синхронный вызов

Да этого момента всё было довольно-таки тривиально. Но как организовать синхронный вызов, когда все сообщения проходят через семь кругов ада (броадкаст, консьюмеры, матчеры — и все на хрен пойми каких нодах в кластере)?

Вы когда-нибудь задумывались, почему сигнатура асинхронного колбэка GenServer’a — арности 2, а синхронного — 3? Кстати, это один из моих любимых вопросов на собеседовании: сразу становится понятно, мечтательный формошлёп перед тобой, или низкоуровневый фрик-социопат.

Второй аргумент в колбэке handle_call/3 — это идентификатор процесса (без потери общности), дожидающегося синхронного ответа. И вместо туплы {:reply, result, state} — из этого колбэка можно вернуть {:noreply, new_state}, а когда-нибудь потом уже выслать синхронный ответ напрямую вызывавшему процессу при помощи GenServer.reply/2. Если вы этого не знали, выпейте за меня ковшичек виски: это тот молоток, который всё вокруг вас сделает гвоздями.

GenStage, в свою очередь, тоже экспортирует reply/2. Поэтому теперь мне просто надо пробросить from через все консьюмеры и матчеры, а потом глубоко внутри написать что-то вроде:

Enum.each(results, fn
  {nil, _} -> :ok
  {from, results} -> GenStage.reply(from, results)
end)

И если это был асинхронный вызов — никакого from нет, и мы ничего не делаем. А если он есть — мы высылаем ему назад аккумулированные результаты вызова всех хендлеров (и пущай он, гад, подавится).

Вот, кажется, и всё, что я хотел рассказать сегодня. Ссылка на библиотеку, её исходный код и тесты — выше.

Удачного брокеринга!

Теги:
Хабы:
+8
Комментарии2

Публикации

Ближайшие события