В прошлой статье я начал с основ CQRS + Event Sourcing. В этот раз я предлагаю продолжить и более подробно посмотреть на ES.
В примере который я выкладывал с моей прошлой статьей магия Event Sourcing’а была скрыта за абстракцией IRepository и двумя методами IRepository.Save() и IRepository.GetById<>().
Для того чтобы поподробнее разобраться что происходит я решил рассказать о процессе сохранения и загрузки агрегата из Event Store на примере проекта Lokad IDDD Sample от Рината Абдулина. У него в аппликейшен сервисах идет прямое обращение к Event Store, без дополнительных абстракций, поэтому все выглядит очень наглядно. Application Service — это аналог CommandHandler, но который обрабатывает все комманды одного агрегата. Такой подход очень удобный и мы тоже на него в одном проекте перешли.
ApplicationService
Интерфейс IApplicationService крайне прост.
В методе Execute мы передаем любые команды и надеемся, что что сервис их перенаправит на нужный обработчик.
Так как у Рината в примере только один аггрегат Customer, то и сервис тоже только один CustomerApplicationService. Собственно поэтому нет необходимости выносить какую-либо логику в базовый класс. Отличное решение для примера на мой взгляд.
Метод Execute передает обработку комманды одной из перегрузок метода When подходящей по сигнатуре. Реализация метода Execute очень простая с использованием динамиков, и не надо бежать рефлексией по всем методам.
Начнем с комманды создания — CreateCustomer.
В реальном проекте у вас между UI и ApplicationService скорее всего будет message queue, ну а для примера Ринат ограничился прямой передачей комманды объекту апликейшен сервиса (см. class ApplicationServer).
После того, как команда CreateCustomer попадает в метод Execute, она перенаправляется в метод When.
В метод Update мы передаем айдишку агрегата и экшен который вызывает метод изменения состояния аггрегата. Вообще на мой взгляд лучше не делать метод Create у аггрегата, а создать еще один конструктор, так как вызов метода Create в данном контексте выглядит немного странным. Мы вроде и создаем агрегат, но почему-то метод Create передаем как метод изменения состояния. С конструктором так бы уже не получилось.
Вернемся к методу Update, задача у него следующая — 1) загрузить все события для текущего аггрегата, 2) создать агрегат и восстановить его состояние использую загруженные события, 3) выполнить переданное действие Action execute над аггрегатом и 4) сохранить новые события если они есть.
В примере, который я показывал в прошлой статье состояние аггрегата хранилось в виде private полей в классе аггрегата. Это не совсем удобно если вы захотите добавить snapshot’ы, т.к. придется как-то высасывать состояние какждый раз или использовать рефлексию. У Рината гораздо более удобный подход — для состояние отдельный класс CustomerState, что позволяет вынести методы проекции из аггрегата и гораздо проще сохранять и загружать snapshot’ы, хоть их и нет в примере.
Как видно, в конструктор агрегату передается список событий этого же аггрегата, как не сложно догадаться, для того чтобы он восстановил своё состояние.
Агрегат в свою очередь делегирует восстановление состояние классу CustomerState.
Приведу код всего класса CustomerState, лишь уберу несколько методов проекции When.
Как видно в конструкторе мы форычем бежим по переданным ивентам, и передаем их в метод Mutate, который в свою очередь напрявляет их дальше в подходящий метод проекции.
Можно заметить что все проперти имеют private setter метод, что гарантирует что состояние извне мы можем изменить только передав соответствующее событие.
Состояние восстановили, теперь можно и попробовать его изменить. Вернемся немного назад к методу изменения состояние. Так как я начал разбираться с коммандой CreateCustomer, то и у агрегата посмотрим метод Create.
Здесь самое место сделать проверку наших бизнесс правил, так как у нас есть доступ к актуальному состоянию агрегата. У нас есть бизнесс правило что Customer может быть создан лишь один раз ( врочем еще есть и техническое ограничение), поэтому при попытки вызвать Create на уже созданном агрегате мы бросаем эксепшен.
Если же все бизнесс правила удовлетворены то в метод Apply передаем событие CustomerCreated. Метод Apply очень прост, лишь передает событие объекту состояния и добавляет его в список текущих изменений.
В большенству случаев на одну операцию с аггрегатом приходится одно событие. Но вот как раз в случае с методом Create событий может быть два.
После того как мы передали в метод Apply событие CustomerCreate, мы может добавить текущему кастомеру приветственный бонус, если удовлетворяетя бизнесс правило что сумма бонуса должена быть больше нуля. В таком случае вызывается метод агрегата AddPayment, который не сореджит никаких проверок а просто инициирует событие CustomerPaymentAdded.
Теперь предстоит сохранить новые события и опубликовать их в Read model. Подозреваю что это делает следующая строчка.
Убедимся…
Ну почти угодал. События сериализуются и сохраняются в append only store (удалять и изменять их мы то не собираемся). Но вот вместо того чтобы отправить их в read-model (на презентационный уровень), Ринат просто выводит их на консоль.
Впрочем для примера этого достаточно.
Если вы хотите посмотреть как это все будет работать с очередью сообщений можете посмотреть пример на гитхабе из моей предыдущей статьи. Я его немного изменил и тоже внес часть инфраструктуры Event Sourcing’a в солюшен. На этом примере я хочу показать как можно добавить снэпшоты.
Снэпшоты нужны чтобы делать промежуточные снимки состояния аггрегата. Это позволяем не закгружать весь поток событий агрегата, а лишь только те которые произошли после того как мы сделали последний снэпшот.
Итак посмотрим на реализацию.
В классе UserCommandHandler есть метод Update очень похожий на тот что у Рината, но у меня он еще сохраняет и загружает снэпшоты. Снэпшоты делаем через каждые 100 версий.
Сначала поднимаем из репозитория снэпшот, потом загружаем поток событий которые произошли после того как мы сделали этот снэпшот. Затем передаем все это конструктору агрегата.
В конструкторе пытаемся достать состояние из снэпшота или создаем новое состояние если нету снэпшота. На полученном состоянии проигрываем все недостающие события, и в итоге получаем актуальную версию агрегата.
После манипуляций с агрегатом, проверяем кратна ли версия агрегата интервалу через который мы делаем снэпшоты, и если это так, то сохраняем новый снэпшот в репозиторий. Чтобы получить из UserCommandHandler’a состояние агрегата пришлось сделать для него internal getter свойство State.
Вот и все, теперь у нас есть снэпшоты, что позволило намного быстрее востанавливать состояние агрегата если у него очень много событий.
Если вам интересна тема CQRS+ES пожалуйста не стесняйтесь задавать вопросы в комментариях. Так же можете писать мне в скайп если нету ака на хабре. Недавно мне в скайп постучался один товарищ из Челябинска и благодаря его вопросам у меня появилось много идей для следующей статьи. Так как в моем распоряжении сейчас побольше свободного времени то я планирую писать их почаще.
В примере который я выкладывал с моей прошлой статьей магия Event Sourcing’а была скрыта за абстракцией IRepository и двумя методами IRepository.Save() и IRepository.GetById<>().
Для того чтобы поподробнее разобраться что происходит я решил рассказать о процессе сохранения и загрузки агрегата из Event Store на примере проекта Lokad IDDD Sample от Рината Абдулина. У него в аппликейшен сервисах идет прямое обращение к Event Store, без дополнительных абстракций, поэтому все выглядит очень наглядно. Application Service — это аналог CommandHandler, но который обрабатывает все комманды одного агрегата. Такой подход очень удобный и мы тоже на него в одном проекте перешли.
ApplicationService
Интерфейс IApplicationService крайне прост.
public interface IApplicationService
{
void Execute(ICommand cmd);
}
В методе Execute мы передаем любые команды и надеемся, что что сервис их перенаправит на нужный обработчик.
Так как у Рината в примере только один аггрегат Customer, то и сервис тоже только один CustomerApplicationService. Собственно поэтому нет необходимости выносить какую-либо логику в базовый класс. Отличное решение для примера на мой взгляд.
Метод Execute передает обработку комманды одной из перегрузок метода When подходящей по сигнатуре. Реализация метода Execute очень простая с использованием динамиков, и не надо бежать рефлексией по всем методам.
public void Execute(ICommand cmd)
{
// pass command to a specific method named when
// that can handle the command
((dynamic)this).When((dynamic)cmd);
}
Начнем с комманды создания — CreateCustomer.
[Serializable]
public sealed class CreateCustomer : ICommand
{
public CustomerId Id { get; set; }
public string Name { get; set; }
public Currency Currency { get; set; }
public override string ToString()
{
return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency);
}
}
В реальном проекте у вас между UI и ApplicationService скорее всего будет message queue, ну а для примера Ринат ограничился прямой передачей комманды объекту апликейшен сервиса (см. class ApplicationServer).
После того, как команда CreateCustomer попадает в метод Execute, она перенаправляется в метод When.
public void When(CreateCustomer c)
{
Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow));
}
В метод Update мы передаем айдишку агрегата и экшен который вызывает метод изменения состояния аггрегата. Вообще на мой взгляд лучше не делать метод Create у аггрегата, а создать еще один конструктор, так как вызов метода Create в данном контексте выглядит немного странным. Мы вроде и создаем агрегат, но почему-то метод Create передаем как метод изменения состояния. С конструктором так бы уже не получилось.
Вернемся к методу Update, задача у него следующая — 1) загрузить все события для текущего аггрегата, 2) создать агрегат и восстановить его состояние использую загруженные события, 3) выполнить переданное действие Action execute над аггрегатом и 4) сохранить новые события если они есть.
void Update(CustomerId id, Action<Customer> execute)
{
// Load event stream from the store
EventStream stream = _eventStore.LoadEventStream(id);
// create new Customer aggregate from the history
Customer customer = new Customer(stream.Events);
// execute delegated action
execute(customer);
// append resulting changes to the stream
_eventStore.AppendToStream(id, stream.Version, customer.Changes);
}
Восстановление состояния
В примере, который я показывал в прошлой статье состояние аггрегата хранилось в виде private полей в классе аггрегата. Это не совсем удобно если вы захотите добавить snapshot’ы, т.к. придется как-то высасывать состояние какждый раз или использовать рефлексию. У Рината гораздо более удобный подход — для состояние отдельный класс CustomerState, что позволяет вынести методы проекции из аггрегата и гораздо проще сохранять и загружать snapshot’ы, хоть их и нет в примере.
Как видно, в конструктор агрегату передается список событий этого же аггрегата, как не сложно догадаться, для того чтобы он восстановил своё состояние.
Агрегат в свою очередь делегирует восстановление состояние классу CustomerState.
/// <summary>
/// Aggregate state, which is separate from this class in order to ensure,
/// that we modify it ONLY by passing events.
/// </summary>
readonly CustomerState _state;
public Customer(IEnumerable<IEvent> events)
{
_state = new CustomerState(events);
}
Приведу код всего класса CustomerState, лишь уберу несколько методов проекции When.
/// <summary>
/// This is the state of the customer aggregate.
/// It can be mutated only by passing events to it.
/// </summary>
public class CustomerState
{
public string Name { get; private set; }
public bool Created { get; private set; }
public CustomerId Id { get; private set; }
public bool ConsumptionLocked { get; private set; }
public bool ManualBilling { get; private set; }
public Currency Currency { get; private set; }
public CurrencyAmount Balance { get; private set; }
public int MaxTransactionId { get; private set; }
public CustomerState(IEnumerable<IEvent> events)
{
foreach (var e in events)
{
Mutate(e);
}
}
...
public void When(CustomerCreated e)
{
Created = true;
Name = e.Name;
Id = e.Id;
Currency = e.Currency;
Balance = new CurrencyAmount(0, e.Currency);
}
public void When(CustomerRenamed e)
{
Name = e.Name;
}
public void Mutate(IEvent e)
{
// .NET magic to call one of the 'When' handlers with
// matching signature
((dynamic) this).When((dynamic)e);
}
}
Как видно в конструкторе мы форычем бежим по переданным ивентам, и передаем их в метод Mutate, который в свою очередь напрявляет их дальше в подходящий метод проекции.
Можно заметить что все проперти имеют private setter метод, что гарантирует что состояние извне мы можем изменить только передав соответствующее событие.
Состояние восстановили, теперь можно и попробовать его изменить. Вернемся немного назад к методу изменения состояние. Так как я начал разбираться с коммандой CreateCustomer, то и у агрегата посмотрим метод Create.
public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc)
{
if (_state.Created)
throw new InvalidOperationException("Customer was already created");
Apply(new CustomerCreated
{
Created = utc,
Name = name,
Id = id,
Currency = currency
});
var bonus = service.GetWelcomeBonus(currency);
if (bonus.Amount > 0)
AddPayment("Welcome bonus", bonus, utc);
}
Здесь самое место сделать проверку наших бизнесс правил, так как у нас есть доступ к актуальному состоянию агрегата. У нас есть бизнесс правило что Customer может быть создан лишь один раз ( врочем еще есть и техническое ограничение), поэтому при попытки вызвать Create на уже созданном агрегате мы бросаем эксепшен.
Если же все бизнесс правила удовлетворены то в метод Apply передаем событие CustomerCreated. Метод Apply очень прост, лишь передает событие объекту состояния и добавляет его в список текущих изменений.
public readonly IList<IEvent> Changes = new List<IEvent>();
readonly CustomerState _state;
void Apply(IEvent e)
{
// pass each event to modify current in-memory state
_state.Mutate(e);
// append event to change list for further persistence
Changes.Add(e);
}
В большенству случаев на одну операцию с аггрегатом приходится одно событие. Но вот как раз в случае с методом Create событий может быть два.
После того как мы передали в метод Apply событие CustomerCreate, мы может добавить текущему кастомеру приветственный бонус, если удовлетворяетя бизнесс правило что сумма бонуса должена быть больше нуля. В таком случае вызывается метод агрегата AddPayment, который не сореджит никаких проверок а просто инициирует событие CustomerPaymentAdded.
public void AddPayment(string name, CurrencyAmount amount, DateTime utc)
{
Apply(new CustomerPaymentAdded()
{
Id = _state.Id,
Payment = amount,
NewBalance = _state.Balance + amount,
PaymentName = name,
Transaction = _state.MaxTransactionId + 1,
TimeUtc = utc
});
}
Теперь предстоит сохранить новые события и опубликовать их в Read model. Подозреваю что это делает следующая строчка.
// append resulting changes to the stream
_eventStore.AppendToStream(id, stream.Version, customer.Changes);
Убедимся…
public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events)
{
if (events.Count == 0)
return;
var name = IdentityToString(id);
var data = SerializeEvent(events.ToArray());
try
{
_appendOnlyStore.Append(name, data, originalVersion);
}
catch(AppendOnlyStoreConcurrencyException e)
{
// load server events
var server = LoadEventStream(id, 0, int.MaxValue);
// throw a real problem
throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events);
}
// technically there should be a parallel process that queries new changes
// from the event store and sends them via messages (avoiding 2PC problem).
// however, for demo purposes, we'll just send them to the console from here
Console.ForegroundColor = ConsoleColor.DarkGreen;
foreach (var @event in events)
{
Console.WriteLine(" {0} r{1} Event: {2}", id,originalVersion, @event);
}
Console.ForegroundColor = ConsoleColor.DarkGray;
}
Ну почти угодал. События сериализуются и сохраняются в append only store (удалять и изменять их мы то не собираемся). Но вот вместо того чтобы отправить их в read-model (на презентационный уровень), Ринат просто выводит их на консоль.
Впрочем для примера этого достаточно.
Если вы хотите посмотреть как это все будет работать с очередью сообщений можете посмотреть пример на гитхабе из моей предыдущей статьи. Я его немного изменил и тоже внес часть инфраструктуры Event Sourcing’a в солюшен. На этом примере я хочу показать как можно добавить снэпшоты.
Snapshots
Снэпшоты нужны чтобы делать промежуточные снимки состояния аггрегата. Это позволяем не закгружать весь поток событий агрегата, а лишь только те которые произошли после того как мы сделали последний снэпшот.
Итак посмотрим на реализацию.
В классе UserCommandHandler есть метод Update очень похожий на тот что у Рината, но у меня он еще сохраняет и загружает снэпшоты. Снэпшоты делаем через каждые 100 версий.
private const int SnapshotInterval = 100;
Сначала поднимаем из репозитория снэпшот, потом загружаем поток событий которые произошли после того как мы сделали этот снэпшот. Затем передаем все это конструктору агрегата.
private void Update(string userId, Action<UserAR> updateAction)
{
var snapshot = _snapshotRepository.Load(userId);
var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1;
var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue);
var user = new UserAR(snapshot, stream);
updateAction(user);
var originalVersion = stream.GetVersion();
_eventStore.AppendToStream(userId, originalVersion, user.Changes);
var newVersion = originalVersion + 1;
if (newVersion % SnapshotInterval == 0)
{
_snapshotRepository.Save(new Snapshot(userId, newVersion,user.State));
}
}
В конструкторе пытаемся достать состояние из снэпшота или создаем новое состояние если нету снэпшота. На полученном состоянии проигрываем все недостающие события, и в итоге получаем актуальную версию агрегата.
public UserAR(Snapshot snapshot, TransitionStream stream)
{
_state = snapshot != null ? (UserState) snapshot.Payload : new UserState();
foreach (var transition in stream.Read())
{
foreach (var @event in transition.Events)
{
_state.Mutate((IEvent) @event.Data);
}
}
}
После манипуляций с агрегатом, проверяем кратна ли версия агрегата интервалу через который мы делаем снэпшоты, и если это так, то сохраняем новый снэпшот в репозиторий. Чтобы получить из UserCommandHandler’a состояние агрегата пришлось сделать для него internal getter свойство State.
Вот и все, теперь у нас есть снэпшоты, что позволило намного быстрее востанавливать состояние агрегата если у него очень много событий.
Feedback
Если вам интересна тема CQRS+ES пожалуйста не стесняйтесь задавать вопросы в комментариях. Так же можете писать мне в скайп если нету ака на хабре. Недавно мне в скайп постучался один товарищ из Челябинска и благодаря его вопросам у меня появилось много идей для следующей статьи. Так как в моем распоряжении сейчас побольше свободного времени то я планирую писать их почаще.