Привет! В этой статье мы развернем dev-cluster kafka последней версии (3.7 на момент написания статьи), без использования zookeeper. Также в нашей сборке будет web-ui и все для мониторинга. В представленной конфигурации настроены SASL, SSL, ACL.
В чем полезность статьи? В статье представлен готовый docker-compose.yml для использования, который будет вам полезен если вы не сильно искушены в вопросах администрирования kafka и docker-compose, но уже хотелось бы начать заниматься разработкой, используя кластер kafka. Беглый поиск в интернет не дал результата в виде готового docker-compose.yml для конкретно такой конфигурации, которая потребовалась мне, поэтому было решено выложить полученный результат.
Я не являюсь узкоспециализированным администратором kafka, поэтому если видите явные ошибки, то буду рад исправить.
Сначала будет быстрое пошаговое руководство как запустить эту сборку и начать использовать. А далее будет представлено полное объяснение переменных, используемых для настройки наших компонентов и объяснение некоторых моментов.
Быстрый старт
Начнем с загрузки необходимых файлов с https://github.com/yubazh/kafka-compose
git clone https://github.com/yubazh/kafka-compose
Для начала нам нужно заменить ip адрес 192.168.0.188 в файле kafka-hosts.txt на свой. После этого запускаем генерацию сертификатов скриптом generate.sh (скрипт не мой, в конце будут ссылки на материалы, откуда "позаимствованы" некоторые автоматизации)
./generate.sh
Данный скрипт генерирует все необходимые файлы и размещает их в директориях certificate-authority, keystore, pem, truststore. После этого необходимо заменить ip адрес 192.168.0.188 на свой в файле docker-compose.yml в трех местах:
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9091,EXTERNAL://192.168.0.188:29093
Для запуска контейнеров используем:
docker-compose up -d
Ждем пока все успешно поднимется и переходим в http://localhost:8080 для доступа в kafka-ui. Используем admin/admin для входа. Для доступа к grafana переходим на http://localhost:3000.
Для остановки контейнеров без удаления volume используем:
docker-compose down
Для остановки контейнеров с удалением всех volume используем:
docker-compose down -v
Для работы с кластером присутствуют файлы admin.properties и client.properties.da/client.properties.va, содержащие необходимые настройки.
Более подробное разъяснение
Начнем также с загрузки необходимых файлов с https://github.com/yubazh/kafka-compose
git clone https://github.com/yubazh/kafka-compose
Нам нужно заменить ip адрес 192.168.0.188 в файле kafka-hosts.txt на свой. После этого запускаем генерацию сертификатов скриптом generate.sh.
./generate.sh
Данный скрипт генерирует все необходимые файлы для работы ssl и размещает их в директориях certificate-authority, keystore, pem, truststore.
Далее рассмотрим docker-compose.yml посервисно.
kafka-0:
kafka-0:
# название контейнера по которому к нему можно обращаться
container_name: kafka-0
# hostname контейнера
hostname: kafka-0
# образ который мы будем использовать, а именно bitnami kafka v 3.7
image: docker.io/bitnami/kafka:3.7
ports:
# для подключению к каждому из брокеров будем использовать вывешенный наружу порт 29092/29093/29094
- "29092:29092"
# также вывешиваем порты 29096-29097 которые ведут к порту 9404. на этот порт вывешиваются метрики jmx-exporter'a
- "29095:9404"
restart: always
networks:
- kafka
environment:
# KAFKA CLUSTER - в данном разделе содержатся переменные для кластеризации кафки
# у каждой из нод должен быть свой уникальный ID внутри кластера
- KAFKA_CFG_NODE_ID=0
# указываем ID кластера. должен быть идентичен внутри кластера у всех нод
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# указываем, что каждый брокер будет исполнять роль broker'a и controller'a. контроллер заменяет отсутствующий у нас zookeeper
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# перечисляем список нод, которые исполняют роль controller'a и будут собраны вместе для кворума
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
# указываем какой пользователь будет обладать правами superuser'a
- KAFKA_CFG_SUPER_USERS=User:sa
# LISTENERS - в данном разделе содержатся переменные для настройка способов подключения к брокеру
# указываем название самой сущности "listener" и порты, на которых брокер будет принимать запросы
- KAFKA_CFG_LISTENERS=INTERNAL://:9091,EXTERNAL://:29092,CONTROLLER://:9093
# указываем адрес и порт, по которым клиенты смогут подключаться к кластеру. контроллер здесь не указывается
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9091,EXTERNAL://192.168.0.188:29092
# указываем протоколы, по которым будут работать описанные выше сущности "listener"
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL,CONTROLLER:SASL_SSL
# BROKER SETTINGS - настройки именно broker'a (process_role=broker)
# указываем, что описанный ранее listener с названием INTERNAL будет использоватся для брокеров
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
# указываем пользователя, которого будет использовать broker
- KAFKA_INTER_BROKER_USER=sa
# указываем пароль от указанного пользователя для broker'a
- KAFKA_INTER_BROKER_PASSWORD=000000
# указываем механизм SASL (механизм для передачи паролей, может быть PLAIN,SCRAM-SHA-256,SCRAM-SHA-512)
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
# CONTROLLER SETTINGS - настройки для controller'a (process_role=controller)
# указываем, что описанный ранее listener с названием CONTROLLER будет использоватся для контроллеров
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# указываем механизм SASL (механизм для передачи паролей, может быть PLAIN,SCRAM-SHA-256,SCRAM-SHA-512)
- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
# указываем пользователя, которого будет использовать controller
- KAFKA_CONTROLLER_USER=sa
# указываем пароль от указанного пользователя для controller'a
- KAFKA_CONTROLLER_PASSWORD=000000
# CLIENT SETTINGS - настройки клиентов, которые будут подключаться из вне
# указываем, что описанный ранее listener с названием EXTERNAL будет использоватся для клиентов
- KAFKA_CLIENT_LISTENER_NAME=EXTERNAL
# перечисляем через запятую создаваемых пользователей при запуске кластера
- KAFKA_CLIENT_USERS=sa,da,va
# перечисляем через запятую пароли пользователей, которых указали выше
- KAFKA_CLIENT_PASSWORDS=000000,111111,222222
# ACL - список управления доступом
# для работы ACL нужно обязательно указать authorizer_class_name
- KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# запрещаем создание топиков любыми пользователями, если ACL отсутствуют
- KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false
# SSL - настройки сертификатов
# выключаем проверку hostname
- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
# пароль от сертификата (зашит по умолчанию в generate.sh)
- KAFKA_CERTIFICATE_PASSWORD=supersecret
# указываем тип сертификата (jks или pem)
- KAFKA_TLS_TYPE=JKS
# SASL - настройки аутентификации
# указываем механизм SASL
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
# COMMON SETTINGS - общие настройки
# запрещаем автоматическое создание топиков
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# дополнительные аргументы, указываемые при запуске. мы используем их для вывешивания метрик jmx-exporter'a
- EXTRA_ARGS=-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.19.0.jar=9404:/opt/jmx-exporter/kafka-3_0_0.yml
volumes:
# volume с данными kafka
- kafka_0_data:/bitnami/kafka
# пробрасываем в контейнер скачанный заранее jmx-exporter
- ./jmx-exporter:/opt/jmx-exporter
# пробрасываем в контейнер сгенерированные ранее сертификаты
- ./keystore/kafka-0.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
healthcheck:
# проверка состояния контейнера. проверка происходит по готовности порта 9091
test: "bash -c 'printf \"\" > /dev/tcp/127.0.0.1/9091; exit $$?;'"
interval: 5s
timeout: 10s
retries: 3
start_period: 30s
Отдельно стоит остановиться на следующих вещах:
мы используем Apache Kafka Raft (KRaft). т.е. мы не используем zookeeper совсем. для использования KRaft необходимо указать process_role=controller, а также заполнить все переменные, связанные с этой ролью. обратите внимание на разделы KAFKA CLUSTER и CONTROLLER SETTINGS в docker-compose.yml. (в документации сказано, что KRaft начиная с версии kafka 3.3 является production-ready.)
в документации к bitnami kafka указано, что при запуске кластера в "KRaft mode", KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL поддерживает только механизм PLAIN. поэтому в нашей конфигурации мы используем механизм SASL PLAIN.
так как мы используем механизм PLAIN, то нам необходимо создать пользователей при старте кластера. в отличие от SCRAM, в запущенный брокер добавить юзеров мы уже не сможем. для добавления пользователей используем соответствующие переменные в docker-compose.
для использования ACL, нужно обязательно указать KAFKA_CFG_AUTHORIZER_CLASS_NAME. нашел единственный верный класс: org.apache.kafka.metadata.authorizer.StandardAuthorizer, который необходимо использовать в данной конфигурации.
при описании "внешнего" listener'a (EXTERNAL) обязательно укажите свой внешний ip адрес, по которому будете подключаться к брокерам.
Рассмотрим переменные kafka-ui (это web-ui):
environment:
# CLUSTER SETTINGS
# название кластера в самом веб-интерфейсе. переменная влияет только на отображаемое название
- KAFKA_CLUSTERS_0_NAME=dev-cluster
# перечисляем брокеры, из которых состоит наш кластер. также указываем порт для коннекта
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-0:9091,kafka-1:9091,kafka-2:9091
# указываем по какому протоколу будет происходить подключение, к перечисленным брокерам
- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_SSL
# указываем механизм SASL
- KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN
# указываем использование SASL
- KAFKA_CLUSTERS_0_PROPERTIES_PROTOCOL=SASL
# описываем конфигурацию SASL, а также логин\пароль юзера для коннекта к брокерам
- KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="sa" password="000000";
# указываем, что при изменении файла конфигурации kafka-ui, он будет автоматически сразу "перечитан" и применен
- DYNAMIC_CONFIG_ENABLED=true
# AUTH
# указываем что необходимо использовать форму для логина (зайти без логин\пасс уже не получится)
- AUTH_TYPE=LOGIN_FORM
# login от kafka-ui
- SPRING_SECURITY_USER_NAME=admin
# pass от kafka-ui
- SPRING_SECURITY_USER_PASSWORD=admin
# SSL - указываем truststore, который будет использоваться при подключении к брокерам, а также пароль от truststore
- KAFKA_CLUSTERS_0_SSL_TRUSTSTORELOCATION=/kafka.truststore.jks
- KAFKA_CLUSTERS_0_SSL_TRUSTSTOREPASSWORD=supersecret
Рассмотрим kafka-exporter. Это экспортер который вывешивает метрики kafka. Нас интересует только раздел command:
# перечисляем брокеры:
--kafka.server=kafka-0:9091 --kafka.server=kafka-1:9091 --kafka.server=kafka-2:9091
# указываем, что используем SASL, указываем конкретный механизм, а также логин и пароль
--sasl.enabled --sasl.mechanism=PLAIN --sasl.username=sa --sasl.password=000000
# указываем использование trustore при подключении к брокерам, а также отключаем проверку hostname
--tls.enabled --tls.insecure-skip-tls-verify --tls.ca-file=/kafka.truststore.jks
# уровень логирования
--log.level=debug
В контейнере prometheus нас интересует только передача содержимого директории prometheus в /etc/prometheus. Она содержит prometheus.yml, который в свою очередь содержит scrape_config. В нем указано откуда нужно собирать метрики.
Рассмотрим контейнер с графаной:
environment:
# AUTH
# указываем логин и пароль от пользователя admin
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
# также указываем, что можно работать с графаной без аутентификации
- GF_AUTH_ANONYMOUS_ENABLED=true
volumes:
# здесь расположены файлы с настройкой графаны, а именно конкретного datasource (указываем на поднятный ранее prometheus), а также указываем автоматическую загрузку дашбордов из файлов
- ./grafana/provisioning:/etc/grafana/provisioning
# передаем в контейнер json'ы дашбордов
- ./grafana/dashboards:/var/lib/grafana/dashboards
Для запуска контейнеров используем:
docker-compose up -d
Ждем пока все компоненты запустятся. В логах kafka-[0:2] мы можем посмотреть загруженные параметры в разделе "INFO KafkaConfig values:". Вы его не пропустите в логе.
Далее можем перейти на http://localhost:8080 и визуально оценить работает ли всё корректно. На что обратить внимание: на главной странице должно быть корректно отображено название кластера, которое мы передали в переменной в kafka-ui. Также должна быть корректна отражена версия кластера (в нашем случае 3.7-IV4). Далее, если ACL корректно включён, то в списке слева обязательно должна быть ссылка на "ACL"
Также можно посетить графану на http://localhost:3000. Выбираем "3 палочки" (как его в некоторых меню называют - гамбургер) в верхнем левом углу и переходим в Dashboards:
Выбираем нужный дашборд и смотрим. Данные появятся после начала работы с кафкой (создания топиков, записи в них и так далее). Пока что всё пусто.
Для проверки работы кластера можно использовать kafka в запущенных контейнерах, но мне удобнее было установить утилиты на свою машину. Воспользуемся руководством quickstart на оф сайте kafka https://kafka.apache.org/quickstart:
wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
rm kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0/bin
Для удобства поместите в данную директорию (kafka_2.13-3.7.0/bin) файлы admin.properties и client.properties.da из репозитория. Эти файлы содержат необходимые для подключения параметры. Пройдемся по содержимум admin.properties (обратите внимание на 7ую строчку, вам нужно путь к своему сгенерированному ранее truststore файлу):
# укажем механизм SASL
sasl.mechanism=PLAIN
# укажем настройки jaas.config, которые используются при подключении, а также логин и пароль юзера, который будет подключаться
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sa" password="000000";
# используем протокол SASL_SSL
security.protocol=SASL_SSL
# обязательно укажите truststore
ssl.truststore.location=/home/user/kafka/kafka-compose/truststore/kafka.truststore.jks
# пароль от kafka.truststore.jks
ssl.truststore.password=supersecret
# этот параметр отключает сравнение hostname, указанного в сертификате, с хостнеймом брокера.
# если коннектитесь через ip адрес своей машины,
# то без указания этой переменной могут сыпаться ошибки. обратите на это внимание
ssl.endpoint.identification.algorithm=
Далее выполним некоторые команды для проверки работы кластера (замените айпи адрес на свой). Обратите внимание что используем для работы admin.properties (админский конфиг):
# создаем топик test-topic с тремя партициями и фактором репликации равным трем
./kafka-topics.sh --bootstrap-server 192.168.0.188:29092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config admin.properties
# добавляем юзеру "da" возможность работать с топиком test-topic, выполнять любые операции
./kafka-acls.sh --bootstrap-server 192.168.0.188:29092 --add --allow-principal User:da --operation All --group '*' --topic test-topic --command-config admin.properties
# проверяем ACL для топика test-topic
./kafka-acls.sh --bootstrap-server 192.168.0.188:29092 --list --topic test-topic --command-config admin.properties
Вывод будет такой:
Далее откроем 2 консоли, в одной запустим producer, а во второй consumer, и увидим что все работает верно. И producer и consumer будем запускать от пользователя da (т.е. с конфигом client.properties.da)
# стартуем producer
./kafka-console-producer.sh --bootstrap-server 192.168.0.188:29092 --topic test-topic --producer.config client.properties.da
# после этого можете в командной строке ввести сообщения
# стартуем consumer
./kafka-console-consumer.sh --bootstrap-server 192.168.0.188:29092 --topic test-topic --consumer.config client.properties.da --from-beginning
# выведем все введенные ранее сообщения
Должно получиться так:
После этого можно будет проверить дашборды еще раз. Графики должны обновиться.
Для хранения информации kafka и prometheus использует docker volume. Поэтому если вы хотите остановить контейнеры без потери информации, используйте следующую команду в директории с docker-compose.yml:
docker-compose down
Если же вы захотите остановить все контейнеры и удалить volume с информацией, то используйте команду:
docker-compose down -v
При подготовке статьи использовались следующие материалы:
https://jaehyeon.me/blog/2023-07-20-kafka-development-with-docker-part-11/
это блог дата инженера. блог очень интересный. именно данный цикл статей содержит информацию по поднятию кластера кафки через docker-compose. все исходники выложены на github автора. именно отсюда взят generate.sh скрипт. минус сборки автра - очень старая версия кафки. произошли некоторые изменения, как в kafka, так и именно в bitnami контейнерах. в данном блоге есть код producer и consumer на pythonhttps://medium.com/@penkov.vladimir/kafka-cluster-with-ui-and-metrics-easy-setup-d12d1b94eccf
описание сборки кафки с мониторингом, используя docker-compose @papiroskohttps://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md
официальный репо bitnami kafka image в README.md которого есть хорошая документация к образам