Привет, Хабр! Решил я значит на время отойти от Scala, Idris и прочего ФП и чуть чуть поговорить о Event Store — базе данных в которой можно сохранят события в потоки событий. Как в старой доброй книге у нас тоже мушкетёров на самом деле 4 и четвертый это DDD. Сначала я с помощью Event Storming выделю команды, события и сущности с ними связанные. Потом сделаю на их основе сохранение состояния объекта и его восстановление. Буду я делать в этой статье обычный TodoList. За подробностями добро пожаловать под кат.
Содержание
- Три мушкетёра — Event Sourcing, Event Storming и Event Store — вступают в бой: Часть 1 — пробуем Event Store ДБ
Ссылки
Исходники
Образы docker image
Event Store
Event Soucing
Event Storming
Собственно говоря Event Store это БД которая предназначена для хранения событий. Так же она умеет создавать подписки на события чтобы их можно было как-то обрабатывать. Так же там есть проекции которые так же реагируют на события и на их основе аккумулирую какие-то данные. Например можно при событии TodoCreated увеличивать какой-то счетчик Count в проекции. Пока что в этой части я буду использовать Event Store как Read и Write Db. Дальше в следующих статьях я создам отдельную БД для чтения в которую будут данные записываться на основе событий сохраненных в БД для записи тобишь Event Store. Так же будет пример того как делать «Путешествия во времени» откатывая систему к состоянию которая она имела в прошлом.
И так начнем Event Stroming. Обычно для его проведения собирают все заинтересованных людей и экспертов которые рассказывают какие события в той предметной области которую ПО будет моделировать. Например для ПО завода — ИзделиеИзготовлено. Для игры — ПолученУрон. Для Финансового ПО — ДенгиЗачисленыНаСчет и прочее в этом духе. Так как наша предметная область это максимально простой TodoList то событий у нас будет немного. И так, запишем на доске события нашей предметной области (домена).
Теперь добавим команды которые эти события вызывают.
Далее сгруппируем эти события и команды вокруг сущности с изменением состояния которой они связаны.
Команды у меня превратятся просто в названия методов сервиса. Приступим к реализации.
Сначала опишем События в коде.
public interface IDomainEvent
{
//Техническое поле. Для сохранения id события в Event Strore
Guid EventId { get; }
//Техническое поле. Сюда будем записывать номер события в стриме Event Store
long EventNumber { get; set; }
}
public sealed class TodoCreated : IDomainEvent
{
//Id созданного Todo
public Guid Id { get; set; }
//Имя созданного Todo
public string Name { get; set; }
public Guid EventId => Id;
public long EventNumber { get; set; }
}
public sealed class TodoRemoved : IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
public sealed class TodoCompleted: IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
Теперь наше ядро — сущность:
public sealed class Todo : IEntity<TodoId>
{
private readonly List<IDomainEvent> _events;
public static Todo CreateFrom(string name)
{
var id = Guid.NewGuid();
var e = new List<IDomainEvent>(){new TodoCreated()
{
Id = id,
Name = name
}};
return new Todo(new TodoId(id), e, name, false);
}
public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
{
var id = Guid.Empty;
var name = String.Empty;
var completed = false;
var ordered = events.OrderBy(e => e.EventNumber).ToList();
if (ordered.Count == 0)
return null;
foreach (var @event in ordered)
{
switch (@event)
{
case TodoRemoved _:
return null;
case TodoCreated created:
name = created.Name;
id = created.Id;
break;
case TodoCompleted _:
completed = true;
break;
default: break;
}
}
if (id == default)
return null;
return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
}
private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
{
Id = id;
_events = events;
Name = name;
IsCompleted = isCompleted;
Validate();
}
public TodoId Id { get; }
public IReadOnlyList<IDomainEvent> Events => _events;
public string Name { get; }
public bool IsCompleted { get; private set; }
public void Complete()
{
if (!IsCompleted)
{
IsCompleted = true;
_events.Add(new TodoCompleted()
{
EventId = Guid.NewGuid()
});
}
}
public void Delete()
{
_events.Add(new TodoRemoved()
{
EventId = Guid.NewGuid()
});
}
private void Validate()
{
if (Events == null)
throw new ApplicationException("Пустой список событий");
if (string.IsNullOrWhiteSpace(Name))
throw new ApplicationException("Пустое название задачи");
if (Id == default)
throw new ApplicationException("Пустой идентификатор задачи");
}
}
Подключаемся к Event Store:
services.AddSingleton(sp =>
{
//Подключается к TCP и в случае разрыва соединения пытается восстановить соединение.
//В самой строке есть опции для всего этого. Можно их в документации на оф сайте глянуть.
var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
con.ConnectAsync().Wait();
return con;
});
И так, главная часть. Собственно сохранение и чтение событий из Event Store:
public sealed class EventsRepository : IEventsRepository
{
private readonly IEventStoreConnection _connection;
public EventsRepository(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
{
var eventPayload = events.Select(e => new EventData(
//Id события
e.EventId,
//Тип события
e.GetType().Name,
//В виде Json (True|False)
true,
//Тело события
Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
//Метаданные события
Encoding.UTF8.GetBytes((string)e.GetType().FullName)
));
//Добавляем в коллекцию событий сущности наше событие
var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
return res.NextExpectedVersion;
}
public async Task<List<IDomainEvent>> Get(Guid collectionId)
{
var results = new List<IDomainEvent>();
long start = 0L;
while (true)
{
var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
if (events.Status != SliceReadStatus.Success)
return results;
results.AddRange(Deserialize(events.Events));
if (events.IsEndOfStream)
return results;
start = events.NextEventNumber;
}
}
public async Task<List<T>> GetAll<T>() where T : IDomainEvent
{
var results = new List<IDomainEvent>();
Position start = Position.Start;
while (true)
{
var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
if (events.IsEndOfStream)
return results.OfType<T>().ToList();
start = events.NextPosition;
}
}
private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
events
.Where(e => IsEvent(e.Event.EventType))
.Select(e =>
{
var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
result.EventNumber = e.Event.EventNumber;
return result;
})
.ToList();
private static bool IsEvent(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => true,
nameof(TodoCompleted) => true,
nameof(TodoRemoved) => true,
_ => false
};
}
private static Type ToType(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => typeof(TodoCreated),
nameof(TodoCompleted) => typeof(TodoCompleted),
nameof(TodoRemoved) => typeof(TodoRemoved),
_ => throw new NotImplementedException(eventName)
};
}
}
Хранилище сущностей выглядит совсем просто. Мы достаем из EventStore события сущности и восстанавливаем ее из них или просто сохраняем события сущности.
public sealed class TodoRepository : ITodoRepository
{
private readonly IEventsRepository _eventsRepository;
public TodoRepository(IEventsRepository eventsRepository)
{
_eventsRepository = eventsRepository;
}
public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);
public async Task<Todo> GetAsync(TodoId id)
{
var events = await _eventsRepository.Get(id.Value);
return Todo.CreateFrom(events);
}
public async Task<List<Todo>> GetAllAsync()
{
var events = await _eventsRepository.GetAll<TodoCreated>();
var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
return res.Where(t => t != null).ToList();
}
}
Сервис в котором происходит работа с репозиторием и сущностью:
public sealed class TodoService : ITodoService
{
private readonly ITodoRepository _repository;
public TodoService(ITodoRepository repository)
{
_repository = repository;
}
public async Task<TodoId> Create(TodoCreateDto dto)
{
var todo = Todo.CreateFrom(dto.Name);
await _repository.SaveAsync(todo);
return todo.Id;
}
public async Task Complete(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Complete();
await _repository.SaveAsync(todo);
}
public async Task Remove(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Delete();
await _repository.SaveAsync(todo);
}
public async Task<List<TodoReadDto>> GetAll()
{
var todos = await _repository.GetAllAsync();
return todos.Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
{
var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
return todos.Where(t => t != null).Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
}
Ну собственно пока ничего впечатляющего. В следующих статься когда я добавлю отдельную БД для чтения все заиграет другими красками. Это сразу нам повесит консистентность со временем. Event Store и SQL БД по принципу мастер — слейв. Одна белая ES и много черных MS SQL из которых читают данные.
Лирическое отступление. В свете последних событий не мог не пошутить по поводу мастер слейв и черных белых. Эхе, уходит эпоха, будем внукам говорить что жили во времена когда базы при репликации называли мастер и слейв.
В системах где много чтения и мало записи данных (таких большинство) это даст прирост скорости работы. Собственно сама репликация мастер слейв она направленна на то что у вас замедляется запись (как и с индексами) но взамен ускоряется чтение за счет распределения нагрузки по нескольким БД.