Pull to refresh

Comments 14

Еще есть интересный вариант - хранить offset-ы Kafka в БД, который позволяет не делать poller. Но наверное имеет проблемы с масштабированием, т.к. вы замыкаетесь на одну БД.

Я сходу не понял, что нам даёт значение смещения в Kafka... Можно поподробнее описать Ваш вариант?

Имел ввиду, что организация распределенных транзакций через Кафка имеет много болерплейта и сложностей (вы их разобрали)

Хочется что-то вроде гарантии exactly ones для событий + единая транзакция для бизнес моделей в БД. Наткнулся на интересную статью, которая решает ряд проблем за счёт хранения offsets-ов Kafka вне брокера прям в БД

https://medium.com/p/91042e81c095

Прочитал по диагонали. Насколько я вижу, речь идёт о получении сообщений из Kafka и отражении полученной информации в Postgres. В моём тексте это первый кейс, покрываемый за счёт ключей идемпотентности. На medium, если я правильно понимаю, в качестве ключа идемпотентности используется смещение и топик. Вполне допустимо в общем случае, как мне кажется.

Poller же требуется для кейса выгрузки данных из PostgreSQL в Kafka, то есть в обратную сторону.

В своих проектах тоже использовал ключи идемпотентности, но всегда хотелось как-то более бесшовно интегрировать Kafka в бизнес-процессы. Например, можно было "забить" на poller + таблицу очередь и сразу отправлять события, если использовать гарантии доставки exactly once - в этом случае события, кот. отправляет продюсер маркируются транзакцией Kafka и будут доступны всем коньюмерам уже после фиксации офсетов консьюмеров. А если добавить офсеты в базу, то ключи идемпотентности вообще не нужны чисто теоретически.

Мне кажется, какая-то путаница возникла.

Ключи идемпотентности нужны при взаимодействии Kafka->DB. Смещение в этом кейсе – это и есть ключ.
Poller нужен при взаимодействии DB->Kafka.
У Вас же всё смешалось в кучу...

Пардон за поток сознания - что-то и хочу в этом роде - смешать базу и Kafka и получить все из "коробки": расширить возможности ACID-транзакций базы транзакциями Kafka (как-то заюзать https://habr.com/ru/company/badoo/blog/333046/).

Скажите, что вы думаете по поводу следующего подхода?

Для всех сообщений в брокере затребовать, чтобы указывался id сообщения из кода приложения (это важный момент). Для всех потребителей сообщений сделать так, чтобы они реализовывали механизм идемпотентности относительно id сообщения.

При этом обработка любого сообщения будет следующей: выполнение всех бизнес действий под транзакцией БД, сохранение в той же БД для входящего сообщения для его ID все другие сообщения, которые необходимо отправить с фиксированным ID. Затем фиксация транзакции в БД. Затем отправка сообщений. Затем отправка acknowledge для исходного сообщения запустившего процесс.

При этом, если в самом начале для сообщения с заданным ID уже есть информация об обработке в БД, то действия для БД пропускаются, из БД извлекаются сообщения, которые должны были быть отправлены в брокер и они отправляются. Затем отправка acknowledge для исходного сообщения запустившего процесс.

Фокус тут в том, что если критерием идемпотентности является генерированный ID из кода, который сохраняется в БД, то в брокер могут попасть более одного сообщения с одинаковым ID, но т.к. каждый консьюмер умеет откидывать уже обработанные сообщения, то в случае попадание дубликата выполнение повторных действий не будет.

При этом обеспечивается гарантия, что если произошла фиксация данных в БД, то отправка из приложения точно когда-то произойдёт, возможно более одной.

Разница с использования ID сообщения из брокера или offset-а в качестве ключа идемпотентности в том, что для них повторная успешная отправка одного и того же сообщения со стороны producer-а не обладает идемпотентностью, т.к. для каждого успешно отправленного сообщения будет присвоен уникальный ID из брокера и offset.

Как вы считаете можно ли рассматривать данное решение как альтернативу CDC описанного вами второго случая?

Вы описали как раз первый сценарий: приём сообщений из Kafka. У меня в тексте это описано так:

...на основании данных из обрабатываемого сообщения мы получаем или однозначно вычисляем значение, которое сохраняем вместе с теми изменениями, которые производим на стороне БД. Таким образом, если мы видим, что такое значение ключа идемпотентности уже ассоциировано с изменяемым объектом в БД, то это изменение уже было выполнено. Это указывает на то, что сообщение обрабатывается не первый раз, а предыдущий раз по какой-то причине его обработка была прервана. То есть её можно пропустить в штатном режиме и перейти к следующем сообщению.

Вы просто предложили использовать в качестве ключа идемпотентности идентификатор сообщения. В комментариях выше и статье на medium, на которую ссылался автор, также шла речь о таком же подходе. Только в качестве ключа там предлагалось использовать не только идентификатор сообщения, но и наименование топика. Вариант вполне допустимый на мой взгляд.

Навскидку, что тут может пойти не так: например, идентификатор сообщения может измениться:

in case of error, commit offset, but enqueue the message again at the end of your Kafka Topic (maybe store how many times this operation was tried)

Это не проблема, но должно учитываться.

Теперь касательно CDC. Во втором случае говорится об отправке сообщения в Kafka. Это другой кейс. Он никак не связан с первым:

Существует также обратная ситуация, когда открывается транзакция в СУБД, во время которой требуется отправить сообщение в Kafka.

Ключи идемпотентности, используемые в предыдущем случае, не сильно помогают, если вложенная транзакция – в Kafka, так как мы не можем искать в очереди старое сообщение и анализировать его ключ. Очередь постоянно изменяется, сообщения могут из неё вытесняться. Кроме того, действие, вызвавшее транзакцию в СУБД, может быть пользовательским и не повториться в конечном счёте.
Если отправлять сообщение без учёта того, как оно будет обрабатываться при получении, в том числе потенциальных дублей, то будут возникать проблемы. Или принимающая сторона должна учитывать возможность дублей.

Альтернативы просты: либо принимающая сторона должна учитывать возможность появления дублей сообщений, либо нужно строить очередь в БД. Это построение очереди в моём тексте и описывается.

Ну вот получается, что тут я больше ссылался именно на второй кейс. Что отправка сообщений в kafka можно делать без CDC, используя всюду ID идемпотентности.

//метод getId возвращает ID, не который установлен брокером,
//а который установлен в коде, когда сообщение отправлялось.
var consumedMessage = consumer.consume();

//допустим есть таблица consumed_message c колонками
//id - id строки в БД (ни на что не влияет)
//consumed_message_id: UNIQUE INDEX - id обрабатываемого сообщения

//и есть таблица produced_message с колонками
//id - id строки в БД (ни на что не влияет)
//consumed_message_id - id обрабатываемого сообщения
//produced_message_id - id отправляемого сообщения
//produced_message_body: JSON - тело отправляемого сообщения

var messageWasConsumed = 
  select count(*) > 0
  	from consumed_message
		where consumed_message_id = consumedMessage.getId();

//здесь сообщения которые необходимо отправить
List producedMessage = new ArrayList<>();
if (!messageWasConsumed) {
  TRANSACTION BEGIN;
  
  //здесь любая бизнесовая логика с сохранениями в БД
  //и например в рамках этой логики необходимо отправить 
  //несколько сообщений, поэтому здесь происходит наполнение
  //массива producedMessages
  //с генерированными в коде id - producedMessage.getId()
  //и с телом - producedMessage.getBody()
  
  insert into consumed_message
    (consumed_message_id)
    values (consumedMessageId);
  
  for(var producedMessage in producedMessages) {
  	insert into produced_message 
    	(consmed_message_id, produced_message_id, produced_message_body)
    	values (consmedMessage.getId(), producedMessage.getId(), producedMessage.getBody());
  }
  
  TRANSACTOIN COMMIT;
} else {
  producedMessages =
		select produced_message_id, produced_message_body
    	from produced_message
    	where consumed_message_id = consumedMessage.getId();
}
  
for(var producedMessage in producedMessages) {
  producer.produce(producedMessage);
}
consumer.ack(message);

Тут важно, что каждый консьюмер имеет точно такую же структуру. Разница только внутри блока с бизнес-логикой.

Кажется, что данная структура позволяет решить оба кейса. И прием сообщения из kafka с записью в БД (первый кейс), который у вас описан, и одновременно отправку в kafka и запись в БД (второй кейс) без использования CDC.

var consumedMessage = consumer.consume();

Ещё раз. Я говорю о CDC и очереди в БД в сценарии, когда нет никакого входящего сообщения из Kafka. Например, пользователь жмёт на сайте на кнопку "Отправить сообщение", далее средний слой обрабатывает http запрос, пишет что-то в БД и возвращает пользователю ответ "Отправлено". А уже потом должна появиться Kafka, очереди, обработка этого сохранённого в БД сообщения и пр.

Ага, я вас понял. А если на любой HTTP запрос отправлять сообщение сразу в kafka и уже полностью асинхронно производить обработку. То есть заведомо приводить систему к кейсу, когда одновременная обработка базой и очередью возможна только тогда, когда исходным запросом тоже было сообщение

Да, это у меня тоже описано :)

Сразу нужно оговориться, что есть третий, самый приоритетный с точки зрения Kafka вариант, – это использование её в качестве источника событий или реализация принципа CQRS. При таком подходе типичная транзакция затрагивает только топики Kafka и выглядит так...

Теперь я вас полностью понял, спасибо)

Sign up to leave a comment.

Articles