Основы репликации в Kafka
Привет, Хабр!
Репликация в Apache Kafka - это механизм, который обеспечивает доступность и надежность в обработке потоков данных. Она представляет собой процесс дублирования данных с одной части темы в другие, называемые репликами.
В этой статье рассмотрим основы репликации в кафке.
Архитектура репликации в Kafka
Каждый брокер - это сервер, который принимает, хранит и обрабатывает потоки данных. Брокеры работают в кластере и объединяются вместе
Брокеры хранят сообщения, называемые темами. Эти сообщения могут быть как временными данными, так и постоянными записями, в зависимости от конфигурации. Они позволяют клиентам публиковать и потреблять сообщения в темах. Это основной способ обмена данными в Kafka.
Каждая тема разделяется на партиции, которые могут распределяться между брокерами. Партиции позволяют достичь параллелизма и распределения нагрузки.
Когда клиент отправляет сообщение в Kafka, брокер принимает его и определяет, в какую тему и партицию оно должно быть размещено. Каждая тема может иметь несколько партиций, и брокеры обрабатывают записи в параллели. Клиенты, подписанные на темы, могут читать сообщения из разных партиций параллельно, что обеспечивает высокую производительность и масштабируемость.
Лидеры и реплики
Каждая партиция в Kafka имеет одну основную реплику, которая называется лидером. Лидер отвечает за прием и запись всех новых сообщений в партиции. Он служит как точка доступа для клиентов, отправляющих и получающих данные. Лидер гарантирует упорядоченность сообщений в партиции и контролирует запись данных.
Каждая партиция также имеет дополнительные реплики, которые называются фолловерами. Реплики являются копиями данных из лидера. Они хранят данные в синхронизированном состоянии с лидером. Если лидер становится недоступным, одна из реплик может быстро перейти в роль лидера.
Лидер регулярно отправляет данные фолловерам, чтобы они могли обновлять свои копии данных и оставаться в синхронизированном состоянии.
Клиенты, отправляющие запросы на запись данных, обращаются к лидеру. Лидер сохраняет упорядоченность и записывает данные, а затем реплицирует их на фолловеры.
Конфигурация и настройка репликации
Сначала создается тема, которую необходимо реплицировать. Указываются параметры репликации, указав количество реплик, которые хотим создать:
bin/kafka-topics.sh --create --topic my-replicated-topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
Здесь создается тема my-replicated-topic с 3 партициями и репликацией на 2 брокерах.
Далее, если брокеры Kafka настроены правильно для репликации. В конфигурационном файле server.properties каждого брокера указываются параметры broker.id, listeners, и log.dirs. Пример:
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs
Далее можно запустить брокеры с этой конфигурацией:
bin/kafka-server-start.sh config/server.properties
Для того чтобы включить репликацию для созданной темы указываются параметры min.insync.replicas и acks при отправке сообщений, это можно сделать с помощью дефолт библио кафки в питоне:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092', acks='all', min_insync_replicas=2)
producer.send('my-replicated-topic', key=b'key', value=b'value')
Устанавливаем acks в all, что означает, что сообщение будет считаться подтвержденным только после записи на лидере и минимум двух репликах.
Можно настроить мониторинг репликации. Kafka предоставляет Admin API, с помощью которого можно получить информацию о брокерах, темах и репликаций:
from kafka import KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')
topic_name = 'my-replicated-topic'
description = admin_client.describe_topics([topic_name])
for topic, info in description.items():
print(f"Topic: {topic}, Replication: {info['num_partitions']} partitions, {info['replication_factor']} replicas")
Управление репликацией
Добавление реплик в существующую тему может быть необходимо для увеличения распределения нагрузки. Для этого можно использовать команду kafka-reassign-partitions.sh
, например:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
increase-replication-factor.json содержит конфигурацию для добавления реплик, например:
{
"version": 1,
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [0, 1, 2, 3]
}
]
}
Удаление реплик также может потребоваться в ситуациях, когда нужно уменьшить нагрузку или изменить конфигурацию темы. Для этого также используется команда kafka-reassign-partitions.sh
, например:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file decrease-replication-factor.json --execute
decrease-replication-factor.json содержит конфигурацию для удаления реплики.
Kafka также предоставляет возможность автоматического управления репликацией с помощью балансировщика реплик:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file automatic-replica-balance.json --execute
Где "automatic-replica-balance.json" содержит конфигурацию для автоматического управления репликацией.
Добавление реплик в простом виде можно реализовать так:
from confluent_kafka.admin import AdminClient, NewPartitions
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
# увеличить репликацию для партиции "my-topic-0" на нвом брокере
new_partitions = [NewPartitions(topic='my-topic', partition=0, replication_factor=4)]
admin_client.create_partitions(new_partitions)
Удаление реплик:
from confluent_kafka.admin import AdminClient, NewPartitions
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
# уменьшить репликацию для партиции "my-topic-1" удалив одну реплику
new_partitions = [NewPartitions(topic='my-topic', partition=1, replication_factor=2)]
admin_client.create_partitions(new_partitions)
Обработка сбоев
Лидер - это реплика партиции, которая является ответственной за обработку всех записей для этой партиции. Если лидер сбоится, кафка автоматически выбирает нового лидера из существующих реплик. К примеру если лидером была реплика на брокере 1 и он вышел из строя, Kafka выберет новым лидером одну из других реплик.
Реплика - это копия партиции данных. Если одна из реплик сбоится, данные остаются доступными благодаря другим репликам.
Если у партиции есть три реплики, и одна из них столкнется с сбоем, данные останутся доступными благодаря оставшимся двум репликам.
Kafka гарантирует, что записи завершаются только после того, как они будут сохранены на лидере и у минимального числа реплик, мин. число в свою очередь конфигурируется параметром min.insync.replicas
.
Если min.insync.replicas=2
, кафка будет завершать записи только после сохранения их на лидере и как минимум одной реплике.
Репликация позволяет обеспечить надежность в системах обработки данных.
На этом всё. Если вы планируете более глубокое изучение Apache Kafka, рекомендую обратить внимание на одноименный онлайн-курс от моих коллег из OTUS. А с полным каталогом курсов вы можете ознакомиться по ссылке.