Комментарии 3
Выглядит слишком энтерпрайзно. Если вы в вашей новой реализации начали подтверждать каждое входящее сообщение индивидуально, вместо того, чтобы подтверждать целыми пачками, и вам это норм по производительности, то можно сделать проще. Сохранять обрабатываемые сообщения в LinkedHashMap и подтверждать в непрерывно возрастающем порядке по мере того, как обработка происходит. Это заодно поможет управлять уровнем паралеллизма, а то сейчас у вас этот уровень стал очень неявным: размер пула потоков, который выполняет flatMap().
Соглашусь, что решение может выглядеть усложнённым. В том числе цель публикации в том, чтобы выйти из "информационного пузыря проекта" и собрать другие мнения.
Вариант, схожий предложенному вами, с in-memory структурой и последовательным commit также рассматривался. Проблема, которая нас пока останавливает при таком подходе, связана с характером нагрузки. У нас определённый процент операций может стабильно «тормозить» до ~60 секунд и есть опасения, что количество реально обработанных сообщений может существенно разойтись с закомиченным offset. Возможно это и надуманная проблема. Теоретически это не так и критично критично,т.к. есть защита от дублей. Но при большом и нестабильном лаге commit’а появляется ощущение меньшей управляемости процесса.
Да, дубли возможны и здесь, но дополнительно ведётся внешний реестр сообщений, что даёт более явную точку контроля при сбоях и рестартах.
Возможно, проблема действительно частично надуманная и мы это не исключаем, что в будущем пересмотрим подход в сторону ordered commit или parallel consumer. Пока же текущая модель несколько месяцев работает в PROD и выдерживает заявленный уровень нагрузки.
По поводу flatMap
Если вопрос был в том, что в качестве concurrency используется та же настройка maxPollRecords, что и в batch-подходе, то согласен, семантически название не идеально. Логически это скорее processingConcurrency, а не размер батча. Я просто решил не вводить еще один параметр в демонстрационном проекте.
При этом сам параллелизм на мой взгляд вполне явно:
flatMap(mapper, concurrency) ограничивает количество обработчиков в один момент времени и обеспечивает backpressure. В обработке одновременно находится не более maxPollRecords сообщений. Как только один обработчик завершается, то запрашивается следующий элемент на обработку, если он есть.

Параллельная обработка Kafka сообщений с гарантией at-least-once в условиях медленного внешнего сервиса