Как стать автором
Обновить

Пример микросервисной архитектуры с Saga на MassTransit

Время на прочтение9 мин
Количество просмотров18K

Привет, Хабр! В общем работаю я значит Архитектором Программных Решений. Мы тут монолиты на микросервисы переводим поэтому я решил для наших разработчиков написать пример проекта с сагой и за одно может оно и вам понадобиться поэтому выложил сюда. Статья будет дополняться по мере поступления вопросов от вас и от наших разработчиков.

Saga - последовательность локальных транзакций.

Существует два способа координации саг:

1) Хореография - когда участники обмениваться сообщениями без централизованной точки управления. По сути команда в которой нет лидера и каждый друг с другом договариваются кто что делает и когда. Самый частый способ что я видел на практике. Хорош тем что проще всего в реализации. Плох тем что у вас появляются жирные микросервисы которые многое знают друг про друга. Плох тем что процесс растекается по микросервисам и его иногда очень сложно понять. А вызывает B который вызывает С и D и потом D опять вызывает A а C вызывает F.

Заказчик говорит, что хочет новую фичу Аналитику. Аналитик пишет ТЗ и говорит Программисту написать код. Программист создает реализацию и говорит Тестеру протестировать его. Если Тестер находит баг, то говорит Программисту починить баг.

2) Оркестр - оркестратор говорит участникам, какие операции нужно выполнить. По сути команда у которой есть лидер, который говорит каждому что ему делать и когда. Хорош тем что у вас набор простых микросервисов которые друг про друга не знают и друг с другом не общаются. Плох тем что у вас появляется очень сложный и жирный оркестратор который знает про всех и все. Классический пример по такому принципу работает BPM Camunda и ее фреймворк Zeebe для саг. Бизнес процессы, которые вы описываете в ней превращаются в саги. Тут Camunda выступает в качестве оркестратора вашей системы.

Заказчик говорит, что хочет новую фичу Менеджеру. Менеджер говорит Аналитику написать ТЗ. Когда ТЗ готово Менеджер говорит Программисту сделать реализацию. Когда реализация готова Менеджер говорит Тестеру проверить реализацию. Если тестер нашел баг, то он говори об этом Менеджеру и Менеджер говори Программисту его починить.

Я делал саги и через MassTransit и через NServiceBus. Тут будет пример простого проекта на MassTransit который я считаю более удобным и лаконичным чем NServiceBus.

MassTransit использует Saga c оркестратором.

MassTransit поддерживает RabbitMQ, Kafka, Azure Service Bus, SQS, Event Hub для передачи сообщений.

 MassTransit поддерживает Entity Framework Core, Dapper, Marten, NHibernate, как абстракции для хранения состояния Saga c поддержкой всех БД, поддерживаемых этими библиотеками и так же   умеет работать с Redis, Mongo, Cosmos Db, Azure Table.

Саги MassTransit используются для описания процесса изменения состояния системы для исполнения, которого нужно сделать запрос более чем в один сетевой сервис. Они используются:

  1. Для обеспечения порядка исполнения. Чтобы действие Б выполнилось строго после действия А.

  2. Для компенсации действий. Если действие Б провалилось по какой-то причине, то сага может выполнить компенсирующее запросы для действия А. Например, вернуть деньги на счет.

  3. Для обеспечения консистентной системы. Например, когда нужно изменить состояние микросервиса А и микросервиса Б строго в соответствии друг с другом.

Наша сага будет максимально простой. Без компенсирующих действий. Суть ее логики: У нас есть отдельно микросервис предметов из которого можно взять определенное количество предметов или добавить. Есть микросервис денег откуда можно либо взять, либо добавить определенное количество денег. Мы проработаем простой сценарий где сага сначала берет из микросервиса денег определенную сумму (холдирует ее) и потом забирает из микросервиса предметов определенное количество (уменьшает количество на складе) предметов.

Диаграмма размещения:

Диаграмма последовательности:

Схема запросов:

MoneyMicroservice

Для начала необходимые зависимости можно поставить командами:

Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
Install-Package MassTransit.EntityFramework

Так же необходимо поставить на RabbitMq Delayed Message Exchange

Создаем сообщения запрос и ответ для работы с шиной

//Рекомендуется для контрактов использовать интерфейсы чтобы не было возможности 
//в них прикрутить какую-то логику.
//Хотя уже с новой версией C# где можно реализации и интерфейсам прикреплять 
//подход больше не актуален
public interface IGetMoneyRequest
{
    public Guid OrderId { get; }
}

public interface IGetMoneyResponse
{
    public Guid OrderId { get; }
}

Создаем обработчик этого сообщения

public class GetMoneyConsumer : IConsumer<IGetMoneyRequest>
{
    public Task Consume(ConsumeContext<IGetMoneyRequest> context)
    {
        return context.RespondAsync<IGetMoneyResponse>(new { context.Message.OrderId });
    }
}

Дальше добавляем в стартапе наш обработчик в список обработчиков

builder.Services.AddMassTransit(cfg =>
{
  //Для сообщения AddMoneyRequest будет установлен адрес add-money-request
    cfg.SetKebabCaseEndpointNameFormatter();
  //Чтобы работала обработка запросов надо поставить расширение на RabbitMq rabbitmq_delayed_message_exchange
    cfg.AddDelayedMessageScheduler();
    //Тут регистрируем наши обработчики сообщений
    cfg.AddConsumer<AddMoneyConsumer>();
    cfg.AddConsumer<GetMoneyConsumer>();
  //Настройка подлючения к RabbitMq
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
      //Использовать паттерн OutBox - либо все сообщений одной пачкой сразу отправляются 
      //либо не будет отправлено ни одно из сообщений. 
     //Это нужно, когда вам, например, нужно послать две команды сразу CreateOrder 
      // и SendEmail только при условии, что отправятся оба либо ни одно из них.
        rbfc.UseInMemoryOutbox();
      //Повторные попытки обработать запрос.
        rbfc.UseMessageRetry(r =>
        {
          //Повторять 3 раза каждый раз увеличивая между повторами 
          //интервал на 1 секунду. Начать с интервала в 1 секунду.
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
      //Использовать отложенные сообщения в том числе с помощью них можно 
      //делать таймауты
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
      //Записываем какие сообщения мы слушаем. Вызывать этот метод обязательно
      //иначе обработчики не будут реагировать на сообщения.
        rbfc.ConfigureEndpoints(brc);
    });
}) 
    .AddMassTransitHostedService();

ItemsMicroservice

По сути точно такой же как и предыдущий

public interface IGetItemsRequest
{
    public Guid OrderId { get; }
}

public interface IGetItemsResponse
{
    public Guid OrderId { get; }
}

public class GetItemsConsumer : IConsumer<IGetItemsRequest>
{
    public Task Consume(ConsumeContext<IGetItemsRequest> context)
    {
        return context.RespondAsync<IGetItemsResponse>(new { OrderId = context.Message.OrderId });
    }
}
    
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    cfg.AddConsumer<AddItemsConsumer>();
    cfg.AddConsumer<GetItemsConsumer>();
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
})
    .AddMassTransitHostedService();

SagasMicroservice

Тут наши саги находятся, и они оркеструют процесс покупки предметов.

Сообщений запрос и ответ от саги:

public class BuyItemsRequest
{
    public Guid OrderId { get; set; }
}

public class BuyItemsResponse
{
    public Guid OrderId { get; set; }
    public string ErrorMessage { get; set; }
}

Состояние саги:

public sealed class BuyItemsSagaState : SagaStateMachineInstance
{
		//Идентификатор по которому мы отличаем один процесс от другого.
    public Guid CorrelationId { get; set; }
    //Текущее состояние саги ака Failed, GetItemsPending и т.д.
    public string? CurrentState { get; set; }
    //Тут мы сохраняем идентификатор запроса что запустил нашу сагу
    //чтобы ответить на него
    public Guid? RequestId { get; set; }
    //Тут мы сохраняем адрес откуда пришел запрос который запустил нашу сагу
    //чтобы ответить на него
    public Uri? ResponseAddress { get; set; }
}

Сага для процесса покупки предметов:

public sealed class BuyItemsSaga : MassTransitStateMachine<BuyItemsSagaState>
{
    private readonly ILogger<BuyItemsSaga> _logger;

    public BuyItemsSaga(ILogger<BuyItemsSaga> logger)
    {
        _logger = logger;
        //Указываем куда будем записывать текущее состояние саги (Pending,Faulted)
        InstanceState(x => x.CurrentState);
        //Указываем что слушаем событие OrderId у которого равен нашему CorrelationId у саги
        //Либо если нет саги с таким CorrelationId то создаем его с ним.
        Event<BuyItemsRequest>(() => BuyItems, x => x.CorrelateById(y => y.Message.OrderId));
       //Указываем какие запросы будем делать из саги
       Request(
            () => GetMoney
            );
        Request(
         () => GetItems
         );
        //Указываем как будем реагировать на сообщения в стартовом состоянии
        Initially(

            When(BuyItems)
            .Then(x =>
            {
            //Сохраняем идентификатор запроса и его адрес при старте саги чтобы потом на него ответить
                if (!x.TryGetPayload(out SagaConsumeContext<BuyItemsSagaState, BuyItemsRequest> payload))
                    throw new Exception("Unable to retrieve required payload for callback data.");
                x.Saga.RequestId = payload.RequestId;
                x.Saga.ResponseAddress = payload.ResponseAddress;
            })
            //Совершаем запрос к микросевису MoneyMicroservice
            .Request(GetMoney, x => x.Init<IGetMoneyRequest>(new { OrderId = x.Data.OrderId }))
           //Переводим сагу в состояние GetMoney.Pending
           .TransitionTo(GetMoney.Pending)

            );

        //Описываем то как наша сага будет реагировать на сообщения находясь в 
        //состоянии GetMoney.Pending
        During(GetMoney.Pending,
            //Когда приходи сообщение что запрос прошел успешно делаем новый запрос
            //теперь уже в микросервис ItemsMicroservice
            When(GetMoney.Completed)
            .Request(GetItems, x => x.Init<IGetItemsRequest>(new { OrderId = x.Data.OrderId }))
            .TransitionTo(GetItems.Pending),
            //При ошибке отвечаем тому, кто инициировал запрос сообщением с текстом ошибки
            When(GetMoney.Faulted)
              .ThenAsync(async context =>
              { 
                //Тут можно сделать какие-то компенсирующие действия. 
               //Например, вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Money " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),
            //При таймауте отвечаем с сообщением что произошел таймаут
            When(GetMoney.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Money");
               })
            .TransitionTo(Failed)

             );

        During(GetItems.Pending,
            //При успешном ответе от микросервиса предметов 
            //отвечаем без ошибки и переводим сагу в финальное состояние.
            When(GetItems.Completed)
              .ThenAsync(async context =>
              {
                  await RespondFromSaga(context, null);
              })
            .Finalize(),

            When(GetItems.Faulted)
              .ThenAsync(async context =>
              {
                   //Тут можно сделать какие-то компенсирующие действия. 
                  //Например, вернуть деньги куда-то на счет.
                  await RespondFromSaga(context, "Faulted On Get Items " + string.Join("; ", context.Data.Exceptions.Select(x => x.Message)));
              })
            .TransitionTo(Failed),

            When(GetItems.TimeoutExpired)
               .ThenAsync(async context =>
               {
                   await RespondFromSaga(context, "Timeout Expired On Get Items");
               })
            .TransitionTo(Failed)

            );
    }
    //Запрос на получение денег
    public Request<BuyItemsSagaState, IGetMoneyRequest, IGetMoneyResponse> GetMoney { get; set; }
    //Запрос на получение предметов
    public Request<BuyItemsSagaState, IGetItemsRequest, IGetItemsResponse> GetItems { get; set; }
   //Событие стартующее нашу сагу.
   public Event<BuyItemsRequest> BuyItems { get; set; }
   //Одно из наших кастомных состояний в которое может перейти сага
    public State Failed { get; set; }
    //Метод для ответного сообщения
    //Тут нужно явно использовать ResponseAddress и RequestId 
    //сохраненные ранее чтобы ответить ровно тому, кто сделал запрос
    private static async Task RespondFromSaga<T>(BehaviorContext<BuyItemsSagaState, T> context, string error) where T : class
    {
        var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
        await endpoint.Send(new BuyItemsResponse
        {
            OrderId = context.Saga.CorrelationId,
            ErrorMessage = error
        }, r => r.RequestId = context.Saga.RequestId);
    }
}

Регистрируем сагу в стартапе

builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.AddDelayedMessageScheduler();
    //Тут добавляем сагу с указанием что будем сохранять ее в БД 
    //с помощью EF и будем использовать пессимистичный режим конкуренции за ресурсы
    cfg.AddSagaStateMachine<BuyItemsSaga, BuyItemsSagaState>()
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
        r.ExistingDbContext<SagasDbContext>();
        r.LockStatementProvider = new PostgresLockStatementProvider();
    });
    cfg.UsingRabbitMq((brc, rbfc) =>
    {
        rbfc.UseInMemoryOutbox();
        rbfc.UseMessageRetry(r =>
        {
            r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
        });
        rbfc.UseDelayedMessageScheduler();
        rbfc.Host("localhost", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        rbfc.ConfigureEndpoints(brc);
    });
});

ApiGateway

Его работа — это просто перевести перевести http запрос в ampq запрос в шину. Дождаться ответа и вернуть ответ пользователю (фронтенду)

public class BuyItemsRequstModel
{
    public Guid OrderId { get; set; }
}

[ApiController]
[Route("api/v1/items")]
public class ItemsController : ControllerBase
{
    //Интерфейс MassTransit через который идет работа с сообщениями
    private readonly IBus _bus;
    private readonly ILogger<ItemsController> logger;

    public ItemsController(IBus bus, ILogger<ItemsController> logger)
    {
        _bus = bus;
        this.logger = logger;
    }

    [HttpPost("buy")]
    public async Task<BuyItemsResponse> BuyAsync(BuyItemsRequstModel model)
    {
       //Делаем запрос в шину и ждем ответа от саги. 
       //Ответ придёт из RabbitMq или словим ошибку таймаута запроса
        logger.LogInformation("Start!");
        var response = await _bus.Request<BuyItemsRequest, BuyItemsResponse>(model);
        logger.LogInformation("End!");
        //Возвращаем сообщение что было в ответе
        return response.Message;
    }
}

Так же можно все делать просто на событиях без Request/Response только это потребует больше кода и можно их через SignalR гонять на фронт.

Исходники

Microservices With Sagas

Благодарности

Спасибо @questor@alexs0ff за помощь. Всем добра :)

Теги:
Хабы:
Всего голосов 13: ↑6 и ↓7+1
Комментарии25

Публикации

Истории

Работа

Ближайшие события

2 – 18 декабря
Yandex DataLens Festival 2024
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань