❯ Введение
Суть проблемы предельно проста: если внешний сетевой запрос завершится успешно, а фиксация изменений в БД сорвется, ваша система зависнет в несогласованном состоянии, которое невозможно откатить. В этой статье мы подробно, шаг за шагом, разберем реально работающую реализацию связки паттернов Transactional Outbox (исходящие транзакции) + Result Table (таблица результатов) + Saga Compensation (компенсирующие транзакции) на языке Scala с использованием Play Framework, Slick и Pekko.
Если ваше приложение одновременно сохраняет данные в базу и отправляет запросы к внешним веб-сервисам, у вас гарантированно возникнет проблема несогласованности данных (проблема двойной записи). Паттерны Transactional Outbox и Saga – это классические, описанные во всех учебниках решения. Однако большинство статей на эту тему ограничиваются голой теорией. Мы же пойдем дальше.
Мы создадим полноценное рабочее приложение, которое вы сможете склонировать, запустить у себя и адаптировать под свои задачи. Проект объединяет три паттерна, которые идеально дополняют друг друга. Transactional Outbox (исходящий почтовый ящик) дает железную гарантию того, что каждое событие будет обработано, даже если само приложение внезапно упадет посреди отправки запроса. Result Table (таблица результатов) фиксирует детали каждого вызова API, ответы сторонних систем и полученные идентификаторы, которые понадобятся, если операцию придется отменить. Saga Compensation (компенсирующие транзакции) – если, к примеру, четвертый шаг цепочки завершится сбоем, этот механизм автоматически и в правильном порядке откатит предыдущие три шага.
В нашем примере используются Play Framework, Scala 3, PostgreSQL, Slick и акторы Pekko. Тем не менее вся архитектура абсолютно не зависит от языка программирования или фреймворка. Вы можете заменить эти библиотеки на любые аналоги – сами концепции останутся неизменными.
Весь материал статьи построен на базе готового проекта. Настоятельно рекомендую склонировать репозиторий и держать код перед глазами по ходу чтения. Каждый фрагмент кода мы берем напрямую из репозитория:
git clone https://github.com/hanishi/never-call-apis-inside-database-transactions cd never-call-apis-inside-database-transactions
Чтобы запустить его локально:
docker-compose up -d postgres sbt run # Открыть http://localhost:9000
В проект встроен интерактивный симулятор внешних служб (склад, биллинг, доставка, проверка на мошенничество) с настраиваемой вероятностью сбоев, удобный веб-интерфейс для оформления заказов и провоцирования ошибок, а также подробный журнал аудита для каждого вызова API. О том, как запустить конкретные сценарии, мы поговорим в разделе «Время экспериментов» в конце статьи.
Итак, поехали.
❯ Зачем всё это нужно?
Если ваша система выполняет хотя бы одно из следующих действий, вы неизбежно сталкиваетесь с проблемой двойной записи (dual-write problem):
Сохраняет данные в локальную базу, а затем вызывает внешнее API, меняющее состояние сторонней системы (списание денег через платежный шлюз, резервирование товара на складе, планирование логистики на стороне курьерской службы или публикация события в Kafka). Обратите внимание: запросы только на чтение (например, загрузка кредитного рейтинга) подобных проблем не создают, поскольку на той стороне просто нечего откатывать при аварии.
Координирует работу нескольких сервисов в рамках одной сквозной бизнес-операции (например: заблокировать товар → проверить на фрод → запланировать доставку → провести оплату).
Должна уметь откатить частично выполненную работу, если один из этапов длинной цепочки завершился неудачей.
Без этих паттернов ваша система рано или поздно окажется в ситуации, когда деньги с клиента успешно списаны, а самого заказа в базе нет; товары на складе заблокированы под заказы-невидимки, или курьеры спешат везти посылку, которая была давно отменена. Паттерн Outbox устраняет подобные нестыковки непосредственно на уровне архитектурного дизайна.
Прежде чем начать:
В примерах мы используем Scala 3 и три ключевые библиотеки. Вот краткий экскурс для быстрого погружения:
Slick: функционально-реляционная библиотека для Scala, обеспечивающая типобезопасность запросов к СУБД. В коде вам будет регулярно встречаться тип
DBIO[T]. Это декларативное описание операции с базой данных, возвращающее результат типаT. Оно похоже на то, какFuture[T]описывает асинхронную операцию. Сам по себеDBIOне запускается, пока вы явно не передадите его методуdb.run(...). Вы можете объединять несколько цепочекDBIOс помощьюfor-comprehensionи оборачивать их в.transactionally, чтобы СУБД выполнила их как единую атомарную транзакцию.Apache Pekko: фреймворк для параллельного программирования на базе модели акторов (открытый преемник Акка). Акторы – это ультралегкие изолированные сущности, обрабатывающие сообщения строго поочередно, что делает их идеальными кандидатами для построения надежных фоновых обработчиков.
Play Framework: веб-фреймворк для Scala, использующий формат HOCON (
application.conf) для удобной конфигурации приложения.
❯ В чем корень проблемы?
Представьте, что вы разрабатываете интернет-магазин. Когда покупатель оформляет заказ, вам нужно выполнить цепочку действий:
Сохранить заказ в базу данных
Зарезервировать товары на складе
Запланировать доставку
Списать деньги с карты
Библиотека Slick предлагает удобную лазейку: метод DBIO.from() преобразует любой Future в DBIO. Это позволяет выстраивать асинхронную работу и запросы к БД в единую цепочку с помощью for-comprehension. Штука удобная, но в ней скрыта опасная ловушка. В Scala запросы к внешним API обычно возвращают Future[T] (неважно, используете ли вы WSClient из состава Play, библиотеку sttp или любой другой HTTP-клиент). А поскольку DBIO.from() проглатывает вообще любой Future, у разработчика возникает соблазн упаковать туда сетевые вызовы и засунуть всё это внутрь блока .transactionally. Код компилируется без единой ошибки, типы идеально сходятся, и всё выглядит так, будто весь процесс застрахован единой транзакцией:
db.run { (for { orderId <- orders += order // Запись в БД _ <- DBIO.from(inventoryApi.reserve(orderId)) // HTTP-вызов, обернутый в DBIO _ <- DBIO.from(shippingApi.schedule(orderId)) // HTTP-вызов, обернутый в DBIO _ <- DBIO.from(billingApi.charge(orderId)) // HTTP-вызов, обернутый в DBIO _ <- shipments += Shipment(orderId, shippingRes) // Запись в БД } yield orderId).transactionally }
Но здесь кроется подвох: транзакция СУБД контролирует исключительно процессы внутри самой базы данных. С точки зрения транзакции, HTTP-запросы внутри DBIO.from() – это операции по принципу «выстрелил и забыл». Они выполняются, их побочные эффекты материализуются мгновенно, а у базы просто нет механизмов, чтобы откатить изменения на сторонних серверах. Представьте, что произойдет, если последняя запись в БД сорвется:
Создание заказа в БД → успешно
Вызов API склада → 200 OK (товар зарезервирован)
Вызов API службы доставки → 200 OK (доставка запланирована)
Вызов API платежной системы → 200 OK (деньги списаны)
Запись информации о доставке в БД → сбой (дедлок, сетевой таймаут – неважно)
База данных откатывает транзакцию
Итог: у клиента списали деньги, доставка вовсю готовится, на складе товар заблокирован… а в вашей базе данных нет и следа самого заказа! Внешние сервисы понятия не имеют, что транзакция в вашей локальной БД развалилась. Они свою работу сделали и спят спокойно.
Простая попытка разделить эти операции тоже не спасет:
// Шаг 1: Сохраняем в БД val orderId = db.run(orders += order).transactionally // Успешно // Шаг 2: Вызываем внешние сервисы inventoryApi.reserve(orderId) // Успешно shippingApi.schedule(orderId) // Здесь приложение падает; доставка не вызвана billingApi.charge(orderId) // Этот вызов даже не начинался
Если приложение потерпит крах в зазоре между шагом 1 и шагом 2, заказ останется лежать в БД мертвым грузом – больше ничего не произойдет. Никакой повторный запуск не сработает, потому что идентификатор orderId бесследно стерся из оперативной памяти.
Распределенных транзакций, способных бесшовно связать разнородные системы, в реальном мире не существует. СУБД гарантирует атомарность только для собственных таблиц, но она бессильна перед сторонними HTTP API, брокерами Kafka или чужими платежными шлюзами. Каждая система фиксирует изменения независимо, падает независимо и абсолютно не подозревает о том, что происходит у соседей.
❯ Решение: паттерн Transactional Outbox
Выход на удивление прост: забудьте о вызовах внешних API во время обработки клиентского запроса. Вместо этого запишите намерения (что должно произойти) прямо в базу данных, а всю грязную работу по отправке запросов поручите фоновому процессу.
То есть вместо прямой отправки запросов на склад, доставку и оплату мы добавляем специальную запись в таблицу outbox_events – и делаем это в рамках той же транзакции базы данных, в которой создается сам заказ:
BEGIN; INSERT INTO orders (customer_id, total_amount, ...) VALUES ('C-123', 99.99, ...); INSERT INTO outbox_events (event_type, payloads, status) VALUES ('OrderCreated', '{...}', 'PENDING'); COMMIT;
По ходу транзакции не происходит никаких сетевых вызовов – только быстрые и надежные локальные записи в таблицы.
Специальный фоновый обработчик (в нашей реализации – актор Pekko под названием OutboxProcessor) непрерывно сканирует эту таблицу. Обнаружив событие в статусе PENDING (ожидает обработки), он резервирует его под себя, отправляет реальные HTTP-запросы и меняет статус записи на PROCESSED (обработано). Если приложение упадет до того, как воркер доберется до записи, событие никуда не денется из БД. Воркер просто подхватит его сразу после перезапуска.
Это полностью решает проблему «API сработал, а БД откатилась», поскольку к внешним вызовам мы переходим только после успешной фиксации локальной транзакции.
❯ А что делать при частичных сбоях?
Паттерн Outbox гарантирует доставку и обработку каждого события. Но представьте: воркер поочередно совершил два успешных вызова, а на третьем споткнулся:
1. Зарезервировать товар → успешно (reservationId: "RES-456") 2. Запланировать доставку → успешно (shipmentId: "SHIP-789") 3. Списать оплату → сбой после 3 попыток отправки
Товар на складе заблокирован, курьеры уже пакуют посылку, но оплата так и не прошла. Очевидно, нужно откатить шаги 1 и 2. Причем делать это нужно строго в обратном порядке (LIFO – последним вошёл, первым вышел), ведь оформление доставки логически зависит от резерва на складе.
Перед нами классический паттерн Saga Compensation (компенсирующая транзакция): при сбое на промежуточном этапе мы последовательно отменяем все успешные предыдущие шаги в обратном порядке. Сначала отменяем доставку, затем снимаем резерв со склада.
Но чтобы выполнить откаты, система должна точно знать, какие шаги завершились успехом и какие данные они нам вернули. Например, для вызова отмены доставки сервису нужен конкретный shipmentId, сгенерированный сторонней службой. Вот почему нам жизненно необходим паттерн Result Table (таблица результатов) – подробнейший аудит-лог, куда записывается каждый сетевой запрос, его результат и полный текст ответа API.
Эти три паттерна безупречно работают в связке:
Паттерн | Какую проблему решает | Как именно |
|---|---|---|
Transactional Outbox | Двойная запись (dual-write) | Атомарно записывает заказ и событие в БД |
Result Table | «Что именно мне откатывать?» | Регистрирует каждый вызов API и ответ от него |
Saga compensation | Частичные сбои в цепочке | Отменяет успешные вызовы в обратном порядке (LIFO) |
Разберем создание всех трех компонентов шаг за шагом.
❯ Схема базы данных
Прежде чем переходить к коду, давайте взглянем на две основные таблицы, на которых держится вся магия Outbox.
❯ orders – бизнес-данные
Эту таблицу вы бы спроектировали в любом случае. Никакой специфики Outbox здесь нет:
CREATE TABLE orders ( id BIGSERIAL PRIMARY KEY, customer_id VARCHAR(255) NOT NULL, total_amount DECIMAL(10, 2) NOT NULL, shipping_type VARCHAR(20) NOT NULL DEFAULT 'domestic', order_status VARCHAR(50) NOT NULL DEFAULT 'PENDING', created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), deleted BOOLEAN NOT NULL DEFAULT FALSE );
❯ outbox_events – план работ
Каждая строка в этой таблице представляет собой конкретное бизнес-событие, информацию о котором нужно передать внешним API. Чтением и записью здесь заправляет вспомогательный класс OutboxHelper (его мы разберем далее):
CREATE TABLE outbox_events ( id BIGSERIAL PRIMARY KEY, aggregate_id VARCHAR(255) NOT NULL, -- к какому заказу (или сущности) относится событие event_type VARCHAR(255) NOT NULL, -- например, "OrderCreated", "OrderStatusUpdated" payloads JSONB NOT NULL, -- специфические данные для каждого адресата (один ключ на API) status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING → PROCESSING → PROCESSED retry_count INT NOT NULL DEFAULT 0, idempotency_key VARCHAR(512) NOT NULL, -- защищает от дублирования событий next_retry_at TIMESTAMP WITH TIME ZONE, -- когда повторить попытку в случае сбоя -- ... плюс временные метки, трекинг ошибок и т.д. ); -- Только одно активное событие на связку агрегат + тип (защита от дубликатов) CREATE UNIQUE INDEX idx_outbox_idempotency ON outbox_events (idempotency_key) WHERE status != 'PROCESSED';
СУБД-колонка payloads с типом JSONB хранит карту (map), где ключи – это названия систем-получателей (например, "inventory", "billing"), а значения – передаваемые им данные. Совсем скоро мы увидим, как строятся эти структуры при объявлении доменных событий.
Для базовой реализации Outbox-паттерна достаточно этих двух таблиц. Остальные таблицы мы добавим позже, когда дойдем до логики логов результатов и проведения компенсирующих транзакций.
❯ Атомарная запись событий
База данных спроектирована, теперь напишем код для ее наполнения. Наша главная цель – сделать так, чтобы любая бизнес-операция, требующая внешних вызовов, атомарно регистрировала и бизнес-данные, и соответствующее событие в одной общей транзакции.
❯ Доменные события
Каждая бизнес-операция генерирует определенное доменное событие. Например, событие OrderCreatedEvent точно знает, в какие сервисы нужно отправить данные и какие структуры им для этого нужны:
sealed trait DomainEvent { def aggregateId: String // к какой сущности относится (например, ID заказа) def eventType: String // например, "OrderCreated" def toPayloads: Map[String, DestinationConfig] // специфические данные для каждого адресата } case class OrderCreatedEvent( orderId: Long, customerId: String, totalAmount: BigDecimal, shippingType: String, timestamp: Instant = Instant.now() ) extends DomainEvent { override def aggregateId = orderId.toString override def eventType = "OrderCreated" override def toPayloads = Map( "inventory" -> DestinationConfig(payload = Some(Json.obj( "orderId" -> orderId, "totalAmount" -> totalAmount, "shippingType" -> shippingType ))), "fraudCheck" -> DestinationConfig(payload = Some(Json.obj( "orderId" -> orderId, "customerId" -> customerId, "totalAmount" -> totalAmount ))), "shipping" -> DestinationConfig(payload = Some(Json.obj( "customerId" -> customerId, "shippingType" -> shippingType, "totalAmount" -> totalAmount ))), "billing" -> DestinationConfig(payload = Some(Json.obj( "amount" -> totalAmount, "currency" -> "USD" ))) ) }
Каждая служба-адресат получает только те данные, которые ей действительно необходимы. Сервису проверки на мошенничество (fraudCheck) уходят идентификатор клиента и сумма покупки для скоринга рисков. Сервису оплаты (billing) – только сумма и валюта платежа.
❯ Трейт OutboxHelper
Трейт OutboxHelper берет на себя гарантию атомарности. Он оборачивает выполнение вашей бизнес-логики и сохранение события в системный метод .transactionally. В итоге они либо запишутся в базу вместе, либо обе операции будут откачены:
trait OutboxHelper { protected val outbox = TableQuery[OutboxTable] protected def withEvent[T](event: DomainEvent)(action: DBIO[T]) (using ec: ExecutionContext): DBIO[T] = (for { result <- action // Ваша запись бизнес-данных (например, INSERT INTO orders) _ <- saveEvent(event) // INSERT INTO outbox_events } yield result).transactionally // ← обе операции в рамках одной транзакции protected def withEventFactory[T](action: DBIO[T])(eventFactory: T => DomainEvent) (using ec: ExecutionContext): DBIO[T] = (for { result <- action _ <- saveEvent(eventFactory(result)) // событие зависит от результата выполнения действия (например, автоинкрементный ID) } yield result).transactionally }
Здесь предусмотрено два удобных сценария использования:
withEventпринимает уже созданное событие и саму процедуру изменения БД. Он идеален для ситуаций, когда все реквизиты события у вас на руках еще до обращения к базе (например, при отмене заказа, когда его ID давно известен).withEventFactoryпринимает СУБД-действие и фабричный методT => DomainEvent. Сначала выполняется запись бизнес-данных, результат этой записи передается в фабрику, а та генерирует событие. Без этого не обойтись, если параметры события зависят от значений, генерируемых самой СУБД (например, от автоинкрементного первичного ключа заказа). Вы просто не сможете заранее создатьOrderCreatedEventбезorderId, а самorderIdстанет известен только после выполнения инструкцииINSERT. МетодwithEventFactoryрешает эту дилемму: он откладывает сборку события ровно до того момента, когда у нас появится сгенерированный базой ID, при этом по-прежнему гарантируя безупречную атомарность в рамках единой транзакции.
Этот блок написан под Slick: сущности DBIO, TableQuery и метод .transactionally – чисто его специфика. Но сам архитектурный паттерн абсолютно универсален. Ваша задача – связать воедино сохранение бизнес-сущности и запись события в одной транзакции СУБД. В Hibernate/JPA для этого используют аннотацию @Transactional, в jOOQ – вызов dsl.transaction(...), а на голом JDBC – инструкцию connection.setAutoCommit(false).
❯ Собираем всё вместе в репозитории
Репозиторий OrderRepository просто примешивает трейт OutboxHelper. Теперь создание нового заказа превращается в элегантную однострочную конструкцию:
@Singleton class OrderRepository @Inject() ()(using ec: ExecutionContext) extends OutboxHelper { private val orders = TableQuery[OrderTable] def createWithEvent(order: Order): DBIO[Long] = withEventFactory((orders returning orders.map(_.id)) += order) { orderId => OrderCreatedEvent(orderId, order.customerId, order.totalAmount, order.shippingType) } }
Во время вызова метода createWithEvent на стороне СУБД выполняется следующий SQL-сценарий:
BEGIN; INSERT INTO orders (customer_id, total_amount, shipping_type, order_status) VALUES ('C-123', 99.99, 'domestic', 'PENDING') RETURNING id; -- Возвращает 123 INSERT INTO outbox_events (aggregate_id, event_type, payloads, idempotency_key, status) VALUES ('123', 'OrderCreated', '{"inventory": {...}, "fraudCheck": {...}, "shipping": {...}, "billing": {...}}', '123:OrderCreated', 'PENDING'); COMMIT;
В результате либо сохранятся обе записи, либо ни одна из них. Сервисному уровню остается лишь выполнить привычный метод db.run(...). В терминах Slick эта функция принимает DBIO (описание плана СУБД-операций), запускает его физическое выполнение и возвращает привычный Future с полученным значением:
class OrderService @Inject() (orderRepo: OrderRepository, ...) (using ec: ExecutionContext, db: Database) { def createOrder(order: Order): Future[Long] = db.run(orderRepo.createWithEvent(order)) // createWithEvent возвращает DBIO[Long] (описание плана СУБД-транзакции) // db.run(...) запускает транзакцию на выполнение и возвращает Future[Long] (сгенерированный ID заказа) }
Отлично. Теперь заказ благополучно сохранен в базу, а в таблице событий дожидается своего часа запись со статусом PENDING. Никаких лишних HTTP-запросов во время транзакции выполнено не было. Теперь перейдем к обработке накопившейся очереди.
❯ Обработка событий: OutboxProcessor
В нашей таблице появилось событие в состоянии PENDING. Логично, что теперь нужен фоновый обработчик, который прочитает эту запись и дернет внешние API. За это и отвечает OutboxProcessor – фоновый типизированный актор Pekko.
❯ Безопасный захват событий в конкурентной среде
В высоконагруженных продакшен-системах события обрабатывают сразу несколько параллельных воркеров. Но как гарантировать, что два независимых процесса не схватят одновременно одну и ту же запись? Эту проблему решает встроенная конструкция PostgreSQL FOR UPDATE SKIP LOCKED всего за один атомарный запрос:
def findAndClaimUnprocessed(limit: Int = 100): DBIO[Seq[OutboxEvent]] = { sql""" WITH claimed AS ( SELECT id FROM outbox_events WHERE status = 'PENDING' AND (next_retry_at IS NULL OR next_retry_at <= NOW()) ORDER BY created_at LIMIT $limit FOR UPDATE SKIP LOCKED ) UPDATE outbox_events e SET status = 'PROCESSING', status_changed_at = NOW() FROM claimed WHERE e.id = claimed.id RETURNING e.* """.as[OutboxEvent] }
С помощью обобщенного табличного выражения (CTE) этот запрос выполняет три важнейших действия за одно обращение к диску (атомарно):
Поиск (Select). Временная таблица
WITH claimedнаходит строки в статусе'PENDING', время повторной отправки которых подошло (или полеnext_retry_atвовсе пусто). Сортировка поcreated_atгарантирует обработку строго по хронологии, а параметрLIMITограничивает размер порции.Блокировка (Lock). Ключевым элементом здесь выступает инструкция
FOR UPDATE SKIP LOCKED. Она накладывает монопольную блокировку на выбранные строки, но при этом воркеры не зависают в ожидании освобождения занятых записей, а просто пропускают заблокированные (SKIP LOCKED). Если запустить три параллельных воркера одновременно, каждый из них получит чисто свой уникальный пакет событий – без пересечений, конфликтов или повторной обработки.Обновление и возврат данных (Update & Return). Внешний оператор
UPDATEпереводит выбранные записи из статуса'PENDING'в'PROCESSING'и мгновенно возвращает их содержимое в приложение. За один единственный сетевой цикл мы находим, блокируем, резервируем и забираем нужные события!
❯ Вызов внешних API
Захватив событие, процессор переходит к публикации – то есть поочередно дергает внешние HTTP-эндпоинты. Дальнейшие действия зависят от ответа серверов:
private def publishEvent(event: OutboxEvent): Future[Boolean] = publisher.publish(event).flatMap { case PublishResult.Success => db.run(outboxRepo.markProcessed(event.id)).map(_ => true) case PublishResult.Retryable(error, retryAfter) => handleRetryableFailure(event, error, retryAfter) case PublishResult.NonRetryable(error) => handleNonRetryableFailure(event, error) }
Успех (Success). Помечаем запись в базе как
PROCESSED. Задача решена.Повторяемый сбой (Retryable failure) (например, при ошибке 503 Service Unavailable). Планируем повтор с экспоненциальной задержкой (выжидаем 2, 4, затем 8 секунд). Если сервер вернул заголовок
Retry-After, ориентируемся строго на него.Неповторяемый сбой (Non-retryable failure) (например, критический статус ответа 400 Bad Request). Отправляем событие напрямую в очередь недоставленных сообщений (DLQ – Dead Letter Queue).
Если лимит повторных попыток исчерпан (по умолчанию разрешено 3 попытки), событие отправляется в очередь недоставленных сообщений для запуска процедуры автоматического отката (о ней мы подробно поговорим в разделе «Автоматическая компенсация»).
❯ Мгновенный запуск процессов через механизмы LISTEN и NOTIFY
Вместо того чтобы мучить базу данных постоянными опросами каждые несколько секунд и плодить лишние задержки, мы настроим триггер PostgreSQL. Он будет мгновенно будить наш обработчик при добавлении новой записи:
CREATE OR REPLACE FUNCTION notify_new_outbox_event() RETURNS trigger AS $$ BEGIN IF NEW.status = 'PENDING' THEN PERFORM pg_notify('outbox_events_channel', NEW.id::text); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER outbox_event_inserted AFTER INSERT OR UPDATE OF status ON outbox_events FOR EACH ROW EXECUTE FUNCTION notify_new_outbox_event();
Разумеется, подобные конструкции специфичны для PostgreSQL. Однако именно благодаря богатейшему встроенному функционалу (LISTEN/NOTIFY, FOR UPDATE SKIP LOCKED, поддержка типа JSONB, частичные индексы) Postgres выглядит идеальным выбором под паттерн Outbox. В проекте из репозитория весь этот потенциал выжат по полной программе.
Мостом, связывающим уведомления из Postgres с нашим OutboxProcessor, служит источник Pekko Streams (PostgresListenNotifyStream). Он постоянно держит соединение в канале уведомлений, буферизирует поступающие ID событий и отправляет актору команду ProcessUnhandledEvent. Использование реактивного стрима из коробки дает нам управление нагрузкой (backpressure), автоматическое восстановление соединения с растущим таймаутом и безопасное выключение фонового процесса. Как только транзакция с заказом и событием успешно фиксируется (о чем говорилось в разделе «Атомарная запись событий»), СУБД активирует триггер, стрим мгновенно перехватывает сигнал, и процессор приступает к работе буквально через миллисекунды. А если по каким-то причинам механизмы LISTEN/NOTIFY недоступны, система автоматически переключается в классический режим периодического опроса таблиц.
❯ Веерная рассылка (fan-out) и условная маршрутизация
До сих пор мы рассматривали обработку одиночных вызовов. Но наше событие OrderCreated должно поочередно облететь целый ряд систем, причем в строгом порядке. В нашей реализации правила маршрутизации вынесены целиком в файл конфигурации application.conf – никакой жесткой привязки к адресам в программном коде. Это дает потрясающую гибкость: подключать внешние системы, менять URL или изменять порядок рассылки можно прямо «на лету», без перекомпиляции приложения:
outbox.http.fanout { OrderCreated = ["inventory", "fraudCheck", "shipping", "billing"] OrderStatusUpdated = ["notifications"] }
Термин «веерная рассылка» (fan-out) в данном случае означает, что на одно доменное событие завязана целая пачка вызовов. Эти запросы выполняются последовательно, а не параллельно. Это сделано намеренно: последующие сервисы могут изменять свое поведение в зависимости от результатов предыдущих (например, биллинг сопоставляет риск-скоринг от фрода для выбора платежного шлюза). Тело каждого HTTP-запроса извлекается из той самой карты toPayloads, которую мы подготовили внутри структуры нашего доменного события, где ключи в точности соответствуют именам систем-получателей. Каждому такому получателю сопоставляется конкретный эндпоинт в конфигурации:
outbox.http.routes { inventory { url = "http://localhost:9000/api/inventory/reserve" method = "POST" timeout = 5 seconds } fraudCheck { url = "http://localhost:9000/api/fraud/check" method = "POST" timeout = 5 seconds } }
После каждого успешного запроса результат немедленно сохраняется в третью таблицу – aggregate_results. Это наш журнал аудита. Таблица фиксирует отправленные данные, полные ответы от API и признак успешности вызова:
CREATE TABLE aggregate_results ( id BIGSERIAL PRIMARY KEY, aggregate_id VARCHAR(255) NOT NULL, -- к какому заказу относится destination VARCHAR(255) NOT NULL, -- получатель (например, "inventory", "shipping") request_payload JSONB, -- что именно мы отправили в API response_payload JSONB, -- что API вернул нам в ответ success BOOLEAN NOT NULL, -- прошел ли вызов успешно? fanout_order INT NOT NULL DEFAULT 0, -- порядковый номер шага в рассылке (0, 1, 2...) -- ... плюс целевой URL, HTTP-метод, статус-код, время выполнения и т.д. );
Зачем тащить в СУБД весь JSON-ответ целиком? Дело в том, что если на этапе оплаты произойдет сбой, системе придется отменять ранее созданную доставку. А для этого нам позарез нужен идентификатор shipmentId, сгенерированный сторонним сервисом во время прямого запроса. Это уникальное значение лежит как раз внутри сохраненного response_payload. Колонка fanout_order фиксирует порядковый номер шага (0, 1, 2…). Во время проведения компенсирующей транзакции мы пойдем по этим номерам в обратном направлении (LIFO), тем самым раскручивая клубок зависимостей без риска нарушить порядок этапов отмены.
❯ Условная маршрутизация
Пока всё выглядит просто – одному получателю соответствует строго один URL. Но что делать, если адрес назначения колеблется в зависимости от контекста заказа? Например, если для параметра "shippingType" указано значение "domestic" (внутри страны), доставка должна уходить на внутреннее API курьерской службы, а если "international" – на шлюз глобального почтового оператора.
Для этого мы можем задекларировать динамический список маршрутов в конфигурационном блоке routes в виде пар «URL + условие». При публикации обработчик последовательно проверяет эти правила сверху вниз и выбирает первый совпавший вариант:
shipping { method = "POST" routes = [{ url = "http://localhost:9000/api/domestic-shipping" condition { jsonPath = "$.shippingType" # извлекаем это поле из входящих данных operator = "eq" # проверяем на равенство value = "domestic" # ожидаемое значение } }, { url = "http://localhost:9000/api/international-shipping" condition { jsonPath = "$.shippingType" operator = "eq" value = "international" } }] }
Добежав до этапа доставки (shipping), модуль публикации открывает тело исходящего запроса (например, {"shippingType": "domestic", ...}) и вычисляет выражение $.shippingType == "domestic". Условие соблюдено, а значит, система выполнит вызов POST /api/domestic-shipping. Если бы в параметре доставки стояла международная почта, первая проверка благополучно провалилась бы и система переключилась бы на второй URL.
Реализации условного блока поддерживает широкий набор стандартных операторов сравнения: eq (равно), ne (не равно), gt (больше), gte (больше или равно), lt (меньше), lte (меньше или равно), contains (содержит) и exists (существует/присутствует).
❯ Цепочка принятия решений: роутинг по ответам смежных сервисов
По-настоящему условная маршрутизация раскрывается тогда, когда логика маршрута строится вокруг данных, которых не было в изначальном запросе, – то есть параметров, возвращенных одной из соседних систем на ранних этапах.
Рассмотрим процесс оплаты. В клиентском запросе нет и не может быть оценки репутационных рисков – эту информацию генерирует наш собственный сервис фрод-скоринга (fraudCheck). При этом бизнес-логика требует: транзакции с низким риском проводить через рядовой платежный шлюз, а рискованные платежи отправлять специализированной системе контроля высокорисковых операций. Поскольку проверка на фрод выполняется до инициации платежа (в списке веерной рассылки она расположена раньше), к моменту запуска биллинга в системе уже сохранен исчерпывающий ответ от скоринг-сервиса.
Чтобы связать их воедино, мы используем специальное поле previousDestination. Оно дает инструкции обработчику: «проверяй указанное условие не во входящем payload, а в сохраненном ответе от указанного смежного сервиса»:
billing { method = "POST" routes = [{ url = "http://localhost:9000/api/billing" condition { jsonPath = "$.riskScore" operator = "lt" value = "50" previousDestination = "fraudCheck" # ← смотрим на ответ fraudCheck, а не на исходный запрос } }, { url = "http://localhost:9000/api/high-value-processing" condition { jsonPath = "$.riskScore" operator = "gte" value = "50" previousDestination = "fraudCheck" } }] }
Каким образом публикатор получает доступ к ответам пройденных систем? Всё благодаря сущности RoutingContext. Это хранилище данных, которое аккумулирует возвращаемые JSON-ответы на протяжении всего жизненного цикла цепочки:
case class RoutingContext(destinationResponses: Map[String, JsValue] = Map.empty) { def withResponse(destination: String, response: JsValue): RoutingContext = copy(destinationResponses = destinationResponses + (destination -> response)) }
Вот как весь этот путь выглядит для события OrderCreated по шагам:
1. Обращаемся к складу (inventory) → 200 OK, ответ: {"reservationId": "RES-456"} → Результат записывается в aggregate_results → В контекст добавляется: {inventory: {"reservationId": "RES-456"}} 2. Проверяем операцию на защищенность (fraudCheck) → 200 OK, ответ: {"riskScore": 25, ...} → Результат записывается in aggregate_results → В контекст добавляется: {inventory: {...}, fraudCheck: {"riskScore": 25}} 3. Рассчитываем маршрут доставки (shipping) → Проверяем исходное тело запроса: $.shippingType == "domestic"? Да. → Выполняем вызов: POST /api/domestic-shipping → Сохраняем результат, обновляем контекст 4. Рассчитываем маршрут биллинга (billing) → Видим параметр previousDestination = "fraudCheck" и заглядываем в context["fraudCheck"] → Проверяем: $.riskScore < 50? В скоринге записано 25 → Да! → Выполняем вызов: POST /api/billing (стандартная оплата, без повышенного контроля) → Сохраняем результат в бэкенд и закрываем цепочку
Если бы скоринг-система вернула более тревожное значение вроде {"riskScore": 75}, на четвертом шаге сработал бы альтернативный маршрут до /api/high-value-processing. Самое приятное, что логика принятия решений диктуется исключительно гибкими конфигурациями и реальными данными – код приложения при этом остается железобетонным и защищенным от лишних изменений.
❯ Описание логики отмены через конфигурацию
В идеальном сценарии все четыре сервиса успешно завершают работу и событие помечается статусом PROCESSED. Но что если на каком-то этапе произойдет критическая ошибка? Прежде чем углубляться в архитектуру механизма откатов, ответим на фундаментальный вопрос: как нашей системе понять, каким образом отменить конкретный HTTP-вызов?
Вспомним, как шел прямой процесс. Веб-сервер отправил запрос службе доставки:
POST /api/domestic-shipping Body: {"customerId": "C-123", "shippingType": "domestic", ...}
И в ответ прилетел документ:
{"shipmentId": "SHIP-789", "orderId": "123", "carrier": "FedEx"}
И исходящий запрос, и ответ сервиса благополучно лежат в нашей таблице aggregate_results (как мы разбирали в разделе «Веерная рассылка (Fan-Out) и условная маршрутизация»). Чтобы отменить эту доставку, нам нужно прийти на эндпоинт:
POST /api/domestic-shipping/SHIP-789/cancel
Разумеется, на этапе проектирования или компиляции мы не могли знать значение SHIP-789 – идентификатор сгенерировала сторонняя служба доставки в реальном времени. Единственное место, откуда мы можем его выудить, – это поле сохраненного JSON-ответа.
❯ Обучаем систему искусству отката
Любой шаг веерной рассылки имеет право содержать в конфигурации специальный блок revert. Начнем с самого простого примера отмены – из каталога склада (inventory):
inventory { url = "http://localhost:9000/api/inventory/reserve" method = "POST" revert { url = "http://localhost:9000/api/inventory/{reservationId}/release" method = "DELETE" extract { reservationId = "response:$.reservationId" } } }
Разберем архитектуру этого блока:
extract(извлечение): дает указание системе выдернуть значениеreservationIdиз тела JSON-ответа прямого вызова с помощью выражения JSONPath$.reservationId. Приставкаresponse:явно указывает, что копать нужно в сохраненном ответе (в то время как приставкаrequest:заставила бы искать во входящем payload исходного запроса).url: использует заполнитель{reservationId}в качестве шаблона. Парсер автоматически подставит туда извлеченную строку.method: задает метод DELETE. Передавать тело запроса в данном случае не требуется.
Таким образом, если при успешной попытке нам вернулось {"reservationId": "RES-456", ...}, компенсирующий вызов преобразуется в:
DELETE /api/inventory/RES-456/release
Те системы доставки данных, у которых блок revert отсутствует (например, сканирование fraudCheck), процессор отмены просто проигнорирует. Это исключительно читающие операции, которые логически не требуют обратного действия.
❯ Сложный пример: отмена запланированной доставки
Процесс отмены доставки устроен немного сложнее: здесь получателю требуется полноценное тело POST-запроса, собранное из данных как исходного запроса, так и прилетевшего ранее ответа:
shipping { routes = [{ url = "http://localhost:9000/api/domestic-shipping" condition { jsonPath = "$.shippingType", operator = "eq", value = "domestic" } revert { url = "http://localhost:9000/api/domestic-shipping/{shipmentId}/cancel" method = "POST" extract { shipmentId = "response:$.shipmentId" # Из ответа API службы доставки orderId = "response:$.orderId" # Тоже из ответа customerId = "request:$.customerId" # Из исходного тела запроса, который мы слали } payload = """{"reason": "payment_failed", "shipmentId": "{shipmentId}", "orderId": "{orderId}", "customerId": "{customerId}"}""" } }] }
В данном случае блок extract берет на себя извлечение трех параметров: двух из ответа и одного из запроса. Затем все найденные фигурные скобки и в строке URL, и в строке payload заполняются полученными данными:
URL: /api/domestic-shipping/{shipmentId}/cancel → /api/domestic-shipping/SHIP-789/cancel Payload: {"reason": "payment_failed", "shipmentId": "{shipmentId}", ...} → {"reason": "payment_failed", "shipmentId": "SHIP-789", "orderId": "123", "customerId": "C-123"}
Фабричный объект RevertEndpointBuilder связывает воедино эти настройки с сохраненными в БД телами запросов/ответов и выдает полностью сконфигурированный объект для отправки в сеть:
object RevertEndpointBuilder { def buildRevertEndpoint( revertConfig: RevertConfig, requestPayload: Option[JsValue], responsePayload: Option[JsValue], baseDestination: String, headers: Map[String, String], timeout: FiniteDuration ): Try[(HttpEndpointConfig, Option[JsValue])] = for { placeholderValues <- extractPlaceholderValues(revertConfig, requestPayload, responsePayload) revertUrl <- buildRevertUrl(revertConfig.url, placeholderValues) } yield ( HttpEndpointConfig( destinationName = baseDestination, url = Some(revertUrl), method = revertConfig.method, headers = headers, timeout = timeout ), buildRevertPayload(revertConfig, placeholderValues) ) }
При любых конфигурационных изменениях на стороне сторонних сервисов (сменились пути, обновился формат параметров) вы просто правите файлы настроек. Сам RevertEndpointBuilder не привязан к предметной области: он ничего не знает ни о балансе склада, ни о перевозчиках. Это абстрактный парсер, который беспрекословно следует заданным правилам извлечения и подстановки.
❯ Автоматическая компенсация (обработчик DLQ)
Мы разобрались с тем, как формируются запросы отмены, теперь перейдем к механизму, который их дергает. Как отмечалось в процессе разбора фоновых процессов, при сетевых таймаутах OutboxProcessor предпринимает повторные попытки с нарастающей паузой (2, 4, 8 секунд). Но если все попытки достучаться провалились, наступает фаза компенсации: нам нужно планомерно стереть следы всех действий, которые к этому моменту успели успешно выполниться на сторонних серверах.
Допустим, оплата после 3 честных попыток выбросила белый флаг. В базе данных таблица aggregate_results будет выглядеть следующим образом:
destination | success | fanout_order | response_payload --------------+---------+--------------+----------------------------------------- inventory | true | 0 | {"reservationId": "RES-456", ...} fraudCheck | true | 1 | {"riskScore": 25, ...} shipping | true | 2 | {"shipmentId": "SHIP-789", ...} billing | false | 3 | null
Три шага завершились успехом, последний – критической ошибкой. На складе забронировано, курьеры грузят коробки, но транзакция по карте не прошла. Придется разматывать цепочку 0-2 в обратную сторону. Именно на этом этапе в игру вступает паттерн Saga Compensation.
Чтобы не усложнять и не устраивать попытки отмены прямо внутри рабочего потока (ведь откат тоже может сорваться из-за сетевого сбоя, заведя систему в еще более дремучие дебри), разработчик использует изящный шаг. OutboxProcessor немедленно убирает проблемное событие с радаров основной таблицы и отгружает его в специальную накопительную таблицу dead_letter_events. Тут будут регистрироваться все инциденты, требующие принудительного отката:
CREATE TABLE dead_letter_events ( id BIGSERIAL PRIMARY KEY, original_event_id BIGINT NOT NULL, -- ссылка на исходную запись в outbox_events aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, payloads JSONB NOT NULL, -- резервная копия исходных данных status VARCHAR(20) NOT NULL DEFAULT 'PENDING', reason VARCHAR(1024) NOT NULL, -- причина сбоя, напр. "MAX_RETRIES_EXCEEDED" -- ... параметры отслеживания попыток отката );
Обработкой этой таблицы занимается автономный воркер DLQProcessor. Подобно нашему основному процессору, это типизированный Pekko-актор, однако он запущен не сам по себе, а как дочерний элемент по отношению к OutboxProcessor. В терминологии Pekko/Akka это означает постоянный супервизор-контроль над его жизненным циклом: если в коде дочернего процессора возникнет фатальная ошибка, родительский актор мгновенно перезапустит его без потери состояния. Каждый рабочий экземпляр OutboxProcessor инициализирует собственного ребенка-обработчика DLQProcessor. Соответственно, если на благо базы трудятся 3 параллельных воркера outbox, система автоматически выделит им 3 дублирующих DLQ-процессора.
❯ Алгоритм работы DLQ-процессора
Шаги проведения компенсирующих действий выглядят так:
Выборка результатов (Query). Выгружаем из
aggregate_resultsсписок всех успешных прямых вызовов, отсортированный по убыванию номера шагаfanout_order DESC(это и есть LIFO / порядок от последнего к первому).Анализ конфигураций (Check). Для каждой записи смотрим, описан ли для нее компенсирующий блок
revertв файле настроек. Шаги без него (как условный фрод-скоринг) просто отбрасываем.Формирование и отправка (Send). Собираем сетевой запрос с помощью
RevertEndpointBuilderи шлем его по сети, предварительно убедившись, что откат для этого конкретного шага не выполнялся ранее.Завершение. Меняем статус события в таблице DLQ на
PROCESSED.
Если наложить этот сценарий на наш пример, шаги компенсации пройдут так:
shipping (fanout_order=2): Есть revert-блок → POST /api/domestic-shipping/SHIP-789/cancel fraudCheck (fanout_order=1): Нет revert-блока → Пропускаем (только чтение, отмена не нужна) inventory (fanout_order=0): Есть revert-блок → DELETE /api/inventory/RES-456/release
Главной точкой входа в процедуру отката является метод revertDLQEvent. Первым делом он запрашивает из СУБД всё успешное наследие прямого пути (по ID заказа). Если во время выстраивания цепочки первый же вызов API потерпел неудачу, то и откатывать нам нечего – база просто моментально отметит DLQ-событие как успешно закрытое. Если же успешные шаги были, трансляция передается методу publishRevertEvent, который поочередно раскрутит все шаги и отправит нужные HTTP-запросы в обратном порядке:
private def revertDLQEvent(dlqEvent: DeadLetterEvent): Future[Boolean] = { db.run( resultRepo.findByAggregateId(dlqEvent.aggregateId, Result.Success, includeReverts = false) ).flatMap { successful => if (successful.isEmpty) { db.run(dlqRepo.markProcessed(dlqEvent.id)).map(_ => true) } else { publishRevertEvent(dlqEvent, successful) // собираем и вызываем эндпоинты отмены в порядке LIFO } } }
❯ Защита от повторной компенсации (идемпотентность)
Перед отправкой компенсирующего запроса наш фоновый воркер обязательно убеждается, что данная операция не была отменена ранее. Все вызовы без исключений сохраняются в логах aggregate_results. Чтобы на уровне БД легко разделять их на прямые и компенсирующие, к имени целевого сервиса при записи отката прибавляется суффикс .revert: прямой вызов сохранится как "shipping", а компенсирующий – как "shipping.revert". Отсюда предельно прозрачная проверка на повторы: ищем в таблице результатов успешную запись с ключом "shipping.revert". Нашли? Значит, шаг отмены уже позади и его можно спокойно пропустить.
Описанный ниже метод разом проверяет все целевые точки. Он опрашивает базу по каждому получателю параллельно и возвращает список названий сервисов, которые уже благополучно пережили отмену:
private def checkAlreadyCompensated( aggregateId: String, successfulResults: Seq[AggregateResult] ): Future[Set[String]] = Future.sequence( successfulResults.map { result => db.run(resultRepo.findSuccessfulRevert(aggregateId, result.destination)) .map(alreadyCompensated => (result.destination, alreadyCompensated.isDefined)) } ).map(_.filter(_._2).map(_._1).toSet)
Подобный барьер идемпотентности жизненно необходим, так как DLQ-процесс в силу внешних причин (аварийная перезагрузка серверов, деплой новой версии сервиса) может прерваться посреди выполнения. Без такой глубокой проверки повторный запуск недовыполненных задач мог бы привести к крайне неприятным последствиям: например, дважды перевести деньги обратно покупателю или отправить избыточный запрос отмены в доставку, которая давно закрыта.
❯ Ручная отмена: повторное использование движка компенсаций
Пока мы говорили об автоматическом запуске процедур отката при аварийном падении API. Но как быть в классической ситуации, когда сам пользователь нажимает кнопку «Отменить заказ»? Прелесть архитектуры в том, что мы можем переиспользовать уже написанный движок отмены. Секрет кроется в простом соглашении об именовании событий: добавление восклицательного знака ! перед типом доменного события служит сигналом для OutboxProcessor о том, что перед ним не обычный прямой процесс, а директива на проведение компенсации.
Класс события OrderCancelledEvent как раз и реализует этот элегантный подход:
case class OrderCancelledEvent( orderId: Long, reason: String, timestamp: Instant = Instant.now() ) extends DomainEvent { override def aggregateId: String = orderId.toString override def eventType: String = "!OrderCreated" // ← Префикс ! дает сигнал на проведение компенсации // ... }
Наш репозиторий оформляет запрос на отмену и запись события о компенсации в общую СУБД-транзакцию – точно так же, как мы делали при первичном создании корзины. Метод cancelWithEvent сначала проверяет факт существования заказа в базе, после чего обновляет его статус на CANCELLED и атомарно дописывает событие !OrderCreated в очередь:
def cancelWithEvent(orderId: Long, reason: String): DBIO[Int] = for { orderOpt <- findById(orderId) _ <- orderOpt match { case Some(_) => DBIO.successful(()) case None => DBIO.failed(new NoSuchElementException(s"Order $orderId not found")) } updated <- withEvent(OrderCancelledEvent(orderId = orderId, reason = reason)) { orders.filter(_.id === orderId) .map(o => (o.orderStatus, o.updatedAt)) .update(("CANCELLED", Instant.now())) } } yield updated
Обнаружив при парсинге свежего события символ !, OutboxProcessor автоматически перестраивает рабочий режим с прямого выполнения веерной отправки на процедуру отката. Вот как выглядит этот пошаговый алгоритм:
Очистка имени. Срезаем системный префикс со служебной строки:
!OrderCreatedпревращается обратно в исходныйOrderCreated.Поиск конфигурации. Считываем из файлов настроек схему связей для нужного события:
OrderCreated→["inventory", "fraudCheck", "shipping", "billing"].Инверсия шагов (LIFO). Переворачиваем полученный список задом наперед:
["billing", "shipping", "fraudCheck", "inventory"].Получение логов. Для каждого шага ищем в таблице результатов
aggregate_resultsлог успешного прямого выполнения.Построение запроса и выполнение. С помощью
RevertEndpointBuilderвоссоздаем эндпоинты отмены и отправляем запросы в сеть (пропуская сервисы без revert-конфигураций).Фиксация результата. Помечаем исходное событие в базе как полностью обработанное.
Обратите внимание: OutboxProcessor использует абсолютно те же программные классы RevertEndpointBuilder и методы проверки дубликатов, что и фоновый DLQProcessor. Вся система обладает врожденным свойством идемпотентности: перед инициацией отработки отката она всегда запрашивает базу на предмет наличия признака .revert у каждого адресата. Если нетерпеливый пользователь кликнет по кнопке «Отменить заказ» несколько раз подряд, повторная задача моментально обнаружит, что все шаги цепи отката уже проведены, и завершится как мгновенная пустая операция.
❯ События отмены защищены от попадания в DLQ
В случае если само компенсирующее событие (!OrderCreated) терпит крах после исчерпания всех лимитов повтора, система ни в коем случае не пытается запустить для него процедуру отката. Идея «компенсировать отмену компенсации» увела бы систему в бесконечный цикл вызовов. Вместо этого строка помечается критическим флагом отказа и отправляется ожидать ручного разбора нашей дежурной сменой инженеров:
if (isRevertEvent) { log.error( s"Revert event ${event.id} (${event.eventType}) exceeded retries - " + s"marking as FAILED, requires manual intervention" ) db.run(outboxRepo.markProcessed(event.id)).map(_ => false) }
Компонент | Ручная отмена ( | Автоматическая (через DLQ) |
|---|---|---|
Триггер инициации | Действие пользователя (клик по кнопке отмены) | Сбой прямого запроса API после лимита попыток |
Таблица хранения |
|
|
Скорость выполнения | Сразу при обработке входящей очереди | После исчерпания попыток основного прохода |
Крах отката | Требует внимания службы поддержки | Аналогично требует ручного разбора в БД |
❯ Инициализация: EventProcessingService
Мы детально проанализировали все независимые шестеренки движка: от записи событий в БД до веерных рассылок, динамической маршрутизации отмены и механизмов DLQ. Пришло время собрать этот пазл воедино. В архитектуре Play Framework синглтоны с нетерпеливой инициализацией (eager singletons) автоматически запускаются прямо на старте системы. Именно таким звеном выступает EventProcessingService. При поднятии приложения он разворачивает пул акторов, отправляет стартовое сообщение активации работы фонового воркера и аккуратно регистрирует слушателя завершения сессии для корректного выхода из процессов:
@Singleton class EventProcessingService @Inject() ( lifecycle: ApplicationLifecycle, publisher: EventPublisher, outboxRepo: OutboxRepository, dlqRepo: DeadLetterRepository, resultRepo: DestinationResultRepository, eventRouter: EventRouter, config: Configuration )(using system: ActorSystem[Nothing], ec: ExecutionContext, db: Database) extends Logging { val outboxActor: ActorRef[OutboxProcessor.Command] = if (poolSize > 1) { system.systemActorOf( OutboxProcessorRouter(publisher, outboxRepo, dlqRepo, resultRepo, eventRouter, pollInterval, batchSize, poolSize, maxRetries, useListenNotify, staleCleanupEnabled, staleTimeoutMinutes, cleanupInterval), "outbox-processor-pool" ) } else { system.systemActorOf( OutboxProcessor(publisher, outboxRepo, dlqRepo, resultRepo, eventRouter, pollInterval, batchSize, maxRetries, useListenNotify, staleCleanupEnabled, staleTimeoutMinutes, cleanupInterval), "outbox-processor" ) } // Запуск процесса outboxActor ! OutboxProcessor.ProcessUnhandledEvent // Корректное завершение работы lifecycle.addStopHook { () => outboxActor.ask(replyTo => OutboxProcessor.Stop(replyTo)) .map(_ => logger.info("Процессор Outbox успешно остановлен")) } }
Конфигурационный файл содержит следующие параметры:
outbox { pollInterval = 2 seconds batchSize = 100 poolSize = 3 # 3 параллельных воркера maxRetries = 3 useListenNotify = true # Мгновенная реакция через LISTEN/NOTIFY enableStaleEventCleanup = true staleEventTimeoutMinutes = 5 # Сброс зависших событий через 5 минут cleanupInterval = 1 minute dlq { maxRetries = 3 pollInterval = 2 seconds } }
Настроив параметр poolSize = 3, мы параллельно запускаем трех акторов OutboxProcessor под управлением балансировщика, причем за каждым из них закреплен персональный дочерний подпроцесс DLQProcessor. А неизменная конструкция FOR UPDATE SKIP LOCKED на уровне СУБД железобетонно страхует систему от ситуации, когда два разных потока параллельно захватят одну и ту же строчку.
❯ Наглядный пример: как это работает
Давайте проследим за кулисами реального сбоя: от создания корзины до экстренной отмены операций из-за отказа платежного шлюза.
Шаг 1: Пользователь создает заказ
POST /api/orders Body: {"customerId": "C-123", "totalAmount": 99.99, "shippingType": "domestic"}
Шаг 2: Атомарная транзакция в базу
BEGIN; INSERT INTO orders (...) RETURNING id; -- Возвращает 123 INSERT INTO outbox_events (aggregate_id='123', event_type='OrderCreated', payloads='{"inventory": {...}, "fraudCheck": {...}, "shipping": {...}, "billing": {...}}', status='PENDING'); COMMIT; -- Триггер PostgreSQL посылает сигнал pg_notify('outbox_events_channel', '123')
Шаг 3: Воркер забирает событие и начинает веерную отправку
SELECT ... FROM outbox_events WHERE status='PENDING' ... FOR UPDATE SKIP LOCKED; UPDATE outbox_events SET status='PROCESSING' WHERE id=456; POST /api/inventory/reserve → 200 OK (reservationId: RES-456) → UPSERT aggregate_results POST /api/fraud/check → 200 OK (riskScore: 25) → UPSERT aggregate_results POST /api/domestic-shipping → 200 OK (shipmentId: SHIP-789) → UPSERT aggregate_results POST /api/billing → 503 Service Unavailable → UPSERT aggregate_results
Шаг 4: Повторные попытки при таймаутах биллинга
Попытка 1: ожидание 2с → 503 Попытка 2: ожидание 4с → 503 Попытка 3: ожидание 8с → 503
Шаг 5: Перемещение в очередь недоставленных сообщений (DLQ)
INSERT INTO dead_letter_events (original_event_id=456, aggregate_id='123', event_type='OrderCreated', status='PENDING', reason='MAX_RETRIES_EXCEEDED'); UPDATE outbox_events SET status='PROCESSED', moved_to_dlq=true WHERE id=456;
Шаг 6: DLQProcessor последовательно выполняет откат в порядке LIFO
Query: SELECT * FROM aggregate_results WHERE aggregate_id='123' AND success=true ORDER BY fanout_order DESC → [shipping(2), fraudCheck(1), inventory(0)] POST /api/domestic-shipping/SHIP-789/cancel → 200 OK → UPSERT (shipping.revert) Пропускаем fraudCheck (нет revert-конфигурации) DELETE /api/inventory/RES-456/release → 200 OK → UPSERT (inventory.revert) UPDATE dead_letter_events SET status='PROCESSED' WHERE id=...;
Шаг 7: целостность данных спасена!
Заказ корректно записан в нашей базе данных и имеет статус сбоя оплаты, но все сопутствующие внешние резервы на серверах партнеров были чисто компенсированы в автоматическом режиме. При этом таблица aggregate_results теперь содержит исчерпывающую историю каждого сетевого чиха – со всеми входящими и исходящими телами прямых и компенсирующих вызовов.
❯ О чем важно помнить
Безусловно, связка паттернов Transactional Outbox, Result Table и Saga Compensation – это не какая-то серебряная пуля. Вам по-прежнему будут нужны качественный мониторинг для отслеживания зависших процессов, триггеры алертов на переполнение очередей DLQ и резервные регламенты работы для тех редких инфраструктурных инцидентов, когда автоматическая компенсация тоже падает, требуя человеческого вмешательства.
Но эти паттерны в корне меняют природу и критичность сбоев ваших систем. Без них вы обречены бороться с коварной бессистемной рассинхронизацией данных: деньгами, списанными за несуществующие заказы, заблокированными на веки вечные складами и фурами, выехавшими по отмененным накладным. Это те самые страшные баги, выковыривать которые приходится путем болезненных ручных правок СУБД глубокой ночью под крики руководства.
Заменив хаос на эти паттерны, вы переведете потенциальные аварии (зависшую в попытках строку DLQ, таймаут эндпоинта отмены или устаревшую запись, ожидающую повторной обработки) в категорию наблюдаемых, легко прослеживаемых и полностью контролируемых событий. Благодаря таблице aggregate_results у вас перед глазами будет доскональная история каждого вызова, а таблица событий dead_letter_events всегда любезно укажет пальцем, какой шаг дал сбои и почему это произошло.
❯ Подведем итоги
Запомните главное правило: никогда не дергайте внешние API внутри активных транзакций базы данных. Просто зафиксируйте факт намерения в локальной таблице, поручите отправку фоновому асинхронному воркеру, а при возникновении форс-мажоров доверьтесь отполированному механизму компенсирующих транзакций.
В рамках этой статьи мы во всеоружии подошли к решению фундаментальной проблемы двойной записи, подстерегающей любые архитектуры при попытке атомарно связать воедино базу данных с внешним сетевым миром. Эти три паттерна превращают непредсказуемый хаос рассинхронизации данных в абсолютно прозрачные, предсказуемые и легко исправимые инциденты.
Наш демо-проект предлагает готовую к бою рабочую архитектуру, которую вы прямо сейчас можете скопировать и адаптировать под свой домашний технологический стек.
Исходный код проекта. Все разобранные в этой статье примеры кода вы можете в деталях изучить в репозитории проекта: never-call-apis-inside-database-transactions.
Может быть интересно:

Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале ↩
