Pull to refresh
0
True Engineering
Лаборатория технологических инноваций

MassTransit, Saga и RabbitMQ для реализации диспетчера процессов

Reading time 10 min
Views 23K

Однажды перед нами встала задача автоматизировать различные workflow в крупной компании. Для нас это значило соединить воедино на момент старта порядка 10 систем. Причем связать всё надо было асинхронно, масштабируемо, надежно.


Упрощённо процесс можно описать как последовательность действий в разных системах, которую нельзя автоматизировать полностью, поскольку она требует человеческого участия. Например, для выбора определенных действий или элементарного согласования, которое необходимо для перехода на следующий этап процесса.


Для решения этой задачи мы решили использовать архитектуру обмена сообщениями через шину данных, и нам отлично подошел MassTransit с его Saga в связке с RabbitMQ.


image

Что из себя представляет Saga?


Saga — это реализация шаблона "Диспетчер процессов" из книги «Шаблоны интеграции корпоративных приложений», который позволяет описать процесс в виде конечного автомата. На вход прилетает какое-то событие, Saga выполняет последовательность действий. При этом на любом из этапов Saga может потребоваться решение человека. Тогда она создает задачу в трекере и «засыпает» на неопределённое время, ожидая новых событий.

Saga реализована на базе Automatonymous. Она декларативно описывается в классе, унаследованном от MassTransitStateMachine<>. Для Saga нужно описать все состояния, принимаемые события и совершаемые действия при наступлении определенных событий. Текущее состояние сохраняется в базе.


Для начала описываем все состояния и события Saga и даём им понятные имена. Выглядит это следующим образом:


public sealed partial class StateMachine
{
    public State AwaitingTaskCreated { get; set; }
    public State AwaitingTaskTakedToWork { get; set; }
    public State AwaitingDecisionAboutTask { get; set; }
    public State Rejected { get; set; }

    public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; }
    public Event<TaskCreatedNotification> TaskCreated { get; set; }
    public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; }
    public Event<TaskDeclinedNotification> TaskDeclined { get; set; }
    public Event<TaskApprovedNotification> TaskApproved { get; set; }

    private void BuildStateMachine()
    {
        InstanceState(x => x.CurrentState);
        Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => 
                        ctx.Message.CorrelationId)
            .SelectId(context => context.Message.CorrelationId));
        Event(() => TaskCreated, x => x.CorrelateById(ctx => 
                        ctx.Message.CorrelationId));
        Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => 
                        ctx.Message.CorrelationId));
        Event(() => TaskDeclined, x => x.CorrelateById(ctx => 
                        ctx.Message.CorrelationId));
        Event(() => TaskApproved, x => x.CorrelateById(ctx => 
                        ctx.Message.CorrelationId));
    }
}

Мы завели partial class, где декларируем списком все состояния и ивенты, и метод BuildStateMachine, в котором описывается корреляция ивентов с Saga. Для этого в каждом ивенте передается специальный параметр CorrelationId — это Guid, который курсирует между всеми связанными системами и в системах мониторинга.


Таким образом, при возникновении каких-либо проблем мы можем восстановить всю картину происходящего по логам из всех связанных систем. Мы отправляем CorrelationId в сообщениях из Saga, его же отправляют системы обратно в нотификациях, чтобы мы могли соотнести сообщение с конкретной Saga.


А вот пример самого класса стейт машины:


public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga>
{
    public StateMachine()
    {
        BuildStateMachine();
        Initially(WhenStartWorkflowCommandReceived());
        During(AwaitingTaskCreatedInPlanner, WhenTaskCreated());
        During(AwaitingTaskTakedToWork, WhenTaskTakedToWork());
        During(AwaitingDecisionAboutTask,
            WhenTaskApproved(),
            WhenTaskDeclined());
    }

    private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand>
                  WhenStartWorkflowCommandReceived()
    {
        return When(StartWorkflowCommandReceived)
            .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data))
            .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance))
            .TransitionTo(AwaitingTaskCreated);
    }

    private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> 
                   WhenTaskCreated()
    {
        return When(DPORPApproveTaskCreatedInPlanner)
            .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data))
            .Send(MailServiceQueue, ctx => new
                        NotifyRequestAuthorThatWorkflowStarted(ctx.Instance))
            .TransitionTo(AwaitingTaskTakedToWork);
    }

    private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> 
                   WhenTaskTakedToWork()
    {
        return When(TaskTakedToWork)
            .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data))
            .TransitionTo(AwaitingDecisionAboutTask);
    }

    private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> 
                   WhenTaskApproved()
    {
        return When(TaskApproved)
            .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data))
            .Finalize();
    }

    private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> 
                   WhenTaskDeclined()
    {
        return When(TaskDeclined)
            .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data))
            .TransitionTo(Rejected);
    }
}

В конструкторе описываются состояния. Прием каждого ивента вынесен в отдельный метод, чтобы сохранить читаемость. Вся логика конструирования сообщений вынесена в сами сообщения, иначе с возрастанием сложности системы Saga довольно быстро распухает.


Стоит внимательно отнестись к выработке конвенций и сохранению читабельности. Из-за императивности C# в нем очень сложно декларировать описание состояний и действий. Даже для простеньких машин состояний начинается настоящий ад.


Теперь несколько слов про SagaInstance. SagaInstance — это класс, унаследованный от SagaStateMachineInstance. Он состоит из объектов и полей, которые характеризуют машину состояний. Грубо говоря, это память Saga. Мы храним там все данные Saga, которые ей понадобятся на протяжении всей жизни. Также в этом классе описана логика изменений этих данных по ходу работы.


Приведём пример:


public class WorkflowSaga 
: SagaStateMachineInstance
, ISagaWithState
, ICreatedOnOffset
, IModifiedOnOffset
, ICreatedBy<string>
, IModifiedBy<string>
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public string InitialRequestViewUrl { get; set; }
    public string RequestNumber { get; set; }
    public string RequestAuthor { get; set; }
    public string RequestText { get; set; }
    public byte[] RowVersion { get; set; }
    public string CreatedBy { get; set; }
    public string ModifiedBy { get; set; }
    public DateTimeOffset CreatedOn { get; set; }
    public DateTimeOffset ModifiedOn { get; set; }
    public DateTimeOffset CompletedOn { get; set; }

    public virtual ICollection<RelatedTask> RelatedTasks { get; set; }

    public void SaveGabrielConfigurationRequestInfo(
        ICreateGabrielConfigurationRequestCommand command)
    {
        CorrelationId = command.CorrelationId;
        RequestNumber = command.RequestNumber;
        RequestAuthor = command.Author;
        RequestText = command.RequestText;
        InitialRequestViewUrl = command.InitialRequestViewUrl;
        CreatedOn = RuntimeContext.Current.DateTimeOffset.Now;
    }

    public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo)
    {
        RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo));
    }

    public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo)
    {
        UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork);
    }

    public void MarkTaskAsApproved(TaskApprovedNotification taskInfo)
    {
        UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment);
        CompletedOn = RuntimeContext.Current.DateTimeOffset.Now;
    }

    public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo)
    {
        UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment);
        CompletedOn = RuntimeContext.Current.DateTimeOffset.Now;
    }

    private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, 
    TaskStatus taskStatus, string comment = null)
    {
        var task = RelatedTasks.Single(t => t.Number == taskInfo.Number);
        task.ModifiedBy = taskInfo.TaskModifiedBy;
        task.Comment = comment;
        task.Status = taskStatus;
    }
}

На примере видно, что в SagaInstance сохраняется CorrelationId для корреляции ивентов с Saga и CurrentState для хранения текущего состояния Saga.


Обработка ошибок


Что происходит с Saga, если во время обработки сообщения возникает ошибка? Это важный вопрос, поскольку всем хочется, чтобы машина состояний всегда оставалась консистентной, даже если что-то пошло не так. И в Saga от MassTransit с этим всё хорошо.


Как вы уже успели заметить, в примерах выше нет ни одного try catch блока для обработки исключений. Причина простая: они там не нужны. Если во время обработки сообщения возникает исключение, то сообщение возвращается в очередь, а все изменения откатятся. Так как все манипуляции с данными мы делаем в той же транзакции, что и Saga, транзакция не будет закрыта.


Вообще, манипуляция чем-то кроме Saga в самой Saga — это bad practice. По книжке «Шаблоны интеграции корпоративных приложений», диспетчер процессов должен оставаться максимально «тонким и тупым»: просто раздавать команды системам и следить за состоянием, а сам он ничего делать не должен.


Конечно, есть и более сложные сценарии, когда нужно выполнить какие-то компенсирующие действия для обработки исключений. Тогда используется обработчик машины состояний “.Catch” для перехвата исключения определенного типа и дальнейшего выполнения компенсирующей логики.


А если вам нужно просто залогировать возникшее исключение, то лучше воспользоваться наблюдателем (Observer).


Теперь представим ситуацию, что мы уже выполнили команду Send во время обработки сообщения, после чего возникло исключение. Что же будет с отправленной на данном шаге командой? Ведь всё, что улетело, уже не вернёшь? Но и тут всё продумано.


При конфигурации шины можно включить опцию UseInMemoryOutbox. Эта опция позволяет не отправлять сообщения до того момента, пока текущий шаг не будет выполнен. Если возникнет исключение, то сообщения не отправятся вовсе. Вот выдержка из документации:


/// <summary>
/// Includes an outbox in the consume filter path, which delays outgoing messages until the return path
/// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be
/// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published.
/// </summary>
/// <param name="configurator">The pipe configurator</param>
public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator)

Тесты


На первый взгляд, тестирование асинхронной машины состояний — то ещё удовольствие. Но и здесь всё хорошо. MassTransit предоставляет неплохой фреймворк для написания тестов, который полностью удовлетворяет все наши нужды в тестировании машины состояний.


Фреймворк предоставляет InMemory реализацию шины данных (InMemoryTestHarness), которая позволяет отправлять и получать сообщения, минуя RabbitMQ или другую очередь.


Ну и как пример:


[TestFixture]
public class SagaTests : TestFixtureBase
{
    protected const string HostName = "HostName";

    protected InMemoryTestHarness Harness;
    protected StateMachine StateMachine;
    protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, 
    StateMachine> Saga;

    [SetUp]
    public void SetUp()
    {
        StateMachine = (StateMachine)Kernel.
        Get<MassTransitStateMachine<WorkflowSaga>>();

        Harness = new InMemoryTestHarness(HostName);
        Saga = Harness
        .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine);
    }

    [TearDown]
    public async Task TearDown()
    {
        await Harness.Stop();
    }

    protected async Task<WorkflowSaga> InitializeSaga()
    {
        await Harness.Start();

        var command = new TestStartWorkflowCommand
        {
            CorrelationId = SagaId,
            Author = RequestAuthor,
            InitialRequestViewUrl = InitialRequestViewUrl,
            RequestText = RequestText,
            RequestNumber = RequestNumber,
        };

        await Harness.InputQueueSendEndpoint
        .Send<IStartWorkflowCommand>(command);
        // Эта строчка нам нужна, поскольку consume срабатывает не сразу и, 
        // соответственно, и Saga не будет, пока не законсюмим
        Assert.IsTrue(Harness.Consumed
        .Select<IStartWorkflowCommand>().Any());

        var currentSaga = Saga.Created.Contains(SagaId);
        currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>();
        return currentSaga;
    }

    [Test]
    public async Task CheckCurrntStateWhenStartWorkflowCommand()
    {
        var saga = await InitializeSaga();

        Assert.IsNotNull(saga);
        Assert.AreEqual(StateMachine
        .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState);
    }
}

public class WhenTaskCreated : SagaTestsBase
{
    private async Task<WorkflowSaga> InitializeState()
    {
        var saga = await InitializeSaga(true);

        saga.CurrentState = StateMachine.AwaitingTaskCreated.Name;
        InitializeRelatedTask(saga);

        await SendTaskCreatedNotification();

        Assert.IsTrue(Harness.Consumed
        .Select<TaskCreatedNotification>().Any());
        return saga;
    }

    [Test]
    public async Task SaveWorkflowDataWhenTaskCreated()
    {
        var saga = await InitializeState();

        var taskInfo = saga.RelatedPlannerTasks
        .First(task => 
        task.PlannerTaskType == PlannerTaskType.DPORPApprove);

        Assert.AreEqual(TaskNumber, taskInfo.Number);
        Assert.AreEqual(TaskUrl, taskInfo.TaskUrl);
        Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId);
        Assert.AreEqual(TaskStatus.Created, taskInfo.Status);
        Assert.AreEqual(User, taskInfo.ModifiedBy);

        Assert.AreEqual(saga.CurrentState, 
        StateMachine.AwaitingTaskTakedToWork.Name);
    }

    [Test]
    public async Task SendsMailWhenTaskCreated()
    {
        var mailConsumer =  Harness
            .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>>
            (RabbitMqRouting.QueueNames
                .SendEmailsQueueName);

        await InitializeState();

        Assert.IsTrue(mailConsumer.Consumed
        .Select<ISendEmailMessageWithTemplateCommand>().Any());
    }

    private async Task SendTaskCreatedNotification()
    {
        await Harness.InputQueueSendEndpoint
        .Send(new TaskCreatedNotification
        {
            TaskUrl = TaskUrl,
            Number = TaskNumber,
            TaskModifiedBy = User,
            CorrelationId = SagaId
        });
    }
}

Тесты выполняются довольно быстро. Например, на одном компьютере разработчика 850 тестов выполняется примерно 21 секунду.


Полезные советы


В заключении приводим список полезных советов, основанных на нашем опыте.


  1. Контракты и схемы общения через шину лучше всего поместить в приватный nuget. Так у вас не будет различий в именованиях на отправляющей и принимающей сторонах. Также в nuget можно поместить константы с именованием очередей и хостов. Nuget настраивается за день. А также некоторые source controls поддерживают nuget, есть платные частные фиды.


  2. Разберитесь в различиях между Send и Publish. Используйте Send, если у вас один подписчик и вы точно знаете имя очереди, в которую отправляете команду. Publish предназначен для отправки оповещений broadcast-ом. Подробности по ссылке.


  3. Если вам нужно построить Request/Response сообщение, то лучше добавить в контракт имя очереди для ответа, чем использовать схему Request/Response от MassTransit, которую и сам MassTransit предлагает избегать. Так как это сильно уменьшает надежность. Вы теряете все преимущества асинхронности. Но если вам всё же нужно получить ответ в ограниченное время, то лучше использовать прямой вызов. Лучше всего об этом написано во всё той же книге «Шаблоны интеграции корпоративных приложений».


  4. Saga должна быть тонкой. Попытайтесь унести всю тяжелую логику в другие системы. А Saga должна скакать по состояниям и разбрасывать сообщения налево и направо.


  5. Добавьте во все сообщения CorrelationId, который будет курсировать между системами. Так намного проще потом анализировать логи и связывать все сообщения в единую картину. Также поступает и сам Masstransit. CorrelationId добавляется в сообщения при наследовании от интерфейса CorrelatedBy.


    Настройте логи и мониторинг в своих системах, это никогда не повредит. Наш опыт в этой статье.
Tags:
Hubs:
+7
Comments 8
Comments Comments 8

Articles

Information

Website
www.trueengineering.ru
Registered
Founded
Employees
101–200 employees
Location
Россия