Как стать автором
Поиск
Написать публикацию
Обновить

Паттерн Saga через MassTransit. Оркестрация vs Хореография

Уровень сложностиСредний
Время на прочтение23 мин
Количество просмотров1.6K

Сегодня многие разработчики активно используют архитектуру микросервисов. Идея кажется простой: разбить приложение на небольшие независимые сервисы и соединить их между собой. На словах всё звучит логично, но на практике появляются сложности - особенно когда нужно управлять распределёнными транзакциями.

Главная проблема в том, чтобы обеспечить согласованность данных: если один сервис завершит операцию с ошибкой, мы должны корректно откатить или компенсировать изменения в других, чтобы система оставалась надёжной и давала правильный бизнес-результат.

В этой статье мы разберём Saga с нуля, простыми словами и на понятных примерах. Материал подойдёт как для первого знакомства с темой, так и в качестве пошагового гайда по её реализации в C#. Мы рассмотрим два основных подхода — хореографию и оркестрацию, разберём, чем они отличаются, и в каких случаях что выбрать.

В конце статьи вы сможете скачать пример готового проекта с GitHub и сразу попробовать всё на практике.

Описание проблемы

Представим задачу по созданию оформлении заказа в интернет-магазине. Он состоит из трех сервисов перечисленных ниже.

  • Inventory - работа со складом, инвенторизацией;

  • Delivery - интеграция с курьерскими службами и состояние доставки;

  • Order - работа с заказом и интеграция с эквайринговыми (платежными) сервисами.

Заказ можно оплатить только изначально сохранив карту в системе. Когда пользователь оформляет заказ, необходимо выполнить несколько шагов:

  1. проверить, сохранена ли карта пользователя;

  2. проверить наличие товаров;

  3. создать заказ и выполнить рекуррентный (автоматический) платеж;

  4. забронировать товары на складе;

  5. запросить у курьерской службы доставку товара.

Что произойдет если не использовать межсервисные транзакции?

Представим что пользователь пытается купить что-то в магазине, но курьерская служба не может доставить товар. Начинается обработка запроса:

  • карта сохранена и доступна для рекуррентного списания;

  • нужные товары в наличии;

  • создается заказ, платеж проходит;

  • товары забронированы для пользователя;

  • курьерская служба приняла запрос за доставку, но отказала.

Что произошло? Товары уже забронированы, оплата по ним прошла. Логично предположить, что можно в сервисе Delivery просто обрабатывать ответ службы доставки и по условию откатывать изменения в сервисах Inventory и Order обычным запросом в очередь сообщений. Подобная логика может работать до того момента, пока не нужно откатывать изменения большем кол-ве сервисов.

Как только по каким-то причинам курьерская служба откажется доставлять заказ, придется делать дополнительные условия по откату уже в Delivery сервисе, и теперь необходимо уже отметить бронь и вернуть деньги пользователю.

Как становится понятно, чем больше действий необходимо откатить, тем больше трудностей и багов может всплыть. Решением для таких случаев может оказаться паттерн Saga.

Межсервисные транзакции (Saga)

Определимся с терминами:

Saga - это последовательность локальных транзакций, где каждая операция выполняется в рамках отдельного сервиса. Если одна из операций завершилась неудачно, выполняется компенсирующая транзакция, отменяющая предыдущие шаги.

SagaStateMachine - это конкретная реализация паттерна с использованием MassTransit и механизма конечных автоматов.

Конечный автомат (или finite state machine, FSM) - это модель, описывающая систему, которая может находиться в одном из конечного числа состояний. Эта система может изменять своё состояние в ответ на входные события или условия.

CorrelationId - это уникальный идентификатор, который используется для связывания всех сообщений, участвующих в одном бизнес-процессе.

Когда начинается выполнение саги, ей присваивается CorrelationId. Все входящие и исходящие сообщения в рамках этой саги будут содержать этот же идентификатор. Это позволяет приложению отслеживать, к какому экземпляру саги относится конкретное сообщение.

Существует два типа реализации Saga.

  1. Хореография (Choreography) - децентрализованный подход. взаимодействие между сервисами осуществляется через события. Каждый участник сам решает, что делать, подписываясь на события и инициируя следующие шаги.

  2. Оркестрация (Orchestration) - централизованный подход. Один сервис (оркестратор) управляет шагами саги, вызывает участников и реагирует на ответы.

В каких случаях Saga излишняя?

Операция ограничена одним сервисом
Если все изменения данных происходят в рамках одной базы/сервиса - проще использовать транзакцию базы данных.
Saga нужна только при распределённости по нескольким сервисам с разными хранилищами.

Нетребовательность к консистентности
Если бизнес-процесс может жить с eventual consistency (система не гарантирует мгновенной согласованности данных) без сложных компенсаций - достаточно простого обмена событиями.
Пример: обновление статистики, отправка писем, логирование - можно сделать асинхронно, без откатов.

Отсутствие цепочки зависимостей
Saga особенно полезна, когда шаги зависят друг от друга (успех одного - условие для следующего).
Если же шаги выполняются независимо и их можно запускать параллельно - Saga лишь добавит лишнюю синхронизацию.

Дешёвые и идемпотентные операции
Если операции легко повторить и нет риска оставить систему в «полу-состоянии», проще сделать ретраи или дедупликацию сообщений, чем городить Saga с компенсирующими действиями.

Простая компенсация без оркестрации
Если «откат» - это одна простая операция, которую можно вызвать при ошибке, нет смысла строить полноценный Saga-оркестратор.

Важнее скорость, чем согласованность
Saga - это дополнительные сообщения, ожидания и логики отката, что увеличивает latency.
Если приоритет - это мгновенная реакция (например, онлайн-игры, трейдинг), лучше использовать другие подходы.

💡 Общий принцип:
Saga оправдана, когда:

  • есть несколько сервисов с разными хранилищами;

  • процесс критичен по консистентности;

  • нет возможности простой ручной компенсации;

  • операции зависимы и требуют чёткой последовательности.

Какой способ выбрать?

Для каждой ситуации подходят разные варианты. Возможные критерии выбора:

Критерий

Оркестрация

Хореография

Отказоустойчивость

Оркестратор централизованно управляет состоянием саги, может сохранять его в БД. Это упрощает отладку, повторное выполнение операций (retries), восстановление цепочки после сбоев, реализацию дедлайнов и таймаутов.

Каждое действие инициируется ивентом, состояние распределено по участникам. Потеря сообщения или сбой сервиса нарушает цепочку, восстановление требует дополнительных механизмов (DLQ, идемпотентность, наблюдаемость).

Связанность

Сервисы напрямую связаны с оркестратором и знают его команды. Это повышает связанность (coupling), особенно при изменении бизнес-логики.

Сервисы реагируют на события и работают независимо. Слабая связанность (loose coupling), проще расширять систему, добавляя новые сервисы, просто подписывая их на события.

Отладка

Централизованное логирование и трекинг. Можно посмотреть в БД оркестратора и понять, на каком шаге застряла сага.

Проследить цепочку сложнее, особенно в распределённой среде. Требуется distributed tracing (OpenTelemetry) и наблюдение за всей цепочкой сообщений.

Гибкость

Подходит при частых изменениях бизнес-логики - достаточно внести изменения в оркестратор.

Хороша для стабильной логики и автономных систем. Изменение последовательности требует координации между несколькими сервисами.


Оркестрация

Подход, при котором централизованный компонент (оркестратор) управляет взаимодействием между микросервисами.

Оркестратор:

  • вызывает участников по шагам;

  • контролирует последовательность;

  • обрабатывает ошибки;

  • решает, что делать дальше.Как дирижёр в оркестре: один руководит - остальные исполняют.

Что включать в оркестрацию?

На самом деле не все шаги правильно включать в оркестрацию. Необходимо выделить цепочку запросов, которая действительно должна быть в качестве межсервисной транзакции. Разберем пример с последовательностью заказа.

  1. Проверить сохранена ли карта пользователя и проверить наличие товаров - эти действия можно выполнить до инициализации саги. Они являются обычными запросами.

  2. Проверить наличие товаров, забронировать товары на складе, создать заказ и выполнить рекуррентный (автоматически) платеж, запросить у курьерской службы доставку товара - ивенты, которые должны быть в транзакции.

  3. Отправить уведомление пользователю - отправка уведомления является ивентом, но не должна быть включенна в Saga т.к. её успех не важен в самой последовательности заказа. Если необходимо в любом случае отправить уведомление, то можно воспользоваться паттерном MessageOutbox.

Заключение: в оркестрации до инициализации саги можно выдополнить различные проверочные действия. Эти проверки можно назвать pre-conditions. Если они не выполняются - и сагу начинать не надо.

Можно ли не делать отдельный сервис для оркестрации?

Микросервисы должны придерживаться принципу единой ответственности, в большинстве случаев правильно будет создать отдельный сервис для оркестрации.

Но в реальности не всегда это уместно. Если на проекте и так много сервисов, то не очень хочется делать еще один запускаемый проект, который будет нагружать сервер. Можно поступить таким образом:

  1. выделить все сервисы которые участвуют в оркестрации;

  2. определить к какому домену больше всего относится вся цепочке ивентов;

  3. использовать сервис, который является ключевым.

Например, если вам необходимо сделать регистрацию пользователя через Saga, то скорее всего у вас есть User или Profile сервис, который является основным в цепочке ивентов. При необходимости, SagaStateMashine можно описать в нем.

Реализация

Необходимо реализовать цепочку по этой схеме:

Цепочка оркестрации
Цепочка оркестрации

Контракты

Так как для оркестрации нужен отдельный сервис, то для него, изначально, необходимо определить контракты. Вариант контрактов для описываемого примера.

  • OrchestrationOrderCreateEvent - создание заказа;

  • OrchestrationOrderCreateEventCompleted - создание заказа прошло успешно;

  • OrchestrationOrderCreateEventFailed - создание заказа выдало ошибку;

  • OrchestrationInventoryGoodsBookedInWarehouseEvent -бронирование товара на складе;

  • OrchestrationInventoryGoodsBookedInWarehouseEventCompleted - бронирование товара на складе прошло успешно;

  • OrchestrationInventoryGoodsBookedInWarehouseEventFailed - бронирование товара на складе выдало ошибку;

  • OrchestrationDeliverySendEvent - передача товара в доставку;

  • OrchestrationDeliverySendEventCompleted - передача товара в доставку успешно;

  • OrchestrationDeliverySendEventFailed - передача товара в доставку выдало ошибку;

  • OrchestrationInventoryCancelBookingGoodsInWarehouseEvent - compensation logic отмены бронирования товара на складе;

  • OrchestrationOrderCancelEvent - compensation logic создания заказа.

Ивенты делятся на 3 типа.

  • Направляемые сервису. Например, OrchestrationOrderCreateEvent, этот ивент направляется из Saga в Order сервис

  • Ответа от сервиса. Если был обработан OrchestrationOrderCreateEvent, то ответом может быть OrchestrationOrderCreateEventСompleted или OrchestrationOrderCreateEventFailed

  • Compensation logic. Ивент о компенсирующем действии при ошибке, например, OrchestrationOrderCancelEvent, говорит о том, что заказ необходимо отменить.

Экземпляр SagaStateMashine

До определения саги необходимо описать, какой контекст хранит оркестратор в рамках транзакции. Для этого необходимо унаследоваться от SagaStateMachineInstance

public class OrderSaga : SagaStateMachineInstance
{
    // Payload Data
    public IEnumerable<GoodViewModel> Goods { get; set; } = null!;
    public string DeliveryAddress { get; set; } = null!;
    public Guid UserId { get; set; }
    
    // Saga state
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; } = null!;
    
    // Audit info
    public DateTime CreatedAt { get; set; }
    public DateTime? UpdateAt { get; set; }
    public DateTime? CompletedAt { get; set; }
    
    // Callback 
    public Guid? RequestId { get; set; }
    public Uri? ResponseAddress { get; set; }
}

Как видно, поля контекста разделены на 4 части:

  • payload data - бизнес-данные, передаваемые через сагу;

  • saga state - технические поля, управляющие состоянием саги;

  • audit info - временные метки для аудита и отладки;

  • callback - данные ответа/запроса для Request/Response паттерна.

Инициализирующее событие

После того как описан контекст саги, необходимо понять, как его инициализировать. В большинстве случаев для этого можно использовать данные из раздела Payload - бизнес-информацию, связанную с процессом.

Кроме полезной нагрузки важно определить CorrelationId. Это уникальный идентификатор, который используется для связывания всех сообщений, участвующих в одной саге или бизнес-процессе. Его значение должно отражать суть операции. В нашем случае - это создание заказа, и в качестве CorrelationId логично использовать OrderId.

public record OrchestrationInitializeSagaEvent(
    //CorrelationId
    Guid OrderId,
    //Payload Data
    IEnumerable<GoodViewModel> Goods, 
    Guid UserId, 
    string DeliveryAddress);

Конечный автомат (Finite State Mashine)

Для начала нужно унаследоваться от MassTransitStateMachine передав в generic описанный ранее контекст:

public class OrderSagaStateMachine : MassTransitStateMachine<OrderSaga>
{
    private readonly ILogger<OrderSagaStateMachine> _logger;
    
    public OrderSagaStateMachine(ILogger<OrderSagaStateMachine> logger)
    {
        _logger = logger;
	}
}

Оркестрация в MassTransit реализуется с помощью конечного автомата (FSM - Finite State Machine). Это означает, что мы должны заранее описать все возможные события и состояния, которые могут возникнуть в рамках саги.

Для начала определим все события, на которые будет реагировать сага - это, как правило, ответы от внешних сервисов:

private Event<OrchestrationInitializeSagaEvent> OnSagaStarted { get; set; } = null!;
private Event<OrchestrationInventoryGoodsBookedInWarehouseEventCompleted> OnGoodsBookedInWarehouseCompleted { get; set; } = null!;
private Event<OrchestrationInventoryGoodsBookedInWarehouseEventFailed> OnGoodsBookedInWarehouseFailed { get; set; } = null!;
private Event<OrchestrationOrderCreateEventCompleted> OnOrderCreateCompleted { get; set; } = null!;
private Event<OrchestrationOrderCreateEventFailed> OnOrderCreateFailed { get; set; } = null!;
private Event<OrchestrationDeliverySendEventCompleted> OnDeliverySendEventCompleted { get; set; } = null!;
private Event<OrchestrationDeliverySendEventFailed> OnDeliverySendEventFailed { get; set; } = null!;

Следующим шагом нужно определить возможные состояния конечного автомата. Их можно выделить на основе вышеуказанных событий:

private State OrderCreated { get; set; } = null!; // Создание заказа
private State BookingGoodsInWarehouse { get; set; } = null!; // Бронирование товаров на складе
private State DeliverySend { get; set; } = null!; // Передача в доставку
private State Failed { get; set; } = null!; // Провал

Конструктор конечного автомата

В конструкторе конечного автомата инициализируются состояния и события. Причем каждое событие связанно одним OrderId, который выступает в качестве CorrelationId

public OrderSagaStateMachine(ILogger<OrderSagaStateMachine> logger)
{
    _logger = logger;
    
    State(() => OrderCreated);
    State(() => BookingGoodsInWarehouse);
    State(() => DeliverySend);
    State(() => Failed);
    
    Event(() => OnSagaStarted, context => context.CorrelateById(x => x.Message.OrderId));
    Event(() => OnGoodsBookedInWarehouseCompleted, context => context.CorrelateById(x => x.Message.OrderId));
    Event(() => OnGoodsBookedInWarehouseFailed, context => context.CorrelateById(x => x.Message.OrderId));
    Event(() => OnOrderCreateCompleted, context => context.CorrelateById(x => x.Message.OrderId));
    Event(() => OnOrderCreateFailed, context => context.CorrelateById(x => x.Message.OrderId));
    Event(() => OnDeliverySendEventCompleted, context => context.CorrelateById(x => x.Message.OrderId));
    Event(() => OnDeliverySendEventFailed, context => context.CorrelateById(x => x.Message.OrderId));

Необходимо объявить поле в котором будет хранится имя State внутри экземпляра SagaStateMashine

	InstanceState(x => x.CurrentState);

Описание инициализации

В этом же конструкторе необходим описать, что будет происходить при инициализации:

Initially(WhenSagaStarted());

private EventActivityBinder<OrderSaga, OrchestrationInitializeSagaEvent> WhenSagaStarted()
{
    return When(OnSagaStarted)
        .Then(InitializeSaga)
        .Then(LogSagaState)
        .Publish(context =>
        new OrchestrationInventoryGoodsBookedInWarehouseEvent(context.Message.OrderId, context.Message.Goods.ToDictionary(x => x.Id, model => model.Count)))
        .TransitionTo(BookingGoodsInWarehouse);
}

private void InitializeSaga(BehaviorContext<OrderSaga, OrchestrationInitializeSagaEvent> context)
{
    context.Saga.Goods = context.Message.Goods;
    context.Saga.DeliveryAddress = context.Message.DeliveryAddress;
    context.Saga.CorrelationId = context.Message.OrderId;
    context.Saga.UserId = context.Message.UserId;
    context.Saga.RequestId = context.RequestId;
    context.Saga.ResponseAddress = context.ResponseAddress;
    context.Saga.CreatedAt = DateTime.UtcNow;
}

private void LogSagaState<TEvent>(BehaviorContext<OrderSaga, TEvent> context) where TEvent : class
{
    _logger.LogInformation($"{nameof(OrderSaga)} | correlationId: {context.Saga.CorrelationId} | event: {context.Event.Name}");
    context.Saga.UpdateAt = DateTime.UtcNow;
}

В методе WhenSagaStarted описана реакция на событие OnSagaStarted:

  1. первым делом вызывается метод InitializeSaga который заполняет данными контекст Saga;

  2. происходит лог текущего состояния;

  3. отправляется событие о бронировании товаров в сервис инвентаризации;

  4. выполняется переход на следующий State (BookingGoodsInWarehouse).

Описание реакции на события

Для описания реакций на события используется блок During:

During(BookingGoodsInWarehouse,
  When(OnGoodsBookedInWarehouseCompleted)
    .Publish(...)
    .TransitionTo(OrderCreated),

  When(OnGoodsBookedInWarehouseFailed)
    .Publish(...)
    .ThenAsync(...)
    .TransitionTo(Failed)
);

Что такое During:
During(State, ...) определяет поведение реакций на события, которые принимаются, когда экземпляр саги находится в указанном состоянии (state). Это позволяет различать логику, которая должна выполняться на разных этапах процесса. Аналогично Initially(...), только применяется не для начального состояния, а для одного или нескольких состояний.
Когда приходит определённое событие (When), выполняется последовательность действий (ThenThenAsyncPublish).
Экземпляр саги переходит в новое состояние (TransitionTo).

Рассмотрим на примере:

During(BookingGoodsInWarehouse,  
    When(OnGoodsBookedInWarehouseCompleted)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationOrderCreateEvent(context.Message.OrderId, context.Saga.Goods, context.Saga.UserId, context.Saga.DeliveryAddress))  
        .TransitionTo(OrderCreated),  
    When(OnGoodsBookedInWarehouseFailed)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationInventoryCancelBookingGoodsInWarehouseEvent(context.CorrelationId!.Value, context.Saga.Goods.ToDictionary(id => id.Id, model => model.Count)))  
        .ThenAsync(async context => await RespondFromSaga(context, new SagaResponse(context.ConversationId!.Value, context.Message.ProblemDetails)))  
        .TransitionTo(Failed));

Если FSM находится в состоянии BookingGoodsInWarehouse, то она ожидает события и реагирует на них.

  • OnGoodsBookedInWarehouseCompleted -> Товары успешно забронированы на складе.

    1. залогировать текущее состояние;

    2. опубликовать ивент OrchestrationOrderCreateEvent для создании заказа;

    3. изменить состояние Saga на OrderCreated.

  • OnGoodsBookedInWarehouseFailed -> Ошибка брони товаров на складе

    1. залогировать текущее состояние;

    2. опубликовать ивент OrchestrationInventoryCancelBookingGoodsInWarehouseEvent для выполнения замещающего действия;

    3. отдать результат, если кто-то его ожидает;

    4. изменить состояние Saga на Failed.

Каждый During разделяет логику по этапам: важные операции публикуются только при конкретной фазе. После публикации ключевых событий состояние саги меняется с помощью TransitionTo(...), чтобы последующие вызовы During(...) обрабатывались правильно в новом состоянии. Это обеспечивает ясную, отделённую логику оркестрации и использование модели Saga для управления долгоживущими бизнес-процессами.

Описание реакции на иное состояние

Если FSM почему-то получило неизвестное ей состояние или у состояния нет обработчика, то оно попадает в блок DuringAny(...). Это блок аналогичен другим During только он не ждет определенное состояние. Обычно в этом блоке описываются завершающие действия при ошибке:

DuringAny(WhenDeliverySendFailed());  
  
private EventActivityBinder<OrderSaga, OrchestrationDeliverySendEventFailed> WhenDeliverySendFailed()  
    => When(OnDeliverySendEventFailed)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationInventoryCancelBookingGoodsInWarehouseEvent(context.CorrelationId!.Value, context.Saga.Goods.ToDictionary(id => id.Id, model => model.Count)))  
        .Publish(context => new OrchestrationOrderCancelEvent(context.CorrelationId!.Value))  
        .TransitionTo(Failed);

В данном примере, когда сервис доставки опубликовал ошибку Saga провела компенсирующие действия и перевела состояние в Failed.

Типовая реакция сервиса на событие из Saga

Рассмотрим тот же пример до состояния BookingGoodsInWarehouse. Перед переходом в него публикуется событие OrchestrationInventoryGoodsBookedInWarehouseEvent. В свою очередь, сервис инвентаризации слушает очередь на наличие этого события и реагирует на него:

public class OrchestrationInventoryGoodsBookedInWarehouseEventConsumer(IInventoryService inventoryService) : IConsumer<OrchestrationInventoryGoodsBookedInWarehouseEvent>  
{  
    public async Task Consume(ConsumeContext<OrchestrationInventoryGoodsBookedInWarehouseEvent> context)  
    {        
	    try  
        {  
            await inventoryService.BookGoodsAsync(context.Message.GoodBooks, context.CancellationToken);  
        }        
        catch (ArgumentOutOfRangeException e)  
        {            
        await context.Publish(new OrchestrationInventoryGoodsBookedInWarehouseEventFailed(context.Message.OrderId,  
                new ProblemDetails()  
                {  
                    Details = e.Message,  
                    Instance = nameof(OrchestrationInventoryGoodsBookedInWarehouseEventConsumer),  
                    Status = (int)HttpStatusCode.InternalServerError,  
                    Title = HttpStatusCode.InternalServerError.ToString(),  
                    Type = "BookError"  
                }));  
            return;  
        }  
        await context.Publish(new OrchestrationInventoryGoodsBookedInWarehouseEventCompleted(context.Message.OrderId));  
    }}

Как видно, если бронирование происходит успешно, то публикуется событие OrchestrationInventoryGoodsBookedInWarehouseEventCompleted, в ином случае публикуется событие OrchestrationInventoryGoodsBookedInWarehouseEventFailed.
Эти события Saga отлавливает в виде Event:

Event(() => OnGoodsBookedInWarehouseCompleted, context => context.CorrelateById(x => x.Message.OrderId));  
Event(() => OnGoodsBookedInWarehouseFailed, context => context.CorrelateById(x => x.Message.OrderId));

Бизнес логика

Для полного понимания картины рассмотрим пример с сагой с точки зрения бизнес логики:

Initially(WhenSagaStarted());  
  
During(BookingGoodsInWarehouse,  
    When(OnGoodsBookedInWarehouseCompleted)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationOrderCreateEvent(context.Message.OrderId, context.Saga.Goods, context.Saga.UserId, context.Saga.DeliveryAddress))  
        .TransitionTo(OrderCreated),  
    When(OnGoodsBookedInWarehouseFailed)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationInventoryCancelBookingGoodsInWarehouseEvent(context.CorrelationId!.Value, context.Saga.Goods.ToDictionary(id => id.Id, model => model.Count)))  
        .ThenAsync(async context => await RespondFromSaga(context, new SagaResponse(context.ConversationId!.Value, context.Message.ProblemDetails)))  
        .TransitionTo(Failed));  
  
During(OrderCreated,  
    When(OnOrderCreateCompleted)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationDeliverySendEvent(context.Message.OrderId, context.Saga.Goods, context.Saga.UserId, context.Saga.DeliveryAddress))  
        .TransitionTo(DeliverySend),  
    When(OnOrderCreateFailed)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationInventoryCancelBookingGoodsInWarehouseEvent(context.CorrelationId!.Value, context.Saga.Goods.ToDictionary(id => id.Id, model => model.Count)))  
        .ThenAsync(async context => await RespondFromSaga(context, new SagaResponse(context.CorrelationId!.Value, context.Message.ProblemDetails)))  
        .TransitionTo(Failed));  
  
During(DeliverySend,  
When(OnDeliverySendEventCompleted)  
        .Then(LogSagaState)  
        .ThenAsync(async context => await RespondFromSaga(context, new SagaResponse(context.CorrelationId!.Value)))  
        .TransitionTo(Final),  
        WhenDeliverySendFailed());  
  
DuringAy(WhenDeliverySendFailed());

//------------------------------------------

private EventActivityBinder<OrderSaga, OrchestrationDeliverySendEventFailed> WhenDeliverySendFailed()  
    => When(OnDeliverySendEventFailed)  
        .Then(LogSagaState)  
        .Publish(context => new OrchestrationInventoryCancelBookingGoodsInWarehouseEvent(context.CorrelationId!.Value, context.Saga.Goods.ToDictionary(id => id.Id, model => model.Count)))  
        .Publish(context => new OrchestrationOrderCancelEvent(context.CorrelationId!.Value))  
        .TransitionTo(Failed);  
  
private EventActivityBinder<OrderSaga, OrchestrationInitializeSagaEvent> WhenSagaStarted()  
{  
    return When(OnSagaStarted)  
        .Then(InitializeSaga)  
        .Then(LogSagaState)  
        .Publish(context =>  
            new OrchestrationInventoryGoodsBookedInWarehouseEvent(context.Message.OrderId, context.Message.Goods.ToDictionary(x => x.Id, model => model.Count)))  
        .TransitionTo(BookingGoodsInWarehouse);  
}
  • Инициализация. Создается контекст Saga, публикуется OrchestrationInventoryGoodsBookedInWarehouseEvent, который бронирует товары в сервисе инвентаризации. Переход в состояние BookingGoodsInWarehouse.

  • Состояние BookingGoodsInWarehouse.

    • Если сервис инвентаризации опубликовал событие об успешной брони товаров (OrchestrationInventoryGoodsBookedInWarehouseEventCompleted), то публикуется событие OrchestrationOrderCreateEvent, которое создает заказ на оплату. Переход в состояние OrderCreated.

    • Если сервис инвентаризации опубликовал событие об ошибке (OrchestrationInventoryGoodsBookedInWarehouseEventFailed), то публикуется событие OrchestrationInventoryCancelBookingGoodsInWarehouseEvent, которое выполняет компенсирующее действие, отменяя бронь товара. Переход в состояние Failed.

  • Состояние OrderCreated:

    • Если сервис заказа опубликовал событие об успешном создании заказа на оплату (OrchestrationOrderCreateEventCompleted), то публикуется событие OrchestrationDeliverySendEvent, которое оправляет товар в доставку. Переход в состояние DeliverySend.

    • Если сервис заказа опубликовал событие об ошибке (OrchestrationOrderCreateEventFailed), то публикуется событие OrchestrationInventoryCancelBookingGoodsInWarehouseEvent, которое выполняет компенсирующее действие, отменяя бронь товара. Переход в состояние Failed.

  • Состояние DeliverySend:

    • Если сервис доставки опубликовал событие об успешной передачи в доставку (OrchestrationDeliverySendEventCompleted), завершить Saga переходом в состояние Final

    • Если сервис доставки опубликовал событие об ошибке (OrchestrationDeliverySendEventFailed), то публикуется событие OrchestrationInventoryCancelBookingGoodsInWarehouseEvent, которое выполняет компенсирующее действие, отменяя бронь товара и публикуется событие OrchestrationOrderCancelEvent, которое выполняет компенсирующее действие, удаляя заказ. Переход в состояние Failed.

  • Иное состояние. Выполнение всех компенсирующих действий

Хореография

Хореография не про иной способ отправки сообщений в очередь, она про идею слабой связанности. В хореографии нет центральной точки, которая бы проверила всё перед запуском.

В хореографии:

  1. saga начинается сразу (например, заказ создан);

  2. каждый участник реагирует на событие;

  3. если что-то пошло не так, участник публикует событие об ошибке;

  4. остальные участники могут отменить свои действия, если это необходимо (compensation logic).

Реализация

Необходимо реализовать цепочку по этой схеме:

Цепочка хореографии
Цепочка хореографии

Контракты

В отличии от оркестрации не должно быть единого контракта. Каждый сервис реализует свои контракты и знает о контрактах других сервисов.

  • Order

    • OrderCreateCommand - команда создания заказа.

    • OrderCreateEventCompleted - создание заказа прошло успешно.

    • OrderCreateEventFailed - создание заказа выдало ошибку.

  • Inventory

    • InventoryGoodsBookedInWarehouseEventCompleted - бронирование товара на складе прошло успешно.

    • InventoryGoodsBookedInWarehouseEventFailed - бронирование товара на складе выдало ошибку.

  • Delivery

    • DeliverySendEventCompleted - передача товара в доставку успешно.

    • DeliverySendEventFailed - передача товара в доставку выдало ошибку.

Как видно, нет событий направляемых в сервисы для выполнения каких либо действий, например, бронирование товара или передача товара в доставку. Вместо этого сервисы реагируют на события других сервисов:

OrderCreateCommand ->
    OrderCreateEventCompleted ->
    OrderCreateEventCompletedConsumer() ->
    InventoryGoodsBookedInWarehouseEventCompleted ->
    InventoryGoodsBookedInWarehouseEventCompletedConsumer() ->
    DeliverySendEventCompleted ->
    DeliverySendEventCompletedConsumer() ->

Также видно отсутствие событий для Compensation logic. Вместо этого все сервисы будут просто реагировать на события с ошибкой других сервисов и проводить Compensation logic.

_Стоит заметить, что иногда используется одно единственное событие (например, OrderCreateEventFailed), которое отправляется всеми сервисами в цепочке при ошибке, для запуска compensation logic.

Общий контекст

Хореография не может хранить общий контекст распределенной транзакции, поэтому контекст приходится передавать в рамках всех запросов:

public record OrderCreateEventCompleted(Guid OrderId, Guid UserId, IEnumerable<GoodViewModel> CartItems, string Address);

public record InventoryGoodsBookedInWarehouseEventCompleted(Guid OrderId, Guid UserId, IEnumerable<GoodViewModel> CartItems, string Address);

public record DeliverySendEventCompleted(Guid OrderId);

Как видно, у двух первых событий одинаковые поля. Так как DeliverySendEventCompleted является последним, его вовсе можно исключить из цепочки запросов.

Начало цепочки запросов

У хореграфии нет как такого инициализующего метода, поэтому в качестве него может выступить обычная команда, которая отправит правильное событие для начала цепочки:

public record OrderCreateCommand(Guid UserId, IEnumerable<GoodViewModel> CartItems, string Address);

В данном случае это команда, которая создает заказ на оплату.

Ее обработчик выполняет pre-conditions по проверке банковской карты, после выполняет логику по заказу. Если создание заказа успешное, то отправляется событие OrderCreateEventCompleted, которое оповещает всех слушателей (сервисов) об успешности создании заказа. При ошибке создания заказа, или проблемах с картой будет оправлено событие OrderCreateEventFailed.

public class OrderCreateCommandConsumer(ILogger<OrderCreateCommandConsumer> logger, ICardService cardService, IOrderService orderService) : IConsumer<OrderCreateCommand>  
{  
    public async Task Consume(ConsumeContext<OrderCreateCommand> context)  
    {        var card = await cardService.GetFirstCardByUserId(context.Message.UserId, context.CancellationToken);  
        if (card is null)  
        {            
	        var errorMessage = $"The user {context.Message.UserId} did not save any card";  
            logger.LogError($"[{nameof(OrderCreateCommandConsumer)}]. Message: {errorMessage}");  
            var problemDetails = new ProblemDetails()  
            {  
                Details = errorMessage,  
                Instance = nameof(OrderCreateCommandConsumer),  
                Status = (int)HttpStatusCode.InternalServerError,  
                Title = HttpStatusCode.InternalServerError.ToString(),  
                Type = "CardError"  
            };  
            await context.RespondAsync(new MqResult<Guid>(problemDetails));  
            return;  
        }  
        var orderId = Guid.NewGuid();  
  
        var orderCreationModel = new OrderCreationModel  
        {  
            UserId = context.Message.UserId,  
            DeliveryAddress = context.Message.Address,  
            CartItems = context.Message.CartItems.Select(x => x.Name).ToList(),  
            Amount = context.Message.CartItems.Sum(x => x.Price * x.Count)  
        };  
        try  
        {  
            await orderService.InsertAsync(orderCreationModel, orderId, context.CancellationToken);  
        }        
        catch (Exception e)  
        {            
	        logger.LogError($"[{nameof(OrderCreateCommandConsumer)}]. Message: {e.Message}");  
            var problemDetails = new ProblemDetails()  
            {  
                Details = e.Message,  
                Instance = nameof(OrderCreateCommandConsumer),  
                Status = (int)HttpStatusCode.InternalServerError,  
                Title = HttpStatusCode.InternalServerError.ToString(),  
                Type = "OrderError"  
            };  
            await context.Publish(new OrderCreateEventFailed(orderId, problemDetails));  
            await context.RespondAsync(new MqResult<Guid>(problemDetails));  
            return;  
        }        
        await context.Publish(new OrderCreateEventCompleted(orderId, context.Message.UserId, context.Message.CartItems, context.Message.Address));  
        await context.RespondAsync(new MqResult<Guid>(orderId));  
        logger.LogCritical($"[{nameof(OrderCreateCommandConsumer)}]. Message: Successfully create order {orderId}");  
    }}

Типовая реакция сервиса на событие из Saga

После создания заказа Order сервис публикует OrderCreateEventCompleted, который, в свою очередь, слушает Inventory сервис:

public class OrderCreateEventCompletedConsumer(ILogger<OrderCreateEventCompletedConsumer> logger, IInventoryService inventoryService) : IConsumer<OrderCreateEventCompleted>  
{  
    public async Task Consume(ConsumeContext<OrderCreateEventCompleted> context)  
    {        
	    //Checking the availability of goods  
        var goodIds = context.Message.CartItems.Select(s => s.Id);  
        var availabilityGoods = await inventoryService.CheckAvailabilityAsync(goodIds, context.CancellationToken);  
        try  
        {  
            ValidateGoodsAvailability(context.Message.CartItems, availabilityGoods);  
            await inventoryService.BookGoodsAsync(context.Message.CartItems.ToDictionary(x => x.Id, i => i.Count), context.CancellationToken);  
        }        
        catch (InvalidOperationException e)  
        {            logger.LogError($"[{nameof(OrderCreateEventCompletedConsumer)}]. Message: {e.Message}");  
            await context.Publish(new InventoryGoodsBookedInWarehouseEventFailed(context.Message.OrderId,  
                new ProblemDetails()  
                {  
                    Details = e.Message,  
                    Instance = nameof(OrderCreateEventCompletedConsumer),  
                    Status = (int)HttpStatusCode.InternalServerError,  
                    Title = HttpStatusCode.InternalServerError.ToString(),  
                    Type = "BookError"  
                }));  
            return;  
        }        
        catch (ArgumentOutOfRangeException e)  
        {            logger.LogError($"[{nameof(OrderCreateEventCompletedConsumer)}]. Message: {e.Message}");  
            await context.Publish(new InventoryGoodsBookedInWarehouseEventFailed(context.Message.OrderId,  
                new ProblemDetails()  
                {  
                    Details = e.Message,  
                    Instance = nameof(OrderCreateEventCompletedConsumer),  
                    Status = (int)HttpStatusCode.InternalServerError,  
                    Title = HttpStatusCode.InternalServerError.ToString(),  
                    Type = "BookError"  
                }));  
            return;  
        }  
        await context.Publish(new InventoryGoodsBookedInWarehouseEventCompleted(context.Message.OrderId, context.Message.UserId, context.Message.CartItems, context.Message.Address));
    }
}

Как видно, по большей части, реакция на событие в хореографии аналогично реакции в оркестрации - событие обрабатывается, выполняются действия, публикуется следующее событие. Но, в отличии от оркестрации, тут нет событий ответа от сервиса

Бизнес логика

Для полного понимания картины рассмотрим пример с сагой с точки зрения бизнес логики:

Начало цепочки. Создается команда OrderCreateCommand, в которой выполняются необходимые pre-conditions. Если заказ успешно создан, то публикуется OrderCreateEventCompleted, иначе публикуется OrderCreateEventFailed.

  • Inventory сервис.

    • Если сервис получил событие о создании заказа (OrderCreateEventCompleted), то бронируются товары, при успешном бронировании сервис отправляет событие InventoryGoodsBookedInWarehouseEventCompleted, иначе отправляется событие InventoryGoodsBookedInWarehouseEventFailed.

    • Если сервис получил событие о ошибке доставки (DeliverySendEventFailed), то бронь товара отменяется.

  • Delivery сервис.

    • Если сервис получил событие об успешном бронировании товаров (InventoryGoodsBookedInWarehouseEventCompleted), то товары передаются в доставку и публикуется событие DeliverySendEventCompleted, иначе публикуется событие DeliverySendEventFailed.

  • Order сервис.

    • Если сервис получил событие об ошибке бронировании товаров (InventoryGoodsBookedInWarehouseEventFailed), то заказ удаляется.

    • Если сервис получил событие об ошибке отправки товаров в доставку (DeliverySendEventFailed), то заказ удаляется.


📂 Пример на GitHub
Вот ссылка на репозиторий с примером проекта. В нём реализованы оба подхода работы с Saga - хореография и оркестрация.
Скачайте проект, запустите оба варианта и сравните их в действии 0 так вы получите полное понимание того, как этот паттерн работает на практике.

Отдельное спасибо @alexander_kuznetsov за помощь в редактировании статьи

Теги:
Хабы:
+5
Комментарии6

Публикации

Ближайшие события