Pull to refresh
555.35
Альфа-Банк
Лучший мобильный банк по версии Markswebb

Event Sourcing и Saga с помощью Marten и Wolverine на C# и немного модульного монолита

Level of difficultyHard
Reading time16 min
Views3.7K

В этой статье:

Мы откажемся от контроллеров, MediatR‑а и MassTransit‑а, всё вышеперечисленное нам заменит Wolverine. Отольём в граните модульный монолит, имплементируем регистрацию событий используя Marten. Пример всего этого безобразия находится тут.

Начнём с Wolverine

Как утверждают создатели, Wolwerine — это новое поколение реализации паттернов «Медиатор» и «Шина Сообщений», библиотека «с батарейками в комплекте».

Сообщения: их можно обрабатывать, их можно посылать, их можно планировать, их можно ожидать.
Сообщения: их можно обрабатывать, их можно посылать, их можно планировать, их можно ожидать.

Wolverine позволяет:

  • Обрабатывать HTTP‑запросы с помощью обработчиков Wolverine, находящихся, в том числе, в библиотеках.

  • Отправлять сообщения (команды и запросы) в обработчики и получать результат выполнения.

  • Отправлять сообщения (команды и запросы) в обработчики находящиеся в других приложениях, в этом случае нам понадобится транспорт. «Из коробки» Wolverine работает со следующими видами транспорта: RabbitMQ, Azure Service Bus, Amazon SQS, TCP, Sql Server, PostgreSQL, MQTT, Kafka. В примере я использую Kafka в качестве транспорта для некоторых сообщений, Wolverine позволяет выбирать какое сообщение каким транспортом будет отправлено.

  • Реализовывать паттерн Saga. Сага от Wolverine поддерживает такой тип сообщений, как «сообщения по таймауту», что позволяет в «автоматическом режиме» завершать «забытые» саги.

  • Отправлять сообщения в режиме «Ping — Pong», в документации это называется «cascading messages».

Wolverine имеет интеграцию с Marten «из коробки», что даёт возможность, например, отправлять все команды и запросы сразу в Кафку.

Marten

Marten ‑.Net библиотека, которая призвана устранить «boilerplate» и позволить Вам сфокусироваться на доставке бизнес ценностей, если верить её создателям. Marten является надстройкой над PostgreSQL, которая позволяет использовать эту БД в качестве аналога MongoDB или EventStoreDB. Также производитель обещает Strong Consistency (но есть нюансы), гибкие стратегии индексирования, улучшенные linq — запросы, встроенную реализацию паттерна Outbox/Inbox «из коробки», поддержку Multi‑tenancy, интеграцию с Asp.Net Core и другие не всем понятные слова на английском:

https://martendb.io/introduction.html.

В примере реализована регистрация событий на Marten, и, если сравнивать связку EventStoreDB плюс MongoDB, (хороший пример находится здесь), то, таки да, в сравнении с ними Marten потребует минимум кода, т.к. read‑часть предоставлена «из коробки».

Mодульный монолит

Модульный монолит — вид архитектуры, подразумевающий разбиение приложения на модули, каждый из которых с увеличением масштаба приложения может быть конвертирован в отдельный микро‑сервис, и, как можно догадаться, модуль является реализацией ограниченного контекста в коде. Модули «знают» лишь контракты друг друга и взаимодействуют так же, как это делали бы микро‑сервисы, плюс взаимодействие с помощью той или иной реализации паттерна Command Dispatcher.

Плюсы: мы экономим на написании инфраструктурного кода, необходимого для создания отдельного приложения под каждый модуль, что нам пришлось бы сделать в случае микро‑сервисной архитектуры, в тоже время, есть шанс избежать спагетти‑кода. Мы экономим на написании интеграционного кода, т.к. модули находятся в одном процессе. Сохранять данные из разных модулей в рамках одной транзакции тоже не проблема, т.к. все модули работают с одним и тем же подключением к БД. Создать новый модуль гораздо проще, чем новый микро-сервис.

Минусы: это монолит, модуль не является единицей развёртывания.

Расово верный подкаст о Modular Monolith для.Net можно найти тут и там же пример.

Ну, или в рамках импортозамещения, беседа и пример от DevBrothers, тоже изрядно.

Предметная область

Она проста и кратка, как тост Булдакова. Есть «Персона», у персоны есть «Счета», на счета приходят «Платежи». Каждое действие: создание персоны, добавление счёта, добавление платежа должно быть подтверждено либо отвергнуто. На выходе мы должны получить список персон и сальдо по каждой из них, с сохранением истории того, как было достигнуто нынешнее состояние.

Добавление новых данных в БД – только по окончанию саги:

В Swagger это выглядит так:

В пункте 1 нужно забрать ид. саги, и в течении 3 минут, отправить запрос с ним в пункте 2.
В пункте 1 нужно забрать ид. саги, и в течении 3 минут, отправить запрос с ним в пункте 2.

В решении имеется проект API, из которого уделена папка Controllers, она не нужна, так как все HTTP-вызовы должны обрабатываться модулями.

У нас три модуля: Модуль по работе с агрегатом Персона (PersonModule), модуль успешного завершения саг (SagaApprovementModule) и модуль отрицательного завершения саг (SagaRejectionModule).

HTTP EndPoint-ы Wolverine

Начнём с SagaApprovementModule: он содержит библиотеку SagaApprovement.Contracts с контрактами модуля и библиотеку SagaApprovement, в которой находятся эндпоинты Wolverine:

// <summary>
/// EndPoint завершения саги добавления счёта, добавления платежа, создания персоны.
/// Во всех трёх обработчиках один из параметров - впрыск ссылки на объект шины сообщений.
/// </summary>
public static class ApproveEndPoints
{
    /// <summary>
    /// Отправляем в сагу добавления счёта сообщение разешающее добавление счёта.
    /// </summary>
    /// <param name="command"></param>
    /// <param name="bus"></param>
    /// <returns></returns>
    [WolverinePost("approve-add-account-saga")]
    public static ValueTask Handle(ApproveAccountCommand command, IMessageBus bus) => bus.PublishAsync(new AccountApproved(command.SagaId));

    /// <summary>
    /// Отправляем в сагу добавления платежа сообщение разешающее добавление платежа.
    /// </summary>
    /// <param name="command"></param>
    /// <param name="bus"></param>
    /// <returns></returns>
    [WolverinePost("approve-add-payment-saga")]
    public static ValueTask Handle(ApprovePaymentCommand command, IMessageBus bus) => bus.PublishAsync(new PaymentApproved(command.SagaId));
    

    /// <summary>
    /// Отправляем в сагу создания персоны сообщение разешающее создание персоны.
    /// </summary>
    /// <param name="command"></param>
    /// <param name="bus"></param>
    /// <returns></returns>
    [WolverinePost("approve-person-creation-saga")]
    public static ValueTask Handle(ApprovePersonCreationCommand command, IMessageBus bus) => bus.PublishAsync(new PersonApproved(command.SagaId));
}

В классе находятся три конечные точки с адресами: «approve‑add‑account‑saga», «approve‑add‑payment‑saga», «approve‑person‑creation‑saga». Чтобы всё это работало, необходим класс, имеющий в своём названии «EndPoint» или «EndPoints», и содержащий методы с атрибутами WolverinePost, WolverineGet и т. п. Больше ничего не нужно.

В руководстве Wolverine указано, что предпочтительным является впрыск зависимости в метод. В строке приведённой выше, мы впрыскиваем интерфейс шины сообщений Wolverine. Далее мы можем выбрать один из трёх вариантов:

  • bus.PublishAsync — публикует сообщение в шине. Даже если нет ни одного обработчика — метод завершится успешно, если обработчики есть — метод не будет ожидать результатов их выполнения.

  • bus.SendAsync — публикует сообщение в шине, но ожидает, что есть обработчики этого сообщения, если они есть — не дожидается завершения их выполнения.

  • bus.InvokeAsync — публикует сообщение, ожидает что есть обработчик, возвращает результат выполнения обработчика отправителю.

В примере выше я публикую в шине сообщение AccountApproved. Обработчик этого сообщения находится в саге AddAccountSagа. Чтобы сага принимала сообщение, нужно определить метод Handle, в котором один из параметров является классом сообщения. Обработчик выглядит следующим образом:

/// <summary>
/// Успешное завершение саги, добавляем аккаунт.
/// </summary>
/// <param name="_"></param>
/// <param name="addAccountService">сервис добавления счёта.</param>
public async void Handle(AccountApproved _, IAddAccountService addAccountService)
{
    // Обращаемся к сервису добавления аккаунта,
    // отправляя туда данные из состояния саги.
    await addAccountService.CreateAccount(PersonId, AccountName);

    // Завершаем сагу.
    MarkCompleted();
}

Saga на Wolverine

/// <summary>
/// Сага добавления аккаунта. 
/// </summary>
public class AddAccountSaga : Saga
{
    /// <summary>
    /// Идентификатор саги.
    /// </summary>
    public string? Id { get; set; }

    /// <summary>
    /// Идентификатор персоны.
    /// </summary>
    public string PersonId { get; set; }

    /// <summary>
    /// Наименование аккаунта.
    /// </summary>
    public string AccountName { get; set; }

    /// <summary>
    /// Обработчик старта саги. 
    /// Название Start зарезервировано Wolverine. 
    /// Сообщение принимаемое в этом методе в качетсве первого параметра - будет считаться стартовым
    /// сообщением саги.
    /// Стартовый обработчик должен вернуть сагу. 
    /// В нашем случае сагу и сообщение завершающее сагу по таймауту.
    /// </summary>
    /// <param name="addAccountSagaStarted">Стартовое сообщение саги</param>
    /// <returns></returns>
    public static (AddAccountSaga, AddAccountTimeoutExpired) Start(AddAccountSagaStarted addAccountSagaStarted) => (new AddAccountSaga
    {
        //заполняем состояние саги данными.
        Id = addAccountSagaStarted.AddAccountSagaId,
        PersonId = addAccountSagaStarted.PersonId,
        AccountName = addAccountSagaStarted.AccountName
    },
    new AddAccountTimeoutExpired(addAccountSagaStarted.AddAccountSagaId));

    /// <summary>
    /// Успешное завершение саги, добавляем аккаунт.
    /// </summary>
    /// <param name="_"></param>
    /// <param name="addAccountService">сервис добавления счёта.</param>
    public async void Handle(AccountApproved _, IAddAccountService addAccountService)
    {
        // Обращаемся к сервису добавления аккаунта,
        // отправляя туда данные из состояния саги.
        await addAccountService.CreateAccount(PersonId, AccountName);

        // Завершаем сагу.
        MarkCompleted();
    }

    /// <summary>
    /// Хэндлер отрицательного завершения саги.
    /// </summary>
    /// <param name="_"></param>
    public void Handle(AccountRejected _) => MarkCompleted();

    /// <summary>
    /// Хэндлер завершения саги по таймауту.
    /// MarkCompleted - закрывает сагу.
    /// </summary>
    /// <param name="_"></param>
    public void Handle(AddAccountTimeoutExpired _) => MarkCompleted();
}

Создавать саги с Wolverine просто:

  • Наследуем класс от класса Saga.

  • Определяем поля, являющиеся состоянием саги. Состояние будет доступно во всех обработчиках сообщений саги. (стр.6–20).

  • Определяем обработчик стартового сообщения саги. Он должен иметь название Start, а первый параметр будет считаться сообщением, с которого начинается сага. (стр.31).

  • Определяем остальные обработчики саги. Первые параметры в обработчиках — это сообщения, которые должна обрабатывать сага.

Метод Start должен вернуть экземпляр саги. В нашем случае он возвращает кортеж из экземпляра саги и сообщения, которое закроет сагу по таймауту, если про неё «забыли». Состояние саги будет сохранено в БД, а затем будет извлекаться из неё при следующих срабатываниях, описанных в саге обработчиков.

Все необходимые сервисы впрыскиваем вторым и следующими параметрами в обработчиках. Как сделано строке № 45.

В настройках, при добавлении Wolverine, можно указать, каким транспортом отправлять сообщения:

//Будем публиковать в кафке ниже приведённые события.
opts.PublishMessage<PersonApproved>().ToKafkaTopic("CreatePersonUseCase.PersonApproved");
opts.PublishMessage<PersonRejected>().ToKafkaTopic("CreatePersonUseCase.PersonRejected");

И указать, откуда могут прийти входящие интеграционные сообщения:

//Будем получать из топиков кафки следующие события.
opts.ListenToKafkaTopic("CreatePersonUseCase.PersonApproved");
opts.ListenToKafkaTopic("CreatePersonUseCase.PersonRejected");

Собственно, это всё, что нужно для создания саги.

Marten и Event Sourcing

Что такое Event Sourcing можно почитать тут, к статье приложен хороший пример, глядя на который, можно оценить на столько Marten облегчает имплементацию регистрации событий.

«Из коробки» в Marten реализован демон и механизм проекций. Проекции обновляются при сохранении новых событий. К проекциям можно обращаться с помощью Linq‑запросов примерно также, как если бы мы работали с реляционными БД. Да‑да, нам не нужен Mongo или другая БД, не нужно имплементировать демон и подписки, чтобы получать снимки. Всё это уже сделал для нас Marten. Нужно создать проекцию и подключить её в Marten, и это не сложно.

Перейдём к матчасти. В примере найдём файл Repository.cs — это реализация репозитория, сохраняющего события в потоки агрегатов в БД и восстанавливающего состояния агрегатов по событиям из БД.

public sealed class Repository(IDocumentStore store) : IRepository
{
    //Marten document store
    private readonly IDocumentStore store = store;

    /// Получаем несохранённые события из агрегата и сохраняем их.
    public async Task StoreAsync(Aggregate aggregate, CancellationToken ct = default)
    {
        // получаем сессию для работы с событиями.
        await using var session = await store.LightweightSerializableSessionAsync(token: ct);
        // получаем список несохранённых событий из агрегата
        var events = aggregate.GetUncommittedEvents().ToArray();
        // добавляем события в стрим с идентификатором aggregate.Id
        session.Events.Append(aggregate.Id, aggregate.Version, events);

        // сохраняем изменения.
        await session.SaveChangesAsync(ct);
        // очищаем список несохранённых событий.
        aggregate.ClearUncommittedEvents();
    }

    /// Восстанавливаем состояние агрегата по событиям.
    public async Task<T> LoadAsync<T>(
        string id,
        int? version = null,
        CancellationToken ct = default
    ) where T : Aggregate
    {
        // получаем сессию для работы с событиями.
        await using var session = await store.LightweightSerializableSessionAsync(token: ct);
        // восстанавливаем состояние агрегата, читая из бд события стрима агрегата.
        // при этом Marten вызовет методы Apply для каждого из сохранённых событий.

        var stream = await session.Events.FetchForWriting<T>(id, ct);

        return stream.Aggregate;        
    }
}

Это весь код, который необходим для репозитория write-части модуля. В read-части можно обойтись вообще без репозитория.

/// <summary>
/// Endpoint получения данных о персонах.
/// </summary>
public static class GetPersonWithSumEndPoint
{
    /// <summary>
    /// Получаем персону по её идентификатору.
    /// В метод впрыскиваем сессию для получения read-модели.
    /// </summary>
    /// <param name="getPersonsWithSumCommand"></param>
    /// <param name="session"></param>
    /// <returns></returns>
    /// <exception cref="Exception"></exception>
    [WolverineGet("person/person")]
    public static async Task<string> Handle(GetPersonWithSumQuery getPersonsWithSumCommand, IQuerySession session)
    {
        var person = await session
            .Query<PersonWithSum>()
            .FirstOrDefaultAsync(c => c.Id == getPersonsWithSumCommand.PersonId) ?? throw new Exception($"Person not found.");

        return JsonConvert.SerializeObject(person, Formatting.Indented);
    }

    /// <summary>
    /// Получаем список всех персон (IRL это плохо, но для примера можно кмк.)
    /// Впрыскиваем в метод сессию для получения списка read-моделей.
    /// </summary>
    /// <param name="getPersonsWithSumCommand"></param>
    /// <param name="session"></param>
    /// <returns>Список персон с сальдо.</returns>
    /// <exception cref="Exception"></exception>
    [WolverineGet("person/persons")]
    public static async Task<string> Handle(GetPersonsWithSumQuery getPersonsWithSumCommand, IQuerySession session)
    {
        var persons = await session
            .Query<PersonWithSum>().ToListAsync() ?? throw new Exception($"Persons not found.");

        return JsonConvert.SerializeObject(persons, Formatting.Indented);
    }
}

Класс GetPersonWithSumEndPoint, как не трудно догадаться, является HTTP-EndPoint-ом. Чтобы получить данные из проекций, нам достаточно выполнить linq-запрос.

await session.Query<PersonWithSum>().ToListAsync()

Получаем все данные из проекции, но можно добавить Where, Take, Skip и пр. для ограничения выборки. Никакой разницы с выборкой из реляционных БД.

Проекции

Проекции в Marten могут быть следующих видов:

  • Проекции агрегатов (Aggregate Projection): Live — проецируем на лету, нужна только модель, класс проекции создавать не нужно; Multi‑Stream — проекции с возможностью группировки событий, срезов событий, разбивки по tenantId и пр.; Custom — ещё более широкие возможности, чем в предыдущем Multi‑Stream.

  • Проекции событий (Event Projections): позволяют явно определять операции создания документов из отдельных событий.

  • Пользовательские (custom) проекции: проекции, наследуемые от IProjection, всё делаете сами с нуля. Остальные перечисленные в этом списке типы проекций наследуются от тех или иных классов и уже имеют какой‑то функционал.

  • Inline проекции: события проецируются одной транзакции с сохранением событий.

  • Flat Table Projection: позволяет создать ADO.NET таблицу, добавить в неё столбцы с помощью AddColumn, и проецировать данные прямо в неё.

В примере можно найти проекцию агрегатов на основе SingleStreamProjection.

Модель проекции выглядит таким образом:

/// <summary>
/// Модель персоны, используется в проекции PersonWithSumProjection. В модель добавлено поле Saldo.
/// </summary>
public class PersonWithSum
{
    /// <summary>
    /// Идентификатор персоны.
    /// </summary>
    public string Id { get; set; }

    /// <summary>
    /// ФИО
    /// </summary>
    public string Name { get; set; }

    /// <summary>
    /// ИНН
    /// </summary>
    public string Inn { get; set; }

    /// <summary>
    /// Сальдо.
    /// </summary>
    public decimal Saldo { get; set; }

    public long Version { get; private set; }

    /// <summary>
    /// Счета.
    /// </summary>
    public List<Account> Accounts = new List<Account>();

    /// <summary>
    /// Методы Apply будут вызваны Marten при построении проекции.
    /// </summary>
    /// <param name="event"></param>
    public void Apply(PersonCreated @event)
    {
        Id = @event.Id;
        Name = @event.Name;
        Inn = @event.Inn;
        Version++;
    }

    public void Apply(PersonNameChanged @event)
    {
        Name = @event.NewName;
        Version++;
    }

    public void Apply(PersonInnChanged @event)
    {
        Inn = @event.NewInn;
        Version++;
    }

    public void Apply(AccountCreated @event)
    {
        var account = new Account(@event.AccountId, @event.Name, new List<Payment>());
        Accounts.Add(account);
        Version++;
    }

    public void Apply(PaymentCreated @event)
    {
        var payment = new Payment(@event.Id, @event.Sum, @event.PaymentType);
        var account = Accounts.FirstOrDefault(x => x.Id == @event.AccountId) ?? throw new ArgumentNullException($"Счёт не найден с ид {@event.AccountId}");
        account.Payments.Add(payment);

        Saldo = @event.PaymentType == (int)PaymentTypeEnum.Credit ? Saldo + @event.Sum : Saldo - @event.Sum;

        Version++;
    }
}

Методы Apply будут вызваны в проекции.

Класс проекции выглядит так:

/// <summary>
/// Проекция событий агрегата PersonAggreate. Проекция вычисляет сальдо по каждому из агрегатов.
/// </summary>
public class PersonWithSumProjection : SingleStreamProjection<PersonWithSum>
{
    public PersonWithSumProjection()
    {
        // Вызываются методы Apply модели PersonWithSum
        ProjectEvent<PersonCreated>((item, @event) => item.Apply(@event));
        ProjectEvent<PersonInnChanged>((item, @event) => item.Apply(@event));
        ProjectEvent<PersonNameChanged>((item, @event) => item.Apply(@event));
        ProjectEvent<AccountCreated>((item, @event) => item.Apply(@event));
        // В этом Apply вычисляется сальдо.
        ProjectEvent<PaymentCreated>((item, @event) => item.Apply(@event));
    }
}

В проекции мы указываем, какие именно события она будет обрабатывать и какими методами модели.

Затем проекцию нужно добавить в Мартен.

options.Projections.Add<PersonWithSumProjection>(ProjectionLifecycle.Async);

LifeTime указан асинхронный, то есть проекция строится асинхронно, после сохранения событий. Можно указать Inline, тогда проекция будет строиться в рамках одной транзакции вместе с сохранением событий.

Агрегаты

Marten не предоставляет базовых классов для написания агрегатов. Поэтому пишем сами: наружу выставляем свойства для чтения. Изменение состояния — только через методы агрегата. Если возникла ошибка — записываем соответствующее событие в список подлежащих сохранению событий, вместо генерации исключения.

Marten ожидает, что в Вашем агрегате реализованы методы Apply для каждого из доменных событий, если это так, то Marten сможет восстановить состояние агрегата, даже если Apply являются приватными методами. Например, в агрегате описаны следующие методы Apply:

protected void Apply(PersonNameChanged @event)
{
    Name = @event.NewName;
    Version++;
}

protected void Apply(PersonInnChanged @event)
{
    Inn = @event.NewInn;
    Version++;
}

protected void Apply(AccountCreated @event)
{
    var account = Account.Create(@event.AccountId, @event.Name);
    _accounts.Add(account);
    Version++;
}

Когда мы читаем агрегат из базы с помощью метода LoadAsync репозитория,

/// Восстанавливаем состояние агрегата по событиям.
public async Task<T> LoadAsync<T>(
    string id,
    int? version = null,
    CancellationToken ct = default
) where T : Aggregate
{
    // получаем сессию для работы с событиями.
    await using var session = await store.LightweightSerializableSessionAsync(token: ct);
    // восстанавливаем состояние агрегата, читая из бд события стрима агрегата.
    // при этом Marten вызовет методы Apply для каждого из сохранённых событий.

    var stream = await session.Events.FetchForWriting<T>(id, ct);

    return stream.Aggregate;        
}

в строке 15 мы обращаемся к свойству stream.Aggregate, при обращении к нему Marten вызовет выше упомянутые методы Apply, и применит события.

Пример агрегата находится в файле: PersonAggregate.cs.

Подписки

Подписки тоже имеются в Marten.

/// <summary>
/// Подписка на события типа PersonApproved.
/// </summary>
public class PersonApprovedToKafkaSubscription : SubscriptionBase
{
    private readonly IServiceProvider _serviceProvider;

    public PersonApprovedToKafkaSubscription(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;

        SubscriptionName = nameof(PersonApprovedToKafkaSubscription);

        // Подписываемся только на события типа PersonApproved
        IncludeType<PersonApproved>();

        // настраиваем сколько событий демон будет извлекать за раз
        // и сколько будет держать в памяти.
        Options.BatchSize = 1000;
        Options.MaximumHopperSize = 10000;

        // Позиция с которой читаем события (с текущего события)
        Options.SubscribeFromPresent();
    }

    /// <summary>
    /// Обрабатываем события.
    /// </summary>
    public override async Task<IChangeListener> ProcessEventsAsync(
        EventRange page,
        ISubscriptionController controller,
        IDocumentOperations operations,
        CancellationToken cancellationToken)
    {
        // с помощью Woverine будем отправлять интеграционные события в кафку.
        var messageBus = _serviceProvider.GetService<IMessageBus>() ?? throw new ArgumentNullException("Шина событий не зарегистрирована в IoC");

        foreach (var @event in page.Events)
        {
            await messageBus.PublishAsync(
                new PersonApprovedIntegrationEvent(@event.Data.GetType().Name, JsonConvert.SerializeObject(@event.Data)));
        }

        return NullChangeListener.Instance;
    }
}

Это подписка на один тип событий. После получения доменного события PersonApproved, на его основе формируется интеграционное событие PersonApprovedIntegrationEvent, которое отправляется в Кафку, как пример.

Replay событий

Иногда нужно заново построить проекцию, в Marten тоже можно это делать.

[HttpGet]
[Route("replay")]
public async Task Replay(
    [FromServices] IDocumentStore store, CancellationToken cancellation)
{
    using var daemon = await store.BuildProjectionDaemonAsync();

    // Fire up everything!
    await daemon.StartAllAsync();

    // or instead, rebuild a single projection
    //await daemon.RebuildProjectionAsync("a projection name", 5.Minutes(), cancellation);

    // or a single projection by its type
    await daemon.RebuildProjectionAsync<PersonWithSumProjection>(cancellation);

    // Be careful with this. Wait until the async daemon has completely
    // caught up with the currently known high water mark
    await daemon.WaitForNonStaleData(1.Minutes());

    // Start a single projection shard
    //await daemon.StartAgentAsync("shard name", cancellation);

    // Or change your mind and stop the shard you just started
    //await daemon.StopAgentAsync("shard name");

    // No, shut them all down!
    await daemon.StopAllAsync();
}

Итого

В целом, Wolverine+Marten способны значительно уменьшить объём шаблонного кода за счёт широкой гаммы решений «из коробки». Однако, есть нюанс — если вы стремитесь к Strong Consistency — у решений на базе Marten могут возникать проблемы с производительностью, о чём говорят предупреждения на официальном сайте.

Удачи!

Tags:
Hubs:
Total votes 19: ↑18 and ↓1+22
Comments3

Articles

Information

Website
digital.alfabank.ru
Registered
Founded
1990
Employees
over 10,000 employees
Location
Россия