Обновить

Комментарии 3

Выглядит слишком энтерпрайзно. Если вы в вашей новой реализации начали подтверждать каждое входящее сообщение индивидуально, вместо того, чтобы подтверждать целыми пачками, и вам это норм по производительности, то можно сделать проще. Сохранять обрабатываемые сообщения в LinkedHashMap и подтверждать в непрерывно возрастающем порядке по мере того, как обработка происходит. Это заодно поможет управлять уровнем паралеллизма, а то сейчас у вас этот уровень стал очень неявным: размер пула потоков, который выполняет flatMap().

Соглашусь, что решение может выглядеть усложнённым. В том числе цель публикации в том, чтобы выйти из "информационного пузыря проекта" и собрать другие мнения.

Вариант, схожий предложенному вами, с in-memory структурой и последовательным commit также рассматривался. Проблема, которая нас пока останавливает при таком подходе, связана с характером нагрузки. У нас определённый процент операций может стабильно «тормозить» до ~60 секунд и есть опасения, что количество реально обработанных сообщений может существенно разойтись с закомиченным offset. Возможно это и надуманная проблема. Теоретически это не так и критично критично,т.к. есть защита от дублей. Но при большом и нестабильном лаге commit’а появляется ощущение меньшей управляемости процесса.
Да, дубли возможны и здесь, но дополнительно ведётся внешний реестр сообщений, что даёт более явную точку контроля при сбоях и рестартах.
Возможно, проблема действительно частично надуманная и мы это не исключаем, что в будущем пересмотрим подход в сторону ordered commit или parallel consumer. Пока же текущая модель несколько месяцев работает в PROD и выдерживает заявленный уровень нагрузки.

По поводу flatMap
Если вопрос был в том, что в качестве concurrency используется та же настройка maxPollRecords, что и в batch-подходе, то согласен, семантически название не идеально. Логически это скорее processingConcurrency, а не размер батча. Я просто решил не вводить еще один параметр в демонстрационном проекте.
При этом сам параллелизм на мой взгляд вполне явно:
flatMap(mapper, concurrency) ограничивает количество обработчиков в один момент времени и обеспечивает backpressure. В обработке одновременно находится не более maxPollRecords сообщений. Как только один обработчик завершается, то запрашивается следующий элемент на обработку, если он есть.

Был неправ насчёт паралеллизма, проглядел, что передаётся параметр. Синтаксис Котлина мне не очень привычен.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации