Как стать автором
Обновить

Комментарии 20

Отличная статья, спасибо! Мы делаем нечто похожее, только с activemq

Я только одного момента не понял, когда слушатель получает событие, он сохраняет его в монго? А дальше что с ним происходит?

И как вообще выглядит событие у вас? Если нетрудно, можете дать конкретный пример названия и обработчика.

У меня есть репозиторий с игрушечным демо. Вот, например, модель данных события OrderCreated:

class OrderCreated(Event):
    topic: Literal["booking"] = "booking"
    content_schema: Literal["OrderCreated"] = "OrderCreated"
    order_id: str
    client_id: str

Когда слушатель (consumer) получает событие, он не сразу подтверждает (commit offset) обработку в Kafka. Событие кладется в коллекцию MongoDB вот в таком виде:

{
  "_id": {
    "topic": "booking",
    "content_schema": "OrderCreated",
    "idempotency_key": "acff8c3352d547d68f1c25a172c031d7"
  },
  "handled": false
}

Т.е. просто составной ключ и флаг, что событие еще не обрабатывалось.

Далее идет обработка. Функцию обработчика можете посмотреть в исходном коде демо.

Когда транзакция зафиксировалась (commit), то обработка события подтверждается (commit offset) в Kafka. Можете посмотреть это в исходном коде event-outbox.

Можете поделиться своим опытом? К сожалению, почти ничего не знаю про ActiveMQ. Почему вы выбрали именно его? Есть ли какие-то механизмы восстановления после сбоев в кластере? Буду признателен, если опишете ваш подход в небольшом комментарии или дадите ссылки на статьи.

Мы на single instance. Всё, что добралось до сервера, восстанавливать не нужно. У нас похожий подход при публикации с транзакциями:

  • евент пишется в БД одновременно с основной транзакцией

  • после окончания транзации, евент отправляется в activemq асинхронно

  • если отправка успешная, то евент удаляется из БД

  • если отправка не успешная, то периодически система отправляет все евенты из БД, которые там есть (и удаляет их после отправки)

Почему activemq:

  • очень простой в обслуживании и настройке

  • retry/DLQ из коробки

  • количество подписчиков более динамическое, чем в Kafka (нет привязки к partition). Можно параллельно получать несколько евентов, нет никаких оффсетов

  • для кафки я находил статьи, где говорится о потере сообщений, это прям no go для нас

Из его минусов:

  • он медленный, если говорить о десятках и сотнях тысяч сообщений в минуту

  • нельзя смотреть "лог" (то, что было в прошлом)

  • он не заточен под кластеризацию. Можно, но не очень эффективно

В нашем случае, кстати, нет промежуточного сохранения события после его получения: обработчик получает сообщение и сразу его обрабатывает. Если обработка успешна, то делается message acknowledge и оно считается доставленным. Если обработка неуспешная, то делается ретрай. На этом этапе сообщение потеряться уже не может

Мне все ещё неизвестен

  • Перед обработкой задачи вставляется небольшая задержка на синхронизацию базы.

  • Падение обработчика приводит к его перезапуску и продолжению обработки задачи.

  • Если задача не обработана за определённый срок, то любой обработчик может её перехватить на себя

Второй пункт это и есть at least once. Если обработчик повторяет процедуру, это значит, что он взял задачу дважды. Если обработчик умный и умеет продолжать выполнение, а не начинать с нуля, это особенности его реализации.

Первый и третий пункт это вообще гарантия того, что в случае задержек или быстрого исполнения, одну и ту же задачу могут схватить разные обработчики.

Это цитата из High Availability решения, которое допускает дублирование с некоторой регулируемой вероятностью. Я вам дал ссылку на следующий раздел - там вместо дублирования страдает доступность.

А, извините, эту "документацию" очень трудно читать.

В Prevent Doubling:
> Падение обработчика или отсутствие связи с ним приводит к остановке обработки лишь его задач.

из описания мне не понятно, что случится, если обработчик начал обрабатывать задачу, а потом упал. Или он начал обрабатывать задачу, и с ним пропала связь. И тут кто-то решил делать ребалансировку.

ПС я правильно понимаю, что эта чудо-БД написана вами?

Поднимется и продолжит её выполнять.

Ребалансировка повлияет лишь на новые задачи. Старые надо будет пересоздать при необходимости.

Агась, но она ещё в бете.

Мне тоже интересен случай, когда обработчик сделал работу, зафиксировал изменения в базе данных, а затем упал. Если другому обработчику придет то же самое событие, то семантика доставки все же "хотя бы раз" (at least once).

Такая система мне нужна в том числе и для сценария, в котором базы данных разных сервисов изолированны. В том смысле, что я не могу положить событие в базу данных одного сервиса, а затем в другом сервисе прочитать его из той же базы для обработки.

Скажите пожалуйста, используется ли в приведенном Вами решении какая-то система доставки сообщений? Или передача событий происходит через базу данных?

В том-то и дело, что тут нет никаких событий. Есть только состояние, которое реплицируется между всеми узлами. Каждый узел напрямую работает со своей локальной копией и ничего не знает про другие. Соответственно, если обработчик упал, то его надо поднять, и он продолжит работу исходя из своего текущего состояния.

Вы меня немного запутали. Я пишу про механизм доставки и обработки событий.

В идеальном мире, от механизма доставки и обработки событий требуется:

Гарантия публикации события ровно один раз (exactly once).

Гарантия доставки события ровно один раз (exactly once).

Гарантия обработки события ровно один раз (exactly once).

Мне не известен способ реализации этих гарантий в настоящем мире.

Но при этом, вы пишете, что:

В том-то и дело, что тут нет никаких событий.

В связи с этим у меня вопрос: как мы можем говорить о гарантиях механизма доставки и обработки событий, если нет никаких событий?

Если вас не затруднит, можете описать вот этот процесс обработки запроса в предложенном подходе?

Обозначим порядок обработки:

Первый сервис получает запрос (request) от клиента.

Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.

Первый сервис отправляет ответ (response) клиенту.

Второй сервис получает событие и выполняет второе действие.

Вот именно, что вы думаете событиями, а задача ваша - не однократное выполнение обработчика события, а однократное выполнение неидемпотентной задачи. Тут нужно немного подняться над возьнёй с событиями, чтобы увидеть их не нужность для этой задачи. Собственно, чем Кафка и занимается - превращает событие в состояние, чтобы потом превратить состояние обратно в событие. Это лишнее звено, которое только всё усложняет, по сравнению с работой с состоянием изначально.

  1. Клиент создаёт задачу.

  2. Увидев новую задачу, первый сервис отмечается в ней.

  3. Первый сервис делает свои дела и обновляет задачу.

  4. Увидев задачу в нужном статусе, второй сервис отмечается в ней.

  5. Второй сервис делает свои дела и обновляет задачу.

  6. Всё это время клиент наблюдает за статусом задачи в реальном времени.

Спасибо, что поделились!Рассмотренный в статье подход позволяет свести задачу однократного выполнения неидемпотентной операции к задаче неоднократного выполнения идемпотентной операции. Возня с событиями, как мне кажется, это больше вопрос подхода. Как ни крути, нужно сделать вторую операцию. Будет ли это для этого поставлена некоторая задача (job/task) или отправлено событие (event), инициирующее действие, в целом, не так важно.

Получается, клиент, который создаёт задачу, записывает ее в общую базу данных. Обработчики тоже смотрят на эту базу данных и выполняют из нее задачи. При этом есть механизм разделения работы, чтобы они не делали одно и то же. Ваше решение предоставляет базу данных + этот механизм.

У меня другой стек. MongoDB не занимается оркестрацией задач. Собственно, для этого здесь Kafka, протокол ребалансировки которой мне не нужно изобретать.

Писать свою оркестрацию это всё-таки большая работа, мое Вам уважение.

Однако, вернёмся к вопросу терминологии. Думаю, ваши доводы меня не убедили. Я всё-таки не согласен с утверждением, что это семантика "ровно один раз" (exactly once). Падения обработчиков могут оставить какой-то сайд-эффект во внешней системе до изменения статуса задачи. Например, отправка письма клиенту. В таких условиях, все равно необходимо делать запрос к внешней системе с ключом идемпотентности, если не хочется случайно отправить письмо дважды. Думаю, семантика обработки здесь тоже "хотя бы один раз" (at least once).

Ещё как важно. На событие надо вовремя подписаться, иначе ты его пропустишь. Его нельзя повторять, иначе обработка задублируется. Его очень легко потерять, просто упав во время его обработки или передачи. Именно поэтому вокруг событий накручены всякие кафки, да рафты, и всё равно вокруг них нужны доп костыли.

А с состояниями всё просто: актуальное значение всегда доступно, его пересылка идемпотентна, а потерять его сложно, ибо реплицируется на несколько узлов. Это на порядок более простая и надёжная архитектура.

Я говорю про более сложную проблему - exactly once для неидемпотентной задачи. Для идемпотентной задачи достаточно и at least once - тут нет дилеммы между доступностью и однократностью.

Можете привести чуть более конкретный пример задачи/проблемы/состояния/обработчика. Т.е. конкретный юз кейс из реального мира.

Вы не верите в существование неидемпотентных API? Да практически в любом REST/GQL/etc API создание сущностей не идемпотентно.

Как вы сделали такой вывод из моего сообщения? Я просто попросил привести реальный пример, чтобы понять, как ваша схема работает. К сожалению на том уровне абстракции, на котором вы описываете, детали совершенно не ясны.

ПС не очень понимаю как API связан с идемпотентностью. Вторая это про реализацию, можно любой тип АПИ сделать идемпонетным или любой сделать неидемпотентным.

Непонятно откуда столько агрессии, при игнорировании основного вопроса. Ну БД и дело ваше, удачи

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории