Проблема CRUD-подхода
// Проблемы:
// 1. История изменений теряется
// 2. Конфликты при конкурентных обновлениях (или использование блокировок)
func UpdateOrderStatus(orderID string, status Status) error {
return db.Exec("UPDATE orders SET status=? WHERE id=?", status, orderID)
}
Решение: CQRS и Event Sourcing
Архитектурное ядро

Ключевые компоненты архитектуры
1. Команда (Command)
Запрос на выполнение действия ("Завершить заказ", "Списать средства")

Характеристики:
Может быть отклонена бизнес-правилами
Не возвращает данные (только статус выполнения)
Пример:
type CompleteOrderCommand struct {
OrderID string
UserID string
}
2. Агрегат (Aggregate)

Хранитель бизнес-правил, который:
Восстанавливает текущее состояние из истории событий
Проверяет возможность выполнения команды
Генерирует новые события при успешной проверке
Важно: Агрегат не сохраняет состояние, только содержит логику.
Пример:
// LoadFromHistory восстанавливает состояние (оптимизировано с учетом снапшотов)
func LoadFromHistory(events []Event, snapshot *Snapshot) *OrderAggregate {
agg := &OrderAggregate{
id: events[0].AggregateID(),
}
if snapshot != nil {
agg.applySnapshot(snapshot)
}
for _, event := range events {
if event.Version() > agg.version {
agg.applyEvent(event)
}
}
return agg
}
// Complete обрабатывает команду
func (a *OrderAggregate) Complete(cmd CompleteOrderCommand) ([]Event, error) {
if a.status != Paid {
return nil, fmt.Errorf("order %s must be paid first", a.id)
}
return []Event{
NewOrderCompletedEvent(a.id, cmd.UserID, a.version+1),
}, nil
}
// applyEvent применяет событие к состоянию
func (a *OrderAggregate) applyEvent(event Event) {
switch e := event.(type) {
case OrderCompletedEvent:
a.status = Completed
a.version = e.Version()
// обработка других типов событий
}
}
// applySnapshot применяет снапшот
func (a *OrderAggregate) applySnapshot(s Snapshot) {
a.version = s.Version
a.status = s.Status
// другие поля
}
// TakeSnapshot создает снапшот
func (a *OrderAggregate) TakeSnapshot() Snapshot {
return Snapshot{
AggregateID: a.id,
Version: a.version,
Status: a.status,
// другие поля
}
}
3. Событие (Event)
Неизменяемая запись о произошедшем изменении.

Свойства:
Содержит все релевантные данные
Сериализуемо и сохраняемо
Пример:
// Факт произошедшего изменения
type OrderCompletedEvent struct {
OrderID string
UserID string
CreatedAt time.Time
}
4. Хранилище событий (Event Store)

Append-only журнал, который:
Гарантирует сохранение событий
Позволяет воспроизвести историю для любого агрегата
Реализует оптимистичные блокировки через версии
Пример:
func (es *EventStore) Append(aggregateID string, events []Event, expectedVersion int) error {
currentVersion := es.GetVersion(aggregateID)
if expectedVersion != currentVersion {
return ErrConcurrentModification
}
// Append-only запись
for _, event := range events {
record := EventRecord{
ID: uuid.New(),
AggregateID: aggregateID,
Version: currentVersion+1,
Type: event.Type(),
Data: event.Data(),
Timestamp: time.Now(),
}
es.db.Create(&record)
currentVersion++
}
return nil
}
5. Шина событий (Event Bus)

Асинхронная доставка событий подписчикам через:
Внутрипроцессные каналы
Внешние брокеры
6. Проектор (Projector)
Трансформирует события в оптимизированные модели чтения.

Особенности:
Создает несколько различных представлений
Работает асинхронно и независимо
Допускает eventual consistency
Пример:
func (p *OrderProjector) HandleEvent(event Event) {
switch e := event.(type) {
case OrderCompletedEvent:
return p.updateOrderStatus(e.OrderID, "completed")
// ... другие типы событий
}
return nil
}
7. Обработчик запросов (Query Handler)
Поставщик данных для чтения, работающий с проекциями.

Пример:
func GetOrderSummary(orderID string) (*OrderSummary, error) {
var summary OrderSummary
err := db.Where("id = ?", orderID).First(&summary).Error
return &summary, err
}
Минимальный цикл
// 1. Получение команды
cmd := CompleteOrderCommand{OrderID: "123", UserID: "u456"}
// 2. Загрузка событий
events := eventStore.Load("123")
// 3. Восстановление агрегата
order := LoadOrder(events)
// 4. Обработка команды
newEvents, err := order.Complete(cmd)
// 5. Сохранение событий
eventStore.Save("123", newEvents, order.Version)
// 6. Публикация событий
for _, event := range newEvents {
eventBus.Publish(event)
}
// 7. Запрос данных (где-то в другом месте)
summary := GetOrderSummary("123")
fmt.Println(summary.Status) // "completed"
Ключевые потоки данных
Командный поток
Клиент → CommandHandler → EventStore → Aggregate → Сохранение → EventBus
Поток запросов
EventBus → Projector → ReadDB ← QueryHandler ← Клиент
Сравнение с традиционным подходом
Традиционный CRUD | CQRS/ES |
---|---|
|
|
Текущее состояние в БД | Состояние = Σ всех событий |
Нет истории изменений | Полный аудит автоматически |
Блокировки для консистентности | Оптимистичные блокировки через версии |
Одна модель для чтения/записи | Раздельные оптимизированные модели |
Ключевые выводы
Фундаментальный сдвиг парадигмы
Состояние системы = история всех событий, а не последний snapshot данных.
Преимущество: Полный аудит и возможность "переиграть" историю.Жесткое разделение ответственности
Команды (Write): "Сделай что-то" → Генерируют события
Запросы (Read): "Покажи данные" → Читают оптимизированные проекции Результат: Независимое масштабирование операций записи и чтения.
Агрегаты - хранители бизнес-логики
Не хранят состояние постоянно, а воссоздают его из событий и принимают решения.События как источник истины
Неизменяемые факты
Содержат всю информацию об изменении
Append-only хранилище = гарантия сохранности истории
Проекции - гибкие представления
Можно создавать несколько специализированных моделей для разных задач.