Зимой 2024 года мне довелось поучаствовать в разработке проекта на Camunda 8. Сразу оговорюсь: проект в итоге реализовали на другом движке. Тем не менее, команда успела сделать стенд, прогнать тесты и замерить производительность.

В этой статье я расскажу об одном эпизоде, когда мне пришлось исправить библиотеку spring-zeebe из Camunda 8, отвечающую за обвязку Job Worker на Spring. Одной из моих задач было отладить процесс, используя локальные интеграционные тесты с Docker и библиотекой Testcontainers. Мы создали простой процесс, вызывающий наш Job Worker, подлежавший отладке. Сразу столкнулись с плавающей ошибкой: иногда тест проходил успешно, иногда — нет. Несколько тасков работали корректно, однако первый периодически игнорировался без видимых ошибок. Изначально подозревали проблему в собственном коде, проверяли конфигурацию Job Worker, благодаря чему узнали много нового о Camunda 8.

Весь день ушёл на локализацию проблемы. Пришлось внимательно изучить стек вызовов при выполнении Job в библиотеке spring-zeebe, конкретно класс JobWorkerImpl. К слову, последние версии spring-zeebe переехали в camunda-spring-boot-starter и camunda-client-java, больше не являясь открытым проектом.

В итоге выяснилось, что в классе JobWorkerImpl обрывается обработка не успев начаться:


  private void handleActivatedJob(final ActivatedJob job, final Runnable finalizer) {
    metrics.jobActivated(1); // Здесь обрывалась обработка
    try {
      executor.execute(jobHandlerFactory.create(job, finalizer));
    } catch (final RejectedExecutionException e) {
      if (isClosed()) {
        return;
      }

      if (scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
        LOG.warn("Underlying executor was closed before the worker. Closing the worker now.", e);
        close();
        return;
      }

      LOG.warn(ERROR_MSG, job.getKey(), e);
    }
  }

Программист, писавший этот код не верил, что в метриках может быть ошибка, он не внес metrics.jobActivated(1); в try блок. Но ошибка как раз там и случилась. Найти такую ошибку можно было только поставив точки останова. Но в чем же была причина ошибки в недрах этого вызова? Спустившись на несколько уровней вызова, я добрался до класса MicrometerMetricsRecorder.

Вот здесь-то и выявился источник ��роблемы — исключение типа NullPointerException, возникавшее при попытке получить значение по ключу в структуре HashMap. Дело в том, что другой поток иногда опережал основной, и ситуация складывалась такая: ключ уже присутствовал, однако само значение ещё не успело записаться в соответствующую ячейку.

Ниже привожу этот файл с моими исправлениями:

diff --git spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java
index 3045cfc..2695dc7 100644
--- spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java
+++ spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java
@@ -1,54 +1,52 @@
 package io.camunda.zeebe.spring.client.actuator;
 
 import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
 import io.micrometer.core.instrument.Counter;
 import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.Tag;
 import io.micrometer.core.instrument.Timer;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MicrometerMetricsRecorder implements MetricsRecorder {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MeterRegistry meterRegistry;
-  private final Map<String, Counter> counters = new HashMap<>();
+  private final Map<String, Counter> counters = new ConcurrentHashMap<>();
 
   public MicrometerMetricsRecorder(final MeterRegistry meterRegistry) {
     LOGGER.info("Enabling Micrometer based metrics for spring-zeebe (available via Actuator)");
     this.meterRegistry = meterRegistry;
   }
 
   protected Counter newCounter(String metricName, String action, String jobType) {
     List<Tag> tags = new ArrayList<>();
     if (action != null && !action.isEmpty()) {
       tags.add(Tag.of("action", action));
     }
     if (jobType != null && !jobType.isEmpty()) {
       tags.add(Tag.of("type", jobType));
     }
     return meterRegistry.counter(metricName, tags);
   }
 
   @Override
   public void increase(String metricName, String action, String type, int count) {
     String key = metricName + "#" + action + '#' + type;
-    if (!counters.containsKey(key)) {
-      counters.put(key, newCounter(metricName, action, type));
-    }
-    counters.get(key).increment(count); // Здесь NullPointerException
+    Counter counter = counters.computeIfAbsent(key, k -> newCounter(metricName, action, type));
+    counter.increment(count);
   }
 
   @Override
   public void executeWithTimer(String metricName, String jobType, Runnable methodToExecute) {
     Timer timer = meterRegistry.timer(metricName, "type", jobType);
     timer.record(methodToExecute);
   }
 }


Поясню, этот код. Для того, чтобы по каждому типу воркера создавались свои метрики, они тут создаются динамически, при первом обращении к методу increase, за что отвечает метод newCounter.
Я внес исправления в этот код заменив HashMap на потокобезопасный ConcurrentHashMap и использовал удобный атаомарный вызов computeIfAbsent, который возвращает существующий или созданный объект, что исключает возникновение race condition.

После этого наши тесты стали работать без ошибок. Я завел issue в проекте spring-zeebe, сделал fork по правилам проекта и отдал на модерацию. Через две недели мой код попал в основную ветку spring-zeebe.

Вот мой небольшой вклад в Camunda 8:
https://github.com/camunda-community-hub/spring-zeebe/commit/61795fc9274283fba2ad3158e41cc3be8c48f158

Почему разработчики spring-zeebe не нашли эту ошибку?

  • Во-первых, эта ошибка проявлялась лишь тогда, когда мы включали actuator в Spring. Пока мы её искали, не знали, что отключение актуатора решает проблему. А во всех тестах и примерах от Camunda8 actuator выключен. Мы же писали микросервис для нашего kuber и сразу включили нужные нам опции, что и включило кусок кода с ошибкой.

  • Во-вторых, ошибка возникала исключительно при первом запуске задания (Job) и лишь один раз из трёх т��кая ситуация приводила к сбою.

Разбираясь с этой проблемой, я перекопал кучу материала и пересмотрел кучу кода. Мне запомнилось видео, в котором разрабы показывают, как они работают над кодом на каком-то митапе:
Falko & Josh: Zeebe Java client deep-dive
В этом видео рассказывается принцип вычитывания и исполнения Job, что это 1 поток, который вычитывает несколько Job-ов, а затем он же их и исполняет. Понятно, что при такой организации выполнения Job-ов, это не может быть оптимальным по скорости решением, но имеет преимущества при масштабировании.

Впоследствии тот проект, который планировалось реализовать на Camunda 8, переделали на другом движке. Вот несколько причин:

  • Производительность, измеренная в ходе тестов, оказалась недостаточной для платежного приложения.

  • Обнаруженные ошибки и проблемы с документацией выявили низкую зрелость проекта на тот момент (2024 год).

  • Юридические ограничения, невозможность получения поддержки законным способом в России.

Выводы:

  • Разработывая приложения на Spring, важно помнить, что это многопоточное приложение и размщение данных в полях Bean классов, например, как в описанном случае private final Map<String, Counter> counters = new HashMap<>(); должно быть синхронизировано между потоками при доступе к этому полю или иметь потокобезопасную реализацию, например, ConcurrentHashMap.

  • Перспективы Camunda 8 в России туманны, в отличие от Camunda 7, которая имеет множество успешных внедрений.