В этой статье мы расскажем о библиотеке оркестрации обработки данных, которая использует RabbitMQ как децентрализованный механизм передачи объектов между микросервисами. Ее задача - создание единого автоматизированного рабочего бизнес-процесса.

Описание архитектуры

Речь пойдет о системе, которая ежедневно обрабатывает большой объем данных, получаемых как единовременно в виде файлов скачиваемых с различных FTP обменников в установленные регламентом промежутки времени, так и данных, получаемых в онлайн режиме в виде асинхронного стриминга информации посредством Kafka\RabbitMQ.

 

Для начала опишем текущую архитектуру приложения, для которой потребовалось усовершенствовать взаимодействия ее компонентов:

Приложение имеет микросервисную архитектуру с четырьмя десятками выделенных сервисов. Каждый отвечает за свой аспект обработки, и большая часть из них объединена в единую цепочку обработки входного объекта.

В цепочке обработки есть как последовательное исполнение подзадач, так и различные ветвления по условиям, параллельный запуск независимых процессов, синхронное\асинхронное ожидание ответа и прочее. Реализация последовательности обработки на заре разработки приложения была сделана с использованием Apache Сamel (https://camel.apache.org/) фреймворка, где на Java DSL были описаны различные маршруты передачи объекта между бинамионвертациями. Схематично эту модель можно отобразить следующим образом:

Описание процесса содержится в одном микросервисе, который руководит обработкой объекта и передает управление посредством REST запросов во все смежные части системы. Спустя время из-за расширения системы, добавления этапов обработки, повышения производительности и увеличения количества параллельных потоков возникли предпосылки к разрешению узких мест этого решения, в первую очередь для обеспечения надежности процесса.

При любом рестарте микросервиса с Camel-обработкой текущий процесс теряется. Каждый такой сбой требует детального анализа и ручного перезапуска процесса по всем затронутым объектам.

Но даже если не брать в расчет сбойные ситуации, это накладывает определенные ограничения на время релиза плановых обновлений или фиксов. Другой момент — это невозможность масштабировать сервис из-за прочей содержащейся в нем логики и огромного объема ресурсов, со всеми параллельными потоками. Они создаются в Camel для обеспечения всех маршрутов, необходимых для его деплоя.

Решение проблемы

Мы решили реализовать библиотеку, которая обеспечивает децентрализованное управление процессом обработки. В качестве транспорта асинхронных событий между сервисами используется кластер RabbitMQ с кворумными очередями. Также мы выделили сервис инициализации процесса, который можно горизонтально масштабировать.

В таком варианте исполнения решается главный вопрос с сохранением консистентности обработки, где за сохранение состояния при рестарте приложения отвечает брокер сообщений и при возобновлении системы процесс обработки пойдет с того момента, на котором был прерван, без какого-либо ручного вмешательства.

Реализация библиотеки

Вся библиотека состоит из следующих основных сущностей:

  • RouteStepInfo– Информация об исполняемом шаге процесса.

    public class RouteStepInfo {
       private final String routeName;
       private final String stepId;
     
       public RouteStepInfo(String routeName, String stepId) {
            this.routeName = routeName;
            this.stepId = stepId;
       }
    }
  • StepExecutor – Исполнитель шага обработки процесса.

    public interface StepExecutor {
       String getStepName();
       void runAsync(RouteStepInfo routeStepInfo, Long objectId);
       void runSync(RouteStepInfo routeStepInfo, Long objectId);
       void runDetached(RouteStepInfo routeStepInfo, Long objectId);
       void appendListener(StepExecutionFinishedListener listener);
    }
  • StepExecutionFinishedListener — Исполнитель события завершения обработки шага процесса.

    public interface StepExecutionFinishedListener {
       void onStepExecutionFinished(RouteStepInfo routeStepInfo, Long objectId);
    }
  • RouteStep – Шаг процесса, с возможными реализациями к выполнению в async\sync\detached режиме. Также может представлять собой ранее сконфигурированный процесс.

    public interface RouteStep {
       String getRouteName();
       String getStepId();
       String getStepName();
       ExecutionMode getExecutionMode();
       void run(Long objectId);
       void appendListener(StepExecutionFinishedListener listener);
     
       enum ExecutionMode {
            ASYNC,
            SYNC,
           DETACHED
       }
    }
  • ExecutionRoute – Исполняемый процесс обработки сущностей, состоящий из набора RouteStep.

    @Slf4j
    public class ExecutionRoute {
       @Getter
       private final String routeName;
       private final List<RouteStep> routeSteps;
       private final Map<String, Integer> stepsIndex;
     
       public ExecutionRoute(String routeName, List<RouteStep> routeSteps) {
            this.routeName = routeName;
            this.routeSteps = routeSteps;
            this.stepsIndex = new HashMap<>();
            for (int i = 0; i < routeSteps.size(); i++) {
               stepsIndex.put(routeSteps.get(i).getStepId(), i);
            }
       }
    }
  • RoutesConfigBuilder – Конфигуратор процессов обработки сущностей.

    public class RoutesConfigBuilder {
       private final Map<String, StepExecutor> executorsMap;
       private final Map<String, List<RouteStep>> configuredRoutes;
     
       private String currentRouteName;
       private List<RouteStep> currentRouteSteps;
    }
  • RouterConfigProvider - Поставщик процессов обработки сущностей.

    public interface RouterConfigProvider {
       List<ExecutionRoute> getRoutes();
    }
  • ProcessRouter – Исполнитель процессов.

    public class ProcessRouter implements StepExecutionFinishedListener {
       private final Map<String, ExecutionRoute> routes;
     
       public ProcessRouter(RouterConfigProvider configProvider) {
            routes = new HashMap<>();
     
            for (ExecutionRoute route : configProvider.getRoutes()) {
                route.appendListener(this);
                routes.put(route.getRouteName(), route);
            }
       }
     
       public void runProcess(String routeName, long objectId) {
       }
     
       @Override
       public void onStepExecutionFinished(RouteStepInfo routeStepInfo, Long objectId) {
       }
     
    }

В результате анализа текущих цепочек обработки и взаимодействия процессов через Camel мы выделили следующие базовые сценарии работы:

Асинхронная обработка (Async):

Ситуация, при которой можно запустить последовательную обработку N этапов процесса в фоновом режиме. После завершения обработки одного шага процесс переходит к выполнению следующего этапа цепочки.

Синхронная обработка (Sync):

Ситуация, при которой необходимо ожидать возвращения управления после выполнения в заданной последовательности всех этапов цепочки процесса.

Изолированная обработка (Detached):

Ситуация, при которой этап обработки запускается асинхронно без ожидания завершения процесса и управление после запуска сразу переходит к выполнению следующего этапа цепочки.

Таким образом с помощью разных способов запуска можно создать разные ветвления и последовательности обработки этапов процесса.
 

По примеру Route 1 – полностью синхронный вызов, при котором последовательно будут выполнены шаги 1, 2, 3 каждый следующий после завершения предыдущего и будет получено управление после окончания последнего.

По примеру Route 2 – полностью асинхронный вызов, при котором последовательно будут выполнены шаги 1, 2, 3 каждый следующий после завершения предыдущего и будет получено управление сразу после запуска шага 1 без окончания обработки цепочки.

По примеру Route 3 – асинхронный запуск с промежуточным изолированным detached этапом. Шаг 3 будет запущен с минимальной задержкой без ожидания окончания процесса по шагу  2.

По примеру Route 5 – здесь порождается запуск параллельного подпроцесса Route 4. Шаг 2.2 будет запущен с минимальной задержкой без ожидания окончания подпроцесса Route 4.

Описанный способ оркестрации обработки процессов после реализации стал применяться для всех новых блоков при разработке системы. Было проведено нагрузочное тестирование, после которого мы приступили непосредственно к замещению отдельных частей Camel обработки с использованием этой библиотеки.

Если коснуться темы внедрения и замещения одного роутинга шагов процесса на другой, то здесь сложность обусловлена тем, что текущая интеграция взаимодействия сервисов приложения друг с другом посредством Camel уже много лет находится в промышленной эксплуатации, и обросла где-то не самыми очевидными зависимостями процессов. Поэтому замещение производится поэтапно небольшими частями с сохранением и возможностью переключения на старый способ роутинга обработки.

Заключение

Разработанная библиотека позволила нам:

  • Повысить отказоустойчивость - при рестартах сервисов обработка продолжается с места остановки без ручного вмешательства.

  • Обеспечить горизонтальное масштабирование - сервис инициализации теперь можно масштабировать независимо.

  • Упростить внедрение новых процессов - конфигурация через RoutesConfigBuilder стала прозрачной.

Замещение Camel идет поэтапно, с сохранением возможности отката на старую логику. Это снижает риски при миграции и позволяет постепенно переводить систему на новую архитектуру.

Опыт показал, что децентрализованная оркестрация на базе RabbitMQ — это надежный и масштабируемый подход для систем с высокими требованиями к отказоустойчивости.

Если у вас похожие проблемы с потерей состояния при рестартах или сложностью масштабирования, возможно, стоит посмотреть в сторону такого решения.