Меня зовут Павел Плетнев, я разработчик в команде кредитных карт в Тинькофф. Хочу поделиться, как можно заранее оптимизировать работу с историей в Camunda или решить проблемы, если вдруг они появились.

Зачем сохранять историю

Одно из преимуществ Camunda — возможность наблюдать за ходом выполнения процесса. Мы используем админку Excamad:

История вымышленного процесса в Excamad

Механизм реализован из коробки: события истории — старт и завершение процесса, история выполнения активити и так далее — сохраняются в базу данных.

Примерная схема хранения данных в Camunda

Спустя время в таком процессе начинаются проблемы: объем исторических таблиц увеличивается, нагрузка на процессор возрастает и время обработки всех запросов растет.

Нужно искать баланс между временем жизни событий истории и уровнем истории в зависимости от количества запускаемых процессов. Чем больше процессов запускается, тем меньше должно быть событий истории и меньше глубина хранения истории.

История в Camunda — ценный источник информации, но со временем деградация производительности из-за истории будет только расти.И тогда нужно уменьшать глубину хранения или отключать ее полностью.

Попробуем сохранить истории и не потерять в производительности. У нас три цели:

  1. Ничего не менять с точки зрения API для пользователей

  2. Уменьшить нагрузку на БД, очистить диск от исторических данных и освободить процессорное время

  3. Потратить минимум ресурсов на разработку нового решения

    Три этапа, которые помогут сохранить истории и не потерять производительность

Экспорт исторических данных в другую систему

В Camunda есть стандартный механизм для настройки обработки событий истории через интерфейс — HistoryEventHandler. Поэтому просто напишем нужную реализацию.

Механизм экспорта может быть любым — http-запросы, брокеры сообщений и так далее. Почему мы используем Kafka:

  1. Kafka поддерживает отказоустойчивость, когда недоступен сервис camunda_history_collector.

  2. Kafka поддерживает асинхронную отправку событий и благодаря этому нет влияния на процесс.

  3. Kafka позволяет контролировать нагрузку на сервис и БД camunda_history_collector за счет ограничения количества партиций.

  4. Kafka упорядочивает сообщения, поэтому сообщения можно обрабатывать в порядке их создания.

Совет: если в событии указать полное имя класса, это поможет при десериализации. Формат данных JSON, в котором будет всего два поля/объекта — название класса и сама сущность.

Пример обработчика истории:

class CamundaHistoryEventHandler(
    private val kafkaTemplate: KafkaTemplate<String, HistoryEventDto>
) : HistoryEventHandler {
     override fun handleEvent(historyEvent: HistoryEvent) {
        kafkaTemplate.sendDefault(
            historyEvent.processInstanceId,
            HistoryEventDto(
                historyEvent.javaClass.canonicalName,
                historyEvent
            )
        )
    }
 
    override fun handleEvents(historyEvents: MutableList<HistoryEvent>) {
        historyEvents.forEach {
            handleEvent(it)
        }
    }
}

Возможны два варианта реализации: с поддержкой транзакций и без. Работа с транзакциями реализуется через механизм сессий Camunda — Session. Выбор решения зависит от потребностей. Лучше работать с поддержкой транзакций. Но если есть длинные транзакции в самом процессе, подойдет второй вариант.

Можно сразу добавить возможность отключить сохранение истории в БД camunda_runtime. Для этого используйте плагин Camunda:

class KafkaHistoryEventEnginePlugin : ProcessEnginePlugin {
    override fun preInit(processEngineConfiguration: ProcessEngineConfigurationImpl) {
        processEngineConfiguration.isEnableDefaultDbHistoryEventHandler = false
    }
 
    override fun postInit(processEngineConfiguration: ProcessEngineConfigurationImpl?) = Unit
 
    override fun postProcessEngineBuild(processEngine: ProcessEngine?) = Unit
}

Совет: отключить сохранение истории на стороне camunda_runtime можно позже, если нужен плавный переход.

Схема экспорта исторических данных, результат первого этапа

Обработка исторических данных

Данные после экспорта можно записать в логи, сделать отдельное хранилище или отправлять их на почту. Но эти варианты противоречат первой и третьей целям, которые мы закрепили выше.

В Camunda есть готовый механизм для сохранения данных в БД, и его можно переиспользовать — нужно поднять еще одну Camunda и реализовать в ней нужную функциональность. За обработку событий истории отвечает класс DbHistoryEventHandler.

Через Jackson можно десериализовать событие и передать его в обработчик. 

Пример десериализации:

fun <T : HistoryEvent> readHistoryEvent(
        historyEvent: HistoryEventDto
): T {
     val clazz = Class.forName(historyEvent.eventClass) as Class<T>
     return objectMapper.treeToValue(historyEvent.eventDto, clazz)
}

Camunda работает через паттерн «Команда». Чтобы вызвать обработчик, нужно обернуть его в отдельную команду и вызывать ее выполнение через CommandExecutor.

class ProcessHistoryEventCommand(
    private val historyEvent: HistoryEvent,
    private val eventHandler: DbHistoryEventHandler
) : Command<Unit>, Serializable {
 
    override fun execute(
        commandContext: CommandContext
    ) {
        eventHandler.handleEvent(historyEvent)
    }
}
 
commandExecutor.execute(
    ProcessHistoryEventCommand(
       event,
       eventHandler
    )
)
Схема процесса обработки исторических данных, результат второго этапа

Настройка редиректа запросов для REST API Camunda

На предыдущих этапах мы настроили экспорт данных и сохранение их в базу. Остается реализовать последнюю цель — ничего не менять для пользователей со стороны API.

Создаем еще один источник данных — отдельный сервис с историей. При этом нужно агрегировать данные из разных систем автоматически. С этой задачей справится почти каждый сервис API gateway. Нужно просто перенаправить исторические запросы из camunda_runtime в новый сервис camunda_history_collector — и тогда для пользователей ничего не изменится.

Схема настройки редиректа исторических запросов для REST API Camunda, результат третьего этапа

Вместо заключения

Вместо подведения итогов расскажу о преимуществах выбранного решения:

  1. В основной БД camunda_runtime хранятся только runtime-данные. Благодаря этому снижается нагрузка на железо — процессор не обрабатывает исторические запросы, и объем занятого пространства на диске не возрастает.

  2. Поддерживается агрегация истории в одном сервисе — camunda_history_collector для разных camunda_runtime.

  3. В camunda_runtime можно управлять временем жизни для событий истории, но ttl задается глобально или на уровне определенного процесса. В camunda_history_collector можно указывать ttl для разных типов событий, когда не вся история нужна в дальнейшем. Например, историю выполнения job`ов хранить пять дней, а историю выполнения процесса — 30 дней.

  4. Работоспобность camunda_history_collector не влияет на работу camunda_runtime.


Мы опубликовали нашу библиотеку на Github для написания делегатов в декларативном стиле camunda-delegator-lib. Она помогает писать меньше бойлерплейта и делает код более читаемым.

Если вы хотите задать вопрос или предложить тему для обсуждения — добро пожаловать в комментарии или в наш GitHub.