В микросервисной архитектуре мы постоянно сталкиваемся с задачей: сохранить изменения в базе и гарантированно отправить событие в Kafka. На первый взгляд звучит просто — сделал транзакцию, отправил сообщение, закоммитил. Но в реальности между базой данных и брокером сообщений никакой общей транзакции нет.
Именно здесь мы сталкиваемся с самыми частыми ошибками:
В БД данные сохранились, а сообщение в Kafka не улетело
или наоборот — Сообщение ушло, а транзакция в БД откатилась.
Такие баги сложно воспроизвести, сложно отлаживать и ещё сложнее объяснять, почему данные между сервисами разъезжаются.
Паттерн Transaction Outbox решает эту проблему уже много лет. Он изолирует бизнес-логику от технической — мы сохраняем событие в таблицу внутри транзакции, а отправкой сообщений занимается отдельный компонент. Но каждая команда и каждый сервис реализуют Outbox по-своему — где-то шедулер, где-то cron, где-то retry-таблица, где-то кастомный SELECT FOR UPDATE. В итоге код размазывается, дублируется и становится неподдерживаемым.
В этой статье я покажу, как я вынес всю логику Transaction Outbox в отдельный Spring Boot Starter, который можно подключить одной зависимостью. Он создаёт таблицу Outbox, конфигурирует шедулер, отвечает за отправку в Kafka и очистку, позволяя микросервисам сосредоточиться только на бизнес-логике.
Что такое transaction outbox паттерн
Transaction Outbox — это архитектурный паттерн, который гарантирует надёжную доставку событий во внешние системы (Kafka, RabbitMQ, Webhooks) за счёт сохранения сообщений в отдельную outbox-таблицу внутри той же транзакции, что и бизнес-изменения в БД.
Отправка сообщений выполняется асинхронно — отдельным процессом или шедулером, который читает таблицу outbox и публикует события.
Паттерн решает проблему несогласованности данных между БД и брокером сообщений, когда:
транзакция в БД может закоммититься, а отправка в Kafka — упасть
сообщение может улететь в Kafka, но транзакция БД — откатится
Напишем spring-boot-starter для реализации паттерна
Я буду использовать стек Kotlin + Spring + JOOQ. Этот стартер также подойдет под Java + Spring + JPA. Мой выбор основан на стеке технологий на проекте)
Мы будем сохранять данные о сообщении в бд, отправлять их шедулером в кафку в нужные топики.
Начнем с создания файла resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
куда мы пропишем путь до класса конфигурации:
com.example.outbox.configuration.OutBoxAutoConfiguration
Так как нам нужно сохранять данные в таблицу, которую нужно создать специально для стартера, есть несколько вариантов:
Миграция находится в стартере
Плюсы:Единая точка нахождения кода для структуры outbox-таблицы.
Не нужно копировать миграции в каждый сервис — подключил стартер, и всё само прилетело.
Гарантия совместимости — версия стартера = версия схемы.
Минимум человеческого фактора
Минусы:
Нужно аккуратно работать с flyway-путями, чтобы не пересекаться с миграциями приложения.(Мы используем R миграцию)
Миграция находится на стороне сервисов
Плюсы:
Полный контроль у сервиса — может расширять схему, накатывать кастомные поля, индексы.
Можно тонко настраивать миграции под свои юзкейсы.
Минусы:
Легко получить рассинхрон стартер ↔ схема БД.
Каждый сервис обязан вручную добавлять миграцию — копипаста, ошибки.
Сложнее обновлять стартер (надо помнить о версиях миграций).
Мы же будем выбирать первый вариант, будем использовать flyway, но миграцию сделаем с префиксом R, которая не будет мешать другим миграциям с префиксом V. Так как checksum миграции изменяться не будет, то она будет наката 1 раз, также будем использовать CREATE TABLE IF NOT EXISTS.
Напишем скрипт миграции resources/db/migration/R__create-outbox-table.sql:
CREATE TABLE IF NOT EXISTS outbox_messages ( id UUID NOT NULL PRIMARY KEY, status TEXT NOT NULL DEFAULT 'WAITING', payload JSONB NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT LOCALTIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT LOCALTIMESTAMP );
Таблица очень простая, она может изменяться в зависимости от ваших бизнес требований. Например добавить поля object_id или object_type для идентификации объекта, если их может быть несколько и тд. Также тут стоит по дефолту status = 'WAITITNG', тут тоже все зависит от вашей статусной модели.
Затем нам нужно сохранить объект в бд, тут опять же все зависит от вашего стека, либо это обычный JpaRepository и метод save, либо кастомное сохранение объекта в зависимости от вашей логики.
В моем случае метод выглядит так:
fun <T> createMessage(messageObject: T) { val payload = objectMapper.writeValueAsString(messageObject) newRecord().apply { this.id = UUID.randomUUID() this.payload = JSONB.valueOf(payload) this.insert() } }
Также нам нужен метод для поиска объектов и отправки, так как мы получаем объекты, чтобы их изменить (после отправки меняется статус), я достаю их через select for update
fun findWaiting(limit: Int = 500) = dslContext.selectFrom(OUTBOX_MESSAGES) .where(OUTBOX_MESSAGES.STATUS.eq(OutboxMessageStatus.WAITING)) .orderBy(OUTBOX_MESSAGES.CREATED_AT.asc()) .limit(limit) .forUpdate() .noWait() .fetch()
Также я добавил бы метод очистки таблицы, тут все зависит от того, сколько вы хотите хранить сообщения и собираетесь ли чистить таблицу:
fun deleteOldMessages(deleteDays: Int = 30) { val currentDate = LocalDateTime.now() val thirtyDaysAgo = currentDate.minus(deleteDays, ChronoUnit.DAYS) dslContext.deleteFrom(OUTBOX_MESSAGES) .where(OUTBOX_MESSAGES.UPDATED_AT.lessOrEqual(thirtyDaysAgo)) .execute() }
Давайте теперь напишем OutboxProperties для того, чтобы задать параметры конфигурации:
data class OutboxProperties( var sendTopic: String = "", // Здесь может быть массив, если вы собираетесь отправлять в несколько топиков var limitMessage: Int = 500, var deleteOldMessageCron: String = "0 0 12 * * *", var deleteMessageDays: Int = 30, var sendMessageDelay: String = "60000" )
Теперь давайте напишем сервис, который будет отправлять сообщения в kafka:
open class OutboxMessageService( private val outboxProperties: OutboxProperties, private val outboxMessageRepository: OutboxMessageRepository, private val rtmKafkaTemplate: KafkaTemplate<String, JsonNode>, private val objectMapper: ObjectMapper ) { private val logger = LoggerFactory.getLogger(OutboxMessageService::class.java) @Transactional open fun deleteOldMessages() { outboxMessageRepository.deleteOldMessages(outboxProperties.deleteMessageDays) } @Transactional open fun findAndSendMessages(): List<OutboxMessagesRecord> { val waitingMessages = outboxMessageRepository.findWaiting(outboxProperties.limitMessage) if (waitingMessages.isEmpty()) { logger.info("Сообщений для отправки в топик ${outboxProperties.sendTopic} не найдено") return emptyList() } waitingMessages.forEach { logger.info("Отправка сообщения с id ${it.id} в топик $outboxProperties") sendMessage(objectMapper.readTree(it.payload?.data()?.trim())) } val updateIds = waitingMessages.mapNotNull { it.id } outboxMessageRepository.updateSentStatusByIds(updateIds) return waitingMessages } private fun sendMessage(message: JsonNode) { rtmKafkaTemplate.send( outboxProperties.sendTopic, message ) } }
Следующим шагом будет написание OutboxMessagesScheduler, который будет раз в N время работать с сообщениями:
class OutboxMessagesScheduler( private val outboxMessageService: OutboxMessageService, private val properties: OutboxProperties ) { private val logger = LoggerFactory.getLogger(OutboxMessagesScheduler::class.java) @Scheduled(fixedDelayString = "\${outbox.send-message-delay:60000}") fun sendMessage() { logger.info("Начало отправки сообщений в топик ${properties.sendTopic}") var waitingMessages = outboxMessageService.findAndSendMessages() while (waitingMessages.isNotEmpty()) { waitingMessages = outboxMessageService.findAndSendMessages() } logger.info("Отправка сообщений в топик ${properties.sendTopic} завершена") } @Scheduled(cron = "\${outbox.delete-old-message-cron:0 0 12 * * *}") fun deleteOldMessages() { logger.info("Начало очистки таблицы сообщений") outboxMessageService.deleteOldMessages() logger.info("Очистка сообщений завершена") } }
И наконец напишем OutBoxAutoConfiguration для конфигурации наших бинов:
@Configuration @EnableScheduling @EnableConfigurationProperties(OutBoxAutoConfiguration::class) @ConfigurationPropertiesScan("com.example.outbox.configuration") class OutBoxAutoConfiguration { @Bean fun outBoxRepository( @Value("\${spring.flyway.default-schema}") schema: String, dslContext: DSLContext, objectMapper: ObjectMapper ): OutboxMessageRepository { dslContext.settings().withRenderMapping( RenderMapping() .withSchemata( MappedSchema().withInput("public").withOutput(schema) ) ) return OutboxMessageRepository(dslContext, objectMapper) } @Bean @ConfigurationProperties(prefix = "outbox") fun customOutBoxProperties() = OutboxProperties() @Bean fun outboxService( outboxProperties: OutboxProperties, outboxRepository: OutboxMessageRepository, rtmKafkaTemplate: KafkaTemplate<String, JsonNode>, objectMapper: ObjectMapper, ) = OutboxMessageService(outboxProperties, outboxRepository, rtmKafkaTemplate, objectMapper) @Bean fun scheduler( outboxMessageService: OutboxMessageService, properties: OutboxProperties ) = OutboxMessagesScheduler(outboxMessageService, properties) }
Значения конфигов в application.yml файле:
outbox: send-topic: тут топик limit-message: 120 delete-old-message-cron: 1 * * * * * delete-message-days: 30 outbox.send-message-delay: 1000
Теперь остается только подключить стартер в ваш проект, сохранить сообщение через repository и все, г��тово)
Расширения, которые стоит добавить в реальном проекте
В этой статье мы сосредоточились на минимально жизнеспособной реализации Transaction Outbox, чтобы показать саму идею и архитектурный подход.
Однако в реальных системах почти всегда добавляют:
Retry-механику с backoff и лимитом попыток
Идемпотентность на стороне consumer’ов через таблицу processed_messages (сохранять идентификаторы в сервисе чтения сообщения и не читать повторно сообщения с тем же id)
Метрики (успешно/ошибка/время обработки)
Dead Letter Queue при превышении лимита retry
Эти элементы легко надстраиваются поверх базового стартера и не усложняют его архитектуру.
Итог
Паттерн Transaction Outbox — один из самых надежных способов обеспечить согласованность данных между вашей БД и брокером сообщений. Он устраняет проблему двойной записи, разрывов транзакций и потерянных событий. В этой статье мы разобрали, как реализовать этот паттерн в виде удобного Spring Boot Starter’а, который инкапсулирует логику и позволяет сервисам сосредоточиться на бизнес - функциональности.
Всем спасибо за внимание и хорошего дня!
