Зачем нужен event sourcing?

Если идет работа с реальными людьми, то нужно сохранять все действия для последующего «разбора полетов». Для многих это основная бизнес‑цель event sourcing.

Если посмотреть в будущее, то, другими словами, примерно все бизнес‑приложения рано или позже полностью или частично перейдут на эту архитектуру. Почему тогда существующие решения массово не используют? Это все‑таки требует дополнительного проектирования и кода, но при постоянном увеличении требований к ПО это уже сейчас по идее окупается.

Есть дополнительные плюсы и минусы, но они вторичны (влияют на выбор реализации, а не концепции).

Стоит отметить, что это достаточно революционное отклонение от обычной архитектуры. Другими словами, потребуется дополнительное обучение программистов и новые лучшие практики. Для примера негативного опыта можно почитать, например, этот тред: https://news.ycombinator.com/item?id=13 339 972

В статье предполагается, что основы Event Sourcing уже известны. Их немного напомню, но не более того.

Основные проблемы

  • Eventual Consistency или старые данные возвращаются при чтении.

  • Как разбираться с ошибками, которые случились где‑то по середине обработки?

  • Медленное построение онлайн проекций (когда событий много в агрегате)

  • При хранении событий в обычной БД c автоинкрементом иногда теряются событий при обработке, т.к. БД под нагрузкой выделяет блоки автоинкрементных id, которые параллельно сохраняются, а обработчик событий обычно ищет по последним id.

Предлагаемое решение

Для решения этих проблем предполагается следующие изменение архитектуры относительно классической:

  • Вводится понятие «основной проекции» — это проекция, расположенная в той же БД, что и хранилище событий.

  • Основная проекция должна содержать все данные, необходимые для работы команд, встроенные проекторы основной БД и реакторов. Соответственно, при необходимости получения данных они берутся из основной проекции, а не из хранилища событий.

  • Хранилище событий используется для получения истории и при необходимости перестроения основной проекции (полное или частичное).

  • Команда, сохранение события и встроенные проекторы основной БД работают в одной транзакции.

  • Реакторы работают в отдельной транзакции.

  • Живые проекции не рекомендуются (только для специфичных случаев с учетом потенциального наличия задержки). Фактически нет такого понятия как класс агрегата или агрегат в памяти. Идет работа на уровне команд и событий, а агрегат как id только для транзакций и параллельности.

  • Если требуется ограниченная асинхронность, то можно прикрутить рассылку событий через NATS (или другую очередь).

Как решаются проблемы?

  • Нет eventual consistency, т.к. идет классическая работа с данными.

  • Обработка ошибок опять же классическая — если все пошло не так, то транзакция откатывается, а клиент получает сообщение об ошибке.

  • Нет медленной обработки событий при получении данных, т.к. для этого, в основном, используется основная проекция.

  • Нет проблемы с диапазонами автоинкрементов, т.к. при обработке событий мы не полагаемся на эти id.

Что теряется?

  • Полностью асинхронный подход (который потенциально облегчает масштабирование) — его можно применить, но уже частично.

  • Распределенность — ее нет из коробки, нужно думать отдельно.

  • Масштабирование — ее нет из коробки, нужно думать отдельно.

В целом, исходя из принципа простоты, они нам обычно не нужны. Если же все же нужны, то, вероятно, стоит использовать классический подход к event sourcing.

Кто‑то даже может сказать, что это совсем не Event sourcing, если у нас нет агрегата в памяти, который вычисляет свое состояние по событиям. Но у нас есть агрегат в базе данных, который создан по событиям (чем не память, только не оперативная). Так что это все‑таки Event sourcing, но я довольно сильно рад, что не классический, т.к. классический имеет слишком много ограничений для обычных приложений. Я бы сказал, что предлагаемый подход не event driven (EVA), но event sourcing.

Как итог, идет отказ от разделения базы событий и основной проекции. За счет этого решаются проблемы распределенности и асинхронности (их нет). При масштабировании решения я бы скорее использовал партиционирование решения, чем асинхронность и отдельные сервисы. Но, это, понятно, зависит от специфики решения: в разных случаях могут быть разные ответы.

С одной стороны, ничего сверхестественного в подходе нет. С другой, какого‑то описания и термина для такого подхода не встречал. Предлагаю использовать Inline event sourcing (встроенный event sourcing), т.к. он основывается на встроенной проекции.

Стартовый шаблон Event Sourcing приложения

Стартовый шаблон приложения Event sourcing: https://github.com/stepin/kotlin-event-sourcing-app

Данный репозиторий представляет из себя пример приложения, а не отдельный движок. Пока что у меня нет уверенности, что этот движок можно использовать "как есть" в дру��их приложениях. При этом есть уверенность, что начав с этого шаблона, вполне реально развивать приложения.

Данный проект является извлечением общей части из одного из моих личных проектов. Это уже где-то 5ая версия движка (первая вообще была на Golang). При этом версия новая -- возможны какие-то шероховатости первое время.

Шаблон основывается на моем базовом шаблоне Котлин-приложений: https://github.com/stepin/kotlin-bootstrap-app

События

Начинаем с выявления событий и сущностей.

Допустим, у нас есть простая бизнес-сущность Пользователь:

data class User(
  displayName: String,
  firstName: String,
  seconfName: String,
  email: String
)

И мы хотим поддержать следующие сценарии (события):

  • регистрация пользователя

  • смена имени

  • удаление пользователя

Для простоты примера не будем обращать внимание на подтверждения и авторизацию.

Пример события регистрации пользователя:

data class UserRegistered(  
  val email: String,  
  val firstName: String?,  
  val secondName: String?,  
  val displayName: String,  
  override val accountGuid: AccountGuid,  
  override val aggregatorGuid: UserGuid = UUID.randomUUID(),  
  override val guid: EventGuid = UUID.randomUUID(),  
) : UserEvent(eventTypeVersion = 3)
  • 4 основных поля: email, firstName, secondName, displayName

  • guid самого событий (рандомный)

  • aggregator guid = user guid — вот это неудобно, что нет синонима, но можно привыкнуть (и указан typealias UserGuid)

  • account guid — движок рассчитан на мультиаккаунтовые приложения

  • data class — удобно. И еще удобнее, что UserEvent — sealed класс, можно такие конструкции делать:

when (val e = event as UserEvent) {  
  is UserMetaUpdated -> "updated $e"  
  is UserRegistered -> "user registered with id $id ${meta.createdAt} $e"  
  is UserRemoved -> "user ${e.email} deleted at ${meta.createdAt}"  
}

Базовый класс для событий агрегата User выглядит так:

sealed class UserEvent(  
  override val eventTypeVersion: Short = 0,  
) : DomainEvent {  
  override val aggregatorType: String  
    get() = "user"  
  
  override val eventType: String  
    get() = this.javaClass.simpleName  
  
  abstract override val aggregatorGuid: UserGuid  
}
  • реализуется интерфейс DomainEvent движка

  • выставляется typealias UserGuid для aggregatorGuid — необязательно, как документация

  • выставляется тип агрегата

  • выставляется тип события — автоматически берется имя класса события (например, UserRegistered)

  • выставляется версия события в 0 по умолчанию, но это значение событие может переопределить

По сути, от события движок требует 2 вещи:

  • реализации интерфейса DomainEvent

  • корректной сериализации и десериализации JSONB

Остальное на усмотрение разработчика. При этом базовый класс для всех событий агрегата считается хорошей практикой.

Про id/guid: в этом примере подразумевается, что команды работают с guid, а при необходимости join в SQL‑запросах используется id (т.к. быстрее).

Команда

У нас команда — это либо отдельный Spring‑сервис, либо метод внутри Spring‑сервиса. По сути единственный критичный момент — должен использоваться интерфейс EventStorePublisher для публикации событий, а остальное движок не ограничивает.

Команда регистрации:

@Service  
class RegisterUser(  
  private val store: EventStorePublisher,  
  private val userRepository: UserRepository,  
) {  
  data class Params(  
    val email: String,  
    val firstName: String?,  
    val secondName: String?,  
    val displayName: String?,  
  )  
  
  sealed class Response {  
    data class Created(val userGuid: UUID) : Response()  
    data class Error(val errorCode: ErrorCode) : Response()  
  }  
  
  suspend fun execute(params: Params): Response = with(params) {  
    val user = userRepository.findByEmail(email)  
    if (user != null) {  
      return Response.Error(ErrorCode.USER_ALREADY_REGISTERED)  
    }  
  
    val accountGuid = UUID.randomUUID()
    val userGuid = UUID.randomUUID()
  
    val userRegistered = UserRegistered(  
      accountGuid = accountGuid,
      aggregatorGuid = userGuid,  
      email = email,  
      firstName = firstName,  
      secondName = secondName,  
      displayName = displayName ?: calcDisplayName(email, firstName, secondName),  
    ) 
    store.publish(userRegistered)  
  
    val accountCreated = AccountCreated(  
      name = "Неизвестная компания",  
      accountGuid = accountGuid,  
      userGuid = userGuid,  
    )  
    store.publish(accountCreated)  
  
    return Response.Created(userGuid)
  }
}

Возвращаемые от команд значения зависят от бизнес-логики: могут ли быть бизнес-ошибки, нужно ли вернуть guid и т.п. В каких-то случаях может ничего не возвращаться.

Проекторы

Пример 2-х проекторов в одном классе:

@Service  
class UserProjector(  
  private val userRepository: UserRepository,  
  private val accountRepository: AccountRepository,  
) {  
  companion object : Logging  
  
  @Projector  
  suspend fun handleUserRegistered(e: UserRegistered, meta: EventMetadata) {  
    val account = accountRepository.findByGuid(e.accountGuid)  
  
    val u = UserEntity()  
    u.accountGuid = e.accountGuid  
    u.accountId = account?.id ?: 0  
    u.guid = e.aggregatorGuid  
    u.email = e.email  
    u.displayName = e.displayName  
    u.firstName = e.firstName  
    u.secondName = e.secondName  
    u.createdAt = meta.createdAt.toInstant(ZoneOffset.UTC)  
  
    val savedUser = userRepository.save(u)  
    logger.debug { "new user id: ${savedUser.id}" }  
  }  
  
  @Projector  
  suspend fun handleUserRemoved(e: UserRemoved) {  
    val user = getUser(e.aggregatorGuid)  
    userRepository.delete(user)  
  }  
  
  private suspend fun getUser(userGuid: UUID) = userRepository.findByGuid(userGuid)  
    ?: throw DomainException(ErrorCode.USER_NOT_FOUND)
}
  • метод проектора должен быть в Spring‑бине

  • должна быть аннотация @Projector

  • в классе может быть несколько методов — ограничений нет

  • первый аргумент — событие

  • второй (опционально) — метаданные события

  • метод должен быть suspend (в принципе, это ограничение можно снять, но сейчас так в движке, и не планирую использовать без suspend)

  • исключение в проекторе отменит сохранение события

Реакторы

@Service  
class UserRegisteredEmailReactor(  
  private val emailService: SendEmailService,  
) {  
  companion object : Logging  
  
  @Reactor  
  suspend fun handle(e: UserRegistered) {  
    emailService.sendEmailConfirmationEmail(e.displayName, e.email, e.aggregatorGuid.toString())  
  }  
}
  • метод проектора должен быть в Spring‑бине

  • должна быть аннотация @Reactor

  • в классе может быть несколько методов — ограничений нет

  • первый аргумент — событие

  • второй (опционально) — метаданные события

  • метод должен быть suspend (в принципе, это ограничение можно снять, но сейчас так в движке, и не планирую использовать без suspend)

  • исключение в реакторе НЕ отменит сохранение события и запуск других реакторов

Чтение данных

Чтение данных основной проекции — никаких ограничений, как обычно.

Так же доступно чтение событий:

interface EventStoreReader {
  fun <T : DomainEvent> findEventsSinceId(  
    eventIdFrom: Long,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  

  fun <T : DomainEvent> findEventsSinceGuid(  
    eventGuidFrom: UUID,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  
 
  fun <T : DomainEvent> findEventsSinceDate(  
    date: LocalDateTime,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  

  fun <T : DomainEvent> findEvents(  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>
}

Это API можно использовать для получения истории или для создания асинхронных проекций.

Потенциально можно написать и свое API чтения событий, в jOOQ все для этого есть.

Так же можно делать полную или частичную перегенерацию базы (аргументы старта приложения или кастомный код).

Пример получения истории (естественно, можно смешивать чтение из событий и из основной проекции, т.к. это все в даже одной базе):

@Service  
class DebugService(  
  private val eventStoreReader: EventStoreReader,  
) {  
  suspend fun getUserAudit(userGuid: UUID): List<String> {  
    return eventStoreReader.findEvents<UserEvent>("user", userGuid, maxBatchSize = 100)  
      .map { (id, event, meta) ->  
        when (event) {  
          is UserMetaUpdated -> "updated $event"  
          is UserRegistered -> "user registered with id $id ${meta.createdAt} $event"  
          is UserRemoved -> "user deleted at ${meta.createdAt}"  
        }  
    }  
  }
}

Тут в API немного некрасиво — нет связи «user» и UserEvent. Возможно, имеет смысл передавать базовый класс, но он абстрактный. Если у кого‑то есть идеи как лучше сделать API (без строчки «user» и без приведения «as UserEvent») — будут рад прочитать.

Ограничения

  • В данной реализации Event Bus не внедрен (для трансляции событий через какую‑нибудь Кафку или NATS), н�� ничего не мешает такое прикрутить, если кому‑нибудь будет нужно.

Итог

Кода немного больше за счет выделения отдельной абстракции — Событие. Так же время уходит на саму абстракцию — назвать, выделить поля и т. п.

Для CRUD получается больше кода, но круда не так много как может показаться — нужно приучить себя думать в событиях бизнес‑области, а не создать/удалить запись в таблице базы данных.

В целом, мне нравится, поэтому и решил поделиться с сообществом.

Еще раз (для удобства) ссылка на репо: https://github.com/stepin/kotlin‑event‑sourcing‑app

Only registered users can participate in poll. Log in, please.
Нужно ли упаковывать движок в библиотеку?
42.86%Да, конечно3
28.57%Нет, в моем проекте наверняка нужно будет что-то менять в зависимостях2
28.57%Не нужно, смысла в этом движке все равно нет2
7 users voted. 6 users abstained.