Comments 9
Большой прогресс по сравнению с предыдущей статьёй: на этот раз всё написано самостоятельно. Вопрос с намёком: а что более осмысленное, чем строка "Key", имело бы смысл в этом конкретном сервисе использовать в качестве ключа?
Можно подробнее про 2 основных кейса.
1) Хочу быстрее обрабатывать (чтобы очередь быстрее рассасывалась) => добавляю консьюмеров. Что будет, если просто добавить, без групп, при одной партиции и при нескольких? А если с группами? Что, если партиций три, а в 3 группах по 1, 2 и 3 консьюмера, как они будут назначаться партициям?
2) Хочу, чтобы одно и то же сообщение обрабатывалось несколькими разными сервисами. Что делать?
Теперь стало понятно, что для поддержки перезапусков сервисов есть специальная техническая очередь. Но, как оффсеты сопоставляются с консьюмерами, мы же не задаем никаких id на стороне консьюмера. А что если состав консьюмеров изменился?
Например, единственный консьюмер упал, после перезапуска Кафка не смогла подобрать ему оффсет, а в очередь за время простоя набежало сообщений. Если брать latest, то не будут обработаны некоторые сообщения. С этой точки зрения всегда удивлялся, как в Кафке выкручиваются, не храня состояние на стороны консьюмеров.
Ну поехали :)
Если вы добавляете консьюмеров без групп на топик, то партиции не распределяются. Каждый консьюмер читает все сообщения. Если же с группами, то в рамках одной группы будет распределение партиций.
Теперь давайте рассмотрим ваш кейс с 3 партициями:Если 1 консьюмер, то он читает все партиции.
Если 2 консьюмера, то один читает две партиции, другой — одну.
Если 3 консьюмера, то каждый читает по одной.
Ещё предлагаю для большего понимания рассмотреть случай, когда консьюмеров в группе больше, чем партиций. В этом случае "лишние" консьюмеры будут простаивать.
Если хотите, чтобы разными сервисами обрабатывались одни и те же сообщения, надо все инстансы в рамках одного сервиса засунуть в одну группу. Так надо сделать для каждого интересующего вас сервиса.
Вторая часть вопроса несколько более сложная. Попытаюсь адекватно объяснить:Оффсеты напрямую не сопоставляются с консьюмерами. Видите ли, какая ситуация, в топике, ответственном за оффсеты, хранятся сообщения следующего вида:
Ключ —
<group_id, topic, partition>Значение —
<commited_offset, metadata>
Это нам даёт возможности для перебалансировки, так как нет жётской связи между оффсетом и конкретным инстансом.
В вашем кейсе с упавшим единственным консьюмером будет примерно следующее:
Kafka переведёт группу в состояние перебалансировки.
Контроллер кластера смаппит сообщение из offser-топика на текущий активный консьюмер.
Как следствие, активный консьюмер начнёт читать с последнего закоммиченного оффсета. То есть никаких потерь не будет, так как мёртвый брокер не мог закоммитить оффсеты для новых сообщений. На всякий случай отмечу, что коммит в Kafka — это некий процесс сохранения оффсета консьюмером.
И в этом случае настройка
latestприменена не будет, так как кафка смаппит оффсет на консьюмера, а эта настройка срабатывает, как вы, надеюсь, помните, когда Kafka не находит оффсет для консьюмера.
Надеюсь, нигде не накосячил в объяснениях :)
Надеюсь не у меня одного проблемы с kafka?
failed to resolve reference "docker.io/bitnami/kafka:4.0.0": docker.io/bitnami/kafka:4.0.0: not found
На докер хабе нету ни одного тега на странице bitnami/kafka
Обновил статью.
Используем confluentinc/cp-kafka вместо Bitnami (который переехал в legacy без обновлений).
Kafka для начинающих: работа с брокером сообщений на практике