Комментарии 36
Зукипер сам по себе является очень важным элементом Кафки, так как если он упадет, то перестанет работать вообще вся Кафка.
Тут можно упомянуть , что кафка уже давно умеет работать по KRAFT без зукипера.
и еще про зукипер, в новых версиях Кафки его выпилят(https://strimzi.io/blog/2024/03/21/kraft-migration/#:~:text=Conclusion,to using KRaft-based clusters.).
Автору статьи спасибо очень хорошая статья скинул коллегам датаинженерам)
Идемпотентный продюсер работает только для одного и того же инстанса продюсера. Если у вас, к примеру, перезапустится сервис после отправки, но без отметки, что сообщение отправлено, то новый инстанс все равно пошлет это сообщение. В итоге на консумер, даже если он помечен как идемпотентный, придет 2 одинаковых сообщения.
когда у нас данные, для которых нужно обеспечить гарантию доставки, и при этом их считывание не идемпотентно. Например операции с платежными транзакциями.
Пожалуйста, не делайте обработку платежных транзакций без идемпотентности на уровне бизнес-логики :)
С рекомендацией насчет идемпотентности в платежных транзакциях согласен)
Да вообще не бывает никакого exactly once нигде и никогда.
В Кафка есть механизм частичного уменьшение дублей который назвали exactly once. Но это не про семантику взаимодействия, повторы на consumer все равно возможны.
Поэтому смысла включать exactly once нет, только если нужны внутрикафковские транзакции (там, кажется, режим exactly once необходим для включения транзакций)
Всё несколько хитрее. Для обеспечения семантики exactly once необходимо коммитить чтение консюмером, только вместе с коммитом записи продюсером. У клиента для этого есть специальный метод. А если используете какую-то оболочку (spring kafka например), то всё доступно из коробки.
Для обеспечения семантики exactly once необходимо коммитить чтение консюмером, только вместе с коммитом записи продюсером.
Что вы имеете в виду? Как продюсер с консумером могут синхронизировать запись?
Через метод Producer.sendOffsetsToTransaction. Вот здесь почедовечески написано как делать через родной клиент https://www.baeldung.com/kafka-exactly-once.
Не знал о таком, спасибо!
Но выглядит как довольно узкий юзкейс: продюсер должен знать о консумер-группе, и поддерживается только одна группа. Возможно, проще будет какой-нибудь синхронный RPC использовать.
Нормально работает, неузко. В рамках, например spring kafka прозрачно и незаметно для разработчика. Ничего другого (синхронного rpc в асинхронной обработке) делать не нужно.
Да нет, как раз узкий кейс. Чаще продьюсер не знает ничего ни о количестве консьюмер групп, ни об из названиях.
Но всё равно интересно, спасибо. Не знал, что кафка так умеет
Честно говоря, у вас здесь столько неточностей, что пройдусь только по крупным.
Kafka Connect API - это не программный интерфейс к Кафке, и никакого отношения к языкам программирования он не имеет. Kafka Connect - позволяет лить данные из внешних систем в Кафку, или из Кафки во внешние системы(базы данных, эластик, даталейк) без программирования, только плагин к системе и конфигурация.
Партиции - это вовсе не подмножества топиков. Топик делится на партиции, партиция - это единица репликации и избыточности(redundancy).
Емкий небольшой формат - это не json, а Avro или protobuf.
Ключ не используется для сегментации сообщений внутри партиции. По умолчанию сообщения с одинаковым ключом попадают в одну и ту же партицию (но все можно поменять). Смысл в том, что в партиции гарантируется очередность сообщения, и если очередность важна, то сообщения должны иметь одинаковый ключ.
Если хочется указать консьюмеру, как десериализовать сообщения, надо использовать не ключ, а headers.
Сообщение не будет удалено из переполненного топика. Сообщения удаляются по времени (старше, чем Х дней) или, (если сконфигурировано), когда размер топика будет больше, чем заданный размер. Если у вас просто кончится место на диске, то Кафка упадет.
min.insync.replicas нельзя ставить больше, чем количество реплик, это в принципе невозможно. Обычно это количество реплик - 1.
Кафка никогда не удаляет отдельные сообщения, а только целые сегменты.
max.poll.interval.ms проверяется на клиенте. Не Кафка считает клиента неработоспособным, а он сам себя считает неработоспособным и посылает leave request.
я вот пожалел времени, чтобы сиё творение коментить )
это просто какой-то поток сознания
Благодарю за большой комментарий! Но по нему есть несколько нюансов, которые нужно прояснить, чтобы не вводить в заблуждение других читателей:
Да, про Kafka Connect API верно, вместо этого имелось в виду Kafka API для написания клиента, который взаимодействует с Kafka. В принципе они у себя в документации его так и называют, уточняя роль клиента для Kafka (https://kafka.apache.org/documentation/#api)
Про min.insync.replicas — да, в тексте должно быть словом "меньше", а не "больше". Спасибо
max.poll.interval.ms — да, это настройка именно клиента Консюмера, а не брокера. Если достигнут таймаут, то Консюмер считается упавшим и группа выполнит повторную балансировку, чтобы переназначить разделы другому Консюмеру из группы
Партиции являются репликами по отношению к друг другу, которые вместе являются составными частями топика, подробнее об этом можно прочитать в официальной документации (https://kafka.apache.org/documentation/#introduction)
Про емкость json сказано в контексте сравнения с такими форматами, как mp4. А то, что бинарник будет еще меньше – да, это очевидный факт
Про ключи вы, видимо, запутались: ассиметричность заполнения партиций проявляется в рамках настройки репликации между master-slave партициям, причем тут ключи не совсем понятно
Сообщения удаляются из Kafka при достижении заданного лимита объема retention.bytes
Я ни в чем не запуталась, я работаю datastreaming инженером больше 10 лет. Вы пишите, что "ключ используется для сегментации сообщений внутри партиции". Это в принципе неверно. Партицию, в которую отправить сообщение, определяет продюсер, и по умолчанию она определяется так: murmur2hash(key)%partition_count. То есть берется хэш от ключа, делится на число партиций в топике и остаток - это номер партиции. Это единственный смысл ключа.
Например, при отправке сообщений с данными клиента в топик, имеет смысл использовать идентификатор клиента в качестве ключа. Это обеспечит попадание всех сообщений, связанных с конкретным клиентом, в одну и ту же партицию, сохраняя их в правильном порядке.
Остальное, кому надо в документации проверит, но с ключами - это фундаментальная вещь.
Эээ, репликами чего являются партиции? В разных партициях живут разные сообщения, откуда там реплики?
При получении сообщения от Продюсера Кафка конвертирует его через сериализатор в бинарный вид, а далее при передаче консюмеру возвращает сообщение в исходный формат через десиреализатор.
Разве это так? Мне казалось что сама Кафка как раз работает с байтами, поэтому в продюсере указываем сериализотор в байты, а в консумере десириализотор, чтобы он знал как байты преобразовать в нужную структуру.
Мне не хватило раздела про автомасшабирование.
Например у меня в AZURE есть woker который автомасштабируется, в kafka 2 партиции. В обычном состоянии в 1w вливается 2 партиции.
Смотрю worker расширился до 6 экземпляров, и получается что сейчас 4 экземпляра простаивают.
Есть вообще какие механизцы автоскалинга партиций под количество сервисов? Спасибо.
Нет, в Apache Kafka никакого автомасштабирования партиций нет. Количество партиций можно увеличить, запустив скрипт со специальными параметрами. При этом данные, которые в Кафке уже есть, останутся в старых партициях, и в новой конфигурации распределение по ключам будет уже другое. Уменьшить количество партиций в принципе нельзя (только пересоздать топик).
Автоматического увеличения/сокращения партиций из коробки в Кафке нет. Но можно посмотреть, что сторонние инструменты предлагают, например, такие как Confluent Control Center. Сам подобное не реализовывал, поэтому больше подсказать тут не могу
Можно сразу сделать больше партиций, если у вас в целом топиков-партиций на кластере не очень много, то хоть 50 партиций сразу ставьте.
Пара вопросов:
1) Подскажите как правильно монитрить кафку
2) Часто бывает когда перезапускается консьюмер, то он не сразу начинает получать сообщения, а после процесса ребалансировки, который может длиться некоторое время. Как этого избежать или сократить время ожидания?
Метрики Кафки выставляются через JMX, поэтому используйте стек для мониторинга, с которым вы умеете работать. Prometheus-Grafana, ELK, zabbix, что угодно. У Confluent хороший репозиторий https://github.com/confluentinc/jmx-monitoring-stacks, я его использую с минимальными изменениями.
Можете использовать статическую принадлежность к группе (static group membership), дать консьюмеру group.instance.id, тогда если перезапуск будет короткий (меньше установленного таймаута), то консьюмер получит свои партиции назад.
Спасибо большое за классную статью!
Уточняющий вопрос:
"""...Exactly once...их считывание не идемпотентно... Например операции с платежными транзакциями."""
Операции с банковскими транзакциями не идемпотентны? Вероятно, не самый удачный пример?
Спасибо!
Да, тут в самом деле стоит более четко пояснить пример.
В статье постарался привести описание кейса, когда необходимо придерживаться exactly once: "Когда нужно использовать: когда у нас данные, для которых нужно обеспечить гарантию доставки, и при этом их считывание не идемпотентно. Например операции с платежными транзакциями".
Имелось в виду, что на стороне системы платежная операция не идемпотентна (то есть если ее повторить, то деньги спишутся повторно, а не будут обработаны как дубли), но при этом допускать ее повтора (как в at least once) или вообще ее потерять (как в at most once) категорически нельзя, потому что те же платежные транзакции это весьма щекотливая тема. Поэтому здесь строго нужна exactly once семантика, ибо без ее гарантии обрабатывать платежные транзакции в асинхронном формате (через Кафку) нельзя.
Если выходить за рамки примеры, то те же платежные транзакции стоит выполнять в синхронном формате (end-to-end), чтобы была гарантия разовой обработки как для системы-потребителя, так или для системы-производителя.
У Вас в статье очень красивые схемы: ёмкие и аккуратные. Поделитесь, пожалуйста, в чём Вы их рисовали?
Спасибо!
Для рисунков использовал powerpoint, а для схем eraser.io и plantuml.com
Kafka. Лучшие практики применения. Настройки Producer & Consumer