В своем проекте мы столкнулись с необходимостью осуществить определенные действия с заявкой в нашей микросервисной архитектуре между несколькими сервисами. Причем если одно или несколько действий завершаются неудачно, то все следующие действия должны быть отменены, оставив состояние сущностей неизменным. Также в случае получения нефинальной ошибки мы хотели бы попробовать довести процесс до финала, не откатывая предыдущие шаги.
В монолитном приложении это было бы сделать легко, опираясь на гарантии удовлетворяющей требования ACID базы данных. В нашем же случае такой фокус не пройдет, наш бизнес-процесс затрагивает сразу несколько баз данных и брокеров сообщений, следовательно, решить эту задачу, запустив одну транзакцию, невозможно.
Поскольку наверняка не мы первые столкнулись с такой проблемой, то решили изучить существующие способы решения этой задачи.
У некоторых в команде уже был опыт работы с xa-транзакциями.
Довольно удобный способ, когда у вас есть несколько ACID СУБД. Процесс состоит из двух фаз.
Первая фаза представляет собой атомарную операцию, которая проверяет возможность начала транзакции и блокировки участников коммита.
Во второй фазе происходит сбор ответов от всех участников и решение коммита или отката.
В данном алгоритме нет промежуточного состояния: либо все участники видят состояние до момента начала транзакции, либо после окончания транзакции.
Основной минус данного подхода — неустойчивость к сбоям. После первой фазе в случае сбоя координатора (узел, который выполняет транзакцию) у остальных участников нет информации: нужно фиксировать транзакцию или отменять, они должны ждать устранения сбоя.
Также при росте участников падает производительность. Чем больше мы горизонтально масштабируем нашу систему, тем больше будет падать производительность.
С saga мы были знакомы в теории из внешних источников, ни разу не сталкиваясь на практике.
Общее определение алгоритма звучит так: «Сага —это алгоритм, обеспечивающий согласованность данных в микросервисной архитектуре».
В классическом описании алгоритма суть метода — выполнение ACID-транзакций в локальных БД микросервисов, в случае ошибки в любом шаге инициируется откат с исполнением компенсационных транзакций. Но в ряде случаев мы хотели бы довести транзакцию до финала, и только если не можем этого сделать, то начать выполнять компенсационные действия. Также в случае падения приложения все операции должны быть обработаны корректно.
Фреймворки, имплементирующие паттерн Сага (Saga)
А вот примеры приложений с использованием саги:
Как видите, проблема не новая, и так или иначе люди ищут пути ее решения. Eventuate Tram завязан сильно на Kafka, Axon тянет очень много проприетарной логики, некоторые требовали поднятия своего сервера. Проблемы, которые несли данные фреймворки, мы не готовы были решать в тот момент времени, нам нужно было простое решение, удовлетворяющее требованиям и свободное от вендор-лока.
Решили написать свое решение. Первоначальный вариант в самом приложении представлял собой реализацию паттерна chain of responsibility,
каждый шаг имел свое состояние, сохраняемое в базе данных. Для кодов ошибок прописывали политику восстановления, в случае исключительной ситуации при выполнении шага, согласно заданной политике восстановления, мы обновляли состояние в БД. Планировщик восстанавливал контекст из базы данных и запускал на обработку нужный шаг.
В случае, если не получалось завершить процесс, мы писали отдельные цепочки действий на приведение системы в согласованное состояние, так как иногда нельзя было выполнить только компенсационные операции.
Данный вариант реализовался довольно быстро, закрывал большинство потребностей по бизнесу и снимал ряд ручных действий с разработчиков. Несмотря на всю простоту и пользу решения, в новом продукте мы заметили, что есть проблемы при попытках переиспользовать старое решение:
сложность настройки (введение нового шага или новой саги требовали ручных действий как в базе данных, так и в коде);
чтобы понять процесс, нужно было посмотреть в разных источниках — файлах и таблицах;
из-за отсутствия возможности описания сложных сценариев некоторые случаи приходилось разбирать вручную.
А еще нам хотелось бы иметь это решение в виде библиотеки для переиспользования в разных продуктах
Запланировали рефакторинг и поискали, что есть в сети по Saga,
Оказалось, что наиболее распространенные реализации Saga сделаны по следующим принципам.
Варианты саги
Хореография (Choreography)
Метод организации саги без центрального координатора, каждый микросервис, участвующий в саге, ответственен за управление сагой.
Достоинства:
Микросервисы, участвующие в саге, не должны знать обо всех участниках саги.
Обеспечивает принцип слабой связанности.
Недостатки:
Ни один микросервис не знает, как выполняется сага в целом
повышенная сложность управления состоянием саги.Возможность появления циклических зависимостей.
Трудно понять, на каком шаге выполнения находится сага.
Не может быть гарантирован порядок выполнения компенсационных (откатных) транзакций.
Ввиду указанных недостатков метод хореографии подходит для небольших систем с простыми транзакциями. В большинстве практических случаев предпочтителен метод оркестрации.
Оркестрация (Orchestration)
Централизованный подход к управлению сагой, сервис оркестратор управляет и координирует процесс выполнения саги.
Достоинства:
Позволяет устранить недостатки, присущие методу хореографии
Гораздо проще понять, на каком шаге выполнения находится сага
Гораздо проще поменять порядок действий в саге
Гораздо проще код в микросервисах-участниках саги
Недостатки:
Есть единый центр координации саги и, как следствие, единая точка отказа
Метод оркестрации по нашему опыту хорошо показал себя для большинства случаев практического применения саги.
class ApplicationSagaDefinition () : SagaDefinition() {
init {
sagaDefinition {
step {
state = ApplicationSagaState.PREFILL
transitions {
transition {
event = ApplicationSagaEventType.APP_CREATED
action = creditAppPrefillAction
transitionAction = TransitionAction { _, _ ->
ApplicationSagaState.APP_VALIDATE
}
}
transition {
event = ApplicationSagaEventType.APP_FAILED
action = failApplicationAction
transitionAction = TransitionAction { _, _ ->
ApplicationSagaState.FINAL
}
}
}
catch {
withCondition {
errorCondition = { it.isRetrievable() }
recoveryPolicy = defaultRecoveryProgressivePolicy()
recoveryAction = failedApplicationRecoveryAction
}
withCondition {
errorCondition = { !it.isRetrievable() }
recoveryPolicy = RecoveryPolicy.None
recoveryAction = failedApplicationRecoveryAction
}
}
}
step {
state = ApplicationSagaState.APP_VALIDATE
transitions {
transition {
event = ApplicationSagaEventType.CREDIT_APP_PREFILLED
action = validateApplicationAction
transitionAction = TransitionAction { _, _ ->
ApplicationPrefillingSagaState.SCORING_APPLICATION
}
}
transition {
event = ApplicationSagaEventType.APP_FAILED
action = failApplicationAction
transitionAction = TransitionAction { _, _ ->
ApplicationSagaState.FINAL
}
}
}
catch {
withCondition {
errorCondition = { it.isRetrievable() }
recoveryPolicy = defaultRecoveryProgressivePolicy()
recoveryAction = failedApplicationRecoveryAction
}
withCondition {
errorCondition = { !it.isRetrievable() }
recoveryPolicy = RecoveryPolicy.None
recoveryAction = failedApplicationRecoveryAction
}
}
}
Параллельное выполнение саг
Саги не гарантируют изоляцию (I в ACID) и, как следствие, действия, выполненные в шагах саги, становятся тут же видны другим сагам, которые могут выполняться параллельно. Для предотвращения потенциальных проблем, связанных с отсутствием свойства изоляции в сагах, нужно предпринимать дополнительные меры.
Возможные варианты методов по обеспечению изоляции в сагах
Замыкания (Short-circuiting)
Предотвращение запуска саги в случае, если объект в базе данных модифицируется другой сагой. В этом случае саги, которые затрагивают одни и те же объекты в БД, будут фактически выполняться последовательно. Этот метод наиболее прост в реализации, но ограничивает параллельность выполнения саг.
Блокировки (Locking)
Сага устанавливает блокировки на те объекты, которые она изменяет в БД, чтобы предотвратить доступ других саг к ним. Возникает опасность возникновения дедлоков.
Прерывание (Interruption)
Прерывание выполнения саги в случае обнаружения, что другая сага (которая еще не закончилась) уже сделала изменения в объектах БД. В отличие от метода блокировок, тут нет опасности возникновения дедлоков.
Предотвращение аномалий в транзакциях саги
Возможные проблемы/аномалии при параллельном выполнении саг:
Lost update. Одна сага переписывает изменения, внесенные другой, не читая их.
Dirty read. Сага читает незавершенные обновления другой саги
Unrepeatable read. Разные этапы саги читают одни и те же данные, но получают разные результаты, так как были внесены изменения другой сагой.
Для предотвращения подобных аномалий необходимо принимать контрмеры на уровне приложения. Фреймворки, реализующие паттерн саг, не предотвращают эти аномалии.
Рекомендованные контрмеры
Семантическое блокирование на уровне приложения. Например, введение статусной модели.
Коммутативные транзакции — по возможности проектировать транзакции в саге так, чтобы их можно было выполнять в любом порядке и получать одинаковый результат.
Пессимистичное представление — по возможности задавать такой порядок транзакций в саге, который минимизирует вероятность dirty read.
Повторное чтение значений при выполнении транзакции — дополнительная проверка, чтобы убедиться, что данные не изменились другой транзакцией и не будет dirty read. Если данные изменились, то сагу надо откатить.
Идемпотентность
Идемпотентность компенсирующих (откатывающих изменений) транзакций.
Компенсирующая транзакция не может быть не выполнена, она должна повторяться, пока не выполнится успешно. Компенсирующие транзакции должны быть коммутативными с операциями, то есть их выполнение в разном порядке должно приводить к одному результату.
Необходимо проектировать сагу так, чтобы минимизировать случаи, когда необходимо выполнять компенсирующие транзакции или минимизировать количество операций в случае, если все же компенсирующая понадобится.
Например:
первыми выполнять операции, которые имеют наибольшую вероятность отказа;
выполнять операции, которые труднее всего откатить, в последнюю очередь.
Для обеспечения атомарности транзакции в базе данных и отправки сообщения могут применяться паттерны:
Transactional Outbox pattern — запись сообщения как часть основной транзакции в базе данных. Далее отдельный процесс перемещает сообщение в брокер сообщений.
Event Sourcing pattern — апдейт базы и сохранение сообщения выполняются как одна операция.
В итоге для нашего продукта мы решили остановиться на следующем :
библиотека на основе оркестратора;
процесс описывается императивно, состоящим из отдельных шагов;
при выполнении шага сохраняется результат выполнения для дальнейшего воспроизведения;
при проблемах с шагом можем его повторить, есть поддержка resilience pattern (retry);
при падении приложения повторяем весь процесс;
если не смогли завершить основную ветку, выполняем действия для обеспечения консистентности, можем также писать свои шаги для приведения системы в консистентное состояние;
результат выполнения шага может быть использован в других шагах;
есть механизм ручного запуска экземпляра с нужного статуса.
На текущий момент поддерживается БД Postgres
Пример сценария создания заявки на кредит:
statefulWorkflowFactory.createAndRegister { workFlowMaker ->
workFlowMaker.body<CreateApplicationRequest, Unit>(CREATION_APPLICATION_WORKFLOW_NAME) { request->
//шаг создания заявки с стандартными настройками политики восстановления шага
val application = step(1, RecoveryPolicyDefinition.defaultProgressivePolicy()) {
applicationService.createApplication(request)
}
/*
шаг создания клиента на основе заявки, полученной на предыдущем шаге
в случае исключительной ситуации повторять выполнение логики в шаге не более 5 раз с интервалом в 15 секунд
*/
val client = step(2, RecoveryPolicy(
maxAttempts = 5,
waitDuration = Duration.ofSeconds(15))
) {
clientService.createClient(application)
.also{applicationService.mergeApplicationClientUid(application.uid, client.uid)}
}
step(3, RecoveryPolicyDefinition.defaultProgressivePolicy()) {
applicationService.markNew(application)
}
// если не смогли завершить успешно все шаги, можем выполнить нужную логику в данном блоке
}.onError {
log.error("Failed process create application, [applicationUid = ${application.uid}].", it)
step(1, RecoveryPolicyDefinition.defaultProgressivePolicy()) {
applicationService.failApplication(applicationUid, it)
}
}
.make()
// асинхронно выполняем наш процесс
}.async(applicationUid.toString(), applicationUid)
Бизнес-логика находится в одном месте. Код пишется в императивном стиле, его несложно читать и понимать, что происходит в нужном сценарии.
Стандартное решение resilience4j позволяет задавать различные варианты логики восстановления шага при сбоях. Для использования нужно только подключить библиотеку и настроить подключение к БД.
В случае если нужно добавить еще один шаг, возможны разные варианты:
Если логика шага не зависит от других шагов, при выполнении кода система увидит, что шаг не выполнялся, и выполнит его.
Предусмотрели сервис для выполнения логики с нужного шага, например, со второго (результат других шагов отклоняется).
Удалить результат выполнения шагов, вызвав нужную процедуру в БД.
Возможность версионирования.