Зимой 2024 года мне довелось поучаствовать в разработке проекта на Camunda 8. Сразу оговорюсь: проект в итоге реализовали на другом движке. Тем не менее, команда успела сделать стенд, прогнать тесты и замерить производительность.
В этой статье я расскажу об одном эпизоде, когда мне пришлось исправить библиотеку spring-zeebe из Camunda 8, отвечающую за обвязку Job Worker на Spring. Одной из моих задач было отладить процесс, используя локальные интеграционные тесты с Docker и библиотекой Testcontainers. Мы создали простой процесс, вызывающий наш Job Worker, подлежавший отладке. Сразу столкнулись с плавающей ошибкой: иногда тест проходил успешно, иногда — нет. Несколько тасков работали корректно, однако первый периодически игнорировался без видимых ошибок. Изначально подозревали проблему в собственном коде, проверяли конфигурацию Job Worker, благодаря чему узнали много нового о Camunda 8.
Весь день ушёл на локализацию проблемы. Пришлось внимательно изучить стек вызовов при выполнении Job в библиотеке spring-zeebe, конкретно класс JobWorkerImpl. К слову, последние версии spring-zeebe переехали в camunda-spring-boot-starter и camunda-client-java, больше не являясь открытым проектом.
В итоге выяснилось, что в классе JobWorkerImpl обрывается обработка не успев начаться:
private void handleActivatedJob(final ActivatedJob job, final Runnable finalizer) {
metrics.jobActivated(1); // Здесь обрывалась обработка
try {
executor.execute(jobHandlerFactory.create(job, finalizer));
} catch (final RejectedExecutionException e) {
if (isClosed()) {
return;
}
if (scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
LOG.warn("Underlying executor was closed before the worker. Closing the worker now.", e);
close();
return;
}
LOG.warn(ERROR_MSG, job.getKey(), e);
}
}
Программист, писавший этот код не верил, что в метриках может быть ошибка, он не внес metrics.jobActivated(1); в try блок. Но ошибка как раз там и случилась. Найти такую ошибку можно было только поставив точки останова. Но в чем же была причина ошибки в недрах этого вызова? Спустившись на несколько уровней вызова, я добрался до класса MicrometerMetricsRecorder.
Вот здесь-то и выявился источник ��роблемы — исключение типа NullPointerException, возникавшее при попытке получить значение по ключу в структуре HashMap. Дело в том, что другой поток иногда опережал основной, и ситуация складывалась такая: ключ уже присутствовал, однако само значение ещё не успело записаться в соответствующую ячейку.
Ниже привожу этот файл с моими исправлениями:
diff --git spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java
index 3045cfc..2695dc7 100644
--- spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java
+++ spring-boot-starter-camunda/src/main/java/io/camunda/zeebe/spring/client/actuator/MicrometerMetricsRecorder.java
@@ -1,54 +1,52 @@
package io.camunda.zeebe.spring.client.actuator;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MicrometerMetricsRecorder implements MetricsRecorder {
private static final Logger LOGGER =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MeterRegistry meterRegistry;
- private final Map<String, Counter> counters = new HashMap<>();
+ private final Map<String, Counter> counters = new ConcurrentHashMap<>();
public MicrometerMetricsRecorder(final MeterRegistry meterRegistry) {
LOGGER.info("Enabling Micrometer based metrics for spring-zeebe (available via Actuator)");
this.meterRegistry = meterRegistry;
}
protected Counter newCounter(String metricName, String action, String jobType) {
List<Tag> tags = new ArrayList<>();
if (action != null && !action.isEmpty()) {
tags.add(Tag.of("action", action));
}
if (jobType != null && !jobType.isEmpty()) {
tags.add(Tag.of("type", jobType));
}
return meterRegistry.counter(metricName, tags);
}
@Override
public void increase(String metricName, String action, String type, int count) {
String key = metricName + "#" + action + '#' + type;
- if (!counters.containsKey(key)) {
- counters.put(key, newCounter(metricName, action, type));
- }
- counters.get(key).increment(count); // Здесь NullPointerException
+ Counter counter = counters.computeIfAbsent(key, k -> newCounter(metricName, action, type));
+ counter.increment(count);
}
@Override
public void executeWithTimer(String metricName, String jobType, Runnable methodToExecute) {
Timer timer = meterRegistry.timer(metricName, "type", jobType);
timer.record(methodToExecute);
}
}
Поясню, этот код. Для того, чтобы по каждому типу воркера создавались свои метрики, они тут создаются динамически, при первом обращении к методу increase, за что отвечает метод newCounter.
Я внес исправления в этот код заменив HashMap на потокобезопасный ConcurrentHashMap и использовал удобный атаомарный вызов computeIfAbsent, который возвращает существующий или созданный объект, что исключает возникновение race condition.
После этого наши тесты стали работать без ошибок. Я завел issue в проекте spring-zeebe, сделал fork по правилам проекта и отдал на модерацию. Через две недели мой код попал в основную ветку spring-zeebe.
Вот мой небольшой вклад в Camunda 8:
https://github.com/camunda-community-hub/spring-zeebe/commit/61795fc9274283fba2ad3158e41cc3be8c48f158
Почему разработчики spring-zeebe не нашли эту ошибку?
Во-первых, эта ошибка проявлялась лишь тогда, когда мы включали actuator в Spring. Пока мы её искали, не знали, что отключение актуатора решает проблему. А во всех тестах и примерах от Camunda8 actuator выключен. Мы же писали микросервис для нашего kuber и сразу включили нужные нам опции, что и включило кусок кода с ошибкой.
Во-вторых, ошибка возникала исключительно при первом запуске задания (Job) и лишь один раз из трёх т��кая ситуация приводила к сбою.
Разбираясь с этой проблемой, я перекопал кучу материала и пересмотрел кучу кода. Мне запомнилось видео, в котором разрабы показывают, как они работают над кодом на каком-то митапе:
Falko & Josh: Zeebe Java client deep-dive
В этом видео рассказывается принцип вычитывания и исполнения Job, что это 1 поток, который вычитывает несколько Job-ов, а затем он же их и исполняет. Понятно, что при такой организации выполнения Job-ов, это не может быть оптимальным по скорости решением, но имеет преимущества при масштабировании.
Впоследствии тот проект, который планировалось реализовать на Camunda 8, переделали на другом движке. Вот несколько причин:
Производительность, измеренная в ходе тестов, оказалась недостаточной для платежного приложения.
Обнаруженные ошибки и проблемы с документацией выявили низкую зрелость проекта на тот момент (2024 год).
Юридические ограничения, невозможность получения поддержки законным способом в России.
Выводы:
Разработывая приложения на Spring, важно помнить, что это многопоточное приложение и размщение данных в полях Bean классов, например, как в описанном случае
private final Map<String, Counter> counters = new HashMap<>();должно быть синхронизировано между потоками при доступе к этому полю или иметь потокобезопасную реализацию, например, ConcurrentHashMap.Перспективы Camunda 8 в России туманны, в отличие от Camunda 7, которая имеет множество успешных внедрений.
