Как стать автором
Обновить

Комментарии 25

Мы можем смело использовать механизм auto-commit, но этот параметр лучше всего явно прописывать в конфигурации.

Может все таки не надо?

at least once на уровне вашего приложения с ним не гарантируется. Прочитали - закомитили - упали. Данные потеряны. И никто не виноват.

Почему не гарантируется? Все зависит от того как мы работаем с исключительными ситуациями , если используем обработчики ошибок по-умолчанию - то да, вы правы.

Тут важно отметить один момент про который я писал в первой статье: при включённой настройке enable.auto.commit=true коммит не происходит сам по себе, а жестко связан с получением сообщения.

Т.е. последовательность в цикле обработки сообщений у нас получается такая:

  1. Коммитим предыдущее смещение

  2. Получаем новые сообщения

  3. Обрабатываем

Обработка данных бывает мультипоточной. В своем пуле потоков. Когда поток данных достаточно большой это типовая схема.

Коммит произойдет независимо от того закончилась обработка или нет. Это взрывоопасная схема by design.

Тут опять же все зависит от того как вы спроектируете обработку. В данном случае consumer и listener выполняются в одном потоке. Если вы отделяете обработку в отдельный поток, то и менеджмент коммитов остаётся на вашей совести. Собственно так и поступает spring-kafka при enable.auto.commit=false.

У вас проблема именно в архитектуре. События "данные обработаны" и "commit" происходят независимо друг от друга.

Есть вариант когда они будут связаны и зависимы, но это однозначно из кода не видно. И любой рефакторинг может эту связь поломать. Это даже тестами не поймать. Вероятность просыпания событий небольшая.

Практика говорит что чем меньше таких косяков в архитектуре приложения, тем оно лучше и дольше живет в дикой среде.

События "данные обработаны" и "commit" происходят независимо друг от друга.

Почему вы так решили? В случае если мы не берём управление коммитом на себя и оставляем enable.auto.commit=true делегируя тем самым коммит KafkaConsumer последовательность вполне детерминирована :

  1. Закоммитили предыдущий оффсет

  2. Получили новые сообщения

  3. Обработали

Если мы сознательно не выносим последний шаг в отдельный поток

Представьте себя на месте разработчика которому прилетел тикет на ускорение обработчика. Один поток это откровенно медленно. Да и он может висеть на io, и недоиспользовать даже это одно ядро. Вероятность такого тикета далеко ненулевая.

Вы спокойно выносите обработку в свой пул, обвешиваете футурами или как у вас принято. И закрываете тикет. По коду не видно что надо сделать еще что-то. Тесты и тестирование проходят успешно.

А через пару месяцев в проде все взрывается. От обычного рестарта вашего приложения. Данные потеряны навсегда. Что делать непонятно. Даже причину потери данных установить очень нетривиально.

При наличии явно прописанной в коде связи событий "обработали -> закомитили" вероятность такой ошибки гораздо меньше. Тот же разработчик ее увидит и поправит соответвенно. Ошибки с большей вероятностью поймаются тестами.

Да, тут соглашусь, такое может быть. Это пожалуй одна из причин для чего я начал этот цикл статей - немного развеять туман в связке spring + kafka (в том числе и для себя)

Но ведь единицей масштабирования в kafka принято считать партицию.

При необходимости масштабирования - всегда можно выделить больше партиций с соответствующим числом обработчиков (где свои смещения)

А расспараллеливать обработку из одной очереди/партиции выглядит некорректным (by design), т.к обеспечить атомарное выполнение операции проблематично.

Когда повидаешь всякого начинаешь подстилать соломки везде. Скорость записи в одну партицию достаточно велика чтобы это было проблематично обработать одним потом в некоторых случаях.

Проблем в атомарности нет. Немного больше кода и аккуратная обработка коммитов. И все.

Ну зависит все же от количества партиций, если запись слишком быстрая - их можно и разнести.

А как нет проблем с атомарностью? Если вы вынесете в асинхронную обработку, то у вас сообщения пришедшие позже могут быть обработаны раньше и успешно, а те что пришли раньше - упасть. Как контролируете коммиты в таких случаях?

Вот пришло 3 сообщения с следующими оффсетами:
35: успешно
36: ошибка
37: успешно (раньше 36)

Просто масштабировать "минимальную единицу масштабирования" - выглядит не очень корректно.

Мне просто действительно интересен ваш опыт :)

Корректный способ, разве что, отдельная очередь самой обработки (но синхронно обрабатываемая). Т.е параллелится сама вычитка и обработка, но это в целом подразумевает ручной коммит оффсета.

Писать код так что сообщение не можешь быть необработано. Если что-то залипло дольше чем это разумно роняем все приложение и уходим в рестарт.

Есть опасность циклического рестарта и неразгребания очереди. Делаем на это мониторинг. Звоним по телефону, будим разработчиков. После пары звонков в три ночи все чиниться.

Ну т.е. сама обработка, как я упомянул - синхронно идет (в параллель само считывание и обработка, но не обработка отдельно взятых сообщений)?
Нет гонки между обработкой сообщений одной партиции?
(в ином случае, ронять приложение по некому таймауту не избавит от "неудачи по середине" и при следующем поднятии либо утеряется сообщение, либо повторно считаются уже обработанные)

Но spring-kafka нам обеспечивает схожее поведение при использовании ConcurrentKafkaListenerContainerFactory, с встроенной очередью сообщений на обработку и кэлбеком об успехе/неудаче.

Ручной коммит вполне себе хорошо контролируемый и явный процесс.

Ну и в целом повторное чтение можно не боятся при обеспечении идемпотентности, но все же оптимальный ли это путь - остается вопросом профиля использования.

Просто ждём пока все нынешнее и все предыдущее не обработается. Комитить с дыркой посередине нельзя бай дизайн. Обрабатывающий пул принимает вычитанное в порядке очереди вычитывания. Порядок нарушаться может и надо быть к этому готовым.

Обработка параллельная, сообщения нужны достаточно равномерные по времени обработки.

Падать если что-то не обработалось и непонятно почему это по сути единственный вариант. Работать дальше нельзя никак. Состояние приложения неопределенное получается.

В этом случае без всяких доработок в коде попросят у devops поднять ещё один контейнер с этим приложением и добавить партиций на топик и обработка пойдёт пропорционально быстрее.

Ну это можно организовать и в одном приложении, при желании (1 поток на 1 партицию)

Это сильно сложнее и может принести регресс. Кроме этого количество партиций может увеличиваться настройкой кафки. Если использовать devops, то просто подрастёт количество контейнеров с приложением.

Кроме этого может оказаться, что узкое место процессор например или память и тогда добавление потоков на одной машине задачу в какой-то момент перестанет решать. А контейнеры могут подниматься на разных машинах и не имеют такого ограничения.

Управлять контейнерами действительно будет проще.

Но на деле, одной машиной читать несколько партиций не сильно сложнее.

Конечно, bottleneck может быть в другом месте, потому и решения нужно выбирать соответствующие профилю использования :)

После какого-то числа контейнеров это начинает влиять на деньги. Контейнеры они денег стоят. А их сотни бывают. И приходит мысль что дешевле поправить код.

Тут вопрос умного devops. Не нужно всегда держать рабочими много контейнеров, при уменьшении нагрузки можно лишние потушить.

Например, если количество не обработанных сообщений в топике стало падать на какой-то процент в единицу времени, значит пик нагрузки прошёл и можно освобождать ресурсы.

Можно. Только это медленно. Сколько там типовое время реакции? Минуты? Дневные волны так обрабатывать ок. Более мелкие уже не ок.

И как обычно не все облака умеют не тратить деньги за выключенные контейнеры. Бывает так что укажи свой максимум. И все. Не используешь - сам виноват. Никому они ночью не нужны.

Мы упираемся в один поток. Который имеет привычку висеть на io. Добавляем инмемори кеш с чем угодно и вот вам странный контейнер Одно ненагруженное ядро и 10гб рам. Если это надо разложить по железкам то девопс к вам первый с угрозами придёт. Это по железкам плохо раскладывается.

Я как-то предпочитаю строить софт без таких ограничений. Это на самом деле не очень сложно.

Ну тут и не нужна мгновенная реакция, минуты и десятки минут, потому что пики могут периодически повторяться, а поднимать и тушить контейнер это тоже время.

Ну и мы тут не любой гипотетисчкий кейс смотрим, а кейс с читаем из kafka. Совсем универсальное решения, оптимального по всем параметрам для всех вариантов использования понятное дело нет.

Конечно. Без конкретики это все чистая теория.

а какое значение по-умолчанию для pollTimeout?

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории