Как стать автором
Обновить

Saga и Event Sourcing с Axon. Первое знакомство

Уровень сложностиСредний
Время на прочтение11 мин
Количество просмотров6.7K

Почему Saga

Все, кто занимается разработкой микросервисов, так или иначе решают для себя вопрос: как обеспечить согласованность бизнес-транзакций, в которых участвуют данные нескольких сервисов. Разумеется, лучшее решение - отсутствие такой необходимости. Но не всегда это возможно.

Один из признанных ИТ-сообществом ответов - это, так называемый Saga паттерн. О том, что это такое, написано достаточно много, а мы займёмся его реализацией. 

Почему Axon

Одним из фреймворков, где есть встроенная java-реализация Saga является Axon. При этом мы сразу получаем в довесок Event sourcing (который подразумевается при использовании Saga), CQRS и в целом - основу для DDD.

Выглядело это всё довольно привлекательно: дополнительно ознакомиться с этими  микросервисными паттернами и их плюсами на практике.

Event sourcing мне был интересен, в первую очередь, как возможная альтернатива feign client для взаимодействия между микросервисами. Так же, надо сказать, что event sourcing подразумевает встроенную очередь сообщений и мне была любопытна эта альтернатива таким популярным брокерам сообщений, как Kafka и RabbitMQ. Не последнюю роль сыграла и хорошая интеграция со стороны Spring Boot. Люблю интегрированные решения.

Следует заметить, что Axon Framework и Axon Server являются опенсорсными и бесплатными проектами. Но если нужна кластеризация, то придётся покупать весьма платный Axon Server Enterprise.

Что здесь и для кого

В статье я поделюсь своим первым опытом реализации saga, некоторыми соображениями и выводами, которые сделал во время знакомства с инфраструктурой Axon. Поскольку речь идёт о "первом контакте", не следует ожидать исчерпывающей широты и глубины познания данной темы. Статья будет полезна тем, кто начинает осваивать Axon или просто хочет к нему присмотреться.

Здесь не будет пошагового тюториала создания всего приложения. Если нужно, рекомендую посмотреть тут. Я сосредоточусь на подробной реализации саги и описании взаимосвязи элементов event sourcing в Axon. В русскоязычных источниках я не нашёл детальной информации. А в англоязычных она оказалось неполной. Поэтому решил написать.

Ссылка на гитхаб репозиторий будет в конце.

Кейс

В качестве основы для опытов с Axon-сагой я взял следующий простой кейс: два микросервиса, один из которых отвечает за безопасность (назовем его IDP), а другой - оперирует финансовыми данными клиента (назовём его Account). Предположим всё это происходит в рамках интернет-магазина. Когда регистрируется новый клиент, IDP должен сохранить в свою базу его имя пользователя, пароль и роль в системе. При этом Acccount-сервис должен открыть у себя для этого пользователя счёт и сохранить его email для отправки сообщений. Эти действия и их результаты в хранилищах обоих сервисов должны быть согласованы. Нельзя допустить, чтобы учётка клиента в сервисе IDP существовала, а счёт для него в сервисе Account не был открыт. Или наоборот. Напомню, что в мире микросервисов подразумевается, что любой из сервисов в любой момент бизнес-транзакции или её отката может оказаться недоступен. Однако в конечном счёте, когда все участники транзакции окажутся онлайн, их состояния должны оказаться согласованными. Т.е. данные клиента должны либо присутствовать в обоих сервисах, либо отсутствовать в обоих.

Дисклеймер

Замечу, что пример искусственный. Не следует искать в нем других смыслов кроме, как - проверить нашу сагу. И да, чтобы реально оценить мощь Axon, вам понадобится большая нагрузка, больше сервисов, но это уже совсем другая история.

Как всё устроено

Для того, чтобы разобраться, как всё работает, мне потребовалось нарисовать вот такую схему.

Axon. Схема взаимосвязей.
Axon. Схема взаимосвязей.

Иначе трудно с непривычки держать в голове весь этот механизм. А последовательность взаимодействий в нём для традиционного (не event sourcing) подхода весьма неочевидна. Те диаграммы, которые я нашел в интернет (в т.ч. в официальной документации), отражают лишь общие архитектурные концепции.

На моей схеме голубым цветом выделены слои контроллера и сервисов, коричневым - хранилища, а зелёным - объекты специфичные для event sourcing. Красным цветом подписаны сущности Axon. Aggregate, Saga, Command, Query и Event синтаксически являются классами. А хэндлеры, как вы догадались - это методы.

Взаимному расположению отдельных элементов не придавайте значения. Оно продиктовано исключительно топологией (постарался избежать пересечения стрелок).

Далее все это будет описано подробно.

CQRS. Почему два хранилища.

Мантра CQRS, как известно, расшифровывается как Command and Query Responsibility Segregation. Простым языком, это означает, что при таком подходе все операции делятся на модифицирующие состояние бизнес-сущностей (Commands) и возвращающие состояние (Queries). Первые только меняют состояние и ничего не возвращают. Вторые - напротив, позволяют получить текущее состояние, но не должны менять его. Обычно это подразумевает отдельные хранилища для операций чтения и записи. Считается, что это позволяет раздельно масштабировать рабочие нагрузки чтения и записи. Axon реализует этот подход. Посмотрим на хранилища поближе.

Data store

Это - “read-store” в концепции CQRS. Тип СУБД и её схему рекомендуется оптимизировать для быстрого чтения. Можно в т.ч. денормализовать базу для этой же цели. При внедрении event sourcing в существующий проект, это может быть хранилище, которое до сих пор использовалось, как read-write. Просто привычную entity теперь начинают называть "проекцией агрегата". Вроде пока без особых сюрпризов.. Но вот он - первый! Read - специализация - это условность. На самом деле, присутствуют синхронизирующие операции записи. На схеме это входящая стрелочка с надписью - "изменения". И в самом деле: иначе как бы мы могли писать в одно хранилище и читать актуальные изменения из другого? Причём происходит эта синхронизация не "сама" и не автоматически, как можно было бы ожидать от этого волшебного фреймворка. Каждую операцию синхронизации вам придётся программировать ручками. Однако Axon предоставляет для этого инфраструктуру.

Если вы будете использовать встроенный в Axon Server “write-store”, то "read-store" тоже придется модифицировать. Потребуется ряд служебных таблиц. Axon может самостоятельно их создать, если вы в application properties включите spring.jpa.hibernate.ddl-auto=update. Вот как это будет выглядеть:

Изначально была только таблица account.

Event store

Это - write-store в концепции CQRS. В случае Axon, можно не заботиться о нём. Axon server, умеет "из коробки" создавать это хранилище и управлять им. Хотя, при желании, вы можете создать и настроить своё. В это хранилище Axon пишет не сами состояния объектов (агрегатов), а список изменяющих событий. Что-то типа лога аудита. Но когда Axon нужно получить текущее состояние агрегата, он, "Внимание!" не лезет в read-хранилище, а восстанавливает состояние, читая из event-store цепочку событий. И тут мы понимаем, что write-специализация на поверку тоже оказалась условностью. Это хранилище используется и для чтения. Для его оптимизации Axon под капотом использует снапшоты, которые позволяют не читать очень длинные цепочки событий.

Напомню, что агрегат в парадигме DDD - это класс, описывающий одну или несколько взаимосвязанных бизнес-сущностей - основу микросервиса.

Цепочка чтения состояния

Чтение не требуется для реализации Saga. Описываю для полноты картины.
Если вам не терпится - можете пропустить эту часть.

В моём примере всё начинается в REST API контроллере, но в общем случае, источником запроса (Query) может быть любой метод. Сам query представляет из себя DTO, который содержит поля - параметры для выполнения запроса. Для каждого типа запроса описывается свой класс query.

Обрабатывается query методом, помеченным аннотацией @QueryHandler. Параметром метода должен быть соответствующий объект типа query. В обработчике - обычные сервисные вызовы к data store.

Для обработки query не требуется хранилище event store и Axon Server. Всё будет работать без них.

Цепочка изменения состояния

Изменение инициируется командой (command). Каждая команда, так же, как и query должна быть предварительно описана соответствующим классом c полями - параметрами команды. В любом месте кода можно создать экземпляр команды и запустить её на выполнение. Команда всегда адресована конкретному экземпляру агрегата.

Для каждой команды должен существовать один обработчик - CommandHandler, помеченный соответствующей аннотацией и принимающий команду на вход. На моей схеме он находится в агрегате, но это не обязательно. Задача обработчика команды - проверить корректность команды и сгенерировать событие (Event).

События - изменяют состояние агрегата и сообщают об этом подписчикам.

Подписчики события - это методы, помеченные аннотациями:@EventSourcingHandler, @SagaEventHandler или @EventHаndler. Первый вариант предназначен для обработки события внутри агрегата и изменения его состояния. Второй - как ясно из названия, применяется внутри саги. Он обрабатывает событие и создаёт команду для следующего действия. Третий - может использоваться где угодно. В обязательном порядке - в сервисе, который синхронизирует изменения агрегата с read-хранилищем.

Сага

Это класс, который по определению затрагивает несколько микросервисов. При этом он не задействует напрямую никакие сущности этих микросервисов. Сага оперирует только командами и событиями, которые могут находиться в общей для микросервисов библиотеке. Тем не менее, нравится вам это или нет, но в Axon сага должна быть частью кода одного из микросервисов, т.к. ей для работы нужен контекст приложения.

В моём примере сага выглядит вот так:

@Saga
public class UserSignUpSaga {

    private static final String USERID_ASSOCIATION = "userId";
    private static final String ACCOUNTID_ASSOCIATION = "accountId";
    private static final String USER_STORE_DEADLINE = "UserStoreDeadline";

    @Autowired
    // Здесь используем transient, потому, что сага сериализуется и не стоит сериализовать весьма объемный commandGateway
    private transient CommandGateway commandGateway;
    private String userId;

    //Я такое не очень люблю, но последующие поля используются для передачи данных между хандлерами
    private String username;
    private String email;
    private String userStoreDeadlineId;
    private boolean userCreated;


    @StartSaga
    //Экземпляр саги привязывается к идентификатору первого обработанного события. После этого, события с другими идентификаторами не направляются данному экземпляру саги.
    @SagaEventHandler(associationProperty = USERID_ASSOCIATION)
    //Обрабатываем событие “зарегистрирован пользователь”. Получаем бин DeadLineManager в виде параметра
    public void handle(UserSignedUpEvent userRegisteredEvent, DeadlineManager deadlineManager) {
        this.userId = userRegisteredEvent.getUserId();
        this.email = userRegisteredEvent.getEmail();
        //Отправляем команду на создание учетной записи пользователя
        commandGateway.send(new CreateUserCommand(userId, userRegisteredEvent.getUsername(), userRegisteredEvent.getPassword()));
        //Устанавливаем "таймер" на 30 сек. сохраняем его id
        this.userStoreDeadlineId = deadlineManager.schedule(Duration.of(30, ChronoUnit.SECONDS), USER_STORE_DEADLINE);
    }

    @SagaEventHandler(associationProperty = USERID_ASSOCIATION)
    //Обрабатываем событие "создана учетная запись пользователя"
    public void handle(UserCreatedEvent userCreatedEvent) {
        //Генерируем уникальный id для счета пользователя
        String accountId = UUID.randomUUID().toString();
        //Ассоциируем сагу с данным счетом пользователя
        SagaLifecycle.associateWith(ACCOUNTID_ASSOCIATION, accountId);
        userCreated = true;
        this.username = userCreatedEvent.getUsername();
        //Запускаем команду создания счета
        commandGateway.send(new CreateAccountCommand(accountId, this.username, this.email));
    }

    @SagaEventHandler(associationProperty = ACCOUNTID_ASSOCIATION)
    //Обрабатываем событие "создан счет пользователя"
    public void handle(AccountCreatedEvent accountCreatedEvent, DeadlineManager deadlineManager) {
        //Отменяем запланированное по времени задание
        deadlineManager.cancelSchedule(USER_STORE_DEADLINE, this.userStoreDeadlineId);
        //Завершаем сагу
        SagaLifecycle.end();
    }

    //Запускаем откат создания пользователя, если для него не был открыт счет (сага не закончилась успешно в течение указанного интервала времени)
    @DeadlineHandler(deadlineName = USER_STORE_DEADLINE)
    @EndSaga
    public void on() {
        if (userCreated) {
            commandGateway.send(new RollbackUserCreationCommand(this.userId, this.username));
        }
    }

}

Идентификаторы

Для того, чтобы механизм команд и событий в Axon работал, необходимо соблюдать следующие правила относительно идентификаторов:

  1. У каждого агрегата, команды и события должно быть поле, содержащее уникальный идентификатор. Обычно это String, который генерируется с помощью UUID.randomUUID().toString(). У команды, события и агрегата этот идентификатор может совпадать. Но у двух агрегатов - нет.

  2. Названия полей-идентификаторов у любых двух агрегатов или у любых двух событий тоже не должны совпадать. Т.е. нельзя, к примеру в агрегате Account назвать идентифицирующее поле id, если поле с таким названием уже есть в агрегате User.

Эти требования могут приводить к некоторой избыточности в виде создания дополнительных полей.

Предположим, в нашем приложении имя каждого пользователя уникально. В сервисе IDP оно используется для entity User, а в Account-сервисе - в entity Account. По этому полю происходит "слабое связывание" сущностей микросервисов.

В подобном случае, в отличие от старых добрых entity, для агрегатов вам придется дополнительно создать в каждом еще одно идентифицирующее поле, причем с уникальным названием и значением.

Хорошая новость заключается в том, что в проекции агрегата, т.е. в read-хранилище можно ничего не менять. Набор полей агрегата может не совпадать с entity. Главное для read-хранилища способность отвечать на все запросы (query).

SagaEventHandler

Основа саги в Axon - это последовательность обработчиков событий. При обработке событий могут запускаться команды, которые, как мы уже знаем, могут генерировать новые события.

Каждый хэндлер может иметь произвольное число параметров. Первый параметр - тип события, на который он реагирует, последующие параметры могут быть любыми бинами, которые вы хотите использовать в методе.

При первой обработке события происходит ассоциация (связка) данного экземпляра саги с парой ключ-значение экземпляра события. Ключом является свойство, указываемое в associationProperty. Значение берется из соответствующего геттера обрабатываемого события.

В дальнейшем эти ассоциации можно менять с помощью метода associateWith(свойство, значение) и removeAssociationWith(свойство, значение). С этого момента сага начинает получать только те события, у которых есть такое свойство, с указанным значением. В экземплярах событий и команд идентификаторы обычно содержат идентификаторы агрегатов. Таким образом экземпляр саги ассоциируется с обработкой конкретных экземпляров агрегатов. Например, если в системе зарегистрирован пользователь с userId="123", которому открывается счёт с accountId="456", то данный экземпляр саги начинает получать и обрабатывать только экземпляры событий, касающиеся данного пользователя и этого счёта.

Завершение саги

Сага считается законченной при вызове метода  SagaLifecycle.end() или при вызове хандлера, помеченного аннотацией @EndSaga.

DeadlineHandler

Этот обработчик отвечает за то, чтобы сага не могла остаться в незавершённом состоянии. Речь идёт о случае, когда завершающее событие не наступило в течение заданного времени.

В нашем случае, это когда учётная запись пользователя создана, а счёт для него не открыт в течение 30 секунд.

Для планирования фоновых событий используется бин DeadLineManager. Он подерживает варианты создания на основе сторонних планировщиков: Quartz, JobRunr, DB-sheduler. Кстати, для каждого из них есть автоконфигурирующие стартеры в Spring Boot. 

Мы же используем четвёртый вариант: SipmpleDeadlineManager, который очень просто настраивается и подходит для иллюстрации в нашем примере. Вот его минимальная конфигурация:

@Configuration
public class DomainConfiguration {
    @Bean
    public DeadlineManager deadlineManager(
            org.axonframework.config.Configuration configuration,
            TransactionManager transactionManager,
            SpanFactory spanFactory
    ){
        ScopeAwareProvider scopeAwareProvider = new ConfigurationScopeAwareProvider(configuration);
        return SimpleDeadlineManager.builder()
                .scopeAwareProvider(scopeAwareProvider)
                .transactionManager(transactionManager)
                .spanFactory(spanFactory).build();
    }
}
Примечание

В промышленной разработке SipmpleDeadlineManager применять не рекомендуется, т.к. он не использует хранилище и соответственно хранит запланированные действия только до перезапуска сервиса.

Итог. Что дальше

Поскольку основным мотивом была реализация саги, мой вывод касается только этого аспекта. Мне показалось, что реализация с использованием Axon тянет за собой довольно много дополнительного кода. Это неудивительно, потому, что сага - это только маленькая часть функциональности фреймворка.

Но всё познаётся в сравнении. Следующий вариант реализации саги я намерен попробовать с Kafka. Возможно результаты станут следующей статьёй.

Приглашаю опытных «аксоноводов» и «аксоноведов», пользуясь случаем, дополнить или поправить автора, если где-то не прав.

Обещанная ссылка на репозиторий.

Updated

Реализованный вариант саги с Kafka здесь. Описание пока не делал.

Теги:
Хабы:
Всего голосов 7: ↑6 и ↓1+7
Комментарии4

Публикации

Истории

Работа

Java разработчик
305 вакансий

Ближайшие события

22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
2 – 18 декабря
Yandex DataLens Festival 2024
МоскваОнлайн
11 – 13 декабря
Международная конференция по AI/ML «AI Journey»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань