Всем привет! В данной статье будет описан один из возможных вариантов реализации паттерна outbox transactional.
План данной статьи такой: вначале немного теории, а потом на примере двух микросервисов будет показала реализация данного паттерна с помощью postgresql, debezium и kafka.
Данная статья будет полезна для разработчиков, которые не встречались с данным паттерном.
Вначале теория и самый главный вопрос: зачем нужен данный паттерн и какую он решает задачу?
Паттерн Transactional Outbox используется в распределенных транзакциях, то есть в транзакциях, которые проходят и затрагивают несколько микровервисов, для обеспечения гарантированной доставки сообщения от одного микросервиса до другого. Данный паттерн используется там, где очень критично, чтобы сообщение не потерялось, даже при сбоях в системе. Outbox паттерн получил распространение именно в микросервисной архитектуре, где нет возможности с помощью одной аннотации обеспечить транзакционность какого-то конкретного метода.
Теперь перейдем к практике.
Допустим у нас имеется два микросервиса: order-service, в котором клиент заказывает какой-то товар и bank-service, в котором происходит оплата за этот товар.
Весь код микросервисов будет доступен по ссылкам order-service и bank-service.
Задача состоит в том, чтобы при заказе товара обязательно произошло списание денег за него, если денег недостаточно или возникла ошибка на стороне банка - микросервис по заказам должен узнал об этом.
Для реализации этого паттерна я буду использовать postgresql, debezium и kafka.
Весь код описывать не буду, только самые важные моменты.
Общая картина реализации этой задачи будет выглядеть следующим образом. В первом микросервисе по заказам клиент будет заказывать товар. На основании этого заказа будет сгенерированы записи в несколько таблиц - в основную таблицу payments и во вспомогательную outbox_payments. На событие записи в последнюю таблицу будет настроен debezium, и как только там появится новая запись - он ее отправит в топик kafka. На этот топик будет подписан консьюмер из второго микросервиса по оплатам, он получит это сообщение и продолжит с ним работу. В случае, если debezium по каким то причинам будет недоступен, в то время когда придет новая запись в таблицу outbox_payments, он ее прочитает сразу как восстановит свою работу после сбоя. Также если кафка упадет и debezium не сможет отправить сообщение в топик, то он все равно продолжит попытки - пока kafka не поднимется. Таким образом, гарантируется, что сообщение будет доставлено в топик. Тут сразу надо оговориться, что возможна ситуация, когда сообщение будет доставлено несколько раз. Поэтому для гарантии обработки каждого сообщения на стороне сервиса оплаты только один раз, необходимо дополнительно реализовать проверку уникального значения операции, проверяя не обрабатывалось ли уже это сообщение. После того как сервис с оплатой сделает списание денег или нет, так как денег может быть недостаточно, или произошла какая-то другая ошибка, то сервис оплаты отправляет в топик в кафку сообщение об статусе данной операции. Данное сообщение читает консьюмер с первого микросервиса по заказам, ищет по уникальному идентификатору операцию и изменяет ее статус. Таким образом, операция заказа завершается.
Всем кто хочет поработать с этими микросервисами может их запустить, там имеются скрипты которые накатят все таблицы, а также тестовые данные. Предварительно, конечно, необходимо поднять в докере необходимые контейнеры для работы микросервисов и настроить коннектор debezium, чтобы он слушал необходимую таблицу.
Давайте по порядку остановимся на некоторых важных моментах в этой реализации.
Начнем с файла docker-compose.yml в микросервисе по заказам. Он выглядит следующим образом.
services: postgres-db: image: debezium/postgres:17 command: ["postgres", "-c", "wal_level=logical"] environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: admin POSTGRES_DB: orders_database healthcheck: test: [ "CMD-SHELL", "pg_isready", "-d", "orders_database" ] interval: 10s timeout: 3s retries: 3 ports: - "15432:5432" volumes: - ../infrastructure/db/create_db.sql:/docker-entrypoint-initdb.d/create_db.sql restart: unless-stopped kafka-connect: image: debezium/connect:2.4 ports: - "8083:8083" depends_on: kafka: condition: service_healthy environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets STATUS_STORAGE_TOPIC: my_connect_statuses KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_PLUGIN_PATH: /kafka/connect zookeeper: image: confluentinc/cp-zookeeper:6.2.4 healthcheck: test: [ "CMD", "nc", "-vz", "localhost", "2181" ] interval: 10s timeout: 3s retries: 3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:6.2.4 depends_on: zookeeper: condition: service_healthy ports: - 29092:29092 healthcheck: test: [ "CMD", "nc", "-vz", "localhost", "9092" ] interval: 10s timeout: 3s retries: 3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: OUTSIDE://:29092,INTERNAL://:9092 KAFKA_ADVERTISED_LISTENERS: OUTSIDE://localhost:29092,INTERNAL://kafka:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8080:8080" restart: always depends_on: kafka: condition: service_healthy environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Тут мы поднимаем несколько контейнеров:
1) postgres-db — контейнер с нашей базой данных. И тут уже важный момент - необходимо при его поднятии выполнить команду ["postgres", "-c", "wal_level=logical"]. Эта команда нужна, чтобы запустить PostgreSQL с включённым логическим журналированием WAL. Без этого Debezium просто не сможет работать, так как он не создаст logical replication slot и не сможет читать WAL как поток событий.
2) kafka-connect — контейнер с Debezium, который и будет звеном между нашей базой данных и кафкой.
3) zookeeper — необходим для работы кафки, хотя сейчас в более новых версиях кафка обходится без него.
4) kafka — контейнер с брокером сообщений.
5) kafka-ui — само название говорит за себя, он нужен чтобы посмотреть что происходит в кафке.
Файл docker-compose.yml в микросервисе по оплатам выглядит следующим образом:
services: service-db: image: postgres:18-alpine3.22 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: admin POSTGRES_DB: bank_database healthcheck: test: [ "CMD-SHELL", "pg_isready", "-d", "bank_database" ] interval: 10s timeout: 3s retries: 3 ports: - "25432:5432" volumes: - ../infrastructure/db/create_db.sql:/docker-entrypoint-initdb.d/create_db.sql restart: unless-stopped
Тут есть только база данных постгрес.
После того как мы поднимем с помощью команды docker-compose up -d, наши контейнеры в докере мы должны увидеть следующее.

Теперь проверим какой уровень журналирования WAL в базе данных в микросервисе по заказам. Для этого необходимо настроить какой-то ui для пр��смотра баз данных, я использую Dbeaver, и открыть нужную базу данных — это будет orders_database и в ней выполнить команду SHOW wal_level; Должны увидеть, что уровень журналирования действительно logical.

Далее запускаем два наших микросервиса. Они должны выполнить все свои скрипты и подняться без ошибок. В микросервисе по заказам есть скрипт, который выглядит следующим образом
DROP PUBLICATION IF EXISTS dbz_publication; CREATE PUBLICATION dbz_publication FOR TABLE public.outbox_payments WITH (publish = 'insert'); ALTER TABLE outbox_payments REPLICA IDENTITY FULL;
Первая команда удаляет существующую публикацию с именем dbz_publication, если она была создана ранее. Второй командой мы создаем саму публикацию — это механизм в постгрес, который определяет, какие данные будут копироваться в поток репликации. Также здесь мы ограничиваем область видимости только одной таблицей outbox_payments и debezium будет следить только за ней. WITH (publish = 'insert') означает, что debezium будет следить только за вставками новых записей, изменения (UPDATE) и удаления (DELETE) будут игнорироваться.
Чтобы проверить все ли прошло хорошо можно выполнить команду
SELECT * FROM pg_publication_tables WHERE tablename = 'outbox_payments';
в базе данных orders_database.
Мы должны получить следующее.

Следующим шагом будет настройка коннектора с кафкой. Для этого необходимо отправить запрос POST на http://localhost:8083/connectors
{ "name": "payment-outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres-db", "database.port": "5432", "database.user": "postgres", "database.password": "admin", "database.dbname": "orders_database", "topic.prefix": "payment_events", "table.include.list": "public.outbox_payments", "plugin.name": "pgoutput", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } }
Порт 8083 это порт где развернут контейнер с debezium. Мы должны получить ответ 201 Created.

Чтобы проверить все ли прошло хорошо - снова выполняем команду в нашей базе данных orders_database.
SELECT * FROM pg_replication_slots WHERE slot_name = 'debezium';
Получим следующий ответ.

Давайте сейчас немного поподробнее рассмотрим тот запрос, который мы отправили для создания коннектора с кафкой.
{ "name": "payment-outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres-db", "database.port": "5432", "database.user": "postgres", "database.password": "admin", "database.dbname": "orders_database", "topic.prefix": "payment_events", "table.include.list": "public.outbox_payments", "plugin.name": "pgoutput", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } }
Основные настройки подключения:
connector.class: Указывает, что мы используем именно PostgreSQL коннектор.
database.hostname, port, user, password, dbname: Реквизиты для входа в нашу базу orders_database. Коннектор подключается к ней как обычный клиент, но с правами на репликацию.
table.include.list: коннектор будет игнорировать все остальные таблицы в базе, кроме public.outbox_payments. Это критично для производительности.
topic.prefix: Это префикс для имени топика в кафка. Итоговое имя топика обычно строится по схеме: префикс.схема.таблица. В нашем случае это будет payment_events.public.outbox_payments. Чтобы убедиться в этом, что у нас есть такой топик зайдем на http://localhost:8080, так как на этом порту у нас развернут контейнер с kafka-ui. Перейдем в раздел Topics и там мы должны увидеть перечень наших топиков среди которых должен быть и payment_events.public.outbox_payments.

Сейчас когда все настроено можно тестировать как все сработает.
Для этого отправим запрос на заказ. Это будет заказ 10 пачек кофе.
POST на http://localhost:8086/api/v1/orders
{ "productCode": "668488bb-dbf8-4b6a-9149-836d5092ae84", "amount": 10, "currency": "RUB", "telephone": "+79991234567", "note": "order coffee" }

Мы получили ответ 200 и уникальный код bb5157b6-c5e3-4242-b316-c14ceb4aa59b — это код нашей операции, по которой все будет происходить. Также в нашей таблице outbox_payments появилась запись.

Далее ее прочитал debezium и отправил в топик в кафку payment_events.public.outbox_payments

На стороне микросервиса по оплатам, мы получили это сообщение. Проверили или нет такого сообщения в таблице accepted_payments_events, если нет — выполнили оплату, сохранили запись об этом в таблице accepted_payments_events и организовали отправку события в кафку об обработке оплаты.
@Override @Transactional public void makePaymentByTelephone(OutboxPaymentDto paymentEvent) { if (!acceptedPaymentsEventRepository.existsByAggregateId(paymentEvent.getAggregateId())){ ClientDto client = clientService.findByTelephone(paymentEvent.getPayload().getTelephone()); BankAccount account = client.getBankAccounts().stream() .findFirst() .orElseThrow(()-> new RuntimeException("Bank account does not found!")); Map<String, String> exceptions = new HashMap<>(); PaymentPayload paymentPayload = paymentEvent.getPayload(); accountValidationService.validationPayment(exceptions, paymentPayload.getCurrency(), paymentPayload.getAmount(), account); Payment payment = buildPayment(paymentPayload, account); PaymentResultEvent paymentResultEvent = buildPaymentResultEvent(paymentPayload); if (!exceptions.isEmpty()) { payment.setPaymentStatus(PaymentStatus.CANCELED); paymentResultEvent.setPaymentStatus(PaymentStatus.CANCELED); paymentResultEvent.setNote(getErrorMessages(exceptions)); } else { account.setAmount(account.getAmount().subtract(paymentPayload.getAmount())); bankAccountService.save(account); payment.setPaymentStatus(PaymentStatus.APPROVED); paymentResultEvent.setPaymentStatus(PaymentStatus.APPROVED); } paymentRepository.save(payment); acceptedPaymentsEventRepository.save(buildAcceptedPaymentsEvent(paymentPayload.getPaymentCode())); eventPublisher.publishEvent(paymentResultEvent); } }
Отправка обратного сообщения в кафку производится через ApplicationEventPublisher.
@Component @RequiredArgsConstructor public class PaymentResultListener { private final KafkaPaymentResultSendService kafkaService; @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handlePaymentResult(PaymentResultEvent event) { kafkaService.sendPaymentResultEvent(event); } }
Это сделано для того чтобы сообщение в кафку отправилось только после выполнения оплаты и сохранения ее в базу данных. В случае если произойдет ошибка и транзакция откатится - сообщение в кафку не отправится.
Также видим, что в топике payment-result-event обратное сообщение появилось.

Это сообщение обработалось на стороне сервиса заказа и статус этой операции изменился на APPROVED в таблице outbox_payments.
Также для того чтобы в таблице outbox_payments не было слишком много записей реализован шедулер, который каждые 5 минут удаляет все записи со статусом APPROVED.
@Slf4j @Component @RequiredArgsConstructor public class OutboxCleanupScheduler { private final OutboxPaymentRepository outboxRepository; @Scheduled(cron = "${app.scheduling.outbox-cleanup-cron}") public void cleanUpApprovedPayments() { log.info("Starting cleanup of APPROVED outbox payments..."); try { outboxRepository.deleteByStatus("APPROVED"); log.info("Cleanup finished successfully."); } catch (Exception e) { log.error("Error during outbox cleanup: {}", e.getMessage()); } } }
Сейчас смоделируем несколько критических ситуаций которые могут иметь место.
Ситуация 1 — недостаточно денег.
Отправим запрос на заказ 10000 пачек кофе.
POST на http://localhost:8086/api/v1/orders
{ "productCode": "b7c3d258-bcbe-4bae-a219-0afe0528c9c3", "amount": 10000, "currency": "RUB", "telephone": "+79991234567", "note": "order coffee" }
Если посмотрим в таблицу payments orders_database, то увидим что данная операция не прошла.

В note есть пояснение причины NOT_ENOUGH_MONEY: Not enough money in the account!
Ситуация 2 – debezium отвалился.
Для этого зайдем в докер и остановим контейнер с ним.

Сделаем еще один запрос на заказ теперь 100 пачек кофе.

Если зайти в кафку и посмотреть в топик, то ожидаемо мы не найдем нового сообщения.
Запустим снова debezium. Как только он восстановится - он проверит таблицу outbox_payments на новые записи и отправит их в кафку.

Дальше все пошло по цепочке, которую мы уже рассматривали.
Ситуация 3 — по какой-то причине отвалилась кафка.
Для этого идем в докер и глушим кафку.

Далее отправляем запрос на 200 пачек кофе.

Видим в таблице outbox_payments, что статус пока IN_PROGRESS.

Запускаем кафку снова. Ждем пока поднимется.
Видим, что сообщение доставлено в топик.

Видим, что в таблице outbox_payments статус поменялся на APPROVED.
Таким образом наше сообщение обработалось на стороне микросервиса по оплате и вернулось обратно.
Ситуация 4 — микросервис по оплате недоступен.
Останавливаем наш микросервис по оплате.
Далее отправляем запрос на 300 пачек кофе.

В топике сообщение появилось.

В таблице outbox_payments данная операция пока в IN_PROGRESS

Запускаем микросервис по оплате. Как только микросервис поднимется он получит сообщение с кафки и начнет с ним работать.
На стороне заказа — статус этой операции поменялся на APPROVED.

Еще один момент, который хотелось бы подсветить — это то, что если консьюмер на стороне оплаты по какой-то причине не может обработать сообщение, которое пришло в топик, то для таких сообщений необходимо организовать, например, отдельную очередь - Dead Letter Queue (DLQ). В неe отправляются сообщения которые не могут быть обработаны, например, может не совпадать формат даты. Реализация очереди Dead Letter Queue не будет рассматриваться в рамках этой статьи.
Таким образом, мы рассмотрели паттерн outbox на примере работы двух микросервисов. С помощью этого паттерна можно организовать надежное взаимодействие микросервисов, при котором сообщение от одного микросервиса должно обязательно дойти до второго и по дороге не потеряться даже при сбоях работы debezium, брокера сообщений или принимающего сервиса.
Всем спасибо, кто дочитал статью до конца. Всем пока!
