Привет! В этой статье мы развернем dev-cluster kafka последней версии (3.7 на момент написания статьи), без использования zookeeper. Также в нашей сборке будет web-ui и все для мониторинга. В представленной конфигурации настроены SASL, SSL, ACL.
В чем полезность статьи? В статье представлен готовый docker-compose.yml для использования, который будет вам полезен если вы не сильно искушены в вопросах администрирования kafka и docker-compose, но уже ��отелось бы начать заниматься разработкой, используя кластер kafka. Беглый поиск в интернет не дал результата в виде готового docker-compose.yml для конкретно такой конфигурации, которая потребовалась мне, поэтому было решено выложить полученный результат.
Я не являюсь узкоспециализированным администратором kafka, поэтому если видите явные ошибки, то буду рад исправить.
Сначала будет быстрое пошаговое руководство как запустить эту сборку и начать использовать. А далее будет представлено полное объяснение переменных, используемых для настройки наших компонентов и объяснение некоторых моментов.
Быстрый старт
Начнем с загрузки необходимых файлов с https://github.com/yubazh/kafka-compose
git clone https://github.com/yubazh/kafka-compose
Для начала нам нужно заменить ip адрес 192.168.0.188 в файле kafka-hosts.txt на свой. После этого запускаем генерацию сертификатов скриптом generate.sh (скрипт не мой, в конце будут ссылки на материалы, откуда "позаимствованы" некоторые автоматизации)
./generate.sh
Данный скрипт генерирует все необходимые файлы и размещает их в директориях certificate-authority, keystore, pem, truststore. После этого необходимо заменить ip адрес 192.168.0.188 на свой в файле docker-compose.yml в трех местах:
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9091,EXTERNAL://192.168.0.188:29093
Для запуска контейнеров используем:
docker-compose up -d
Ждем пока все успешно поднимется и переходим в http://localhost:8080 для доступа в kafka-ui. Используем admin/admin для входа. Для доступа к grafana переходим на http://localhost:3000.
Для остановки контейнеров без удаления volume используем:
docker-compose down
Для остановки контейнеров с удалением всех volume используем:
docker-compose down -v
Для работы с кластером присутствуют файлы admin.properties и client.properties.da/client.properties.va, содержащие необходимые настройки.
Более подробное разъяснение
Начнем также с загрузки необходимых файлов с https://github.com/yubazh/kafka-compose
git clone https://github.com/yubazh/kafka-compose
Нам нужно заменить ip адрес 192.168.0.188 в файле kafka-hosts.txt на свой. После этого запускаем генерацию сертификатов скриптом generate.sh.
./generate.sh
Данный скрипт генерирует все необходимые файлы для работы ssl и размещает их в директориях certificate-authority, keystore, pem, truststore.
Далее рассмотрим docker-compose.yml посервисно.
kafka-0:
kafka-0: # название контейнера по которому к нему можно обращаться container_name: kafka-0 # hostname контейнера hostname: kafka-0 # образ который мы будем использовать, а именно bitnami kafka v 3.7 image: docker.io/bitnami/kafka:3.7 ports: # для подключению к каждому из брокеров будем использовать вывешенный наружу порт 29092/29093/29094 - "29092:29092" # также вывешиваем порты 29096-29097 которые ведут к порту 9404. на этот порт вывешиваются метрики jmx-exporter'a - "29095:9404" restart: always networks: - kafka environment: # KAFKA CLUSTER - в данном разделе содержатся переменные для кластеризации кафки # у каждой из нод должен быть свой уникальный ID внутри кластера - KAFKA_CFG_NODE_ID=0 # указываем ID кластера. должен быть идентичен внутри кластера у всех нод - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv # указываем, что каждый брокер будет исполнять роль broker'a и controller'a. контроллер заменяет отсутствующий у нас zookeeper - KAFKA_CFG_PROCESS_ROLES=broker,controller # перечисляем список нод, которые исполняют роль controller'a и будут собраны вместе для кворума - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 # указываем какой пользователь будет обладать правами superuser'a - KAFKA_CFG_SUPER_USERS=User:sa # LISTENERS - в данном разделе содержатся переменные для настройка способов подключения к брокеру # указываем название самой сущности "listener" и порты, на которых брокер будет принимать запросы - KAFKA_CFG_LISTENERS=INTERNAL://:9091,EXTERNAL://:29092,CONTROLLER://:9093 # указываем адрес и порт, по которым клиенты смогут подключаться к кластеру. контроллер здесь не указывается - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9091,EXTERNAL://192.168.0.188:29092 # указываем протоколы, по которым будут работать описанные выше сущности "listener" - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL,CONTROLLER:SASL_SSL # BROKER SETTINGS - настройки именно broker'a (process_role=broker) # указываем, что описанный ранее listener с названием INTERNAL будет использоватся для брокеров - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # указываем пользователя, которого будет использовать broker - KAFKA_INTER_BROKER_USER=sa # указываем пароль от указанного пользователя для broker'a - KAFKA_INTER_BROKER_PASSWORD=000000 # указываем механизм SASL (механизм для передачи паролей, может быть PLAIN,SCRAM-SHA-256,SCRAM-SHA-512) - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN # CONTROLLER SETTINGS - настройки для controller'a (process_role=controller) # указываем, что описанный ранее listener с названием CONTROLLER будет использоватся для контроллеров - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # указываем механизм SASL (механизм для передачи паролей, может быть PLAIN,SCRAM-SHA-256,SCRAM-SHA-512) - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN # указываем пользователя, которого будет использовать controller - KAFKA_CONTROLLER_USER=sa # указываем пароль от указанного пользователя для controller'a - KAFKA_CONTROLLER_PASSWORD=000000 # CLIENT SETTINGS - настройки клиентов, которые будут подключаться из вне # указываем, что описанный ранее listener с названием EXTERNAL будет использоватся для клиентов - KAFKA_CLIENT_LISTENER_NAME=EXTERNAL # перечисляем через запятую создаваемых пользователей при запуске кластера - KAFKA_CLIENT_USERS=sa,da,va # перечисляем через запятую пароли пользователей, которых указали выше - KAFKA_CLIENT_PASSWORDS=000000,111111,222222 # ACL - список управления доступом # для работы ACL нужно обязательно указать authorizer_class_name - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer # запрещаем создание топиков любыми пользователями, если ACL отсутствуют - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false # SSL - настройки сертификатов # выключаем проверку hostname - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= # пароль от сертификата (зашит по умолчанию в generate.sh) - KAFKA_CERTIFICATE_PASSWORD=supersecret # указываем тип сертификата (jks или pem) - KAFKA_TLS_TYPE=JKS # SASL - настройки аутентификации # указываем механизм SASL - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN # COMMON SETTINGS - общие настройки # запрещаем автоматическое создание топиков - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false # дополнительные аргументы, указываемые при запуске. мы используем их для вывешивания метрик jmx-exporter'a - EXTRA_ARGS=-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.19.0.jar=9404:/opt/jmx-exporter/kafka-3_0_0.yml volumes: # volume с данными kafka - kafka_0_data:/bitnami/kafka # пробрасываем в контейнер скачанный заранее jmx-exporter - ./jmx-exporter:/opt/jmx-exporter # пробрасываем в контейнер сгенерированные ранее сертификаты - ./keystore/kafka-0.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro - ./truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro healthcheck: # проверка состояния контейнера. проверка происходит по готовности порта 9091 test: "bash -c 'printf \"\" > /dev/tcp/127.0.0.1/9091; exit $$?;'" interval: 5s timeout: 10s retries: 3 start_period: 30s
Отдельно стоит остановиться на следующих вещах:
мы используем Apache Kafka Raft (KRaft). т.е. мы не используем zookeeper совсем. для использования KRaft необходимо указать process_role=controller, а также заполнить все переменные, связанные с этой ролью. обратите внимание на разделы KAFKA CLUSTER и CONTROLLER SETTINGS в docker-compose.yml. (в документации сказано, что KRaft начиная с версии kafka 3.3 является production-ready.)
в документации к bitnami kafka указано, что при запуске кластера в "KRaft mode", KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL поддерживает только механизм PLAIN. поэтому в нашей конфигурации мы используем механизм SASL PLAIN.
так как мы используем механизм PLAIN, то нам необходимо создать пользователей при старте кластера. в отличие от SCRAM, в запущенный брокер добавить юзеров мы уже не сможем. для добавления пользователей используем соответствующие переменные в docker-compose.
для использования ACL, нужно обязательно указать KAFKA_CFG_AUTHORIZER_CLASS_NAME. нашел единственный верный класс: org.apache.kafka.metadata.authorizer.StandardAuthorizer, который необходимо использовать в данной конфигурации.
при описании "внешнего" listener'a (EXTERNAL) обязательно укажите свой внешний ip адрес, по которому будете подключаться к брокерам.
Рассмотрим переменные kafka-ui (это web-ui):
environment: # CLUSTER SETTINGS # название кластера в самом веб-интерфейсе. переменная влияет только на отображаемое название - KAFKA_CLUSTERS_0_NAME=dev-cluster # перечисляем брокеры, из которых состоит наш кластер. также указываем порт для коннекта - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-0:9091,kafka-1:9091,kafka-2:9091 # указываем по какому протоколу будет происходить подключение, к перечисленным брокерам - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_SSL # указываем механизм SASL - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN # указываем использование SASL - KAFKA_CLUSTERS_0_PROPERTIES_PROTOCOL=SASL # описываем конфигурацию SASL, а также логин\пароль юзера для коннекта к брокерам - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="sa" password="000000"; # указываем, что при изменении файла конфигурации kafka-ui, он будет автоматически сразу "перечитан" и применен - DYNAMIC_CONFIG_ENABLED=true # AUTH # указываем что необходимо использовать форму для логина (зайти без логин\пасс уже не получится) - AUTH_TYPE=LOGIN_FORM # login от kafka-ui - SPRING_SECURITY_USER_NAME=admin # pass от kafka-ui - SPRING_SECURITY_USER_PASSWORD=admin # SSL - указываем truststore, который будет использоваться при подключении к брокерам, а также пароль от truststore - KAFKA_CLUSTERS_0_SSL_TRUSTSTORELOCATION=/kafka.truststore.jks - KAFKA_CLUSTERS_0_SSL_TRUSTSTOREPASSWORD=supersecret
Рассмотрим kafka-exporter. Это экспортер который вывешивает метрики kafka. Нас интересует только раздел command:
# перечисляем брокеры: --kafka.server=kafka-0:9091 --kafka.server=kafka-1:9091 --kafka.server=kafka-2:9091 # указываем, что используем SASL, указываем конкретный механизм, а также логин и пароль --sasl.enabled --sasl.mechanism=PLAIN --sasl.username=sa --sasl.password=000000 # указываем использование trustore при подключении к брокерам, а также отключаем проверку hostname --tls.enabled --tls.insecure-skip-tls-verify --tls.ca-file=/kafka.truststore.jks # уровень логирования --log.level=debug
В контейнере prometheus нас интересует только передача содержимого директории prometheus в /etc/prometheus. Она содержит prometheus.yml, который в свою очередь содержит scrape_config. В нем указано откуда нужно собирать метрики.
Рассмотрим контейнер с графаной:
environment: # AUTH # указываем логин и пароль от пользователя admin - GF_SECURITY_ADMIN_USER=admin - GF_SECURITY_ADMIN_PASSWORD=admin # также указываем, что можно работать с графаной без аутентификации - GF_AUTH_ANONYMOUS_ENABLED=true volumes: # здесь расположены файлы с настройкой графаны, а именно конкретного datasource (указываем на поднятный ранее prometheus), а также указываем автоматическую загрузку дашбордов из файлов - ./grafana/provisioning:/etc/grafana/provisioning # передаем в контейнер json'ы дашбордов - ./grafana/dashboards:/var/lib/grafana/dashboards
Для запуска контейнеров используем:
docker-compose up -d
Ждем пока все компоненты запустятся. В логах kafka-[0:2] мы можем посмотреть загруженные параметры в разделе "INFO KafkaConfig values:". Вы его не пропустите в логе.
Далее можем перейти на http://localhost:8080 и визуально оценить работает ли всё корректно. На что обратить внимание: на главной странице должно быть корректно отображено название кластера, которое мы передали в переменной в kafka-ui. Также должна быть корректна отражена версия кластера (в нашем случае 3.7-IV4). Далее, если ACL корректно включён, то в списке слева обязательно должна быть ссылка на "ACL"

Также можно посетить графану на http://localhost:3000. Выбираем "3 палочки" (как его в некоторых меню называют - гамбургер) в верхнем левом углу и переходим в Dashboards:

Выбираем нужный дашборд и смотрим. Данные появятся после начала работы с кафкой (создания топиков, записи в них и так далее). Пока что всё пусто.
Для проверки работы кластера можно использовать kafka в запущенных контейнерах, но мне удобнее было установить утилиты на свою машину. Воспользуемся руководством quickstart на оф сайте kafka https://kafka.apache.org/quickstart:
wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz tar -xzf kafka_2.13-3.7.0.tgz rm kafka_2.13-3.7.0.tgz cd kafka_2.13-3.7.0/bin
Для удобства поместите в данную директорию (kafka_2.13-3.7.0/bin) файлы admin.properties и client.properties.da из репозитория. Эти файлы содержат необходимые для подключения параметры. Пройдемся по содержимум admin.properties (обратите внимание на 7ую строчку, вам нужно путь к своему сгенерированному ранее truststore файлу):
# укажем механизм SASL sasl.mechanism=PLAIN # укажем настройки jaas.config, которые используются при подключении, а также логин и пароль юзера, который будет подключаться sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sa" password="000000"; # используем протокол SASL_SSL security.protocol=SASL_SSL # обязательно укажите truststore ssl.truststore.location=/home/user/kafka/kafka-compose/truststore/kafka.truststore.jks # пароль от kafka.truststore.jks ssl.truststore.password=supersecret # этот параметр отключает сравнение hostname, указанного в сертификате, с хостнеймом брокера. # если коннектитесь через ip адрес своей машины, # то без указания этой переменной могут сыпаться ошибки. обратите на это внимание ssl.endpoint.identification.algorithm=
Далее выполним некоторые команды для проверки работы кластера (замените айпи адрес на свой). Обратите внимание что используем для работы admin.properties (админский конфиг):
# создаем топик test-topic с тремя партициями и фактором репликации равным трем ./kafka-topics.sh --bootstrap-server 192.168.0.188:29092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config admin.properties # добавляем юзеру "da" возможность работать с топиком test-topic, выполнять любые операции ./kafka-acls.sh --bootstrap-server 192.168.0.188:29092 --add --allow-principal User:da --operation All --group '*' --topic test-topic --command-config admin.properties # проверяем ACL для топика test-topic ./kafka-acls.sh --bootstrap-server 192.168.0.188:29092 --list --topic test-topic --command-config admin.properties
Вывод будет такой:

Далее откроем 2 консоли, в одной запустим producer, а во второй consumer, и увидим что все работает верно. И producer и consumer будем запускать от пользователя da (т.е. с конфигом client.properties.da)
# стартуем producer ./kafka-console-producer.sh --bootstrap-server 192.168.0.188:29092 --topic test-topic --producer.config client.properties.da # после этого можете в командной строке ввести сообщения # стартуем consumer ./kafka-console-consumer.sh --bootstrap-server 192.168.0.188:29092 --topic test-topic --consumer.config client.properties.da --from-beginning # выведем все введенные ранее сообщения
Должно получиться так:

После этого можно будет проверить дашборды еще раз. Графики должны обновиться.
Для хранения информации kafka и prometheus использует docker volume. Поэтому если вы хотите остановить контейнеры без потери информации, используйте следующую команду в директории с docker-compose.yml:
docker-compose down
Если же вы захотите остановить все контейнеры и удалить volume с информацией, то используйте команду:
docker-compose down -v
При подготовке статьи использовались следующие материалы:
https://jaehyeon.me/blog/2023-07-20-kafka-development-with-docker-part-11/
это блог дата инженера. блог очень интересный. именно данный цикл статей содержит информацию по поднятию кластера кафки через docker-compose. все исходники выложены на github автора. именно отсюда взят generate.sh скрипт. минус сборки автра - очень старая версия кафки. произошли некоторые изменения, как в kafka, так и именно в bitnami контейнерах. в данном блоге есть код producer и consumer на pythonhttps://medium.com/@penkov.vladimir/kafka-cluster-with-ui-and-metrics-easy-setup-d12d1b94eccf
описание сборки кафки с мониторингом, используя docker-compose @papiroskohttps://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md
официальный репо bitnami kafka image в README.md которого есть хорошая документация к образам
