Как стать автором
Обновить

Реализация Transactional outbox pattern и немного DDD

Уровень сложностиСредний
Время на прочтение8 мин
Количество просмотров16K

Уверен, что многие, кто интересовался подходами к разработке микросервисной архитектуры, знакомы с трудами Криса Ричардсона на эту тему и уже встречали transactional outbox pattern. А для тех, кто не знаком, кратко расскажу основную идею:

В микросервисной архитектуре не редка ситуация коммуникации микросервисов посредством публикации событий. Происходит изменение состояния какой-либо сущности в одном из микросервисов и для информирования других сервисов публикуется событие об изменение данной сущности. Проблема кроется в том, что эти два действия - сохранение состояния и публикация события - обычно не происходят атомарно. А это означает, что на любом из указанных этапов что-то может пойти не так, и одно из действий не будет выполнено, что оставит систему в несогласованном состоянии. В качестве решения предлагается в начале в рамках одной транзакции атомарно сохранять и изменения и само событие, а вторым шагом выполнять публикацию ранее сохраненного события.

Перед тем, как изобретать свой велосипед, я, разумеется, ознакомился с чужими. И все они мне показались решениями "в лоб". Везде над методом навешивается аннотация Transactional и в самом методе сначала сохраняется сущность, а следом происходит сохранение события, связанного с ней.

@Transactional
public SaleId createWidget(Sale sale) {
  var saleId = saleRepository.save(sale);
  messageQueue.saveMessage(StockReductionEvent.of(sale.item(), sale.amount()));
  return saleId;
}

Вариант вполне себе рабочий, но можно и лучше.

Используемый стек

Представленные примеры реализованы на языке Kotlin с использованием Spring, базы данных MongoDB. Но все тоже самое может быть легко реализовано и с использованием реляционной модели.

Сам Крис Ричардсон, рассказывая о Transactional outbox pattern, оперирует понятиями из DDD, такими как агрегат и доменное событие. Я же решил пойти еще немного дальше и реализовать данную интеграцию с использованием DDD в более явной форме.

Пару слов о DDD. А точнее о тактических шаблонах DDD.

Объект-значение (Value Object) - объект, для которого не важна своя идентичность, и один объект может быть легко заменен на другой такой же объект с такими же свойствами. В качестве примера обычно приводят деньги. Money - характеризуется валютой и своим достоинством. Купюра 1000 рублей абсолютно эквивалентна для кассира любой другой тысяча рублевой купюре. В Kotlin объект-значение может быть представлен классом данных.

Сущность (Entity) - в качестве противопоставления объекту-значению идет сущность - то что мы хотим идентифицировать и отличать от других подобных объектов. Опять же пример с деньгами: если мы уже не кассиры, для которых важен только наминал купюры, а допустим криминалисты, имеющие дело с фальшивыми купюрами. Тогда нас может заинтересовать не только номинал, но еще и серийный номер конкретной купюры. Для сравнения объектов сущности используется не набор ее свойств, а выполняется сравнение только индентификаторов, а свойства могут изменяться в процессе жизненного цикла объекта.

Entity
abstract class Entity<ID : Serializable>(val id: ID) {

    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other == null || this::class != other::class) return false

        val casted = other as Entity<*>

        if (id != casted.id) return false

        return true
    }

    override fun hashCode() = id.hashCode()
}

Сущность характеризуется только своей идентичностью (id)

Доменное событие (Domain Event) - важное для предметной области и для бизнеса событие об изменение состояния сущности.

DomainEvent
abstract class DomainEvent<A : RootAggregate<A, ID>, ID : Serializable, BODY : Any>(
    val id: Id,
    val type: String,
    val aggregate: String,
    val aggregateId: ID,
    val occurredOn: ZonedDateTime,
    val body: BODY?
) {
    constructor(aggregate: A, type: String, body: BODY? = null) : this(
        Id(),
        "${aggregate.aggregateName}.$type",
        aggregate.aggregateName,
        aggregate.id,
        ZonedDateTime.now(),
        body
    )

    data class Id(val id: UUID? = UUID.randomUUID())


    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other == null || this::class != other::class)
            return false

        val casted = this::class.cast(other)

        if (id != casted.id) return false

        return true
    }

    override fun hashCode() = id.hashCode()

    override fun toString(): String =
        "${this::class.simpleName}(id=$id, type='$type', aggregate='$aggregate', aggregateId=$aggregateId, occurredOn=$occurredOn, body=$body)"

}

Агрегат (Aggregate) - рассматривается как совокупность сущностей и объектов значений. В качестве примера можно взять пост и комментарии к нему: и то и то могут быть сущностями, однако комментарий не может жить отдельно от поста. Поэтому можно сказать, что пост является корнем агрегата. В первом же приближении можно считать, что сущность и агрегат - это одно и тоже. Особенно данное утверждение может быть верно, если мы более склонны к проектированию мелкокластреных агрегатов только с одной сущностью. Технически же предполагается, что репозиторий существует только для агрегата, но никак не для сущности, т.е. любые операции по персистентности могут производиться только через агрегат.

Как было сказано выше, доменное событие непосредственно связано с изменениями в какой-либо сущности, например, "был добавлен новый комментарий". Так как доступ ко всем сущностям агрегата осуществляется через сам агрегат, то вполне логично, что владельцем доменного события также является агрегат.

RootAggregate
abstract class RootAggregate<A : RootAggregate<A, ID>, ID : Serializable>(id: ID) : Entity<ID>(id) {
    companion object {
        private val camelRegex = "(?<=[a-zA-Z])[A-Z]".toRegex()
    }

    private fun String.camelToSnakeCase(): String {
        return camelRegex.replace(this) { "_${it.value}" }.lowercase(Locale.getDefault())
    }

    @Transient
    val aggregateName = this::class.simpleName!!.camelToSnakeCase()

    @Transient
    private val _events = mutableListOf<DomainEvent<A, ID, *>>()
    val events: List<DomainEvent<A, ID, *>>
        get() = _events.toList()

    protected fun addEvent(event: DomainEvent<A, ID, *>) {
        _events.add(event)
    }

    fun clearEvents() {
        _events.clear()
    }
}

Таким образом одновременно при сохранении самого агрегата из него могут извлекаться и сохраняться так же и доменные события.

DomainRepository
interface DomainRepository<A : RootAggregate<A, ID>, ID : Serializable> {
    fun save(aggregate: A): A

    fun saveAll(aggregates: Iterable<A>): List<A>

    fun findById(id: ID): A?
  
    fun deleteById(id: ID)
    
    fun delete(aggregate: A)
}

Наличие специально выделенного DomainRepository позволяет отслеживать сохранение агрегатов и применять к этому процессу дополнительную логику по организации сохранения объектов в рамках одной транзакции. Тут и приходит на помощь BeanPostProcessor, который заменит исходный объект на прокси с новой сквозной функциональностью.

TransactionalOutboxBeanPostProcessor
import org.aopalliance.intercept.MethodInterceptor
import org.springframework.aop.framework.ProxyFactory
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.BeanFactoryAware
import org.springframework.beans.factory.config.BeanPostProcessor
import org.springframework.core.Ordered
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.transaction.support.TransactionTemplate
import pro.korobovn.domaintypes.DomainRepository
import pro.korobovn.domaintypes.RootAggregate


class TransactionalOutboxBeanPostProcessor : BeanPostProcessor, Ordered, BeanFactoryAware {

    private lateinit var transactionTemplate: TransactionTemplate

    private lateinit var mongoTemplate: MongoTemplate

    private lateinit var domainEventRepository: DomainEventRepository

    override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any = bean

    override fun postProcessAfterInitialization(bean: Any, beanName: String): Any =
        if (bean is DomainRepository<*, *>) createTransactionalProxy(bean) else bean

    private fun createTransactionalProxy(bean: Any) =
        ProxyFactory(bean)
            .apply {
                addAdvice(saveInterceptor)
                addAdvice(saveAllInterceptor)
            }.proxy

    private val saveInterceptor = MethodInterceptor { invocation ->
        if (
            invocation.method.name == "save" &&
            RootAggregate::class.java.isAssignableFrom(invocation.method.returnType) &&
            invocation.arguments.firstOrNull() is RootAggregate<*, *> &&
            (invocation.arguments.firstOrNull() as RootAggregate<*, *>).events.isNotEmpty()
        ) {
            val aggregate = invocation.arguments.firstOrNull() as RootAggregate<*, *>
            transactionTemplate.execute {
                domainEventRepository.insertAll(aggregate.events)
                invocation.proceed()
            }
        } else invocation.proceed()
    }

    private val saveAllInterceptor = MethodInterceptor { invocation ->
        if (
            invocation.method.name == "saveAll" &&
            List::class.java.isAssignableFrom(invocation.method.returnType) &&
            invocation.arguments.firstOrNull() is Iterable<*>
        ) {
            @Suppress("UNCHECKED_CAST")
            val aggregates = invocation.arguments.firstOrNull() as Iterable<RootAggregate<*, *>>
            val events = aggregates.flatMap(RootAggregate<*, *>::events).toList()
            if (events.isNotEmpty())
                transactionTemplate.execute {
                    domainEventRepository.insertAll(events)
                    invocation.proceed()
                }
            else invocation.proceed()
        } else invocation.proceed()
    }

    override fun getOrder(): Int = Ordered.LOWEST_PRECEDENCE

    override fun setBeanFactory(beanFactory: BeanFactory) {
        this.transactionTemplate = beanFactory.getBean(TransactionTemplate::class.java)
        this.mongoTemplate = beanFactory.getBean(DomainEventRepository::class.java)
    }

}

TransactionalOutboxBeanPostProcessor отслеживает бины, которые реализуют DomainRepository, создает для таких бинов прокси, которые в свою очередь добавляют запуск транзакции при сохранении самого агрегата.

Публикация

За публикацию событий из базы данных отвечает отдельный процесс, который отслеживает изменения и выполняет публикацию. Данная задача может быть реализована с применением различных технологий, например, для монго можно использовать Mongo Change Streams - это аналог WAL (Write-Ahead Logging) у реляционных баз данных. Или же, в более простом варианте, изменения из базы могут запрашиваться через определенный интервал времени. В качестве брокера у нас используется RabbitMQ, но опять же ничего не мешает использовать любую другую технологию. При публикации события ожидается подтверждение успешной доставки сообщения до RabbitMQ и только после этого удаляется из очереди отправленное событие. Только потом уже выполняется публикация следующих сообщений. Все это обеспечивает строгий порядок публикации событий.

Итоги

В итоге наша программная модель превращается в нечто подобное:

Агрегат
@Document
class Cart(id: Id = Id()) : RootAggregate<Cart, Cart.Id>(id) {

    data class Id(val id: UUID = UUID.randomUUID()) : Serializable

    data class Item(val name: String, val quantity: Quantity = Quantity.ONE)

    private var _items: MutableList<Item> = mutableListOf()

    val items: List<Item>
        get() = _items.toList()

    fun add(item: Item) {
        _items.add(item)
        addEvent(CartWasChangedEvent.itemWasAdded(this, item))
    }


    override fun toString(): String {
        return "Cart(id=$id, items=$items)"
    }

}

Тут мы видим, что при изменении корзины сам агрегат фактически размещает addEvent событие об изменение своего состояния CartWasChangedEvent. Формально выполнили простой перенос размещения события со слоя сервисов на слой доменной логики. Эти изменения делают доменную модель менее анемичной, агрегат самостоятельно отвечает за соблюдение своих инвариантов о необходимости размещение события при изменении, не полагаясь на уровень сервисов.

CartWasChangedEvent
class CartWasChangedEvent : DomainEvent<Cart, Cart.Id, CartWasChangedEvent.Body> {
    enum class ChangeType {
        ADDED, REMOVED
    }

    data class Body(val type: ChangeType, val item: Cart.Item)

    @PersistenceCreator
    private constructor(
        id: Id,
        type: String,
        aggregate: String,
        aggregateId: Cart.Id,
        occurredOn: ZonedDateTime,
        body: Body
    ) : super(id, type, aggregate, aggregateId, occurredOn, body)

    constructor(aggregate: Cart, body: Body) : super(aggregate, "changed.added", body)

    companion object {
        fun itemWasAdded(cart: Cart, item: Cart.Item) = CartWasChangedEvent(cart, Body(ChangeType.ADDED, item))
    }
}

У события имеется два констуктора. Первый конструктор для восстановления объекта из базы данных - приватный, вызывается только фреймворком. Второй конструктор - пользовательский.

На уровне сервиса все происходит крайне незаметно для разработчика: выполняется только сохранение агрегата, все остальное происходит без дополнительного шаблонного кода. Логическая сложность того, о чем должен помнить разработчик, снижается.

Дислайк

На мой взгляд, основным недостатком предложенного подхода, как и его основным преимуществом, является тесная интеграция с концепцией и программной моделью DDD. Но, к сожалению, DDD не является серебряной пулей и подходит не к любому кейсу. Например, я сталкивался с ситуацией, когда происходило изменение состояния (маппинг) и требовалось опубликовать сообщение. По сути это было не доменное событие, а команда для другого сервиса на запуск дальнейшей обработки. При желании можно было бы "натянуть" концепцию DDD и на описанный кейс: ввести новый агрегат, которого нет фактически в домене, создать новое событие связанное с этим агрегатом, и затем в другом сервисе отслеживать это событие и должным образом реагировать на него. Но все это выглядело крайне притянутым за уши. По этой причине при разработке данного кейса было решено отказаться от DDD и пойти более традиционным путем, который оказался даже проще и выглядел более эстетично.

Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+1
Комментарии15

Публикации

Истории

Работа

Java разработчик
339 вакансий

Ближайшие события