
Если вы настроили многоузловой кластер Kafka, то, вероятно, знаете, что в нем есть части конфигурации, общие для кластера, а есть уникальные для каждого узла.
В этой заметке я описываю свой способ проведения централизованного обновления конфигурации брокеров.
Поменяли на одном брокере — настройки применились везде.
Bourne again shell. Погнали!
Синхронизировать настройки будем при помощи rsync+ssh. Подключение к узлам будет под тем же служебным пользователем, под которым работает ПО Kafka.
Но перед синхронизацией необходимо разделить конфигурационный файл server.properties на две отдельные составляющие — server.properties.uniq и server.properties.common.
server.properties.uniq — настройки уникальные для каждого брокера.
server.properties.common — настройки общие для каждого брокера.
Полный конфигурационный файл брокера можно не читать, там много настроек, которые не являются предметом заметки, я его приведу в свёрнутом виде.
server.properties
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
broker.rack=DC1
advertised.listeners=SASL_SSL://yamaha1.bercut.com:9093
advertised.host.name=yamaha1.bercut.com
listeners=SASL_SSL://yamaha1.bercut.com:9093
#Kerberos
listener.name.sasl_ssl.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true \
keyTab="/opt/kafka-secret/kafka-server-yamaha1.keytab" principal="KAFKA/yamaha1.bercut.com";
############################# End of Server Basics ######################
############################# Zookeeper #################################
zookeeper.connection.timeout.ms=18000
zookeeper.connect=yamaha1.bercut.com:2182,yamaha2.bercut.com:2182,yamaha3.bercut.com:2182
## Properties for SSL Zookeeper Security between Zookeeper and Broker
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.client.enable=true
zookeeper.ssl.protocol=TLSv1.2
zookeeper.ssl.truststore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.truststore.jks
zookeeper.ssl.truststore.password=ChangeThisZKeySecret
zookeeper.ssl.keystore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.keystore.jks
zookeeper.ssl.keystore.password=ChangeThisZKeySecret
zookeeper.set.acl=true
############################ SSL and SASL settings #############################
## put this line if your certificate does not contain FQDN
#ssl.endpoint.identification.algorithm=
ssl.keystore.location=/opt/kafka-config/ssl/kafka/kafka.server.keystore.jks
ssl.keystore.password=ChangeThisKKeySecret
ssl.key.password=ChangeThisKKeySecret
ssl.truststore.location=/opt/kafka-config/ssl/kafka/kafka.server.truststore.jks
ssl.truststore.password=ChangeThisKKeySecret
ssl.protocol=TLSv1.2
security.inter.broker.protocol=SASL_SSL
##ssl.client.auth=required
ssl.client.auth=none
##Properties for SASL beetween the brokers and clients
#sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512,GSSAPI
sasl.kerberos.service.name=kafka
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="broker-admin" password="ChangeThisAdminSecret";
super.users=User:broker-admin
##Properties for Authorization
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# disable this for more security
allow.everyone.if.no.acl.found=false
############################# Socket Server Settings #############################
# Maps listener names to security protocols, the default is for them to be the same.
# See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=8
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=16
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=2
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.min.isr=2
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
#7 days
#log.retention.hours=168
#4 days
log.retention.ms=345600000
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#1GB
log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created
#1GB
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
#5min
log.retention.check.interval.ms=300000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=3000
############################# General Topics Settings #############################
delete.topic.enable=false
auto.create.topics.enable=false
config.storage.replication.factor=3
replica.lag.time.max.ms=2000
replica.fetch.wait.max.ms=200
min.insync.replicas=2
default.replication.factor=3
unclean.leader.election.enable = false
num.replica.fetchers=2
############################ consumer local read connections priority ###################
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
Ключевой момент в конфигурации в том, что он разделен на тематические секции, а секция "Server Basics" скриптом автоматически отделится в server.properties.uniq.
Вот как можно сделать сплит. Перед сплитом на всякий случай сохраняем предыдущие версии файлов. Пишем splitcfg.sh.
#!/bin/bash
cfg="server.properties"
mydir=$(dirname "$0")
cd "${mydir}"
umask 026
if [[ -f "${cfg}.uniq" ]]; then
echo Saving old ${cfg}.uniq to ${cfg}.uniq.bak
mv ${cfg}.uniq ${cfg}.uniq.bak
fi
if [[ -f "${cfg}.common" ]]; then
echo Saving old ${cfg}.common to ${cfg}.common.bak
mv ${cfg}.common ${cfg}.common.bak
fi
uniqtxt=false
while read line; do
if [[ "$line" == "############################# Server Basics #############################" ]]; then
uniqtxt=true
fi
if [[ "$uniqtxt" == "true" ]]; then
echo "$line" >> ${cfg}.uniq
else
echo "$line" >> ${cfg}.common
fi
if [[ "$line" == "############################# End of Server Basics ######################" ]]; then
uniqtxt=false
fi
done < <(cat "${cfg}")Отлично, splitcfg.sh готов, также нам понадобится еще один простой скрипт, который склеивает конфигурацию обратно (mergecfg.sh):
#!/bin/bash
cfg="server.properties"
mydir=$(dirname "$0")
cd "${mydir}"
umask 026
cat ${cfg}.uniq ${cfg}.common > $cfgsplitcfg.sh и mergecfg.sh раскладываем на все брокеры.
Теперь нужно сделать выборочную синхронизацию файлов с другими брокерами.
Комментарии по коду:
#!/bin/bash
# Список всех брокеров.
# Скрипт сам поймет, с кем нужно делать синхронизацию, а с кем нет (с собой не надо).
CLUSTER_HOSTS=(10.1.1.1 10.1.1.2 10.1.1.3)
# Пользователь, под которым будет идти синхронизация по rsync+ssh
kafkauser="kafka"
# В моем случае бинарные файлы kafka лежат по пути /opt/kafka_версия,
# а симлинк /opt/kafka указывает на него (для простоты обновления версии).
# Обределяем путь до папки kafka.
kafkadir="$(readlink -f /opt/kafka)"
# SSH ключ, под которым будет идти синхронизация по rsync+ssh
ssh_key="/home/${kafkauser}/.ssh/id_ed25519"
# Временная папка для синхронизации.
tmp_dir="/tmp/kafka-sync"
# Флажок интерективности. Скрипт синхронизации позволяет администратору сравнить
# что поменялось перед перезаписью файлов.
interactive="true"
# Список папок для синхронизации.
file_list="
/opt/jmx_prometheus_javaagent
/opt/kafka
/opt/kafka-config
${kafkadir}/bin
${kafkadir}/config
${kafkadir}/libs
${kafkadir}/LICENSE
${kafkadir}/licenses
${kafkadir}/NOTICE
${kafkadir}/site-docs
"
# Список исключений.
# Не нужно синхронизировать уникальные SSL сертификаты брокеров Kafka
# и узлов Zookeeper.
# Не нужно синхронизировать server.properties, server.properties.uniq и его бэкап.
exclude_file_list="
${kafkadir}/logs/*
/opt/kafka-config/ssl
/opt/kafka-config/ssl/zookeeper
/opt/kafka-config/ssl/kafka
/opt/kafka-config/server.properties.uniq
/opt/kafka-config/server.properties
/opt/kafka-config/server.properties.uniq.bak
"
# Перед каждым элементом списка исключений добавить '--exclude=',
# это необходимо по синтаксису rsync.
exclude_file_list=$(echo "${exclude_file_list}" | sed 's|^| --exclude=|g')
# Разделяем конфигурацию на уникальную и общую (splitcfg.sh приведен выше).
echo "Spliting server.properties"
/opt/kafka-config/splitcfg.sh
# Определяем собственный IP адрес, формируем список хостов для синхронизации
# HOSTS_TO_SYNC, ��е добавляя в него самого себя.
my_ip=$(/sbin/ip add | sed -n '/127.0.0.1/d; s/.*inet \([0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\).*/\1/p;')
for host in ${CLUSTER_HOSTS[@]}; do
[[ "$host" == "$my_ip" ]] && continue
HOSTS_TO_SYNC+=("${host}")
done
# Для каждого remote_host в списке синхронизации HOSTS_TO_SYNC
for remote_host in ${HOSTS_TO_SYNC[@]}; do
# В режиме dry-run (без изменений) ищем файлы,
# которые отличаются на удаленной машине от файлов на локальной машине.
echo "Sync with ${remote_host}."
echo -n "Scanning difference of files... "
diff_files=$(
rsync --recursive \
--relative \
--itemize-changes \
--dry-run \
--delete \
--links \
--checksum \
--group --owner --perms --no-times \
-e "ssh -i ${ssh_key}" \
${exclude_file_list} \
${file_list} \
${kafkauser}@${remote_host}:/
)
echo -e "Done\n#######################################"
# Если найдены файлы, которые отличаются, то вывести подсказку для администратора,
# а затем список найденых отличий.
if [ -n "$(echo ${diff_files})" ] ; then
echo "
Description for string [<>][fd]cstpoguax:
< transferred to the remote host. c different checksum
> received to the local host. s different size
. the item is not being updated T modification time will be set to the transfer time
f file p different permissions
d directory o different owner
L symlink g different group
a ACL information changed
x the extended attribute information changed"
echo -e "#######################################\n\nList of file differences\n"
# Список отличающихся файлов
echo "${diff_files}" | sed '/^$/d'
# Скачиваем файлы на удаленной машине во временную локальную папку
# для проведения сравнения.
echo -en "\n#######################################\nMaking cache of remote files to compare. "
remote_file_list=$(echo "${diff_files}" | awk '{print $2}' | sed '/^ *$/d; /\/bin/d; s/^/'${kafkauser}@${remote_host}:'/')
echo -e "Downloading files to ${tmp_dir}:\n${remote_file_list}\n"
mkdir -p ${tmp_dir}
rsync -e "ssh -i ${ssh_key}" \
--recursive \
--relative \
--links \
$remote_file_list \
${tmp_dir} 2>/dev/null
echo -e "\nDone\n#######################################"
# Для каждого файла смотрим отличия и выводим их для администратора
while read line; do
echo -e "\n\nComparing file\n${line}\n\n"
rfile=$(echo $line | awk '{print $2}')
echo "diff ${remote_host}:${rfile} ${rfile}"
diff ${tmp_dir}/${rfile} ${rfile}
echo -e "\n\n#######################################\n\n"
done < <(echo "${diff_files}")
# Удаляем временную локальную папку
echo "Removing Local ${tmp_dir}"
rm -r ${tmp_dir}
# Если установлен флаг интерактивности, подождать решения администратора,
# пока он визуально просмотрит планируемые изменения и подтвердит их введя "y".
# Для отказа - "n" или "Ctrl" + "C".
if [ "${interactive}" == "true" ]; then
while : ; do
echo -e "\nDo you wish sync changes?"
read LINE
echo "User answer ${LINE}"
[ "${LINE}" == "y" ] && break
[ "${LINE}" == "n" ] && exit 1
done
fi
# Сохранение уникальных настроек удаленного брокера
echo "Building remote server.properties.uniq ... "
ssh -i ${ssh_key} ${kafkauser}@${remote_host} "
/opt/kafka-config/splitcfg.sh
" && echo "Done"
# Синхронизация конфигурации с брокером
echo "Sync with ${remote_host}... "
rsync --recursive \
--relative \
--links \
--delete \
--checksum \
--group --owner --perms --no-times \
-e "ssh -i ${ssh_key}" \
${exclude_file_list} \
${file_list} \
${kafkauser}@${remote_host}:/ \
&& echo "Done"
# Запуск сбора конфигурации server.properties на удаленной машине
echo "Building remote server.properties ... "
ssh -i ${ssh_key} ${kafkauser}@${remote_host} "
/opt/kafka-config/mergecfg.sh
" && echo "Done"
else
echo -e "\n(No changes found)\n"
fi
doneСкрипт выполняет замену конфигурации брокера и узла Zookeeper, однако не выполняет их перезапуск.
Перезапуск брокеров и Zookeeper нужно делать по очереди, контролируя синхронизацию данных в партициях, подключения клиентов к брокерам, состояние балансировок в консьюмер‑группах.
Можно добавить перезапуск в скрипт с задержками, однако я предпочитаю контролировать этот процесс вручную, заглядывая в логи, все‑таки прод. Это позволяет вовремя понять правильный момент для перезапуска, а также отловить потенциальные ошибки в конфигурации — если один брокер начал ругаться при рестарте, то это повод откатить его конфигурацию назад.
