Pull to refresh
2
0
Юрий Кехтер @return_nullptr

backend-разработчик на Python

Send message

Скажите пожалуйста, рассматривали ли вы вариант, когда Kafka не фиксирует смещение (commit offset) сразу при получении сообщения из топика, а делает это после обработки?

Ведь тогда можно объединить консюмера и Inbox-демона. К тому же, Kafka самостоятельно разделяет работу между консюмерами. Этот момент теряется, если складывать все в инбокс-таблицу, а потом запускать воркеров. Что думаете?

Буду благодарен, если ознакомитесь с моей публикацией:

https://habr.com/p/820867/

Это решение ещё не было в продакшне. Там разделения нет, а меня терзает мысль, что я упускаю что-то важное. Будет здорово, если разбирающийся человек укажет мне на неочевидные недочёты.

Спасибо, что поделились!Рассмотренный в статье подход позволяет свести задачу однократного выполнения неидемпотентной операции к задаче неоднократного выполнения идемпотентной операции. Возня с событиями, как мне кажется, это больше вопрос подхода. Как ни крути, нужно сделать вторую операцию. Будет ли это для этого поставлена некоторая задача (job/task) или отправлено событие (event), инициирующее действие, в целом, не так важно.

Получается, клиент, который создаёт задачу, записывает ее в общую базу данных. Обработчики тоже смотрят на эту базу данных и выполняют из нее задачи. При этом есть механизм разделения работы, чтобы они не делали одно и то же. Ваше решение предоставляет базу данных + этот механизм.

У меня другой стек. MongoDB не занимается оркестрацией задач. Собственно, для этого здесь Kafka, протокол ребалансировки которой мне не нужно изобретать.

Писать свою оркестрацию это всё-таки большая работа, мое Вам уважение.

Однако, вернёмся к вопросу терминологии. Думаю, ваши доводы меня не убедили. Я всё-таки не согласен с утверждением, что это семантика "ровно один раз" (exactly once). Падения обработчиков могут оставить какой-то сайд-эффект во внешней системе до изменения статуса задачи. Например, отправка письма клиенту. В таких условиях, все равно необходимо делать запрос к внешней системе с ключом идемпотентности, если не хочется случайно отправить письмо дважды. Думаю, семантика обработки здесь тоже "хотя бы один раз" (at least once).

Большое спасибо Вам за статью! Вы, кстати, вдохновили меня опубликоваться на Хабре со своей реализацией, правда, на Python. Если честно, я бы хотел сопоставить решения. Вероятно, из обсуждения каждый из нас может вынести что-то полезное для своего стека.

В Вашей архитектуре есть разделение:

Важно отметить, что в нашей архитектуре получение (потребление) сообщений концептуально отделено от их фактической обработки.

Можете объяснить, почему их пришлось разделять?

Вы меня немного запутали. Я пишу про механизм доставки и обработки событий.

В идеальном мире, от механизма доставки и обработки событий требуется:

Гарантия публикации события ровно один раз (exactly once).

Гарантия доставки события ровно один раз (exactly once).

Гарантия обработки события ровно один раз (exactly once).

Мне не известен способ реализации этих гарантий в настоящем мире.

Но при этом, вы пишете, что:

В том-то и дело, что тут нет никаких событий.

В связи с этим у меня вопрос: как мы можем говорить о гарантиях механизма доставки и обработки событий, если нет никаких событий?

Если вас не затруднит, можете описать вот этот процесс обработки запроса в предложенном подходе?

Обозначим порядок обработки:

Первый сервис получает запрос (request) от клиента.

Первый сервис выполняет первое действие и публикует событие, инициирующее выполнение второго действия на втором сервисе.

Первый сервис отправляет ответ (response) клиенту.

Второй сервис получает событие и выполняет второе действие.

Можете поделиться своим опытом? К сожалению, почти ничего не знаю про ActiveMQ. Почему вы выбрали именно его? Есть ли какие-то механизмы восстановления после сбоев в кластере? Буду признателен, если опишете ваш подход в небольшом комментарии или дадите ссылки на статьи.

Мне тоже интересен случай, когда обработчик сделал работу, зафиксировал изменения в базе данных, а затем упал. Если другому обработчику придет то же самое событие, то семантика доставки все же "хотя бы раз" (at least once).

Такая система мне нужна в том числе и для сценария, в котором базы данных разных сервисов изолированны. В том смысле, что я не могу положить событие в базу данных одного сервиса, а затем в другом сервисе прочитать его из той же базы для обработки.

Скажите пожалуйста, используется ли в приведенном Вами решении какая-то система доставки сообщений? Или передача событий происходит через базу данных?

У меня есть репозиторий с игрушечным демо. Вот, например, модель данных события OrderCreated:

class OrderCreated(Event):
    topic: Literal["booking"] = "booking"
    content_schema: Literal["OrderCreated"] = "OrderCreated"
    order_id: str
    client_id: str

Когда слушатель (consumer) получает событие, он не сразу подтверждает (commit offset) обработку в Kafka. Событие кладется в коллекцию MongoDB вот в таком виде:

{
  "_id": {
    "topic": "booking",
    "content_schema": "OrderCreated",
    "idempotency_key": "acff8c3352d547d68f1c25a172c031d7"
  },
  "handled": false
}

Т.е. просто составной ключ и флаг, что событие еще не обрабатывалось.

Далее идет обработка. Функцию обработчика можете посмотреть в исходном коде демо.

Когда транзакция зафиксировалась (commit), то обработка события подтверждается (commit offset) в Kafka. Можете посмотреть это в исходном коде event-outbox.

Мне вот интересно, а почему считается, что параметры вселенной константы? Насколько понимаю, все измерения статистические. Грубо говоря, измерили кучу однотипных частиц, усреднили, вот и результат. Почему масса элементарной частицы не может быть каким-нибудь распределением с отклонениями?

Может, всё-таки, "тонкая настройка вселенной" это не факт, а процесс, который происходит постоянно? Со временем нестабильное разрушается. Помножить такой отсев на миллиарды лет, и вот и масса частицы - самый стабильный вариант.

Information

Rating
Does not participate
Location
Тбилиси, Грузия, Грузия
Date of birth
Registered
Activity