Всем привет. На этот раз хотел бы поделиться материалом, связанным непосредственно с devops работой. Недавно возникла потребность раскатить kafka кластер в kubernetes. В ходе развертывания возникло очень много сложностей, встречено множество подводных камней, и, естественно, в большинстве случаев рецепта в интернете найдено не было, поэтому приходилось искать решения самостоятельно методом проб и ошибок. Все, что здесь будет описано это сугубо личный опыт на одном из проектов. Сегодня я расскажу как с нуля раскатить dev контур bitnami/kafka кластера с помощью helm чартов, как обезопасить ваш кластер kafka и какие сложности могут вам встретиться. Погнали!
Немного о себе
Меня зовут Бальцер Артем, я fullstack Java/Kotlin разработчик. Вот идет уже 9й год разработки на Java и связанных непосредственно с нею технологиях. Был и опыт фриланса, создавал приложения под ключ, и опыт работы в банке, и на государственных проектах. Пишу все, начиная с высоконагруженных java/kotlin backend сервисов и мультиплееров, заканчивая frontend приложениями, shell-скриптами и библиотеками на C/C++.
Требования
Данный материал предполагает, что вы как минимум имеете небольшой опыт работы с кластером kubernetes и kafka, умеете работать с kube-конфигами, утилитой kubectl, понимаете что такое pod, service. Требуется понимать архитектуру kafka, цикл жизни сообщений внутри брокера, что такое топики, партиции, потребители и производители. Также неплохо бы знать на базовом уровне TLS протокол и принципы работы с ним для выстраивания защищенного соединения.
Начало работы
Для того, чтобы получить готовое решение было необходимо понять в какой среде будет работать kafka. Можно запустить standalone вариант kafka, можно запустить в докере, а можно в k8s. Главные преимущества нахождения кафки в кубере я выделяю следующие:
Масштабируемость. Благодаря возможностям кубера это сделать очень легко.
Гибкость настройки и деплоя. В качестве инструментов настройки опять же используются helm чарты. В них можно удобно поменять необходимые параметры окружения и быстро обновить стенд.
Итак, выбрали кубер. Теперь необходимо выбрать чарты. На просторах интернета мне встретились 2 самых используемых версии: bitnami/kafka и confluent. Второй вариант из этих двух является частью большой инфраструктуры confluentinc, что само по себе избыточно, поэтому был выбран bitnami.
Примечания:
По умолчанию префикс во всех командах - kafka-dev, это наименование релиза helm upgrade.
По умолчанию, рабочий неймспейс - kafka
Чарт-обертка
Ссылка на мой чарт с инструкциями и утилитами.
Мною выше была создана чарт-обертка вокруг bitnami/kafka чарта версии 27.1.0. Чарт также содержит Kafdrop, необходимый для удобного просмотра кафка кластера. Для его запуска потребуется установка helm на свой локальный компьютер. Убедитесь, что вы подключены к своему удаленному куберу с помощью kubeconfig. При необходимости используйте minikube.
Внутри чарта вы также можете найти вложенную папку tls, внутри которой будет происходить работа с ключами, сертификатами и секретами кластера, но об этом позже.
Немного о helm: это очень полезный инструмент, пакетный менеджер для kubernetes. Он позволяет запустить в кластере даже самое архитектурно сложное приложение единственной командой в консоли. Helm работает на основе helm-charts, это своего рода инструкции что брать, откуда брать, как разворачивать и с какими переменными окружения.
Весь репозиторий это helm-chart, за исключением tls скриптов. Чарты состоят из дескриптора чарта Chart.yaml, субчартов charts/*, шаблонов чарта templates/* и файла со значениями, используемыми при работе чарта values.yaml. Субчарты могут быть размещены физически вместе с вашим основным чартом, либо быть загружены в формате .tgz как зависимость.
Первый запуск
Для начала, поднимем кафка кластер с PLAINTEXT слушателями и без какого либо шифрования/аутентификации. Выделим размер volume под брокеры и zookeeper по 1гб в values-v1.yaml:
kafka:
enabled: true
controller:
replicaCount: 0
automountServiceAccountToken: true
broker:
replicaCount: 2
persistence:
size: 1Gi
automountServiceAccountToken: true
kraft:
enabled: false
zookeeper:
enabled: true
persistence:
size: 1Gi
listeners:
client:
protocol: PLAINTEXT
controller:
protocol: PLAINTEXT
interbroker:
protocol: PLAINTEXT
Перед запуском апгрейда выполните:
helm dependency update
Затем:
helm upgrade kafka-dev . -n kafka -f ./values-v1.yaml --wait --install --render-subchart-notes
Команда имеет следующий формат:
helm upgrade <имя релиза> . -n <неймспейс> -f ./<уточняющий values.yaml файл> --wait --install --render-subchart-notes
После каждого запуска апгрейда вы всегда можете увидеть заметки такого формата:
Они очень информативны и полезны. Чтобы получить их используйте флаг --render-subchart-notes. Запомним эти эндпойнты, они пригодятся позже.
Исходя из заметок, доступ внутри кластера доступен по следующим эндпоинтам:
kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
...
kafka-dev-broker-n.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
Где n=replicaCount-1
Для теста брокеров можно выполнить поочередно команды создания и чтения списка топиков:
kubectl exec -it kafka-dev-broker-0 -n kafka -c kafka -- kafka-topics.sh --create --partitions 1 --topic mytopic --bootstrap-server kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local:9092,kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
kubectl exec -it kafka-dev-broker-0 -n kafka -c kafka -- kafka-topics.sh --list --bootstrap-server kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local:9092,kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
External access
Но все же будет лучше, если вы скачаете shell-консольные кафка утилиты. Сделать это можно любым удобным способом, я же просто скачал саму кафку с официального сайта https://kafka.apache.org/quickstart и распаковал в папку kafka. И это еще не все. Для доступа извне потребуется включение externalAccess. Сделать это можно несколькими путями, либо настройкой балансировщика, либо nodeport. Выберем первый вариант.
Предположим, что у вас уже настроен балансировщик, у вас есть IP адрес балансировщика (допустим, далее - 1.1.1.1) и спецификация values.yaml чарта ingress-nginx/ingress-nginx.
В нем необходимо определить внешние tcp порты:
tcp:
9094: "kafka/kafka-dev-broker-0-external:9094"
9095: "kafka/kafka-dev-broker-1-external:9094"
В данном случае настроены 2 реплики брокера, если у вас их будет больше то добавьте соответствующее кол-во строк. Префикс kafka в начале - имя неймспейса, а все что после слеша - наименование сервисов реплик брокера. Порт 9094 для внутреннего EXTERNAL слушателя.
Обновляем балансировщик командой:
helm upgrade <release_name> ingress-nginx/ingress-nginx -f ./<путь к values.yaml> -n kube-system
Далее, скорее всего, вам необходимо добавить обработчики портов в cloud консоли, будь то яндекс,google или AWS. В yandex-cloud это Network Load Balancer/Балансировщики. Добавляете обработчик, где целевой порт - сгенерированный порт формата 3****, просмотреть вы его можете в сервисе балансировщика в самом кубере после апгрейда.
Далее неободимо открыть тот самый externalAccess в values-v2.yaml:
kafka:
enabled: true
controller:
replicaCount: 0
automountServiceAccountToken: true
broker:
replicaCount: 2
persistence:
size: 1Gi
automountServiceAccountToken: true
kraft:
enabled: false
zookeeper:
enabled: true
persistence:
size: 1Gi
listeners:
client:
protocol: PLAINTEXT
controller:
protocol: PLAINTEXT
interbroker:
protocol: PLAINTEXT
external:
protocol: PLAINTEXT
externalAccess:
enabled: true
broker:
service:
type: ClusterIP
domain: 1.1.1.1
Обновляем командой:
helm upgrade kafka-dev . -n kafka -f ./values-v2.yaml --wait --install --render-subchart-notes
Чтобы убедиться, что все сделано правильно также есть пара команд из helm notes:
kubectl exec -it kafka-dev-broker-0 -n kafka -- cat /opt/bitnami/kafka/config/server.properties | grep advertised.listeners
kubectl exec -it kafka-dev-broker-1 -n kafka -- cat /opt/bitnami/kafka/config/server.properties | grep advertised.listeners
В итоге, в advertised.listeners должен лежать EXTERNAL://1.1.1.1:9094 и EXTERNAL://1.1.1.1:9095 , где 1.1.1.1 как было написано выше - адрес вашего балансировщика.
Внимание! Адрес и порт EXTERNAL слушателя должен полностью совпадать с внешним адресом и портом настроенным на балансировщике 1к1. В зависимости от кол-ва реплик, при старте кластера внешний порт в advertised.listeners брокеров меняется от 9094 и далее по возрастанию. Именно по этому внешние порты брокеров на балансировщике всегда начинаются с 9094 в том числе (см. код выше), изменить их не получится.
Для настройки nodeport обратитесь к документации bitnami https://github.com/bitnami/charts/tree/main/bitnami/kafka
Теперь тестировать локально гораздо легче:
kafka/bin/kafka-topics.sh --bootstrap-server 1.1.1.1:9094,1.1.1.1:9095 --list
Kafdrop
Далее будет полезно запустить Kafdrop. values-v3.yaml:
kafka:
enabled: true
controller:
replicaCount: 0
automountServiceAccountToken: true
broker:
replicaCount: 2
persistence:
size: 1Gi
automountServiceAccountToken: true
kraft:
enabled: false
zookeeper:
enabled: true
persistence:
size: 1Gi
listeners:
client:
protocol: PLAINTEXT
controller:
protocol: PLAINTEXT
interbroker:
protocol: PLAINTEXT
external:
protocol: PLAINTEXT
externalAccess:
enabled: true
broker:
service:
type: ClusterIP
domain: 1.1.1.1
kafdrop:
enabled: true
replicaCount: 1
kafka:
brokerConnect: kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local:9092, kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
service:
type: ClusterIP
targetPort: 8080
resources:
limits:
cpu: 100m
memory: 500Mi
requests:
cpu: 10m
memory: 400Mi
ingress:
enabled: true
annotations:
kubernetes.io/ingress.class: nginx
hosts:
- kafdrop-dev.k8s.yourdomain.com
Значения в kafdrio.kafka.brokerConnect взяты все из тех же notes. Доступ внутренний, поэтому происходит по 9092 порту. Ингресс установим kafdrop-dev.k8s.yourdomain.com либо любой другой. После запуска команды:
helm upgrade kafka-dev . -n kafka -f ./values-v3.yaml --wait --install --render-subchart-notes
По адресу вышепрописанного хоста ингресса вы увидете рабочую kafdrop страницу
TLS/SSL
Для настройки защищенного канала придется работать с утилитами openssl и keytool. Можете использовать keystore explorer, что тоже является хорошим инструментом.
TLS соединение предполагается быть двухсторонним с аутентификацией, т.е. mTLS (клиент предъявляет свой сертификат).
Потребуются следующие пары ключей:
Certificate authority - самоподписная CA пара ключей
Пара ключей для слушателей kafka
Пара ключей для zookeeper. Примечание: для корректного взаимодействия с zookeeper рекомендуется выпустить две пары ключей отдельно для клиентской и серверной аутентификации вместно одной совмещенной пары. Данный вариант используется исключительно для наглядности.
Пара ключей клиента кафки (mTLS аутентификация). Для клиентов можно сгенерировать n-е кол-во ключей.
Для генерации будут использоваться следующие утилиты из директории /tls :
generate-ca.sh - генерация своего CA сертификата и ключа. Не имеет входных данных, поля сертификата вы можете описать прямо в скрипте.
generate-ssl.sh - генерация пар ключей и хранилищ <keypairname>.keystore.jks и <keypairname>.truststore.jks. Имеет параметры $1 - путь к CA-паре ключей и $2 - наименование сгенерированной пары ключей и хранилищ.
generate-client-ssl.sh - генерация клиентских пар ключей и хранилищ <keypairname>.keystore.jks и <keypairname>.truststore.jks. Отличие от предыдущего в возможности указать свой .cnf файл на все N ключей для описания полей сертификата. Имеет параметры $1 - путь к CA-паре ключей и $2 - наименование сгенерированной пары ключей и хранилищ, $3 - путь к .cnf файлу.
generate-secrets.sh - генерация секретов в кластере kubernetes. Имеет параметры: $1 - наименование секрета в формате <secret_name>-jks, а также одновременно соответствует наименованию сгенерированных у вас локально хранилищ <secret_name>.keystore.jks и <secret_name>.truststore.jks, $2 - наименование неймспейса кластера, $3 - наименование файла <filename>.truststore.jks хранимого внутри секрета кубера (различие есть, например в размещении в kafka и в zookeeper - в данном случае будут использоваться хранилища с соответствующим именем).
generate-all-kafka.sh - генерация всех ключей, необходимых для старта приложений
sign-ssl.sh - подписание клиентского сертификата в формате .csr. Параметры: $1 - путь к CA, $2 - наименование сертификата на выходе, $3 - путь к .csr.
Для конфигурации openssl используются .cnf файлы, на примере openssl-kafka.cnf:
[req]
default_bits = 4096
encrypt_key = yes
default_md = sha256
prompt = no
utf8 = yes
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
C = RU
ST = Moscow
L = Moscow
O = ORG
CN = Service cert
[v3_req]
basicConstraints = CA:FALSE
subjectKeyIdentifier = hash
keyUsage = digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
subjectAltName = DNS:kafka-dev,DNS:kafka-dev.kafka.svc,DNS:kafka-dev.kafka.svc.cluster.local,DNS:kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local,DNS:kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local,IP:1.1.1.1
Обратите внимание на subjectAltName параметр. Здесь используются все доменные имена, выданные вам в helm notes при апгрейде релиза. Это касается как kafka так и zookeeper. В конце находится IP балансировщика. Для зукипера балансировщик не нужен, если вы не хотите подключаться к нему напрямую в том числе. Чем больше реплик приложения, тем больше будет доменных имен.
openssl-zookeeper.cnf:
[req]
default_bits = 4096
encrypt_key = yes
default_md = sha256
prompt = no
utf8 = yes
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
C = RU
ST = Moscow
L = Moscow
O = ORG
CN = Service cert
[v3_req]
basicConstraints = CA:FALSE
subjectKeyIdentifier = hash
keyUsage = digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
subjectAltName = DNS:kafka-dev-zookeeper,DNS:kafka-dev-zookeeper.kafka.svc,DNS:kafka-dev-zookeeper.kafka.svc.cluster.local,DNS:kafka-dev-zookeeper-0.kafka-dev-zookeeper-headless.kafka.svc.cluster.local,DNS:kafka-dev-zookeeper-1.kafka-dev-zookeeper-headless.kafka.svc.cluster.local
openssl-kafka-client.cnf:
[req]
default_bits = 4096
encrypt_key = yes
default_md = sha256
prompt = no
utf8 = yes
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
C = RU
ST = Moscow
L = Moscow
O = ORG
CN = Service client cert
[v3_req]
basicConstraints = CA:FALSE
subjectKeyIdentifier = hash
extendedKeyUsage = clientAuth
Запустим поочередно команды:
cd tls
sh generate-ca.sh
sh generate-all-kafka.sh
После запуска в директории tls вы обнаружите:
kafka.keystore.jks и kafka.truststore.jks
zookeeper.keystore.jks и zookeeper.truststore.jks
kafka-client.keystore.jks и kafka-client.truststore.jks
а также сами пары ключей с соответствующими именами
соответствующие kafka-jks, zookeeper-jks, kafka-client-jks секреты в кластере кубера
Пароль ко всем хранилищам по умолчанию - changeit
Теперь запустим апгрейд по values-v4.yaml:
kafka:
enabled: true
controller:
replicaCount: 0
automountServiceAccountToken: true
broker:
replicaCount: 2
persistence:
size: 1Gi
automountServiceAccountToken: true
kraft:
enabled: false
zookeeper:
enabled: true
persistence:
size: 1Gi
listeners:
client:
protocol: PLAINTEXT
controller:
protocol: PLAINTEXT
interbroker:
protocol: PLAINTEXT
external:
protocol: SSL
sslClientAuth: required
tls:
type: JKS
existingSecret: kafka-tls
keystorePassword: changeit
truststorePassword: changeit
externalAccess:
enabled: true
broker:
service:
type: ClusterIP
domain: 1.1.1.1
kafdrop:
enabled: true
replicaCount: 1
kafka:
brokerConnect: kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local:9092, kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
service:
type: ClusterIP
targetPort: 8080
resources:
limits:
cpu: 100m
memory: 500Mi
requests:
cpu: 10m
memory: 400Mi
ingress:
enabled: true
annotations:
kubernetes.io/ingress.class: nginx
hosts:
- kafdrop-dev.k8s.local.ru
helm upgrade kafka-dev . -n kafka -f ./values-v4.yaml --wait --install --render-subchart-notes
Проверим внешнее соединение по SSL. Для этого нужно создадим локально папку kafka_config, в ней разместим kafka-client.* хранилища и config.properties следующего содержания:
ssl.truststore.location=/.../kafka_config/kafka-client.truststore.jks
ssl.truststore.password=changeit
ssl.keystore.location=/.../kafka_config/kafka-client.keystore.jks
ssl.keystore.password=changeit
security.protocol=SSL
Затем попробуем прочитать список топиков:
kafka/bin/kafka-topics.sh --bootstrap-server 1.1.1.1:9094,1.1.1.1:9095 --command-config ./kafka_config/config.properties --list
Теперь попробуем поднять полностью защищенный кластер. values-v5.yaml:
kafka:
enabled: true
image:
debug: true
controller:
replicaCount: 0
automountServiceAccountToken: true
broker:
replicaCount: 2
persistence:
size: 1Gi
automountServiceAccountToken: true
kraft:
enabled: false
listeners:
client:
protocol: SSL
sslClientAuth: required
controller:
protocol: SSL
sslClientAuth: required
interbroker:
protocol: SSL
sslClientAuth: required
external:
protocol: SSL
sslClientAuth: required
tls:
type: JKS
existingSecret: kafka-jks
keystorePassword: changeit
truststorePassword: changeit
zookeeper:
enabled: true
existingSecret: zookeeper-jks
keystorePassword: changeit
truststorePassword: changeit
externalAccess:
enabled: true
broker:
service:
type: ClusterIP
domain: 1.1.1.1
zookeeper:
enabled: true
persistence:
size: 1Gi
tls:
client:
enabled: true
auth: need
existingSecret: zookeeper-jks
keystorePassword: changeit
truststorePassword: changeit
quorum:
enabled: true
auth: need
existingSecret: zookeeper-jks
keystorePassword: changeit
truststorePassword: changeit
networkPolicy:
enabled: false
kafdrop:
enabled: true
replicaCount: 1
kafka:
brokerConnect: kafka-dev-broker-0.kafka-dev-broker-headless.kafka.svc.cluster.local:9092, kafka-dev-broker-1.kafka-dev-broker-headless.kafka.svc.cluster.local:9092
ssl:
enable: true
properties: |
ssl.truststore.location=/tmp/stores/kafka.truststore.jks
ssl.keystore.location=/tmp/stores/kafka.keystore.jks
ssl.truststore.password: changeit
ssl.keystore.password: changeit
security.protocol: SSL
secretName: "kafka-client-jks"
service:
type: ClusterIP
targetPort: 8080
jvm:
opts: "-Xms228M -Xmx228M"
resources:
limits:
cpu: 100m
memory: 600Mi
requests:
cpu: 10m
memory: 500Mi
ingress:
enabled: true
annotations:
kubernetes.io/ingress.class: nginx
hosts:
- kafdrop-dev.k8s.local.ru
helm upgrade kafka-dev . -n kafka -f ./values-v5.yaml --wait --install --render-subchart-notes
Обратите внимание: zookeeper не работает с хранилищами формата pem, только с jks. Также обязательно наличие параметра zookeeper.networkPolicy.enabled=false , т.к. чарты зукипера по умолчанию выставляют файервол и брокеры не могут до него достучаться, отваливаясь по таймауту. Для надежности, повторите предыдущий шаг с вызовом kafka/bin/kafka-topics.sh.
Генерация внешних клиентских ключей и их подписание
Корректный путь - когда клиент сам генерирует пару ключей, приватный ключ оставляет у себя, а публичный отдает на подпись CA. Сделать это можно следующим способом.
openssl genrsa -out external-client.key 4096
openssl req \
-new -key external-client.key \
-out external-client.csr \
-config <путь к .cnf файлу>
Далее владелец ресурса подписывает external-client.csr:
sh sign-ssl.sh ./ca external-client ./external-client.csr
Подписанный external-client.crt сертификат затем может использоваться клиентом.
Для проверки подписи можете создать дополнительный ca, подписать им и сохранить их в новые jks хранилища. Используя их на клиенте вы получите
SunCertPathBuilderException: unable to find valid certification path to requested target
Авторизация и дополнительная аутентификация
Опционально, можно подключить и авторизацию. В кафка авторизация реализована в формате SASL - Simple Authentication and Security Layer, он включает в себя несколько различных способов аутентификации:
GSSAPI (Kerberos authentication)
OAUTHBEARER - отлично подойдет для интеграции с keycloak
Так как SSL достаточен для аутентификации - SASL не требуется, но может потребоваться при разделении на пользователей и read/write permissions на базе ACL. Для того, чтобы включить и SSL и SASL одновременно, необходимо объявить слушателям протокол SASL_SSL. Здесь лежит очень занятная статья, помогающая настроить SASL.
Итог
Этот материал поможет развернуть защищенный кластер kafka, но следует понимать, что шифрование и аутентификация может иногда неблагоприятно воздействовать на производительность кластера.
На этом все, спасибо за внимание к моей статье.