Привет, Хабр! Наше подразделение занимается автоматизацией бизнес-процессов обслуживания клиентов в офисах и отделениях банка. Такие бизнес-процессы специфичны для конкретной организации, связаны с соблюдением внутренних регламентов и нормативных документов банка, должны полностью соответствовать часто меняющимся требованиям законодательства и всем инструкциям и правилам ЦБ РФ. Так, например, за процедурой открытия вклада стоит множество взаимодействий различных классов систем и компонентов, начиная от сбора и хранения согласий клиента на обработку персональных данных, интеграций с госсервисами и системами сбора сведений и заканчивая формированием приходно-кассовых ордеров.
Подобные процессы мы реализуем с помощью движка Camunda, который позволяет прототипировать и визуализировать сложную последовательность шагов и моделировать flow совместно с бизнес-заказчиком, а также обладает множеством инструментов для реализации интеграций, оркестрации, мониторинга, сбора бизнес-метрик и тестирования. + быстро и относительно незатратно менять процессы (можно привести пример с запретом перевода валюты в другие банки). Вдобавок Camunda является open-source решением, что крайне важно в текущих реалиях и т.д.
Впервые в наше подразделение Camunda попала в рамках закупки некоторого функционала у вендора. Реализация вендора была закрытой, сам движок инкапсулировался в каждый отдельный микросервис и был окружён различными самописными расширениями для централизованной разработки bpmn-процессов в браузере и сбора данных от всех запущенных инстансов.
В рамках рефакторинга и развития закупленного функционала с подобным подходом мы поняли, что плюсов у предложенного/перенятого решения примерно столько же, сколько ограничений и минусов, и пришли к необходимости перехода на открытый чистый движок без доработок вендора.
Архитектура
Мы провели сравнение Camunda 8 и Camunda 7 по нашим внутренним критериям, проанализировали варианты развёртывания Camunda 7 (https://docs.camunda.org/manual/7.19/introduction/architecture/) и остановились на кластерной модели Camunda 7 с рекомендованной разработчиками Camunda архитектурой развёртывания c использованием удаленного движка, а точнее Camunda Run в качестве workflow-движка.
Далее мы развернули отдельный инстанс СУБД Postgres только под задачи Camunda, несколько экземпляров Camunda Run (воркеры) в кластере за балансировщиком для исполнения процессов, а также отдельный независимый экземпляр Camunda Run исключительно для задач администрирования и мониторинга.
Настройки Camunda Run для администрирования и мониторинга процессов:
camunda.bpm:
job-execution:
enabled: false
Настройки узлов Camunda Run (воркеров) за балансировщиком
camunda.bpm:
job-execution:
max-wait: 10000
backoff-time-in-millis: 50
max-backoff: 250
backoff-decrease-threshold: 100
wait-increase-factor: 2
Из коробки Camunda не умеет работать с Kafka, но большая часть наших процессов и внутренних систем базируются на event-driven паттерне, поэтому после первого этапа выбора архитектуры и развёртывания системы нам было необходимо подружить Camunda с Kafka:
для возможности автоматического запуска определённых бизнес-процессов по мере появления потребности
для запроса действий в других системах в рамках исполнения различных бизнес-задач
для обеспечения интеграционных взаимодействий
Укрупнённо план выглядел следующим образом:
Где...
Микросервисы активации процессов:
являются держателями спроектированных bpmn-диаграмм
могут вести учёт и версионирование диаграмм
взаимодействуют с Kafka и умеют соотносить нужный процесс с тем или иным событием
обеспечивают идемпотентность на случай дублей или сбоев
Микросервисы обработки внешних задач :
используются вместо делегатов
обращаются к Camunda с заданной частотой для получения задач
могут исполнять сложную логику (например, сравнения данных из различных систем)
могут запускать другие bpmn-процессы (через Kafka и взаимодействия с микросервисами активации процессов)
позволяют извлечь необходимые процессу данные из других систем
Часть бизнес-процесса открытия вклада, завязанная на асинхронный обмен событиями по Kafka, после этапа прототипирования выглядела следующим образом:
Здесь стоит сказать, что для взаимодействия с Kafka мы используем модель, предложенную Крисом Ричардсоном,
Асинхронное API наших сервисов состоит из каналов и соответствующих типов сообщений, таких как:
входящие команды,
ответы на командные запросы и
события
Схема асинхронного API сервиса из книги Ричардсона приведена ниже:
Таким образом, возвращаясь к bpmn-схеме, кубик «Отправка запроса открытия вклада» должен быть реализован отправкой командного события в топик Kafka.
Внешняя задача для данного шага была реализована с использованием базового класса обработки задач, наследующего интерфейс библиотеки Camunda
org.camunda.bpm.client.task.ExternalTaskHandler
с преобразованием атрибутов объекта ExternalTask в расширенный под наши потребности DTO:
/**
* Обобщенное выполнение задачи.
*
* @param externalTask внешняя задача
* @param externalTaskService сервис внешних задач
*/
@Override
public void execute(@NonNull final ExternalTask externalTask,
@NonNull final ExternalTaskService externalTaskService) {
final TaskBO task = ExternalTaskToTaskBOMapper.EXTERNAL_TASK_TO_TASK_BO_MAPPER.map(externalTask);
Для сквозной и межсистемной трассировки мы передаём в хедеры сообщения в Kafka корреляционный ключ инстанса работающего бизнес-процесса (correlationId), и этот же идентификатор вернётся нам в хедерах reply-события для связки ответа микросервиса открытия вклада с конкретным исполняющимся процессом.
/**
* Отправка сообщения в топик
*
* @param task DTO-представление внешней задачи
* @param message DTO для отправки в топик Kafka
*/
public void sendCommand(@NonNull final TaskBO task,
@NonNull final Object message) {
kafkaProducer.sendCommand(task.getBusinessKey(), message, task.getHeaders());
}
Кубик получения результата открытия депозита из топика Kafka также реализован внешней задачей.
Мы реализовали подписку на заданный reply-топик микросервиса открытия депозитов и ожидали в нём появления ответного события по первоначальному командному запросу.
String processCorrelationId = externalTask.getVariable(CORRELATION_ID);
correlationIds.add(processCorrelationId);
executor.execute(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>
(defaultConsumerProperties)) {
consumer.subscribe(Collections.singletonList(externalTask.getVariable(KAFKA_TOPIC)));
...
}
...
Пока инстанс микросервиса обработки внешних задач был один – всё работало отлично, каждое входящее событие из reply-топика Kafka фильтровалось consumer-ом по бизнес-ключу и хедерам, и в случае совпадения значения correlationId в хедере с любым элементом внутреннего списка микросервиса correlationIds, задача успешно завершалась. Однако, как только мы пытались масштабироваться и увеличить количество инстансов микросервисов обработки задач – часть reply-событий для одного инстанса-подписчика попадали в обработку другого инстанса-подписчика, correlationId входящего события отсутствовал во внутреннем списке инстанса correlationIds, и исполнение внешних задач в процессе отваливалось по таймауту.
Для исправления данной ситуации потребовалось переопределить свойства подписчика и помещать его в уникальную группу для того чтобы каждый инстанс микросервиса исполнения внешних задач получал каждое событие из нашего reply-топика Kafka:
Properties consumerProperties = new Properties(defConsumerProperties);
consumerProperties.putAll(defConsumerProperties);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_TASK_GROUP_ID + UUID.randomUUID());
И далее применить данные свойства к consumer:
executor.execute(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties))
{
String kafkaTopic = externalTask.getVariable(KAFKA_TOPIC);
consumer.subscribe(Collections.singletonList(kafkaTopic));
В результате всех перечисленных изысканий укрупнённая схема взаимодействия компонентов при необходимости масштабирования выглядела следующим образом:
Собственно, данных артефактов и манипуляций было достаточно для реализации и исполнения приближенного к реальному бизнес-процесса открытия вклада и проведения итерации нагрузочного тестирования (для того чтобы убедиться в жизнеспособности схемы под нагрузкой на проде).
Первая итерация нагрузочного тестирования включала в себя один инстанс воркера (Camunda Run), 1 инстанс сервиса активации процесса и один инстанс сервиса обработки внешних задач и стартовые показатели ресурсов:
Настройки деплоймента для микросервиса обработки внешних задач:
resources:
limits:
cpu: '1'
memory: 512Mi
requests:
cpu: '1'
memory: 512Mi
Настройки деплоймента для микросервиса обработки активации процессов:
resources:
limits:
cpu: '1'
memory: 512Mi
requests:
cpu: '1'
memory: 512Mi
Настройки деплоймента воркера:
resources:
limits:
cpu: '4'
memory: 2G
requests:
cpu: '1'
memory: 512Mi
В течение 30 минут на данной конструкции и ограничениях были запущены и успешно завершены 300 экземпляров одного и того же прототипа процесса открытия вклада, состоящего более чем из 50 отдельных блоков-шагов, без достижения экстремума. НТ продолжается.