
Если вы настроили многоузловой кластер Kafka, то, вероятно, знаете, что в нем есть части конфигурации, общие для кластера, а есть уникальные для каждого узла.
В этой заметке я описываю свой способ проведения централизованного обновления конфигурации брокеров.
Поменяли на одном брокере — настройки применились везде.
Bourne again shell. Погнали!
Синхронизировать настройки будем при помощи rsync+ssh. Подключение к узлам будет под тем же служебным пользователем, под которым работает ПО Kafka.
Но перед синхронизацией необходимо разделить конфигурационный файл server.properties на две отдельные составляющие — server.properties.uniq и server.properties.common.
server.properties.uniq — настройки уникальные для каждого брокера.
server.properties.common — настройки общие для каждого брокера.
Полный конфигурационный файл брокера можно не читать, там много настроек, которые не являются предметом заметки, я его приведу в свёрнутом виде.
server.properties
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 broker.rack=DC1 advertised.listeners=SASL_SSL://yamaha1.bercut.com:9093 advertised.host.name=yamaha1.bercut.com listeners=SASL_SSL://yamaha1.bercut.com:9093 #Kerberos listener.name.sasl_ssl.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true \ keyTab="/opt/kafka-secret/kafka-server-yamaha1.keytab" principal="KAFKA/yamaha1.bercut.com"; ############################# End of Server Basics ###################### ############################# Zookeeper ################################# zookeeper.connection.timeout.ms=18000 zookeeper.connect=yamaha1.bercut.com:2182,yamaha2.bercut.com:2182,yamaha3.bercut.com:2182 ## Properties for SSL Zookeeper Security between Zookeeper and Broker zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.client.enable=true zookeeper.ssl.protocol=TLSv1.2 zookeeper.ssl.truststore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.truststore.jks zookeeper.ssl.truststore.password=ChangeThisZKeySecret zookeeper.ssl.keystore.location=/opt/kafka-config/ssl/kafka/zookeeper-client.keystore.jks zookeeper.ssl.keystore.password=ChangeThisZKeySecret zookeeper.set.acl=true ############################ SSL and SASL settings ############################# ## put this line if your certificate does not contain FQDN #ssl.endpoint.identification.algorithm= ssl.keystore.location=/opt/kafka-config/ssl/kafka/kafka.server.keystore.jks ssl.keystore.password=ChangeThisKKeySecret ssl.key.password=ChangeThisKKeySecret ssl.truststore.location=/opt/kafka-config/ssl/kafka/kafka.server.truststore.jks ssl.truststore.password=ChangeThisKKeySecret ssl.protocol=TLSv1.2 security.inter.broker.protocol=SASL_SSL ##ssl.client.auth=required ssl.client.auth=none ##Properties for SASL beetween the brokers and clients #sasl.enabled.mechanisms=SCRAM-SHA-512 sasl.enabled.mechanisms=SCRAM-SHA-512,GSSAPI sasl.kerberos.service.name=kafka sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="broker-admin" password="ChangeThisAdminSecret"; super.users=User:broker-admin ##Properties for Authorization authorizer.class.name=kafka.security.authorizer.AclAuthorizer # disable this for more security allow.everyone.if.no.acl.found=false ############################# Socket Server Settings ############################# # Maps listener names to security protocols, the default is for them to be the same. # See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=8 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=16 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/data/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=2 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=2 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 offsets.topic.min.isr=2 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age #7 days #log.retention.hours=168 #4 days log.retention.ms=345600000 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #1GB log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created #1GB log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies #5min log.retention.check.interval.ms=300000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=3000 ############################# General Topics Settings ############################# delete.topic.enable=false auto.create.topics.enable=false config.storage.replication.factor=3 replica.lag.time.max.ms=2000 replica.fetch.wait.max.ms=200 min.insync.replicas=2 default.replication.factor=3 unclean.leader.election.enable = false num.replica.fetchers=2 ############################ consumer local read connections priority ################### replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
Ключевой момент в конфигурации в том, что он разделен на тематические секции, а секция "Server Basics" скриптом автоматически отделится в server.properties.uniq.
Вот как можно сделать сплит. Перед сплитом на всякий случай сохраняем предыдущие версии файлов. Пишем splitcfg.sh.
#!/bin/bash cfg="server.properties" mydir=$(dirname "$0") cd "${mydir}" umask 026 if [[ -f "${cfg}.uniq" ]]; then echo Saving old ${cfg}.uniq to ${cfg}.uniq.bak mv ${cfg}.uniq ${cfg}.uniq.bak fi if [[ -f "${cfg}.common" ]]; then echo Saving old ${cfg}.common to ${cfg}.common.bak mv ${cfg}.common ${cfg}.common.bak fi uniqtxt=false while read line; do if [[ "$line" == "############################# Server Basics #############################" ]]; then uniqtxt=true fi if [[ "$uniqtxt" == "true" ]]; then echo "$line" >> ${cfg}.uniq else echo "$line" >> ${cfg}.common fi if [[ "$line" == "############################# End of Server Basics ######################" ]]; then uniqtxt=false fi done < <(cat "${cfg}")
Отлично, splitcfg.sh готов, также нам понадобится еще один простой скрипт, который склеивает конфигурацию обратно (mergecfg.sh):
#!/bin/bash cfg="server.properties" mydir=$(dirname "$0") cd "${mydir}" umask 026 cat ${cfg}.uniq ${cfg}.common > $cfg
splitcfg.sh и mergecfg.sh раскладываем на все брокеры.
Теперь ��ужно сделать выборочную синхронизацию файлов с другими брокерами.
Комментарии по коду:
#!/bin/bash # Список всех брокеров. # Скрипт сам поймет, с кем нужно делать синхронизацию, а с кем нет (с собой не надо). CLUSTER_HOSTS=(10.1.1.1 10.1.1.2 10.1.1.3) # Пользователь, под которым будет идти синхронизация по rsync+ssh kafkauser="kafka" # В моем случае бинарные файлы kafka лежат по пути /opt/kafka_версия, # а симлинк /opt/kafka указывает на него (для простоты обновления версии). # Обределяем путь до папки kafka. kafkadir="$(readlink -f /opt/kafka)" # SSH ключ, под которым будет идти синхронизация по rsync+ssh ssh_key="/home/${kafkauser}/.ssh/id_ed25519" # Временная папка для синхронизации. tmp_dir="/tmp/kafka-sync" # Флажок интерективности. Скрипт синхронизации позволяет администратору сравнить # что поменялось перед перезаписью файлов. interactive="true" # Список папок для синхронизации. file_list=" /opt/jmx_prometheus_javaagent /opt/kafka /opt/kafka-config ${kafkadir}/bin ${kafkadir}/config ${kafkadir}/libs ${kafkadir}/LICENSE ${kafkadir}/licenses ${kafkadir}/NOTICE ${kafkadir}/site-docs " # Список исключений. # Не нужно синхронизировать уникальные SSL сертификаты брокеров Kafka # и узлов Zookeeper. # Не нужно синхронизировать server.properties, server.properties.uniq и его бэкап. exclude_file_list=" ${kafkadir}/logs/* /opt/kafka-config/ssl /opt/kafka-config/ssl/zookeeper /opt/kafka-config/ssl/kafka /opt/kafka-config/server.properties.uniq /opt/kafka-config/server.properties /opt/kafka-config/server.properties.uniq.bak " # Перед каждым элементом списка исключений добавить '--exclude=', # это необходимо по синтаксису rsync. exclude_file_list=$(echo "${exclude_file_list}" | sed 's|^| --exclude=|g') # Разделяем конфигурацию на уникальную и общую (splitcfg.sh приведен выше). echo "Spliting server.properties" /opt/kafka-config/splitcfg.sh # Определяем собственный IP адрес, формируем список хостов для синхронизации # HOSTS_TO_SYNC, не добавляя в него самого себя. my_ip=$(/sbin/ip add | sed -n '/127.0.0.1/d; s/.*inet \([0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\).*/\1/p;') for host in ${CLUSTER_HOSTS[@]}; do [[ "$host" == "$my_ip" ]] && continue HOSTS_TO_SYNC+=("${host}") done # Для каждого remote_host в списке синхронизации HOSTS_TO_SYNC for remote_host in ${HOSTS_TO_SYNC[@]}; do # В режиме dry-run (без изменений) ищем файлы, # которые отличаются на удаленной машине от файлов на локальной машине. echo "Sync with ${remote_host}." echo -n "Scanning difference of files... " diff_files=$( rsync --recursive \ --relative \ --itemize-changes \ --dry-run \ --delete \ --links \ --checksum \ --group --owner --perms --no-times \ -e "ssh -i ${ssh_key}" \ ${exclude_file_list} \ ${file_list} \ ${kafkauser}@${remote_host}:/ ) echo -e "Done\n#######################################" # Если найдены файлы, которые отличаются, то вывести подсказку для администратора, # а затем список найденых отличий. if [ -n "$(echo ${diff_files})" ] ; then echo " Description for string [<>][fd]cstpoguax: < transferred to the remote host. c different checksum > received to the local host. s different size . the item is not being updated T modification time will be set to the transfer time f file p different permissions d directory o different owner L symlink g different group a ACL information changed x the extended attribute information changed" echo -e "#######################################\n\nList of file differences\n" # Список отличающихся файлов echo "${diff_files}" | sed '/^$/d' # Скачиваем файлы на удаленной машине во временную локальную папку # для проведения сравнения. echo -en "\n#######################################\nMaking cache of remote files to compare. " remote_file_list=$(echo "${diff_files}" | awk '{print $2}' | sed '/^ *$/d; /\/bin/d; s/^/'${kafkauser}@${remote_host}:'/') echo -e "Downloading files to ${tmp_dir}:\n${remote_file_list}\n" mkdir -p ${tmp_dir} rsync -e "ssh -i ${ssh_key}" \ --recursive \ --relative \ --links \ $remote_file_list \ ${tmp_dir} 2>/dev/null echo -e "\nDone\n#######################################" # Для каждого файла смотрим отличия и выводим их для администратора while read line; do echo -e "\n\nComparing file\n${line}\n\n" rfile=$(echo $line | awk '{print $2}') echo "diff ${remote_host}:${rfile} ${rfile}" diff ${tmp_dir}/${rfile} ${rfile} echo -e "\n\n#######################################\n\n" done < <(echo "${diff_files}") # Удаляем временную локальную папку echo "Removing Local ${tmp_dir}" rm -r ${tmp_dir} # Если установлен флаг интерактивности, подождать решения администратора, # пока он визуально просмотрит планируемые изменения и подтвердит их введя "y". # Для отказа - "n" или "Ctrl" + "C". if [ "${interactive}" == "true" ]; then while : ; do echo -e "\nDo you wish sync changes?" read LINE echo "User answer ${LINE}" [ "${LINE}" == "y" ] && break [ "${LINE}" == "n" ] && exit 1 done fi # Сохранение уникальных настроек удаленного брокера echo "Building remote server.properties.uniq ... " ssh -i ${ssh_key} ${kafkauser}@${remote_host} " /opt/kafka-config/splitcfg.sh " && echo "Done" # Синхронизация конфигурации с брокером echo "Sync with ${remote_host}... " rsync --recursive \ --relative \ --links \ --delete \ --checksum \ --group --owner --perms --no-times \ -e "ssh -i ${ssh_key}" \ ${exclude_file_list} \ ${file_list} \ ${kafkauser}@${remote_host}:/ \ && echo "Done" # Запуск сбора конфигурации server.properties на удаленной машине echo "Building remote server.properties ... " ssh -i ${ssh_key} ${kafkauser}@${remote_host} " /opt/kafka-config/mergecfg.sh " && echo "Done" else echo -e "\n(No changes found)\n" fi done
Скрипт выполняет замену конфигурации брокера и узла Zookeeper, однако не выполняет их перезапуск.
Перезапуск брокеров и Zookeeper нужно делать по очереди, контролируя синхронизацию данных в партициях, подключения клиентов к брокерам, состояние балансировок в консьюмер‑группах.
Можно добавить перезапуск в скрипт с задержками, однако я предпочитаю контролировать этот процесс вручную, заглядывая в логи, все‑таки прод. Это позволяет вовремя понять правильный момент для перезапуска, а также отловить потенциальные ошибки в конфигурации — если один брокер начал ругаться при рестарте, то это повод откатить его конфигурацию назад.
