Комментарии 21
А чем вас готовый коннектор от Elastic не устроил?
Судя по коммитам какой-то мертворожденный проект и он не решает задачу.
Отвечу за автора так как делал такую же схему несколько лет назад и она всё еще работает. Плюсы репликации через кафку:
Источник данных может быть любой, в моем случае, это были 2 разных реляционки.
Можно наполнять несколько кластеров без влияния на источник. Например, эластик плохо переносит потерю связи между нодами, в такой схеме можно создать по отдельному кластеру в ДЦ.
Можно пересоздать индекс без влияния. Один консьюмер читает в старый индекс, другой в новый. Актуально, если как написано в статье наполнение индекса занимает больше часа, например 3 месяца. Индексы пересоздавать придется, потому что создать сразу правильные мапинги еще никому не удавалось.
Еще, механизмами кафки можно восстанавливать данные - остановить консьюмеры, сдвинуть оффсет и перепролить за период, регулировать нагрузку от индексации - количеством консьюмеров/партиций, следить за отставанием репликации по лагу.
Относительно сомнительно скрипта на питоне, минусов в такой схеме наверно и нет. Самая главная проблема сам эластик и то что строгой консистентности не будет, особенно при нескольких кластерах.
Почему нельзя было все события одной сущности держать в одной партиции?
А если у нас все в таблицу записано, то что мешает пары create-delete в разрезе идентификатора объекта убрать из этой таблицы до отправки в elastic? Все равно по итогу ничено не останется в elastic.
А почему нельзя используя транзакцию PostgreSQL сделать запись в эластик (как в самом первом варианте) и если она не прошла, повторить? Resilient вариант, так сказать. Если он не пройдёт ещё несколько раз, вернуть ошибку клиенту, т.к. если что-то с эластиком случилось, то клиент всё равно не получит из него данных, если ему вернуть ОК.
Дело в том, что Эластик не подчиняется транзакциям из Постгрес
Ну понятно, но транзакция позволит не появиться фантомной записи (без индекса в эластике), а resilient позволит или убедиться, что они всегда парой или вернуть ошибку. Прямо как первый вариант. Просто не пойму почему нельзя было повторить запросы эластика, если они не сработали в первый раз. Было какое-то бизнес требование?
Предположим, у нас есть три топика
item_created
,item_updated
,item_deleted
в которых приходитID
сущности.
Почему не сделать в одном топике все события по item? Так вы добьетесь упорядочивания
Топики разделены по смыслу.
А для чего это нужно? Какой-то практический смысл есть в этом?
Конечно, чтобы клиент получил только то, что ему нужно
Ладно, предположим это требование, на которое вы повлиять не можете. Тогда еще такой вопрос.
Можно будет сохранять и «настаивать» события перед их обработкой. Например, получив
deleted
ждать сколько‑то секунд в надежде что придетcreated
. Такой вариант, подойдет, только он создаст еще большую задержку обработки.
Такое событие можно направить на retry‑очередь. Там оно будет пробовать себя до тех пор, пока не придет
created
, после чего произойдет удаление и будет достигнут eventual consistency.
Правильно понимаю, что отправляя запрос на retry-очередь, вы по сути делаете ровно тоже самое "настаивание", только долбя при этом БД запросами на проверку наличия записи?
И еще, как вы определяете сколько ждать eventual consistency. Грубо говоря, вам пришло событие удаления, вам нечего удалять, вы ждете события создания, а оно потерялось по сети и не пришло.
Штатных потерь не может быть при таком дизайне
В ретрай очереди можно сколько угодно долго находится сообщение
Штатных потерь не может быть при таком дизайне
В общем случае - легко, особенно когда у вас три топика отдельных и вероятно три разных продьюсера, которые могут работать асинхронно. Если же у вас один продьюсер льет в три топика строго последовательно, то зачем нужна kafka? Можно же лить напрямую в elastic сразу, вместо кафки.
В ретрай очереди можно сколько угодно долго находится сообщение
Можно-то можно, но такое сообщение каждый раз будет инициировать запрос к эластику с проверкой наличия записи в индексе. Если целостность нарушится и то сообщение, которое вы ждете, не придет никогда, то ситуация будет не очень хорошая.
Не проще создать таблицу itemId, status, isProcessed, version?
В статус пишем create/update/delete в рамках транзакции совместно с изменением основной таблицы, isProcessed ставим в false.
Теперь можем смело сразу за записью в БД писать в es и обновлять запись в таблице при успехе.
Дальше в фоне бегаем по этой таблице и обновляем isProcessed после изменения в es на строку с указанным статусом, версией и id, чтобы обеспечить консистентость в случае сбоев. Если affected_rows будет 0, значит статус элемента изменился и нужно перезапросить текущий статус и обновить данные уже согласно нему.
Таким образом и проблема реиндекса при сбоях будет легко решаться, просто повторить обход таблицы для уже обработанных результатов кроме deleted.
Debezium прекрасно решает задачу доставки данных из постгрес в кафку
Льем из Postgres в Elastic консистентно