В этой статье мы разработаем и реализуем событийно-ориентированное приложение с использованием Kafka в Python. Для примера мы возьмем заказ мебели в приложении типа IKEA. Это просто пример, а не то, что происходит на самом деле в IKEA.
Мы будем делать приложение на локальном компьютере, но для производственных сред вы можете использовать облачный провайдер, такой как AWS, GCP или Azure.
Давайте посмотрим, что у нас есть в приведенной выше архитектуре:
Frontend (внешний интерфейс). Это может быть мобильное или веб-приложение, где пользователь заказывает товар. Когда пользователь выбирает и заказывает мебель с помощью приложения, внешний интерфейс обращается к бэкэнду.
Orders Backend (бэкэнд заказов). Он принимает заказ из внешнего интерфейса со всеми данными, связанными с этим заказом, а затем записывает в Kafka тему под названием «order_details». Тема «order_details» будет содержать всю информацию, относящуюся к одному отдельному заказу. Это будет простой Python-файл. Вы можете задеплоить этот сервис вместе с остальными, как микросервисы в облако, используя, например, облачный запуск на GCP или Lambda на AWS.
Transactions Backend (бэкэнд транзакций). Подписывается на тему «order_details» в Kafka, поэтому всякий раз, когда кто-то пишет в тему, бэкэнд транзакций будет читать сообщение и обрабатывать его в режиме реального времени. Бэкэнд транзакций будет выполнять обработку кредитных карт и некоторые другие проверки, чтобы убедиться, что заказ подтвержден. Как только заказ будет подтвержден, он отправит ответ в другую тему Kafka под названием «order_confirmed». Эта тема нужна, чтобы собирать все данные, которые относятся к подтвержденному заказу.
Email Backend (бэкэнд электронной почты). Подписывается на тему «order_confirmed» и отправляет пользователю электронное письмо с подтверждением, когда заказ подтвержден. Он также может отправить сообщение в тему, например «order_email_sent».
Analytics Backend (бэкэнд аналитики). Он подписывается на тему «order_confirmed» и выполняет по ней какую-то аналитику. Например, он может агрегировать общее количество заказов в этот день и общее количество доходов, полученных от разных заказов. Затем мы можем отправить результат аналитики по теме «analytics_result».
Dashboard. У нас может быть служба для получения каких-либо данных из разных тем и отправки их на панель инструментов для визуализации. Здесь мы просто используем один сервис для них обоих в Python для простоты, но вы можете легко их разделить.
В этом посте нам понадобятся следующее:
kafka-python
flask
Чтобы запустить Kafka локально, можно использовать следующий компоновочный файл с кластером Kafka с одним брокером, а также одним zookeeper'ом и некоторыми другими компонентами Kafka, такими как центр управления UI (UI control-center), реестр схем (schema-registry) и т.д.
## docker-compose-kafka.yml
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29093:29093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
kafka-tools:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka-tools
container_name: kafka-tools
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
schema-registry:
image: confluentinc/cp-schema-registry:5.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
control-center:
image: confluentinc/cp-enterprise-control-center:5.4.0
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
Также можно использовать Kafka-UI и Conduktor. UI-центр управления просто показывает сообщения, которые отправляются, когда UI-страница темы открыта. Для Kafka-UI вы можете добавить следующие коды в свой файл компоновки вместо части центра управления:
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=broker:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081
Затем нам нужно запустить docker-compose -f docker-compose-kafka.yml up -d
, чтобы запустить Kafka со всеми компонентами. Обратите внимание, что Kafka должна быть запущена, когда мы хотим протестировать внешние и внутренние службы, которым необходимо отправлять или получать данные из тем Kafka.
Мы можем проверить, все ли работает, с помощью docker-compose -f docker-compose-kafka.yml ps
команды:
NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:29093->29093/tcp
kafka-tools "tail -f /dev/null" kafka-tools running
kafka-ui "/bin/sh -c 'java $J…" kafka-ui running 0.0.0.0:8080->8080/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
В дополнение к вышеупомянутому файлу docker-compose для Kafka у нас будет еще один для наших микросервисов. Вы можете добавлять сервисы один за другим в файл docker-compose и тестировать их.
Обратите внимание, что нужно для подключения ваших producers и consumers к брокеру Kafka:
Если вы используете ту же сеть, поместив микросервис в тот же файл docker-compose, что и Kafka. Или используя отдельный файл docker-compose и настроив сеть, как сеть Kafka, вы можете использовать
broker:29092
.Если вы запускаете свой сервис локально на том же компьютере без его докеризации, вы можете использовать в своем коде его
localhost:9092
.Если вы хотите запустить Kafka на одной машине, а свои службы на другой машине, вам нужно использовать в своем коде
<kafka machine ip>:29093
.
Начнем с backend-сервисов. Интерфейс мы добавим позже.
Бэкэнд заказов
Теперь давайте перейдем к backend'у заказов. Мы докеризуем приложение и помещаем его в отдельный файл docker-compose с именем docker-compose-services.yml
и настраиваем сеть так же, как сеть Kafka. orders_backend.py
— это flask-приложение и выглядит оно следующим образом:
# orders_backend.py
import json
import time
from kafka import KafkaProducer
from flask import Flask, jsonify, request
ORDER_KAFKA_TOPIC = 'order_details'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
app = Flask(__name__)
## from inside docker compose network - when add the service to compose file -> orders_backend:v1
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
# post endpoint to get user id , order id, user email, and order details
@app.route('/order', methods=['POST'])
def order():
user_id = request.json['user_id']
order_id = request.json['order_id']
user_email = request.json['user_email']
order_details = request.json['order_details']
order = {}
order['user_id'] = user_id
order['order_id'] = order_id
order['user_email'] = user_email
order['order_details'] = order_details
order['time'] = time.time()
producer.send(ORDER_KAFKA_TOPIC, order)
print("Sent order details {} to kafka topic: {}".format(order, ORDER_KAFKA_TOPIC))
return jsonify(order)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=5002, debug=True)
У него есть конечная точка публикации для получения заказа и публикации его в теме Kafka с именем «order_details».
Затем вы можете легко докеризовать этот сервис. Вот докерфайл:
FROM python:3.9.7-slim
RUN pip install -U pip
RUN pip install pipenv
WORKDIR /app
COPY [ "Pipfile", "Pipfile.lock", "./" ]
RUN pipenv install - system - deploy
COPY [ "orders_backend.py", "./" ]
EXPOSE 5002
ENTRYPOINT ["python", "orders_backend.py"]
Затем мы можем создать образ, используя:
docker build -t orders_backend:v1 .
Файл docker-compose будет выглядеть следующим образом:
# docker-compose-services.yml
version: "1"
services:
orders_backend:
restart: always
image: orders_backend:v1
ports:
- "5002:5002"
networks:
- ikea-ordering-kafka_default
networks:
ikea-ordering-kafka_default:
external: true
Мы можем использовать postman для проверки:
Мы также можем видеть сообщения по теме в пользовательском интерфейсе:
Бэкэнд транзакций
Это простой сервис для прослушивания темы «order_details», выполнения какой-либо обработки данных и отправки подтвержденного сообщения в тему «order_confirmed»:
# transactions_backend.py
import json
import time
from kafka import KafkaConsumer, KafkaProducer
OERDER_KAFKA_TOPIC = 'order_details'
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
consumer = KafkaConsumer(OERDER_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
while True:
for message in consumer:
print("Received order details: {}".format(message.value))
user_id = message.value['user_id']
order_id = message.value['order_id']
user_email = message.value['user_email']
order_details = message.value['order_details']
time = message.value['time']
## do some suff on the order and check the confirmation
order_confirmed = {}
order_confirmed['user_id'] = user_id
order_confirmed['order_id'] = order_id
order_confirmed['user_email'] = user_email
order_confirmed['order_details'] = order_details
order_confirmed['time'] = time
order_confirmed['status'] = 'confirmed'
producer.send(ORDER_CONFIRMED_KAFKA_TOPIC, order_confirmed)
print("Sent order details {} to kafka topic: {}".format(order_confirmed, ORDER_CONFIRMED_KAFKA_TOPIC))
Докер-файл выглядит следующим образом:
FROM python:3.9.7-slim
RUN pip install -U pip
RUN pip install pipenv
WORKDIR /app
COPY [ "Pipfile", "Pipfile.lock", "./" ]
RUN pipenv install - system - deploy
COPY [ "transactions_backend.py", "./" ]
ENTRYPOINT ["python", "transactions_backend.py"]
Вы можете создать этот образ и обновить docker-compose-services.yml
файл:
# docker-compose-services.yml
version: "1"
services:
orders_backend:
restart: always
image: orders_backend:v1
ports:
- "5002:5002"
networks:
- ikea-ordering-kafka_default
transactions_backend:
restart: always
image: transactions_backend:v1
ports:
- "5003:5003"
networks:
- ikea-ordering-kafka_default
networks:
ikea-ordering-kafka_default:
external: true
Затем, ещё раз протестировав сервисы с помощью postman, мы можем увидеть сообщения, приходящие в тему:
Бэкэнд электронной почты
Код этой службы выглядит следующим образом:
# email_backend.py
import json
import time
from kafka import KafkaConsumer, KafkaProducer
# from flask import Flask, jsonify, request
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
EMAIL_SENT_KAFKA_TOPIC = 'order_email_sent'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
def send_email(user_id, order_id, user_email, order_details, time, status):
print("Sending email to user: {} with order details: {}".format(user_email, order_details))
# send email to user
# ...
# ...
# ...
# ...
return True
while True:
for message in consumer:
# read data from consumer and call the send_email() function
print("Received order details: {}".format(message.value))
user_id = message.value['user_id']
order_id = message.value['order_id']
user_email = message.value['user_email']
order_details = message.value['order_details']
time = message.value['time']
status = message.value['status']
email_send_status = send_email(user_id, order_id, user_email, order_details, time, status)
email_sent = {}
email_sent['user_id'] = user_id
email_sent['order_id'] = order_id
email_sent['user_email'] = user_email
email_sent['order_details'] = order_details
email_sent['time'] = time
email_sent['status'] = email_send_status
producer.send(EMAIL_SENT_KAFKA_TOPIC, email_sent)
print("Sent email details {} to kafka topic: {}".format(email_sent, EMAIL_SENT_KAFKA_TOPIC))
Докер-файл также похож на предыдущие с небольшими изменениями имени python-файла. Затем вы можете создать образ, обновить файл docker-compose и запустить его. После отправки нескольких новых сообщений через postman мы можем увидеть сообщения в теме в пользовательском интерфейсе:
Бэкэнд аналитики
Код следующего сервиса получает подтвержденный заказ и рассчитывает общее количество заказов и общий доход:
# analytics_backend.py
import json
import time
from kafka import KafkaConsumer, KafkaProducer
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
ANALYTICS_KAFKA_TOPIC = 'analytics_result'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
total_revenue = 0
total_orders_count = 0
while True:
for message in consumer:
# read data from consumer and do some analytics on it
print("Received order details: {}".format(message.value))
order_details = message.value['order_details']
total_revenue += int(order_details['price'])
total_orders_count += 1
analytics = {}
analytics['total_revenue'] = total_revenue
analytics['total_orders_count'] = total_orders_count
producer.send(ANALYTICS_KAFKA_TOPIC, analytics)
print("Sent analytics details {} to kafka topic: {}".format(analytics, ANALYTICS_KAFKA_TOPIC))
Докер-файл снова точно такой же, как и предыдущий, но с небольшой модификацией. Создайте образ, обновите файл компоновки и, наконец, запустите его.
Мы можем отправить несколько новых сообщений через postman и увидеть сообщения в теме в пользовательском интерфейсе:
Вот и все на этот раз. Надеемся, вы получили общее представление о том, как использовать Kafka в своих проектах.
А чтобы еще больше узнать о том, как разработчикам можно использовать Kafka в работе, вы можете прийти на наш курс «Apache Kafka для разработчиков». Это углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres, который переведёт вас на новый уровень владения инструментом.