Привет, Хабр!

Репликация в Apache Kafka - это механизм, который обеспечивает доступность и надежность в обработке потоков данных. Она представляет собой процесс дублирования данных с одной части темы в другие, называемые репликами.

В этой статье рассмотрим основы репликации в кафке.

Архитектура репликации в Kafka

Каждый брокер - это сервер, который принимает, хранит и обрабатывает потоки данных. Брокеры работают в кластере и объединяются вместе

Брокеры хранят сообщения, называемые темами. Эти сообщения могут быть как временными данными, так и постоянными записями, в зависимости от конфигурации. Они позволяют клиентам публиковать и потреблять сообщения в темах. Это основной способ обмена данными в Kafka.

Каждая тема разделяется на партиции, которые могут распределяться между брокерами. Партиции позволяют достичь параллелизма и распределения нагрузки.

Когда клиент отправляет сообщение в Kafka, брокер принимает его и определяет, в какую тему и партицию оно должно быть размещено. Каждая тема может иметь несколько партиций, и брокеры обрабатывают записи в параллели. Клиенты, подписанные на темы, могут читать сообщения из разных партиций параллельно, что обеспечивает высокую производительность и масштабируемость.

Лидеры и реплики

Каждая партиция в Kafka имеет одну основную реплику, которая называется лидером. Лидер отвечает за прием и запись всех новых сообщений в партиции. Он служит как точка доступа для клиентов, отправляющих и получающих данные. Лидер гарантирует упорядоченность сообщений в партиции и контролирует запись данных.

Каждая партиция также имеет дополнительные реплики, которые называются фолловерами. Реплики являются копиями данных из лидера. Они хранят данные в синхронизированном состоянии с лидером. Если лидер становится недоступным, одна из реплик может быстро перейти в роль лидера.

Лидер регулярно отправляет данные фолловерам, чтобы они могли обновлять свои копии данных и оставаться в синхронизированном состоянии.

Клиенты, отправляющие запросы на запись данных, обращаются к лидеру. Лидер сохраняет упорядоченность и записывает данные, а затем реплицирует их на фолловеры.

Конфигурация и настройка репликации

Сначала создается тема, которую необходимо реплицировать. Указываются параметры репликации, указав количество реплик, которые хотим создать:

bin/kafka-topics.sh --create --topic my-replicated-topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

Здесь создается тема my-replicated-topic с 3 партициями и репликацией на 2 брокерах.

Далее, если брокеры Kafka настроены правильно для репликации. В конфигурационном файле server.properties каждого брокера указываются параметры broker.id, listeners, и log.dirs. Пример:

broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs

Далее можно запустить брокеры с этой конфигурацией:

bin/kafka-server-start.sh config/server.properties

Для того чтобы включить репликацию для созданной темы указываются параметры min.insync.replicas и acks при отправке сообщений, это можно сделать с помощью дефолт библио кафки в питоне:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092', acks='all', min_insync_replicas=2)

producer.send('my-replicated-topic', key=b'key', value=b'value')

Устанавливаем acks в all, что означает, что сообщение будет считаться подтвержденным только после записи на лидере и минимум двух репликах.

Можно настроить мониторинг репликации. Kafka предоставляет Admin API, с помощью которого можно получить информацию о брокерах, темах и репликаций:

from kafka import KafkaAdminClient

admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

topic_name = 'my-replicated-topic'
description = admin_client.describe_topics([topic_name])
for topic, info in description.items():
    print(f"Topic: {topic}, Replication: {info['num_partitions']} partitions, {info['replication_factor']} replicas")

Управление репликацией

Добавление реплик в существующую тему может быть необходимо для увеличения распределения нагрузки. Для этого можно использовать команду kafka-reassign-partitions.sh, например:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

increase-replication-factor.json содержит конфигурацию для добавления реплик, например:

{
  "version": 1,
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 0,
      "replicas": [0, 1, 2, 3]
    }
  ]
}

Удаление реплик также может потребоваться в ситуациях, когда нужно уменьшить нагрузку или изменить конфигурацию темы. Для этого также используется команда kafka-reassign-partitions.sh, например:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file decrease-replication-factor.json --execute

decrease-replication-factor.json содержит конфигурацию для удаления реплики.

Kafka также предоставляет возможность автоматического управления репликацией с помощью балансировщика реплик:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file automatic-replica-balance.json --execute

Где "automatic-replica-balance.json" содержит конфигурацию для автоматического управления репликацией.

Добавление реплик в простом виде можно реализовать так:

from confluent_kafka.admin import AdminClient, NewPartitions

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# увеличить репликацию для партиции "my-topic-0" на нвом брокере
new_partitions = [NewPartitions(topic='my-topic', partition=0, replication_factor=4)]
admin_client.create_partitions(new_partitions)

Удаление реплик:

from confluent_kafka.admin import AdminClient, NewPartitions

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# уменьшить репликацию для партиции "my-topic-1" удалив одну реплику
new_partitions = [NewPartitions(topic='my-topic', partition=1, replication_factor=2)]
admin_client.create_partitions(new_partitions)

Обработка сбоев

Лидер - это реплика партиции, которая является ответственной за обработку всех записей для этой партиции. Если лидер сбоится, кафка автоматически выбирает нового лидера из существующих реплик. К примеру если лидером была реплика на брокере 1 и он вышел из строя, Kafka выберет новым лидером одну из других реплик.

Реплика - это копия партиции данных. Если одна из реплик сбоится, данные остаются доступными благодаря другим репликам.

Если у партиции есть три реплики, и одна из них столкнется с сбоем, данные останутся доступными благодаря оставшимся двум репликам.

Kafka гарантирует, что записи завершаются только после того, как они будут сохранены на лидере и у минимального числа реплик, мин. число в свою очередь конфигурируется параметром min.insync.replicas.

Если min.insync.replicas=2, кафка будет завершать записи только после сохранения их на лидере и как минимум одной реплике.


Репликация позволяет обеспечить надежность в системах обработки данных.

На этом всё. Если вы планируете более глубокое изучение Apache Kafka, рекомендую обратить внимание на одноименный онлайн-курс от моих коллег из OTUS. А с полным каталогом курсов вы можете ознакомиться по ссылке.