В этой статье я хочу показать как можно использовать Kafka в дата-инженерии и как её "пощупать".
Я не хотел бы повторяться по важным моментам, которые касаются архитектуры Kafka, поэтому рекомендую ознакомиться с данным видео.
В нём хорошо рассказано про основные концепции, которые будут дальше использоваться в статье, такие как:
Что такое
producer
.Что такое
consumer
.Что такое
topic
.Что такое
offset
.Что такое
commit
.Что такое
partition
.Что такое
replication
.
Весь код, который будет использоваться в статье будет доступен в моём репозитории.
Разворачивание сервиса
Начнём с того, что развернем Kafka локально в Docker. Для этого создадим docker-compose.yaml
со следующим кодом:
version: '3.8'
services:
zookeeper:
image: 'confluentinc/cp-zookeeper:7.7.0'
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '2181:2181'
kafka:
image: 'confluentinc/cp-kafka:7.7.0'
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:19092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- '9092:9092'
- '19092:19092'
kafka-ui:
image: 'provectuslabs/kafka-ui:v0.7.2'
container_name: kafka-ui
ports:
- '8080:8080'
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
depends_on:
- kafka
networks:
default:
name: kafka-network
Чтобы запустить все сервисы выполним команду:
docker-compose up -d
После этого у нас запустится Kafka, ZooKeeper и UI for Apache Kafka.
UI for Apache Kafka будет доступен по адресу http://localhost:8080/ через него можно будет: создавать topic
, удалять topic
, смотреть сообщения в topic
и прочее. Очень удобный инструмент для работы с Kafka.
Создание и удаление topic
В данном разделе мы с вами попробуем создавать и удалять topic
.
Создание и удаление topic через CLI
Чтобы создать topic
нужно выполнить команды ниже.
Зайти в контейнер с Kafka:
docker exec -it kafka /bin/bash
Создание topic
test
в Kafka:
kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Просмотр всех доступных topic
в Kafka:
kafka-topics --list --bootstrap-server kafka:9092
Удаление topic
test
в Kafka:
kafka-topics --delete --topic test --bootstrap-server kafka:9092
Создание и удаление topic через Python
Если вам удобнее взаимодействовать с Kafka через Python, то это не проблема.
Для работы с Kafka нам понадобится библиотека confluent-kafka. В примерах ниже я использую версию 2.5.0. Весь код и список всех зависимостей находится в моём репозитории.
Точно также эти операции можно произвести без подключения к контейнеру c Kafka, а через Python.
Чтобы создать topic
через Kafka:
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})
def example_create_topics(a: AdminClient = None, topics: list[str] = None) -> None:
"""
Функция для создания `topic` в Kafka
:param a: AdminClient с параметрами инициализации. Default `None`. :param topics: Список `topic` для создания. Default `None`. :return: Ничего не возвращает
"""
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
example_create_topics(
a=admin_client,
topics=['test'],
)
Важно: IDE может ругаться, что модуля NewTopic
не существует, но он есть. Это официальный пакет. Это касается версии 2.5.0.
Чтобы удалить topic
:
from confluent_kafka.admin import AdminClient
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})
def example_delete_topics(a: AdminClient = None, topics: list[str] = None) -> None:
"""
Функция для удаления `topic` в Kafka.
:param a: AdminClient с параметрами инициализации. Default `None`. :param topics: Список `topic` для удаления. Default `None`. :return: Ничего не возвращает.
"""
fs = a.delete_topics(topics, operation_timeout=30)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} deleted".format(topic))
except Exception as e:
print("Failed to delete topic {}: {}".format(topic, e))
example_delete_topics(
a=admin_client,
topics=['test'],
)
Больше примеров использования библиотеки confluent_kafka
в официальном GitHub проекта.
Kafka CLI
CLI является популярным вариантов для взаимодействия с Kafka. Изначально его нет на вашем устройстве, поэтому необходимо его скачать следующей командой:
wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz
Затем распаковать:
tar -xzf kafka_2.13-3.8.0.tgz
После выполнения данных команд мы можем использовать CLI для взаимодействия с Kafka.
Важно: Все исполняемые файлы находятся в папке bin
. Поэтому стоит обратить внимание, что все скрипты будут выполнять из неё.
Чтобы перейти в папку bin
нужно выполнить команду:
cd kafka_2.13-3.8.0/bin/
Запись в Kafka через CLI
Чтобы произвести запись в Kafka выполним команду:
echo 'Hello, Kafka!' | sh kafka-console-producer.sh --broker-list localhost:19092 --topic test
Или так:
echo 'Hello, Kafka!' | ./kafka-console-producer.sh --broker-list localhost:19092 --topic test
Важно: Мне привычнее вызывать скрипт командой sh
, но можно и через ./
.
Ещё можно создать producer
в интерактивном режиме командой:
sh kafka-console-producer.sh --broker-list localhost:19092 --topic test
После создания такого producer
у нас появляется возможность писать все сообщения, которые хотим.
После выполнения команды у нас появится [>
и после чего мы сможем вводить сообщения для Kafka.
Для выхода из интерактивного режима несколько раз нажмите CTRL + C
.
Чтение из Kafka через CLI
Важно: topic
в Kafka можно читать "с конца" и "с начала".
Чтобы начать читать с самого начала:
sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test --from-beginning
Чтобы начать читать с конца и получать только новые сообщения:
sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test
Kafka Python
Как было описано выше мы можем взаимодействовать с Kafka через Python. Поэтому сейчас рассмотрим также операции записи и чтения с использованием Python.
Запись в Kafka через Python
Я приведу пример той записи, которая может появиться в вашей Kafka – это информация о пользователе.
Запись будет содержать: uuid
, first_name
, last_name
, middle_name
.
Вы можете запустить код ниже и в topic
my_topic
начнут записываться значения.
import json
import time
from confluent_kafka import Producer
from faker import Faker
import uuid_utils as uuid
def generate_list_of_dict() -> dict[str, str]:
fake = Faker(locale='ru_RU')
return {
'uuid': str(uuid.uuid7()),
'first_name': fake.first_name(),
'last_name': fake.last_name(),
'middle_name': fake.middle_name(),
}
# Define the Kafka configuration
conf = {'bootstrap.servers': "localhost:19092"}
# Create a Producer instance with the above configuration
producer = Producer(conf)
while True:
# Define some data to send to Kafka
data = generate_list_of_dict()
# Convert the data to a JSON string
data_str = json.dumps(data)
# Produce a message to the "my_topic" topic
producer.produce(topic="my_topic", value=data_str)
# Flush the producer to ensure all messages are sent
producer.flush()
# Sleep for a second before producing the next set of messages
time.sleep(3)
Важно: Если topic
ранее не был создан, то он создастся при первой записи.
Чтение из Kafka через Python
Для того чтобы прочитать значения из Kafka нам необходимо создать consumer
. Функция ниже имеет возможность прочитать topic
с самого начала и с определённого offset
.
from confluent_kafka import Consumer, KafkaError, TopicPartition
def consume_messages(topic: str = None, offset: int = None) -> None:
conf = {
'bootstrap.servers': 'localhost:19092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
if offset is not None:
partitions = consumer.list_topics(topic).topics[topic].partitions
for partition in partitions:
consumer.assign([TopicPartition(topic, partition, offset)])
else:
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError:
print('Reached end of partition')
else:
print(f'Error: {msg.error()}')
else:
print(f'Received message: {msg.value().decode("utf-8")}')
except KeyboardInterrupt:
pass
finally:
consumer.close()
# Читать с начала
consume_messages('test')
# Читать с определенного offset
# consume_messages('test', offset=5)
Ранее мы читали topic
в Kafka без использования групп и поэтому атрибут --from-beginning
срабатывал каждый раз при вызове (каждый раз создавалась новая группа).
Но при создании consumer
через Python указание group.id
является обязательным и поэтому мы можем столкнуться со следующей проблемой: если мы один раз прочитали topic
, то при перезапуске кода мы начнем читать только новые сообщения и даже атрибут auto.offset.reset
не поможет.
А всё это происходит, потому что мы произвели commit
(фиксацию) offset
для группы.
Чтобы проверить на каком сейчас offset
находится группа необходимо выполнить команду в Kafka:
sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --describe
И мы увидим, что мы прочитали все сообщения. Поэтому offset
стоит на последнем сообщении в topic
.
CURRENT-OFFSET
говорит о том на какомoffset
находится группа.LOG-END-OFFSET
текущий последний доступныйoffset
дляtopic
Вообще, это не проблема, потому что данный offset
можно "сбросить", для этого необходимо выполнить команду:
sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --to-earliest --reset-offsets --execute --topic test
Также можно прочитать topic
заново изменив group.id
. Но это делать не рекомендуется.
Использование Kafka в дата-инженерии
В дата-инженерии Kafka частый гость, потому что Kafka позволяет быстро и за дёшево покрыть множество бизнес-задач, таких как:
CDC
При реализации CDC вы можете встретиться с Kafka, потому что она является "стандартом" при работе с такого вида событиями.
Если вы хотите понять что такое CDC и какую роль там занимает Kafka вы можете изучить мою статью: CDC на примитивах.
Event-driven
Так как Kafka позволяет нам получать изменения "моментально". В этом определении есть определённые нюансы, но это тема для другого разговора.
Если вернуться к мысли выше, то получая все события "моментально" мы можем на них реагировать.
Для примера: покупатель заходит на сайт нашего интернет-магазина и при заходе в какую-то категорию или раздел мы можем сделать ему какое-то предложение или перестроить для него страницу, в зависимости от его предпочтений или условий заложенных ранее.
Real-time Analytics
Также довольно часто Kafka используется для аналитики в реальном времени. Если к нам сообщения о событиях приходят постоянно и "моментально", то мы можем реагировать на них и следить за своими метриками.
Для примера: маркетинговые акции. Мы запускаем какую-то акцию и сразу смотрим на важные для нас показатели. В зависимости от получаемых значений мы можем изменять условия акции, условия размещения и прочее.
Резюме
Kafka популярный инструмент, поэтому найти литературу, видео и примеры использования – не проблема. В данной статье я показал только верхушку айсберга, который можно изучать и изучать.
Если говорить про взаимодействие c Kafka, то CLI и Python – это не единственные инструменты, к ним можно добавить: PySpark, ClickHouse, Java и прочее.
Кстати, про то как читать из Kafka при помощи ClickHouse было описано в моей статье: CDC на примитивах.
Для более глубокого изучения инструмента рекомендую ознакомиться с книгой: Apache Kafka. Потоковая обработка и анализ данных" (авторы - Нархид Н., Шапира Г., Палино Т., год издания - 2019). В ней описывается много тонкостей и подводных камней при работе с Kafka. Уже вышло второе издание, я его не читал, но судя по наполнению; учтены новые моменты, поэтому порекомендовал бы изучать второе издание.
Ну и самое главное – Теория без практики мертва, практика без теории слепа. Поэтому попробуйте Kafka, даже на pet-проектах или в рамках данной статьи.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.