Всем привет! В этой статье рассмотрим наиболее полную реализацию паттерна Transactional Outbox, которую можно будет легко расширять и применять в продакшне. Данная статья будет полезна как для разработчиков, которые еще не встречались с данным паттерном, так и тем, кто уже применял его в своей работе.

Введение

Прежде чем перейти к определению паттерна, определим ключевых акторов:

  • Паблишер - процесс, инициирующий изменения и создающий события.

  • Консьюмер - процесс, обра��атывающий эти события.

Transactional Outbox - это интеграционный паттерн, решающий проблему атомарности обновления базы данных и отправки уведомления во внешние системы. Наивным решением было бы "сначала запишем в БД, а потом отправим в брокер", однако такое решение ломается при сбоях, если процесс упадет между этими двумя шагами, то мы либо потеряем событие, либо отправим дубль.

Идея паттерна

Мы объединяем запись бизнес-сущностей и метаданных события в одну локальную транзакцию. События сохраняются в специальную таблицу outbox, откуда их забирает отдельный воркер. Таким образом мы гарантируем, что либо записалось все, либо ничего. В наиболее популярной реализации, worker записывает событие в топик для обработки другими сервисами, но что если нам нужно не записывать событие в топик, а обращаться напрямую к сервису, например, по gRPC / REST?

На помощь может прийти дополнительная абстракция в виде стратегии со своими DTO. Определив интерфейс с методом processEvent(Object dto), мы сможем подставлять любую логику обработки, не меняя ядро воркера.

Однако, что если мы не можем выполнить нужное действие в данный момент времени? Например, недоступен брокер, либо сервис, к которому мы обращаемся. В таком случае, нам необходимо добавить Retry-механизм. По-хорошему, данный механизм должен иметь экспоненциальный (ступенчатый) таймаут, т.к. задавая константный таймаут мы будем ждать либо слишком долго, либо выполнять слишком много попыток. Например:

  • Первые 3 попытки: интервал 15 секунд.

  • Следующие 5 попыток: интервал 5 минут.

При работе с outbox есть два распространенных подхода:

  1. Удалять обработанные события: меньше места в БД, но теряется история.

  2. Хранить и помечать статусом (например, SENT): есть аудит, но таблица будет расти и ее нужно архивировать/очищать.

Важно помнить: Transactional Outbox гарантирует доставку at-least-once (хотя бы один раз). Это неизбежно означает, что консьюмеры должны быть идемпотентными, так как возможны дубли при ретраях.

Рассмотрим на практическом примере

Пусть у нас есть интернет-магазин, построенный с помощью микросервисной архитектуры, который включает в себя 2 сервиса - сервис заказов и сервис уведомлений. При создании нового заказа нам необходимо отправить уведомление клиенту о том, а в дальнейшем, при обновлении статуса заказа - уведомлять о новых изменениях.

В данном случае архитектура состоит из следующих элементов:

[ Order Service ]
  ├─ orders
  ├─ outbox_events  ← в той же БД
  └─ OutboxPublisher → Kafka

[ Kafka ]

[ Notification Service ]
  └─ @KafkaListener → отправка уведомления

Что именно будем собирать

  • order - создает заказ, пишет запись в outbox_events в той же транзакции, шедулер публикует события в Kafka.

  • notification - слушает топик Kafka и пишет в лог «отправку уведомления».

  • STUB -каждые 2 минуты статус заказа обновляется автоматически, чтобы показать цепочку событий.

Запуск инфраструктуры (PostgreSQL + Kafka)

Репозиторий с данным примером доступен на гитхабе. Клонируем репоизтори и запускаем инфраструктуру и сервисы из корневой папки с помощью, сервисы запускаем в отдельных терминалах:

# запуск сервисов из docker compose
make infra
# запуск сервиса order (port: 8081) -> терминал 1
make order
# запуск сервиса notifcation (port: 8082) -> терминал 2
make notification

Создание заказа

curl -X POST http://localhost:8081/orders \
  -H 'Content-Type: application/json' \
  -d '{"customerEmail":"demo@dreadew.dev","totalPrice":1200}'

Структура базы данных

В микросервисе order-service создаются две основные таблицы: orders (Заказы) и outbox (события). В order-service используется ddl-auto: create-drop, так что таблицы создаются при старте и удаляются при остановке.

spring:
  jpa:
    hibernate:
      ddl-auto: create-drop

Реализация микросервиса order

Сущность заказа

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
public class Order {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;

  @Column(nullable = false)
  private String customerEmail;

  @Column(nullable = false)
  private BigDecimal totalPrice;

  @Enumerated(EnumType.STRING)
  @Column(nullable = false)
  private OrderStatus status;

  @Column(nullable = false)
  private Instant createdAt;

  @Column(nullable = false)
  private Instant updatedAt;
}

Таблица outbox

@Entity
@Table(name = "outbox_events")
@Getter
@Setter
@NoArgsConstructor
public class OutboxEvent {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;

  @Column(nullable = false)
  private String eventType;

  @Column(nullable = false)
  private String payload;

  @Column(nullable = false)
  private String status;

  @Column(nullable = false)
  private int retryCount;

  @Column(nullable = false)
  private Instant availableAt;

  @Column(nullable = false)
  private Instant createdAt;

  @Column(nullable = false)
  private Instant updatedAt;
}

Создание заказа + запись в outbox в одной транзакции

@Transactional
public Order createOrder(String email, BigDecimal totalPrice) {
  Instant now = Instant.now();
  Order order = new Order();
  order.setCustomerEmail(email);
  order.setTotalPrice(totalPrice);
  order.setStatus(OrderStatus.NEW);
  order.setCreatedAt(now);
  order.setUpdatedAt(now);
  Order saved = orderRepository.save(order);

  enqueueEvent(saved, now);
  return saved;
}

Паблишер для кафки

@Component
@RequiredArgsConstructor
public class KafkaOutboxPublisher implements OutboxPublisher {

  private final KafkaTemplate<String, String> kafkaTemplate;

  @Value("${app.outbox.topic}")
  private String topic;

  @Override
  public void publish(OutboxEvent event) {
    try {
      kafkaTemplate.send(topic,event.getEventType(), event.getPayload()).get(3, TimeUnit.SECONDS);
    } catch (Exception ex) {
      throw new IllegalStateException("Kafka publish failed", ex);
    }
  }
}

Outbox-процессор с ретраями

@Scheduled(fixedDelayString = "${app.outbox.poll-interval-ms:5000}")
@Transactional
public void processBatch() {
  Instant now = Instant.now();
  List<OutboxEvent> events = outboxEventRepository.findReady(
      List.of(OutboxStatus.NEW.name(), OutboxStatus.FAILED.name()), now);

  for (OutboxEvent event : events) {
    try {
      outboxPublisher.publish(event);
      markSent(event, now);
    } catch (Exception ex) {
      markFailed(event, now, ex);
    }
  }
}

STUB изменения статусов каждые 2 минуты

@Scheduled(fixedDelayString = "${app.orders.stub-status-interval-ms:120000}")
public void updateStatuses() {
  List<Order> orders = orderRepository.findAll();
  for (Order order : orders) {
    OrderStatus next = nextStatus(order.getStatus());
    if (next != order.getStatus()) {
      orderService.updateStatus(order.getId(), next);
    }
  }
}

Реализация консьюмера в сервисе notification

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationListener {

  private final ObjectMapper objectMapper;

  @KafkaListener(topics = "${app.kafka.topic}", groupId = "${spring.kafka.consumer.group-id}")
  public void handle(String payload) throws Exception {
    OrderEventPayload event = objectMapper.readValue(payload, OrderEventPayload.class);
    log.info("Send notification for order {} status {}, email {}",
        event.getOrderId(), event.getStatus(), event.getCustomerEmail());
  }
}

Результаты проверки

Создаем заказ через Postman / curl, либо через команду make create-order

Создание заказа через Postman
Создание заказа через Postman

Проверяем таблицу outbox (make check-outbox), топик в кафке (make check-kafka) и логи микросервиса notifcation-service.

Топик order-events, событие о новом заказе пришло
Топик order-events, событие о новом заказе пришло
Таблица outbox, событие о новом заказе сохранилось в таблицу
Таблица outbox, событие о новом заказе сохранилось в таблицу
Логи в сервисе notification, событие о созданном заказе получено и обработано
Логи в сервисе notification, событие о созданном заказе получено и обработано

Как можем заметить по скринам с логами, событие создалось в БД и отправилось в топик без ретраев. Теперь ожидаем 2 минуты (дефолтный таймаут в коде из репозитория) и снова проверяем логи кафки, сервиса notifcation и записи в таблице outbox.

Топик order-events после создания нового заказа
Топик order-events после создания нового заказа
Таблица outbox, было добавлено новое событие об изменении статуса заказа
Таблица outbox, было добавлено новое событие об изменении статуса заказа

Как можем заметить, события об изменениях статуса создаются, корректно записываются в таблицу outbox и обрабатываются воркером. Теперь отключаем контейнер с кафкой через Docker Desktop / CLI и ожидаем следующего события. После этого проверяем таблицу outbox и логи сервиса order.

Таблица outbox после отключения контейнера кафки
Таблица outbox после отключения контейнера кафки
Логи сервиса order после отключения контейнера кафки
Логи сервиса order после отключения контейнера кафки

Из логов видно, что сообщение не отправилось и перешло в статус FAILED, теперь включаем контейнер с кафкой и ждем пока пройдет Retry-таймаут, затем проверяем таблицу outbox, топик в кафке и логи сервиса notification.

Таблица outbox после перезапуска контейнера кафки
Таблица outbox после перезапуска контейнера кафки
Топик order-events после перезапуска контейнера
Топик order-events после перезапуска контейнера
Логи сервиса notification после перезапуска контейнера кафки
Логи сервиса notification после перезапуска контейнера кафки

Выводы

Transactional Outbox - хороший, практичный способ обеспечить консистентность данных между сервисами без распределенных транзакций. Он дает надежную доставку событий и упрощает гарантию “записали бизнес‑данные - событие тоже сохранено”.

Но у паттерна есть и свои минусы:

  • Асинхронность добавляет задержку между записью и доставкой события.

  • При сбое брокера/сервиса включаются ретраи, что увеличивает лаг и нагрузку.

Тем не менее, в реальных системах это один из самых надежных и понятных способов добиться согласованности между сервисами.