Pull to refresh

Введение в CQRS + Event Sourcing: Часть 2

Reading time8 min
Views48K
В прошлой статье я начал с основ CQRS + Event Sourcing. В этот раз я предлагаю продолжить и более подробно посмотреть на ES.

В примере который я выкладывал с моей прошлой статьей магия Event Sourcing’а была скрыта за абстракцией IRepository и двумя методами IRepository.Save() и IRepository.GetById<>().
Для того чтобы поподробнее разобраться что происходит я решил рассказать о процессе сохранения и загрузки агрегата из Event Store на примере проекта Lokad IDDD Sample от Рината Абдулина. У него в аппликейшен сервисах идет прямое обращение к Event Store, без дополнительных абстракций, поэтому все выглядит очень наглядно. Application Service — это аналог CommandHandler, но который обрабатывает все комманды одного агрегата. Такой подход очень удобный и мы тоже на него в одном проекте перешли.

ApplicationService


Интерфейс IApplicationService крайне прост.
public interface IApplicationService
{
   void Execute(ICommand cmd);
}

В методе Execute мы передаем любые команды и надеемся, что что сервис их перенаправит на нужный обработчик.

Так как у Рината в примере только один аггрегат Customer, то и сервис тоже только один CustomerApplicationService. Собственно поэтому нет необходимости выносить какую-либо логику в базовый класс. Отличное решение для примера на мой взгляд.

Метод Execute передает обработку комманды одной из перегрузок метода When подходящей по сигнатуре. Реализация метода Execute очень простая с использованием динамиков, и не надо бежать рефлексией по всем методам.
public void Execute(ICommand cmd)
{
   // pass command to a specific method named when
   // that can handle the command
   ((dynamic)this).When((dynamic)cmd);
}

Начнем с комманды создания — CreateCustomer.
[Serializable]
public sealed class CreateCustomer : ICommand
{
   public CustomerId Id { get; set; }
   public string Name { get; set; }
   public Currency Currency { get; set; }

   public override string ToString()
   {
       return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency);
   }
}

В реальном проекте у вас между UI и ApplicationService скорее всего будет message queue, ну а для примера Ринат ограничился прямой передачей комманды объекту апликейшен сервиса (см. class ApplicationServer).
После того, как команда CreateCustomer попадает в метод Execute, она перенаправляется в метод When.
public void When(CreateCustomer c)
{
   Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow));
}

В метод Update мы передаем айдишку агрегата и экшен который вызывает метод изменения состояния аггрегата. Вообще на мой взгляд лучше не делать метод Create у аггрегата, а создать еще один конструктор, так как вызов метода Create в данном контексте выглядит немного странным. Мы вроде и создаем агрегат, но почему-то метод Create передаем как метод изменения состояния. С конструктором так бы уже не получилось.

Вернемся к методу Update, задача у него следующая — 1) загрузить все события для текущего аггрегата, 2) создать агрегат и восстановить его состояние использую загруженные события, 3) выполнить переданное действие Action execute над аггрегатом и 4) сохранить новые события если они есть.
void Update(CustomerId id, Action<Customer> execute)
{
   // Load event stream from the store
   EventStream stream = _eventStore.LoadEventStream(id);
   // create new Customer aggregate from the history
   Customer customer = new Customer(stream.Events);
   // execute delegated action
   execute(customer);
   // append resulting changes to the stream
   _eventStore.AppendToStream(id, stream.Version, customer.Changes);
}


Восстановление состояния


В примере, который я показывал в прошлой статье состояние аггрегата хранилось в виде private полей в классе аггрегата. Это не совсем удобно если вы захотите добавить snapshot’ы, т.к. придется как-то высасывать состояние какждый раз или использовать рефлексию. У Рината гораздо более удобный подход — для состояние отдельный класс CustomerState, что позволяет вынести методы проекции из аггрегата и гораздо проще сохранять и загружать snapshot’ы, хоть их и нет в примере.
Как видно, в конструктор агрегату передается список событий этого же аггрегата, как не сложно догадаться, для того чтобы он восстановил своё состояние.
Агрегат в свою очередь делегирует восстановление состояние классу CustomerState.
/// <summary>
/// Aggregate state, which is separate from this class in order to ensure,
/// that we modify it ONLY by passing events.
/// </summary>
readonly CustomerState _state;

public Customer(IEnumerable<IEvent> events)
{
   _state = new CustomerState(events);
}

Приведу код всего класса CustomerState, лишь уберу несколько методов проекции When.
/// <summary>
/// This is the state of the customer aggregate.
/// It can be mutated only by passing events to it.
/// </summary>
public class CustomerState
{
   public string Name { get; private set; }
   public bool Created { get; private set; }
   public CustomerId Id { get; private set; }
   public bool ConsumptionLocked { get; private set; }
   public bool ManualBilling { get; private set; }
   public Currency Currency { get; private set; }
   public CurrencyAmount Balance { get; private set; }

   public int MaxTransactionId { get; private set; }

   public CustomerState(IEnumerable<IEvent> events)
   {
       foreach (var e in events)
       {
           Mutate(e);
       }
   }
...
   public void When(CustomerCreated e)
   {
       Created = true;
       Name = e.Name;
       Id = e.Id;
       Currency = e.Currency;
       Balance = new CurrencyAmount(0, e.Currency);
   }

   public void When(CustomerRenamed e)
   {
       Name = e.Name;
   }

   public void Mutate(IEvent e)
   {
       // .NET magic to call one of the 'When' handlers with
       // matching signature
       ((dynamic) this).When((dynamic)e);
   }
}

Как видно в конструкторе мы форычем бежим по переданным ивентам, и передаем их в метод Mutate, который в свою очередь напрявляет их дальше в подходящий метод проекции.
Можно заметить что все проперти имеют private setter метод, что гарантирует что состояние извне мы можем изменить только передав соответствующее событие.

Состояние восстановили, теперь можно и попробовать его изменить. Вернемся немного назад к методу изменения состояние. Так как я начал разбираться с коммандой CreateCustomer, то и у агрегата посмотрим метод Create.
public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc)
{
   if (_state.Created)
       throw new InvalidOperationException("Customer was already created");
   Apply(new CustomerCreated
       {
           Created = utc,
           Name = name,
           Id = id,
           Currency = currency
       });

   var bonus = service.GetWelcomeBonus(currency);
   if (bonus.Amount > 0)
       AddPayment("Welcome bonus", bonus, utc);
}

Здесь самое место сделать проверку наших бизнесс правил, так как у нас есть доступ к актуальному состоянию агрегата. У нас есть бизнесс правило что Customer может быть создан лишь один раз ( врочем еще есть и техническое ограничение), поэтому при попытки вызвать Create на уже созданном агрегате мы бросаем эксепшен.
Если же все бизнесс правила удовлетворены то в метод Apply передаем событие CustomerCreated. Метод Apply очень прост, лишь передает событие объекту состояния и добавляет его в список текущих изменений.
public readonly IList<IEvent> Changes = new List<IEvent>();
readonly CustomerState _state;

void Apply(IEvent e)
{
   // pass each event to modify current in-memory state
   _state.Mutate(e);
   // append event to change list for further persistence
   Changes.Add(e);
}

В большенству случаев на одну операцию с аггрегатом приходится одно событие. Но вот как раз в случае с методом Create событий может быть два.
После того как мы передали в метод Apply событие CustomerCreate, мы может добавить текущему кастомеру приветственный бонус, если удовлетворяетя бизнесс правило что сумма бонуса должена быть больше нуля. В таком случае вызывается метод агрегата AddPayment, который не сореджит никаких проверок а просто инициирует событие CustomerPaymentAdded.
public void AddPayment(string name, CurrencyAmount amount, DateTime utc)
{
   Apply(new CustomerPaymentAdded()
       {
           Id = _state.Id,
           Payment = amount,
           NewBalance = _state.Balance + amount,
           PaymentName = name,
           Transaction = _state.MaxTransactionId + 1,
           TimeUtc = utc
       });
}

Теперь предстоит сохранить новые события и опубликовать их в Read model. Подозреваю что это делает следующая строчка.
// append resulting changes to the stream
_eventStore.AppendToStream(id, stream.Version, customer.Changes);

Убедимся…
public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events)
{
   if (events.Count == 0)
       return;
   var name = IdentityToString(id);
   var data = SerializeEvent(events.ToArray());
   try
   {
       _appendOnlyStore.Append(name, data, originalVersion);
   }
   catch(AppendOnlyStoreConcurrencyException e)
   {
       // load server events
       var server = LoadEventStream(id, 0, int.MaxValue);
       // throw a real problem
       throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events);
   }

   // technically there should be a parallel process that queries new changes
   // from the event store and sends them via messages (avoiding 2PC problem).
   // however, for demo purposes, we'll just send them to the console from here
   Console.ForegroundColor = ConsoleColor.DarkGreen;
   foreach (var @event in events)
   {
       Console.WriteLine("  {0} r{1} Event: {2}", id,originalVersion, @event);
   }
   Console.ForegroundColor = ConsoleColor.DarkGray;
}

Ну почти угодал. События сериализуются и сохраняются в append only store (удалять и изменять их мы то не собираемся). Но вот вместо того чтобы отправить их в read-model (на презентационный уровень), Ринат просто выводит их на консоль.
Впрочем для примера этого достаточно.
Если вы хотите посмотреть как это все будет работать с очередью сообщений можете посмотреть пример на гитхабе из моей предыдущей статьи. Я его немного изменил и тоже внес часть инфраструктуры Event Sourcing’a в солюшен. На этом примере я хочу показать как можно добавить снэпшоты.

Snapshots


Снэпшоты нужны чтобы делать промежуточные снимки состояния аггрегата. Это позволяем не закгружать весь поток событий агрегата, а лишь только те которые произошли после того как мы сделали последний снэпшот.
Итак посмотрим на реализацию.
В классе UserCommandHandler есть метод Update очень похожий на тот что у Рината, но у меня он еще сохраняет и загружает снэпшоты. Снэпшоты делаем через каждые 100 версий.
private const int SnapshotInterval = 100;

Сначала поднимаем из репозитория снэпшот, потом загружаем поток событий которые произошли после того как мы сделали этот снэпшот. Затем передаем все это конструктору агрегата.
private void Update(string userId, Action<UserAR> updateAction)
{
   var snapshot = _snapshotRepository.Load(userId);
   var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1;
   var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue);
   var user = new UserAR(snapshot, stream);
   updateAction(user);
   var originalVersion = stream.GetVersion();
   _eventStore.AppendToStream(userId, originalVersion, user.Changes);
   var newVersion = originalVersion + 1;
   if (newVersion % SnapshotInterval == 0)
   {
       _snapshotRepository.Save(new Snapshot(userId, newVersion,user.State));
   }
}

В конструкторе пытаемся достать состояние из снэпшота или создаем новое состояние если нету снэпшота. На полученном состоянии проигрываем все недостающие события, и в итоге получаем актуальную версию агрегата.
public UserAR(Snapshot snapshot, TransitionStream stream)
{
   _state = snapshot != null ? (UserState) snapshot.Payload : new UserState();
   foreach (var transition in stream.Read())
   {
       foreach (var @event in transition.Events)
       {
           _state.Mutate((IEvent) @event.Data);
       }
   }
}

После манипуляций с агрегатом, проверяем кратна ли версия агрегата интервалу через который мы делаем снэпшоты, и если это так, то сохраняем новый снэпшот в репозиторий. Чтобы получить из UserCommandHandler’a состояние агрегата пришлось сделать для него internal getter свойство State.

Вот и все, теперь у нас есть снэпшоты, что позволило намного быстрее востанавливать состояние агрегата если у него очень много событий.

Feedback


Если вам интересна тема CQRS+ES пожалуйста не стесняйтесь задавать вопросы в комментариях. Так же можете писать мне в скайп если нету ака на хабре. Недавно мне в скайп постучался один товарищ из Челябинска и благодаря его вопросам у меня появилось много идей для следующей статьи. Так как в моем распоряжении сейчас побольше свободного времени то я планирую писать их почаще.
Tags:
Hubs:
Total votes 14: ↑13 and ↓1+12
Comments39

Articles