При разработке программных продуктов иногда может появиться задача разработки многосоставного ступенчатого процесса. На каждом этапе которого нужно предусмотреть обработку ошибок, повторное выполнение операции, откат с начальному состоянию в случае ошибки и тд. Возникают вопросы о том, как реализовать обработку критических ситуаций, как хранить состояние процесса и тд. Если процесс сложный и длительный по времени, то реализация этой задачи может быть непростой для разработки и тестирования всех вариантов развития процесса.

В этом случае можно подумать о применении оркестраторов. В этой статье я расскажу про temporal и покажу пример кода. В конце приложу ссылку на исходный код, так что можно будет скачать и самому поиграться с настройками, процессами и тд.

Краткое инфо

Чем temporal не является:

  • Это не планировщик задач (cron). Хотя он может запускать задачи по расписанию, но он не заменяет cron напрямую, потому что запускать сервер temporal слишком избыточно; нет встроенной визуализации расписаний и есть ограничения в гибкости cron выражениях

  • Это не очередь сообщений. Temporal - это не kafka, rabbit и тд. Он не предназначен для простой передачи сообщений между сервисами.

  • Это не база данных. Хотя temporal и хранит состояние процессов, но он не предназначен для использования как СУБД.

А что же это такое:

Temporal - это оркестратор, который управляет вашим кодом. Он гарантирует надежное выполнение долгоживущих операций в распределенных системах. В нем есть встроенная поддержка ретраев с настраиваемыми таймаутами и сохранение состояния процесса с возможностью запуска с места остановки.

Важная особенность - это описание оркестрируемого процесса в коде с небольшим количеством добавок в виде аннотаций и интерфейсов.

На официальном сайте можно найти документацию по инструменту для многих языков (Go, Java, Python ...). Кроме этого на официальном сайте есть ссылка на слак, где можно задать вопросы разработчикам и другим пользователям Temporal. В этом канале еще есть ИИ, которая обучена на документации.

Преимущества temporal

  • Отказоустойчивость: Выдерживает падения сервисов, сетевые сбои, перезапуски инфраструктуры.

  • Долгоживущие процессы: Можно легко управлять процесс��ми, которые длятся дни или месяцы (например, пробная подписка или онбординг).

  • Простота разработки:  Вся логика пишется как простой последовательный код. Исчезает необходимость вручную управлять очередями, повторными попытками (retries) и компенсирующими транзакциями (sagas).

  • Визуализация и отладка: UI Temporal показывает полную историю выполнения любого workflow, что сильно упрощает отладку.

Несколько важных терминов

Вначале нужно упомянуть несколько терминов, чтобы потом все понятно было:

Рабочий процесс (Workflow):

Workflow - это процесс, состоящий из нескольких элементарных шагов. Это может быть, например, процесс модерации поста, процесс перевода денег с одного счета на другой, процесс бронирования жилья, билетов и тд.

Этот процесс мы описываем в коде, так что визуально сложно отличить от обычной функции, в которой есть последовательные вызовы методов.

Детерминированный: При повторных запусках с теми же данными должен давать одинаковый результат, рекомендации ниже раскроют этот пункт.

Рекомендации для написания workflow:

  • Не используйте глобальные изменяемые переменные в своих имплементациях Workflow. Это гарантирует, что различные Wokrflow изолированы.

  • Код workflow должен быть детерминирован. Не вызывайте недетерминированные функции (например UUID.randomUUID()) напрямую из кода Workflow. Temporal SDK предоставляет API для вызова недетерминированного кода.

  • Не используйте конструкции языка для получения системного времени, вместо этого используйте только Workflow.currentTimeMillis() для получения текущего времени в коде Workflow.

  • Не используйте Thread или другие классы многопоточности как ThreadPoolExecutior. Вместо этого для выполнения асинхронного кода используйте Async.funtion или Async.procedure предоставляемые Temporal SDK

  • Не используйте синхронизацию, локи или другие стандартные классы для синхронизации кода кроме тех, что поставляются классом Workflow. Нет явной необходимости в синхронизации кода, потому что код Workflow выполняется в одном потоке под глобальным локом.

  • Вызывайте Workflow.sleep вместо Thread.sleep

  • Используйте Promise и CompletablePromise вместо Future и CompetableFuture

  • Используйте WorflowQueue вместо BlockingQueue

  • Используйте Workflow.getVersion когда изменяете Workflow. Без этого старые процессы могут упасть из-за несоответствия процесса

Деятельность (Activity):

Это конкретный шаг нашего процесса. Для примера можно назвать: вызов внешнего API, сохранение состояния в базу, отправка нотификации пользователю и тд

Activity уже может содержать недетерминированный код, а кроме того в активити мы можем внедрять наши бины.

Здесь еще нужно упомянуть, что входные и выходные объекты для Activity должны быть сериализуемыми, чтобы Temporal мог их сохранить в базу. Кроме этого, есть ограничение на объем.

Кроме этого, activity должен быть идемпотентым, те корректно выполняться вне зависимости от количества вызовов. Если мы в самом начале шага создаём пользователя в базе, то при нескольких ретраях этого шага создадим дубли в базе.

Namespace

Namespace - это пространство для разделения разных Workflow. Разработчики Temporal предлагают использовать этот механизм для разделения процессов в dev и prod средах. Сомнительно, но окей.

В итоге получается вот такая схема из этих компонентов:

Схема компонентов
Схема компонентов

Некоторые детали реализации Temporal

  • В Temporal реализован подход Event Sourcing. Temporal не хранит текущее состояние вашего workflow. Вместо этого он хранит историю событий (запущен, вызвана активность X, активность X завершена успешно, и т.д.). При перезапуске воркер «проигрывает» эту историю, пропуская шаги, которые уже были выполнены, чтобы восстановить последнее состояние. По этой причине большое количество активностей вызывает большое количество записей в бд. И именно поэтому изменение Workflow может вызывать ошибку, если сделать это неправильно.

  • В Temporal используется версионирование процессов, поэтому при модификации с нарушением совместимости можно выбрать подходящий процесс, чтобы не поломать старое и добавить новое. Это стоит применять, если есть активные workflow со старым алгоритмом.

  • Гарантия «ровно-один раз»: Благодаря сохранению состояний и ретраям, каждая деятельность будет выполнена ровно один раз, даже если для этого потребуется несколько попыток. Кроме этого, у каждого workflow есть идентификатор, поэтому запустить дублирующий workflow со второго инстанса вашего приложения не получитчся.

Пример кода

Идея проекта

Напишем пример на java для модер��ции поста. Этот процесс будет состоять из 3х шагов:

  • Проверка поста на количество символов

  • Проверка поста на содержание запрещенных слов

  • Проверка на везение Если какой-то из этап возвращает отрицательный результат, то мы заканчиваем процесс модерации и сохраняем отрицательный результат модерации. В дальнейшем можно сюда добавить еще функциональности, чтобы показать разные возможности temporal.

Полный код проекта можно посмотреть в git репозитории, ссылку смотрите в конце.

Развертывание temporal

Temporal состоит из нескольких компонентов, которые можно развернуть в докере или кубере. Для простоты напишем docker compose для поднятия всех необходимых компонентов. В первом приближении есть такие компоненты:

  • Сервер temporal

  • UI temporal

  • База данных для temporal

Docker compose
services:
 postgresql:
 container_name: temporal-postgresql
 environment:
 POSTGRES_PASSWORD: temporal
 POSTGRES_USER: temporal
 image: postgres:15
 networks:
 - temporal-network
 ports:
 - 5454:5432
 volumes:
 - /var/lib/postgresql/data

temporal:
 container_name: temporal
 depends_on:
 - postgresql
 environment:
 - DB=postgres12
 - DB_PORT=5432
 - POSTGRES_USER=temporal
 - POSTGRES_PWD=temporal
 - POSTGRES_SEEDS=postgresql
 - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
 - TEMPORAL_ADDRESS=temporal:7233
 - TEMPORAL_CLI_ADDRESS=temporal:7233
 - TEMPORAL_NAMESPACE_DEFAULT=default
 - TEMPORAL_DEFAULT_NAMESPACE=default
 image: temporalio/auto-setup:1.29.0
 networks:
 - temporal-network
 ports:
 - 7233:7233
 volumes:
 - ./dynamicconfig:/etc/temporal/config/dynamicconfig

temporal-admin-tools:
 container_name: temporal-admin-tools
 depends_on:
 - temporal
 environment:
 - TEMPORAL_ADDRESS=temporal:7233
 - TEMPORAL_CLI_ADDRESS=temporal:7233
 image: temporalio/admin-tools:1.29
 networks:
 - temporal-network
 stdin_open: true
 tty: true

temporal-ui:
 container_name: temporal-ui
 depends_on:
 - temporal
 environment:
 - TEMPORAL_ADDRESS=temporal:7233
 - TEMPORAL_CORS_ORIGINS=http://localhost:3000
 image: temporalio/ui:2.41.0
 networks:
 - temporal-network
 ports:
 - 8888:8080
 networks:
 temporal-network:
 driver: bridge
 name: temporal-network

Конфигурация

Для начала нам нужно настроить конфигурацию для temporal в нашем приложении.

В этом проекте я буду использовать стартер спринга для простоты настройки, но можно использовать голый jdk для temporal. Ниже приведу пример настройки для обоих вариантов :

Конфигурация через Spring starter

Ниже дан пример yaml конфигурации для стартера:

spring:  
  application:  
    name: temporal-demo  
  temporal:
    connection:  
      target: 127.0.0.1:7233  
      target.namespace: default  
    workers:  
      - task-queue: MODERATION_TASK_QUEUE  
        capacity:  
          max-concurrent-workflow-task-pollers: 6  
          max-concurrent-activity-task-pollers: 6  
        rate-limits:  
          max-worker-activities-per-second: 2  
          max-task-queue-activities-per-second: 2  
    workflow-cache:  
      max-instances: 10  
      max-threads: 10 
    workersAutoDiscovery:  
      packages: com.moderation

В секции connection мы указываем параметры подключения к temporal и наш namespace.
В секции workers мы указываем параметры воркеров для нашего workflow. Worker - это внутренний компонент в temporal, который выполняет наши activity, иными словами - рабочая лошадка.
В секции workersAutoDiscovery мы указываем пакет с activity и workflow, которые будут автоматически сканироваться спрингом и регистрироваться в воркерах.

Конфигурация через голый jdk

Для начала нам нужно создать бин WorkflowClient, который позволит нам взаимодействовать с temporal.

@Configuration  
public class TemporalConfig {  
  
  @Bean  
  public WorkflowClient workflowClient() {  
    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();  
    return WorkflowClient.newInstance(service, WorkflowClientOptions.newBuilder()  
        .setNamespace("default")  
        .build());  
  }  
}

Создание воркера

Голый jdk ничего не знает про бины, поэтому приходится самому регистрировать activity в воркере, чтобы он мог их выполнять. Если мы этого не сделаем, то в ui будет висеть уведомление, что подходящего воркера не найдено:

@Component  
@RequiredArgsConstructor  
public class TemporalWorker implements CommandLineRunner {  
  
  private final WorkflowClient workflowClient;  
  private final Moderationservice moderationService;  
  private final ModerationDao moderationDao;  
  
  @Override  
  public void run(String... args) {  
    WorkerFactory factory = WorkerFactory.newInstance(workflowClient);  
    Worker worker = factory.newWorker("MODERATION_TASK_QUEUE");  
  
    worker.registerWorkflowImplementationTypes(ModerationWorkflowImpl.class);  
    
    worker.registerActivitiesImplementations(new SymbolCountModerationActivityImpl(moderationService, moderationdao));  
    worker.registerActivitiesImplementations(new ForbiddenWordModerationActivityImpl(moderationService, moderationdao));  
    worker.registerActivitiesImplementations(new LuckTestModerationActivityImpl(moderationService, moderationdao));  
  
    factory.start();  
  }  
}

Теперь реализуем все шаги модерации. Я буду каждый шаг реализовать в отдельном activity для возможности разной настройки таймаутов каждого шага. Если в этом нет необходимости, то можно и в одном интерфейсе написать три метода для каждого из трех шагов.

Реализуем первый шаг

В первом шаге нужно проверить пост на количество символов.

В temporal для каждого activity нужно создать интерфейс с определенными аннотациями:

@ActivityInterface  
public interface SymbolCountModerationActivity {  
  
  @ActivityMethod  
  ModerationStatus symbolCountModerate(Moderation moderation);  
}

Ну а далее мы пишем реализацию этого интерфейса:

@Component  
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")  
@RequiredArgsConstructor  
public class SymbolCountModerationActivityImpl implements SymbolCountModerationActivity {  
  
  private final ModerationDao moderationDao;  
  private final ModerationService moderationService;  
  
  @Override  
  public ModerationStatus symbolCountModerate(Moderation moderation) {  
    ModerationStatus status = moderationService.symbolCountModeration(moderation);  
    if (!status.isApproved()) {  
      moderationDao.updateFinalStatus(moderation.id(), true, false);  
    }  
    return status;  
  }  
}

Далее представлен код сервиса модерации:

// Методы из ModerationService
public ModerationStatus symbolCountModeration(Moderation moderation) {  
  log.info("Started first stage for {}", moderation);  
  simulateLongExecution();  
  var isApproved = moderation.text().length() < CONTENT_LENGTH;  
  return new ModerationStatus(true, isApproved);
}
  
private void simulateLongExecution() {  
  try {  
    sleep(10_000L);  
  } catch (InterruptedException ignored) {  
  }
}

// Dto статуса модерации 
public record ModerationStatus(  
  boolean isFinished,  
  boolean isApproved  
) {}

Важный нюанс состоит в том, что в сам workflow мы не можем внедрять бины, а в активити мы можем это делать.

Реализуем второй шаг

На втором шаге будем проверять наличие в тексте поста запрещенных слов.

Далее представлен интерфейс второго активити (шага).

@ActivityInterface  
public interface ForbiddenWordModerationActivity {  
  
  @ActivityMethod  
  ModerationStatus forbiddenWordModerate(Moderation moderation);  
}

Далее представлена реализация второго шага:

@Component  
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")  
@RequiredArgsConstructor  
public class ForbiddenWordModerationActivityImpl implements ForbiddenWordModerationActivity {  
  
  private final ModerationService moderationService;  
  private final ModerationDao moderationDao;  
  
  @Override  
  public ModerationStatus forbiddenWordModerate(Moderation moderation) {  
    ModerationStatus status = moderationService.forbiddenWordModeration(moderation);  
    if (!status.isApproved()) {  
      moderationDao.updateFinalStatus(moderation.id(), true, false);  
    }  
    return status;  
  }  
}

Ну и сам код модерации:

private static final List forbiddenFords = List.of("box", "men", "noigram", "metan");

public ModerationStatus forbiddenWordModeration(Moderation moderation) {  
  log.info("Started second stage for {}", moderation);  
  simulateLongExecution();  
  var content = moderation.text();  
  boolean isApproved = forbiddenFords.stream()  
      .noneMatch(content::contains);  
  return new ModerationStatus(true, isApproved);  
}

Реализуем третий шаг

На третьем этапе у нас будет проверка на везение: с некоторой вероятностью будем выбрасывать ошибку. Эта ошибка будет означать недоступность внешнего сервиса или какую-то другую нештатную ситуацию в ходе работы нашей программы.

Это поможет нам увидеть поведение temporal в случае ошибки в activity и настроить политику ретраев и таймаутов.

Интерфейс activity:

@ActivityInterface  
public interface LuckTestModerationActivity {  
  
  @ActivityMethod  
  ModerationStatus luckyModerate(Moderation moderation);  
}

Реализация activity:

@Component  
@ActivityImpl(workers = "MODERATION_TASK_QUEUE")  
@RequiredArgsConstructor  
public class LuckTestModerationActivityImpl implements LuckTestModerationActivity {  
  
  private final ModerationService moderationService;  
  private final ModerationDao moderationDao;  
  
  @Override  
  public ModerationStatus luckyModerate(Moderation moderation) {  
    try {  
      ModerationStatus status = moderationService.luckyModeration(moderation);  
      moderationDao.updateFinalStatus(moderation.id(), true, true);  
      return status;  
    } catch (Exception e) {  
      if (Activity.getExecutionContext().getInfo().getAttempt() == 5){  
        moderationDao.updateFinalStatus(moderation.id(), true, false);  
      }  
      throw e;  
    }  
  }  
}

Код модерации:

public ModerationStatus luckyModeration(Moderation moderation) {  
  log.info("Started third stage for {}", moderation);  
  checkLuck();  
  return new ModerationStatus(true, true);  
}

private void checkLuck() {  
  if (Math.random() * 10 > 4) {  
    throw new RuntimeException("You are not lucky!");  
  }  
}

Собираем наш workflow

После того как мы описали отдельно каждый из шагов модерации самое время собрать и запустить наш процесс.

Для этого нужно создать интерфейс для процесса с определенными аннотациями:

@WorkflowInterface  
public interface ModerationWorkflow {  
  
  @WorkflowMethod  
  void moderate(Moderation moderation);  
}

Далее представлена реализация workflow:

@WorkflowImpl(taskQueues = "MODERATION_TASK_QUEUE")  
public class ModerationWorkflowImpl implements ModerationWorkflow {  
  
  /*
	Объяв��ение activity
	...
  */
  
  @Override  
  public void moderate(Moderation moderation) {  
    var symbolModerationStatus = symbolCountModerationActivity.symbolCountModerate(moderation);  
    if (!symbolModerationStatus.isApproved()) {  
      return;  
    }  
  
    var forbiddenWordsModerationStatus = forbiddenWordModerationActivity.forbiddenWordModerate(moderation);  
    if (!forbiddenWordsModerationStatus.isApproved()) {  
      return;  
    }  
  
    luckTestModerationActivity.luckyModerate(moderation);  
  }  
}

Как видим, в этом нет ничего сложного. Визуально это просто просто код с последовательным вызовом методов, но в реальности это набор атомарных шагов с ретраями и сохранением состояния каждого шага.

Важно задавать id workflow при его создании, потому что если такой процесс уже есть, то повторно он запускаться не будет. В качестве id можно выбирать идентификатор записи в бд, чтобы потом было легко понять, какую запись смотреть.
Кроме этого можно для идентификатора использовать более читабельные названия. Например, что-то вроде customer1_order_1. Этот идентификатор дает информацию о конкретном заказчике, текущем процессе (заказе) и идентификаторе заказа. В этом случае в UI temporal получать информацию о процессе сможет не только разработчик, но и аналитик или тестировщик.

Этот процесс можно запускать из любой точки входа в приложение. Выберем рест запрос для простоты. Workflow процесс может быть длительный, даже в нашем случае, если мы будем проверять картинки в посте на предмет чего-то запрещённого с помощью llm, ml или вручную. Чтобы не зависеть от длительности выполнения процесса в рест запросе асинхронно запустим процесс.

В реальности нужно подумать о том, чтобы сохранение в бд и запуск workflow не аффектили друг друга в случае ошибки, а в pet проекте можно на это забить.

Ниже представлен пример запуска процесса модерации:

public Integer startModeration(Moderation moderation) {  
  Moderation createdModeration = moderationDao.create(moderation);  
  
  ModerationWorkflow workflow = workflowClient.newWorkflowStub(  
      ModerationWorkflow.class,  
      WorkflowOptions.newBuilder()  
          .setTaskQueue(MODERATION_TASK_QUEUE)  
          .setWorkflowId(createdModeration.id().toString())  
          .build()  
  );  
  WorkflowClient.start(workflow::moderate, createdModeration);  
  
  moderationDao.updateInProgress(createdModeration.id());  
  
  return createdModeration.id();  
}

Просмотр информации этапов в GUI

У temporal есть UI, в котором можно смотреть статус и состояния текущих или завершенных процессов.

Окно с информацией о текущем workflow
Окно с информацией о текущем workflow

На следующем скрине видно, как третий этап модерации ретраится

Retry activity
Retry activity

В UI есть подробный лог событий процесса

Полный лог событий workflow
Полный лог событий workflow

Тестирование

Для тестового проекта напишем один интеграционный тест для случая успешного прохождения процесса модерации.

В temporal jdk есть необходимые инструменты для тестирования. Создаем тестовое окружение, чтобы не использовать реальный temporal.

Ниже представлена эта конфигурация:

@BeforeEach  
void setUp() {  
  applicationContext.start();  
  Worker worker = testWorkflowEnvironment.newWorker(MODERATION_TASK_QUEUE);  
  worker.registerWorkflowImplementationTypes(ModerationWorkflowImpl.class);  
  
  worker.registerActivitiesImplementations(symbolCountModerationActivity);  
  worker.registerActivitiesImplementations(forbiddenWordModerationActivity);  
  worker.registerActivitiesImplementations(luckTestModerationActivity);  
  testWorkflowEnvironment.start();  
}

Далее представлен сам тест:

@Test  
void shouldSuccessWorkflow() {  
  ModerationWorkflow workflow = workflowClient.newWorkflowStub(  
      ModerationWorkflow.class,  
      WorkflowOptions.newBuilder().setTaskQueue(MODERATION_TASK_QUEUE).build()  
  );  
  Moderation moderation = new Moderation(0, "Text", false, false);  
  moderationHandler.startModeration(moderation);  
  workflow.moderate(moderation);  
  
  await()  
      .atMost(10, SECONDS)  
      .pollInterval(1, SECONDS)  
      .until(() -> {  
        Moderation actual = moderationDao.getById(0);  
        assertThat(actual.isApproved()).isTrue();  
        return true;  
      });  
}

Кроме этого можно отдельно тестировать каждое activity, использовать моки и все, что захотите.

Заключение

В этой статье рассмотрели важные базовые моменты, связанные с temporal, а также пример проекта с этим инструментом. Этот проект можно скачать и самому потыкать: в нем есть swagger для отправки запроса, вся необходимая информация для запуска проекта есть в Readme. Скачивайте и запускайте.

Подводя итог, можно сказать, что temporal — это не просто еще одна библиотека для разработчика, а подход к построению надежных систем. Он абстрагирует всю сложность распределенных систем — обработку сбоев, повторные попытки, масштабирование — давая возможность разработчику сосредоточиться на написании бизнес-логики.

Ссылки:

P.S.
У меня есть репозиторий с полезными ссылками видео, статей и прочего. Можете найти там что-то полезное:
https://github.com/RinatBeybutov/Links-and-materials