Как стать автором
Обновить

Комментарии 21

Судя по коммитам какой-то мертворожденный проект и он не решает задачу.

Отвечу за автора так как делал такую же схему несколько лет назад и она всё еще работает. Плюсы репликации через кафку:

  • Источник данных может быть любой, в моем случае, это были 2 разных реляционки.

  • Можно наполнять несколько кластеров без влияния на источник. Например, эластик плохо переносит потерю связи между нодами, в такой схеме можно создать по отдельному кластеру в ДЦ.

  • Можно пересоздать индекс без влияния. Один консьюмер читает в старый индекс, другой в новый. Актуально, если как написано в статье наполнение индекса занимает больше часа, например 3 месяца. Индексы пересоздавать придется, потому что создать сразу правильные мапинги еще никому не удавалось.

  • Еще, механизмами кафки можно восстанавливать данные - остановить консьюмеры, сдвинуть оффсет и перепролить за период, регулировать нагрузку от индексации - количеством консьюмеров/партиций, следить за отставанием репликации по лагу.

Относительно сомнительно скрипта на питоне, минусов в такой схеме наверно и нет. Самая главная проблема сам эластик и то что строгой консистентности не будет, особенно при нескольких кластерах.

Почему нельзя было все события одной сущности держать в одной партиции?

Топики разделены по смыслу.

Если начали упрощать реальные требования тогда можно и с Эластиком не заморачиваться

А если у нас все в таблицу записано, то что мешает пары create-delete в разрезе идентификатора объекта убрать из этой таблицы до отправки в elastic? Все равно по итогу ничено не останется в elastic.

Ну в таблицу PG оно пишется в правильном порядке. Это после прокачки через брокер порядок может нарушится.

А почему нельзя используя транзакцию PostgreSQL сделать запись в эластик (как в самом первом варианте) и если она не прошла, повторить? Resilient вариант, так сказать. Если он не пройдёт ещё несколько раз, вернуть ошибку клиенту, т.к. если что-то с эластиком случилось, то клиент всё равно не получит из него данных, если ему вернуть ОК.

Дело в том, что Эластик не подчиняется транзакциям из Постгрес

Ну понятно, но транзакция позволит не появиться фантомной записи (без индекса в эластике), а resilient позволит или убедиться, что они всегда парой или вернуть ошибку. Прямо как первый вариант. Просто не пойму почему нельзя было повторить запросы эластика, если они не сработали в первый раз. Было какое-то бизнес требование?

Да, дело в том, что построение индекса Эластика это не обязанность сервиса item

Предположим, у нас есть три топика item_createditem_updateditem_deleted в которых приходит ID сущности.

Почему не сделать в одном топике все события по item? Так вы добьетесь упорядочивания

Топики разделены по смыслу.

А для чего это нужно? Какой-то практический смысл есть в этом?

Конечно, чтобы клиент получил только то, что ему нужно

Ладно, предположим это требование, на которое вы повлиять не можете. Тогда еще такой вопрос.

Можно будет сохранять и «настаивать» события перед их обработкой. Например, получив deleted ждать сколько‑то секунд в надежде что придет created. Такой вариант, подойдет, только он создаст еще большую задержку обработки.

Такое событие можно направить на retry‑очередь. Там оно будет пробовать себя до тех пор, пока не придет created, после чего произойдет удаление и будет достигнут eventual consistency.

Правильно понимаю, что отправляя запрос на retry-очередь, вы по сути делаете ровно тоже самое "настаивание", только долбя при этом БД запросами на проверку наличия записи?

И еще, как вы определяете сколько ждать eventual consistency. Грубо говоря, вам пришло событие удаления, вам нечего удалять, вы ждете события создания, а оно потерялось по сети и не пришло.

Штатных потерь не может быть при таком дизайне
В ретрай очереди можно сколько угодно долго находится сообщение

Штатных потерь не может быть при таком дизайне

В общем случае - легко, особенно когда у вас три топика отдельных и вероятно три разных продьюсера, которые могут работать асинхронно. Если же у вас один продьюсер льет в три топика строго последовательно, то зачем нужна kafka? Можно же лить напрямую в elastic сразу, вместо кафки.

В ретрай очереди можно сколько угодно долго находится сообщение

Можно-то можно, но такое сообщение каждый раз будет инициировать запрос к эластику с проверкой наличия записи в индексе. Если целостность нарушится и то сообщение, которое вы ждете, не придет никогда, то ситуация будет не очень хорошая.

Не проще создать таблицу itemId, status, isProcessed, version?

В статус пишем create/update/delete в рамках транзакции совместно с изменением основной таблицы, isProcessed ставим в false.

Теперь можем смело сразу за записью в БД писать в es и обновлять запись в таблице при успехе.

Дальше в фоне бегаем по этой таблице и обновляем isProcessed после изменения в es на строку с указанным статусом, версией и id, чтобы обеспечить консистентость в случае сбоев. Если affected_rows будет 0, значит статус элемента изменился и нужно перезапросить текущий статус и обновить данные уже согласно нему.

Таким образом и проблема реиндекса при сбоях будет легко решаться, просто повторить обход таблицы для уже обработанных результатов кроме deleted.

Debezium прекрасно решает задачу доставки данных из постгрес в кафку

дебезиум не может магически сделать констистеное обновление сущности

Он просто копирует, максимум им можно заменить аутбокс

А дальше все равно нужно думать как разрешить гонку при параллельном чтении из разных топиков

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории