
В мире разработки программного обеспечения постоянно появляются новые подходы к построению систем. Event-Driven архитектура выступает одним из наиболее перспективных решений для создания гибких и масштабируемых приложений.
В данной статье мы расскажем о реализации Event-Driven архитектуры (EDA) с использованием FastAPI и паттерна Publish/Subscribe. Рассмотрим основные концепции, компоненты и практические примеры кода для создания масштабируемой системы обмена сообщениями.
Что такое Event-Driven архитектура (EDA)?
Event-Driven архитектура (EDA) — это архитектурный стиль, в котором компоненты системы взаимодействуют путем генерации, обнаружения и обработки событий. Событие — это значимое изменение состояния, о котором оповещаются все заинтересованные компоненты. Этот подход позволяет создавать слабосвязанные системы, где компоненты могут развиваться независимо друг от друга.
Главный принцип Pub/Sub заключается в разделении отправителей сообщений (издателей) и получателей (подписчиков), что позволяет им взаимодействовать без прямой зависимости друг от друга. Основные сущности:
Издатель (Publisher) — источник сообщений, который:
Формирует сообщения определенного формата
Публикует их в одну или несколько тем/каналов
Не имеет информации о том, кто получит сообщения
Не ожидает ответа от получателей
Может быть любым компонентом системы, генерирующим события или данные
Подписчик (Subscriber) — получатель сообщений, который:
Выражает интерес к конкретным темам через механизм подписки
Получает сообщения только из тем, на которые подписан
Обрабатывает сообщения по мере их поступления
Может динамически подписываться и отписываться от тем
Может фильтровать получаемые сообщения по дополнительным критериям
Брокер сообщений (Message Broker) — ключевой посредник, который:
Обеспечивает инфраструктуру для передачи сообщений
Хранит информацию о подписках
Управляет темами/каналами сообщений
Выполняет маршрутизацию сообщений от издателей к соответствующим подписчикам
Может обеспечивать надежность доставки (гарантии доставки сообщений)
Часто включает механизмы буферизации сообщений при временной недоступности подписчиков
Тема (Topic) / Канал (Channel) — именованный канал для категоризации сообщений:
Представляет логическую категорию или группу сообщений
Служит фильтром при доставке: подписчики получают сообщения только из выбранных тем
Может иметь иерархическую структуру (например, "orders/new", "orders/processed")
Может поддерживать фильтрацию по шаблонам (например, "orders/*")
Сообщение (Message) — единица передачи информации:
Содержит полезную нагрузку (payload)
Может включать метаданные (временные метки, идентификаторы, заголовки)
Имеет определенную структуру и формат (JSON, XML, бинарный и т.д.)
Может содержать информацию о приоритете или времени жизни

Реализация Pub/Sub архитектуры на примере сервиса по обработки заказов
В ходе этой статьи мы разработаем простую систему заказов. При создании заказа пользователи, подписанные на рассылку, должны получить уведомление по email о том, что заказ создан. Соответственно, у нас будет два сервиса:
orders - отвечает за создание заказов и отправку отправку события (event)
notifications - отвечает за возможность подписаться на определенные события и обработку сообщений о создании заказа.
Основные компоненты:
FastAPI — основной web-сервер, на нем будут реализованы наши сервисы
Kafka брокер — распределённая платформа потоковой обработки событий
PostgreSQL - база данных для хранения заказов и подписок пользователей
Pydantic модели — описание структуры событий
Разработка Publisher (FastAPI) — отправка событий
В качестве фреймворка был выбран fast api. Он идеально подходит для разработки микросервисов, за счет своей простоты, скорости и асинхронности. В основе структуры проекта используется слоистая архитектура, которая предполагает разбиение приложение на слои и внедрение зависимостей, где каждый внешний слой ничего не знает о внутреннем. Основные слои это:
Router (Контроллеры / Веб-слой): обработка HTTP-запросов и ответов.
Service (Бизнес-логика): основная логика приложения (без привязки к API или БД).
Repository (Доступ к данным): работа с базой данных или другим хранилищем.

Нам необходимо создать следующую структуру проекта:

alembic
Миграции базы данных.
alembic.ini — конфигурационный файл Alembic для работы с миграциями.
api
Слой представления (presentation layer) — маршруты FastAPI.
v1/ — версия API (позволяет в будущем легко поддерживать разные версии).
orders.py — роуты для работы с заказами.
db
Работа с базой данных.
db.py — инициализация подключения к базе данных.
events
Работа с событиями в рамках паттерна Pub/Sub.
create_topics.py — создание топиков Kafka.
kafka_topics.py — описание топиков Kafka.
publisher.py — логика публикации событий (паблишер Kafka).
schemas.py — схемы для сообщений, которые отправляются через Kafka.
models
Модели базы данных (ORM модели).
orders.py — модель заказа.
repositories
Репозитории — слой доступа к данным.
abc_repositories.py — абстрактные классы репозиториев (интерфейсы).
orders_repository.py — конкретная реализация репозитория заказов.
schemas
Pydantic-схемы для валидации входных и выходных данных API.
order.py — схемы для работы с заказами.
services
Бизнес-логика приложения.
orders_service.py — сервисный слой для обработки заказов.
В этой статье мы не будем подробно останавливаться на этапах реализации всех слоев приложения, так как нас интересует взаимодействие с брокером. В качестве брокера используется Apache Kafka — мощная платформа для реализации Event-Driven Architecture, которая обеспечивает надежную передачу, хранение и обработку событий. Правильная настройка Kafka критически важна для построения масштабируемой и отказоустойчивой системы. В этом разделе мы рассмотрим ключевые аспекты конфигурации Kafka для эффективной работы с событиями в EDA.
Данный брокер поддерживает 3 политки гарантий доставки сообщений:
Тип гарантии | Описание | Преимущества | Недостатки |
At Most Once | Сообщение доставляется не более одного раза. Возможна потеря данных. | Низкая задержка, высокая пропускная способность | Риск потери сообщений |
At Least Once | Сообщение доставляется как минимум один раз. Возможны дубликаты. | Надёжность, отсутствие потерь | Необходима обработка дубликатов |
Exactly Once | Сообщение доставляется ровно один раз. Ни потерь, ни дубликатов. | Максимальная надёжность | Высокие накладные расходы |
Для различных задач могут быть использованы разные семантики доставки. Как правило, золотой серединой является At Least Once — он обеспечивает баланс между надёжностью и производительностью. Для критических систем, например платежей, рекомендуется использовать Exactly Once.
Политика Exactly Once достигается за счет:
Идемпотентности продюсера (enable.idempotence=true) – предотвращает дубли при отправке.
Транзакционной моделью между Producer и Consumer (isolation.level=read_committed).
Перед запуском приложения нам необходимо убедиться, что созданы топики — это категории или потоки сообщений, к которым публикуются данные. Каждое событие относится к определённому топику (подобно таблицам в базе данных или очередям в системах обмена сообщениями).
Кроме того, у каждого топика есть партиции — это единицы параллелизма в Kafka, которые позволяют распределять обработку данных между несколькими потребителями.
Если нужные топики отсутствуют, их нужно создать. В нашем случае мы будем использовать скрипт, который будет запускаться при старте приложения. Обратите внимание, что это демонстрационный пример: в промышленной эксплуатации, как правило, для этого применяются инструменты автоматизации инфраструктуры, такие как Terraform, Ansible и другие.
Код реализующий создание топиков:
# events/kafka_topics
from aiokafka.admin import NewTopic
KAFKA_TOPICS = [NewTopic("orders_created", num_partitions=3, replication_factor=1)]
num_partitions (число разделов) - Этот параметр указывает, сколько разделов (partitions) будет у топика. Каждый раздел является независимым логическим блоком, в который записываются данные. Количество разделов влияет на параллельность обработки сообщений (больше разделов — больше параллельных потоков для записи и чтения). Однако чем больше разделов, тем сложнее управление топиком, так как увеличивается нагрузка на брокеры.
replication_factor (фактор репликации) - Этот параметр указывает, сколько копий данных в топике будет храниться на различных брокерах Kafka. Обычно, чем выше репликация, тем выше доступность и отказоустойчивость. В данном демонстрационном примере указано 1. В продакшн системах рекомендуется указывать минимум 3 реплики.
# events/create_topics.py
class KafkaTopicManager:
def __init__(self, settings: Settings) -> None:
self.settings = settings
async def __aenter__(self) -> Self:
self.admin_client = AIOKafkaAdminClient(bootstrap_servers=self.settings.kafka.kafka_url)
await self.admin_client.start()
return self
async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Any
) -> None:
if self.admin_client:
await self.admin_client.close()
return None
async def create_topics(self, topics: list[NewTopic]) -> None:
if not self.admin_client:
raise RuntimeError("Admin client is not started")
existing_topics = await self.admin_client.list_topics()
topics_to_create = [topic for topic in topics if topic.name not in existing_topics]
if not topics_to_create:
logger.info("All topics already exist.")
return
await self.admin_client.create_topics(new_topics=topics_to_create)
logger.info(f"Created topics: {[topic.name for topic in topics_to_create]}")
return None
# main.py
from events.publisher import KafkaProducerBase
from events.create_topics import KafkaTopicManager
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from events.kafka_topics import KAFKA_TOPICS
from depends import container
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
async with KafkaTopicManager(settings) as manager:
await manager.create_topics(KAFKA_TOPICS)
kafka: KafkaProducerBase = container.resolve(KafkaProducerBase)
await kafka.start()
yield
Реализация producer
Далее реализуем producer, который будет отвечать за отправку сообщений при создании заявки.
# events/publisher.py
import logging
from aiokafka import AIOKafkaProducer
from events.schemas import KafkaEventBase, OrderCreateEvent
from settings import Settings
logger = logging.getLogger(__name__)
class KafkaProducerBase:
def __init__(self, settings: Settings):
self._settings = settings
self._producer = AIOKafkaProducer(bootstrap_servers=self._settings.kafka.kafka_url, acks=1)
async def start(self) -> None:
await self._producer.start()
logging.info("Kafka producer started")
return None
async def stop(self) -> None:
if self._producer:
await self._producer.stop()
logging.info("Kafka producer stopped")
return None
async def send(self, topic: str, message: KafkaEventBase) -> None:
if not self._producer:
raise RuntimeError("Kafka producer not started")
await self._producer.send_and_wait(topic, message.to_kafka_bytes())
return None
class OrderProducer:
def __init__(self, kafka: KafkaProducerBase) -> None:
self._kafka = kafka
async def publish_order(self, message: OrderCreateEvent) -> None:
await self._kafka.send("orders_created", message)
logging.info(f"Send event 'orders_created' {message.id}")
return None
В данном случае при инициализации продюсера используется параметр acks=1, что обеспечивает политику доставки At Least Once. При такой настройке продюсер будет ожидать от брокера подтверждения о записи на диск одного брокера. Это гарантирует доставку как минимум один раз.
Если бы нам потребовалось ужесточить политику доставки, то необходимо:
1. Включить параметр: enable_idempotence=True и acks=’all’
2. При отправки сообщений использовать транзакции:
async with self._producer.transaction():
await self._producer.send_and_wait(topic, message.to_kafka_bytes())
Теперь мы можем добавить в сервисный слой зависимость и реализовать отправку эвента при создании заявки:
#services/order_service.py
from events.publisher import OrderProducer
from events.schemas import OrderCreateEvent
from repositories.orders_repository import OrdersRepository
from schemas.order import OrderCreateSchema, OrderResponse
class OrderService:
def __init__(self, repository: OrdersRepository, producer: OrderProducer):
self.repository = repository
self.producer = producer
async def create_order(self, data: OrderCreateSchema) -> OrderResponse:
result = await self.repository.create(data)
await self.producer.publish_order(
message=OrderCreateEvent(id=result.id, user_id=result.user_id, total_price=result.total_price)
)
return OrderResponse(**result.model_dump())
Разработка Subscriber (Consumer) — обработка событий
Теперь необходимо реализовать сервис, который будет отвечать за отправку уведомлений. Его основная возможность — дать пользователю подписаться на уведомления и обрабатывать эвенты при создании заявки. Структура проекта будет та же, что и у orders. Основное отличие заключается в том, что на стороне notifications будет консьюмер, который будет обрабатывать эвенты о создании заявки.
Для начала реализуем сервис, который будет отправлять уведомления:
import logging
from email.message import EmailMessage
import aiosmtplib
from enums.events import SubscriberEventType
from events.schemas import OrderCreateEvent
from repositories.subscribers_reposiroties import SubscribersRepository
from schemas.email import OrderEmailMessage
from settings import Settings
logger = logging.getLogger(__name__)
class EmailSendError(Exception):
pass
class EmailService:
def __init__(self, settings: Settings, repository: SubscribersRepository):
self.settings = settings
self.repository = repository
async def _send_email(self, message: OrderEmailMessage) -> None:
logger.info("Sending email")
email = EmailMessage()
email["From"] = self.settings.email.sender
email["Subject"] = message.subject
email.set_content(message.body)
try:
result = await aiosmtplib.send(
email,
recipients=message.recipients,
hostname=self.settings.email.host,
port=self.settings.email.port,
username=self.settings.email.username,
password=self.settings.email.password,
use_tls=self.settings.email.use_tls,
)
logger.info(f"Email was sent: {result}")
except Exception as e:
raise EmailSendError(f"Failed to send email - {e}")
async def notification_on_order_create(self, event: OrderCreateEvent) -> None:
emails = await self.repository.get_emails_for_notification(SubscriberEventType.order_create)
await self._send_email(
OrderEmailMessage(
recipients=emails,
subject=f"Была создана заявка - {event.id}",
body=f"Создана заявка на сумму - {event.total_price}",
)
)
Здесь мы получаем все электронные почты пользователей, которые оформили подписку на уведомления, и отправляем им email о том, что была создана заявка.
Теперь реализуем консьюмер, который будет обрабатывать эвенты:
#events/handler.py
import asyncio
import json
import logging
from asyncio import AbstractEventLoop, Task
from typing import Callable, Dict, Coroutine, Any
from aiokafka import AIOKafkaConsumer
from pydantic import ValidationError
from events.schemas import OrderCreateEvent
from service.email import EmailSendError, EmailService
from settings import Settings
logger = logging.getLogger(__name__)
class KafkaConsumerBase:
def __init__(self, settings: Settings, loop: AbstractEventLoop) -> None:
# Инициализация базового Kafka-потребителя
self._settings = settings
self._loop = loop
self._consumer = AIOKafkaConsumer(
bootstrap_servers=self._settings.kafka.kafka_url, # Адреса Kafka-брокеров
group_id="fastapi-consumer", # ID группы потребителей
loop=self._loop, # Event loop для асинхронной работы
auto_offset_reset="earliest", # Стратегия чтения: начинать с самого раннего доступного сообщения
)
self._handlers: Dict[str, Callable[[dict[Any, Any]], Coroutine[Any, Any, None]]] = {}
# Словарь для хранения обработчиков по каждому топику
self._task: Task[None] | None = None # Задача для фонового чтения сообщений
def register_handler(self, topic: str, handler: Callable[[dict[Any, Any]], Coroutine[Any, Any, None]]) -> None:
# Регистрируем обработчик для указанного топика
self._handlers[topic] = handler
async def start(self) -> None:
# Запускаем потребителя и подписываемся на все топики, для которых зарегистрированы обработчики
await self._consumer.start()
await self._consumer.subscribe(topics=list(self._handlers.keys()))
logging.info("Kafka consumer started")
# Запускаем фоновую задачу для обработки сообщений
self._task = asyncio.create_task(self._consume())
async def stop(self) -> None:
# Останавливаем потребителя и завершаем фоновую задачу
if self._task:
self._task.cancel()
if self._consumer:
await self._consumer.stop()
logging.info("Kafka consumer stopped")
async def _consume(self) -> None:
# Основной цикл обработки входящих сообщений Kafka
try:
async for msg in self._consumer:
topic = msg.topic # Определяем из какого топика пришло сообщение
value = json.loads(msg.value.decode("utf-8")) # Декодируем сообщение в JSON
handler = self._handlers.get(topic)
if handler:
await handler(value) # Вызываем обработчик, передавая ему сообщение
else:
logging.info(f"No handler for topic {topic}")
except asyncio.CancelledError:
pass
class OrderConsumerService(KafkaConsumerBase):
# Конкретная реализация потребителя для событий заказов
def __init__(self, settings: Settings, loop: AbstractEventLoop, email_service: EmailService):
super().__init__(settings, loop)
self.email_service = email_service
# Регистрируем обработчик событий для топика "orders_created"
self.register_handler("orders_created", self.handle_order_created)
async def handle_order_created(self, data: dict) -> None:
logger.info("Event received")
try:
event = OrderCreateEvent.model_validate(data)
except ValidationError as e:
# Если данные некорректные -- логируем ошибку и пропускаем сообщение
logger.info(f"Error data for event order created - {e}")
return
try:
# Отправляем email-уведомление о создании заказа
await self.email_service.notification_on_order_create(event)
logger.info(f"Send email 'order_created' - {event.id}")
Заключение
Паттерн Pub/Sub в связке с Kafka и FastAPI позволяет строить масштабируемые и слабосвязанные системы.
Ключевые плюсы:
Минимальная зависимость компонентов.
Простота масштабирования.
Гибкость в обработке событий.
С правильным управлением топиками, грамотной организацией продюсеров и консьюмеров можно обеспечить высокую надёжность доставки сообщений даже в сложных продакшн-системах.
Ссылки на репозитории:
https://github.com/aarbatskov/orders-example
https://github.com/aarbatskov/notifications-example