Всем привет! Меня зовут Даша, я python-разработчик в команде Big Data «Группы Лента» (Lenta tech). За свой восьмилетний опыт работы я успела поучаствовать в большом количестве проектов. Инструменты и фреймворки менялись, но оставалось одно — Kafka. Сначала этот инструмент использовался с опаской и на небольших проектах, а сейчас стал стандартом при проектировании любых систем.
Я собрала набор практических приемов, которые помогают настраивать её быстро и без лишних сложностей. В статье делюсь своими «рецептами», которые не всегда можно найти в документации или «нагуглить».
Рецепт 1. Локальное развертывание с помощью docker-compose
Думаю, большинство справятся с базовой настройкой контейнера или возьмут готовые примеры. Я бы хотела сосредоточиться на дополнительной настройке с помощью переменных окружения.
Здесь и далее буду использовать image от confluentinc.
KAFKA_BROKER_ID — уникальный идентификатор брокера. Используется для координации в кластере. Этот параметр обязателен для локального развертывания в том числе, хотя чаще всего не имеет пользы при разворачивании уникальной пары zookeeper-kafka.
KAFKA_ZOOKEEPER_CONNECT — строка подключения к zookeeper. Думаю, тут все понятно.
KAFKA_ADVERTISED_LISTENERS — один из самых важных параметров, адреса и порты для подключения клиентов. Каждый элемент состоит из наименования, хоста и порта. Таким образом, можно регулировать, откуда клиент сможет подключаться к кафке.
PLAINTEXT://localhost:9092 — подключение только локально.
PLAINTEXT://kafka:9092 — подключение только из докера.
PLAINTEXT://localhost:9092 — kafka в докере, доступ локально, если порт 9092 проброшен.
Если необходимо сохранить доступы и внутри сети докера, и снаружи, можно определить несколько слушателей, например
INTERNAL://kafka:29092,EXTERNAL://localhost:9092
где INTERNAL://kafka:29092 — слушатель внутри сети докера, а EXTERNAL://localhost:9092 — извне
Kafka будет сообщать клиентам:
"если ты внутри контейнера — используй kafka:29092"
"если ты снаружи — используй localhost:9092"
Важно, что наименование может быть абсолютно любым. Главное указать его в параметрах KAFKA_LISTENER_SECURITY_PROTOCOL_MAP и KAFKA_INTER_BROKER_LISTENER_NAME.
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP — здесь указывается сопоставление имен и привязывает его к конкретному протоколу.
Например,KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
Для обоих имен в нашем примере указывается протокол PLAINTEXT (без шифрования).
Kafka разделяет понятие имени слушателя и используемого протокола, чтобы дать больше гибкости. Это особенно полезно, когда вы используете несколько интерфейсов. Например, один для внутреннего взаимодействия между брокерами, а другой — для внешних клиентов.
Для корректного внутреннего взаимодействия также необходимо указать наименование слушателя для внутреннего взаимодействия в параметре KAFKA_INTER_BROKER_LISTENER_NAME. В нашем примере это INTERNAL. Несмотря на то, что локально чаще всего нет необходимости поднимать кластер и несколько брокеров, этот параметр также остается обязательным.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR — параметр для определения фактора репликации. Этот параметр важен для корректной работы групп консумеров. Количество брокеров и фактор репликации должны совпадать, либо фактор репликации должен быть меньше. В локальной разработке нет необходимости использовать несколько копий, поэтому заострять внимание на этом не буду.
KAFKA_MESSAGE_MAX_BYTES — параметр, определяющий максимальный размер сообщения в байтах, включая тело, ключ, заголовки и мета информацию. По умолчанию используется 1мб, но в моей практике часто встречалась необходимость использования большего количества.
Итак, базовая настройка локальной Кафки:
kafka:
container_name: kafka
image: confluentinc/cp-kafka:7.6.0
restart: unless-stopped
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: 10485880
Для локальной разработки и тестирования больше чем достаточно.
Конечно, есть еще много интересных настроек для локального разворачивания, но по опыту такой настройки вполне достаточно для комфортной работы.
Рецепт 2. Удобный UI
По запросу “ui for kafka” Google сразу выдает Kafka ui. Это хороший базовый инструмент для простого чтения сообщений. Но у него есть проблемы - некорректный поиск, нет возможности читать закодированные сообщения, некорректная работа пагинации. После недолгих поисков я нашла более подходящий моим задачам инструмент - AKHQ. Для локального развертывания я использую image tchiotludo/akhq.

Здесь используется только одна переменная окружения — AKHQ_CONFIGURATION, где в формате yaml описывается конфигурация для подключения к кафке.
Самый простой базовый пример:
akhq:
container_name: akhq
image: tchiotludo/akhq
restart: unless-stopped
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
kafka:
properties:
bootstrap.servers: "kafka:29092"
ports:
- 8080:8080
Рецепт 3. А если сообщение закодировано в protobuf?
Когда я разрабатывала один из проектов с закодированным сообщением в protobuf, для проверки я использовала сторонние сервисы для рас��одирования сообщения. Но в таком подходе есть большие проблемы — это рутинная операция, которая занимает время, а ещё большей проблемой стала невозможность организовать поиск по сообщениям. Настал день, когда ручной поиск нужных сообщений стал просто невыносим и я наконец пошла разбираться с UI.
Для работы с закодированным сообщением в protobuf необходимо добавить параметр deserialization в блоке с настройкой подключения к кафке. В коде я работала с файлами с расширением .proto и сгенерированными .py для python. Но здесь используются файлы формата *.desc. Их можно сгенерировать с помощью утилиты protoc в терминале:
protoc \
--descriptor_set_out=your_file.desc \
--include_imports \
--proto_path=./protos \
your_file.proto
Тогда конфигурация ui будет выглядеть следующим образом:
akhq:
image: tchiotludo/akhq
restart: unless-stopped
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
kafka:
properties:
bootstrap.servers: "kafka:29092"
deserialization:
protobuf:
descriptors-folder: “/desc”
topics-mapping:
- topic-regex: “in_topic.*”
descriptor-file: "in.desc"
value-message-type: “InItem”
- topic-regex: “out_topic*”
descriptor-file: "out.desc"
value-message-type: “OutItem”
ports:
- 8080:8080
volumes:
- ./descriptors/in.desc:/desc/in.desc
- ./descriptors/out.desc:/desc/out.desc
Поподробнее про новые переменные:
descriptors-folder — папка внутри контейнера, в которой лежат файлы конфигурации.
topic-regex — регулярное выражение с названием топика/группы топиков, для которого будет применено расшифрование с помощью desc файла.
descriptor-file — наименование файла desc.
value-message-type — внутри proto файла обычно описано наименование сообщения, здесь его нужно указать.
Рецепт 4. Библиотека Confluent Kafka и ошибки
А теперь немного про Python. Возможно и в других языках программирования есть такая же проблема. Самая популярная библиотека для подключения к кафке работает через потоки. Соответственно, для корректной работы exception она явно не бросает, а возвращает объект ошибки KafkaError.
Со своей стороны, клиент может обработать эту ошибку и вызвать явно exception.
Большинство из критических ошибок при работе с Кафкой не роняет процесс, а просто логирует ошибки. Кажется логичным, чтобы такие критические ошибки не замалчивались. Для producer, например, можно описать явную обработку любой ошибки:
def error_callback(err):
logger.exception(f"Error in kafka client: {err}", exc_info=True)
raise KafkaException(err)
producer_configuration: dict[str, Any] = {
"bootstrap.servers": bootstrap_servers,
"error_cb": error_callback,
}
Рецепт 5. KAFKA_ADVERTISED_LISTENERS или как работать с кафкой корректно и в докере и локально
Выше мы уже говорили об этом запутанном параметре и как понять, каким образом описывать его правильно. Здесь же расскажу проблему, с которой я долгое время не могла разобраться.
Обычный сценарий: Кафка и akhq подняты в docker-compose в одной сети, проект запускается локально. Как описать переменные окружения для проекта? Как корректно указать конфигурацию для akhq?
Начнем с описания параметра для Кафки:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
Для внутреннего использования - INTERNAL, для внешнего EXTERNAL.
Конфигурация для AKHQ:
AKHQ_CONFIGURATION: |
akhq:
connections:
local:
properties:
bootstrap.servers: "kafka:29092"
Важно использовать внутренний listener. Для локального подключения также важно использовать внешний listener localhost:9092.
Надеюсь, что эти рецепты помогли уложить в голове знания по работе с Кафкой локально и вы сможете с удовольствием использовать их в вашей повседневной разработке. Буду рада любым комментариям и дополнениям!
А какие приемы или инструменты помогают вам упростить работу с Kafka и сделать её более предсказуемой?