Привет, Хабр!

Меня зовут Илья Вязников, я инженер сопровождения СОФРОС. Это моя первая статья из серии публикаций, где я буду делиться практическими кейсами, полезными настройками и реальными примерами из эксплуатации платформы.

В интеграционных решениях сообщения неизбежно периодически попадают в архив: из-за сетевых ошибок, таймаутов, временной недоступности получателя или проблем валидации.

Разовый ручной возврат сообщений в обработку удобен для диагностики, но плохо масштабируется. Если проблема временная (например, недоступна внешняя система), инженеру приходится либо ждать восстановления и запускать возврат вручную, либо держать ситуацию под постоянным контролем.

В одном из проектов мы автоматизировали этот сценарий: настроили периодическую повторную обработку архивных сообщений по расписанию с ограничением числа попыток и фильтрацией по типам данных.

В статье покажу, как реализовать такой механизм в DATAREON Platform с помощью сервисного алгоритма и ArchiveFacade.

Когда это особенно полезно

  • Обмены с внешними системами с настроенными блокировками(1С, СУБД, и т.д.).

  • Высокая нагрузка и нестабильные интеграции.

Принцип работы решения

По триггеру (задание по расписанию) запускается бизнес-процесс, который:

  1. Выбирает сообщения из архива по заданным типам данных.

  2. Проверяет количество уже выполненных попыток восстановления.

  3. Если лимит не исчерпан - восстанавливает сообщение и увеличивает счётчик попыток.

  4. Логирует результат.

Решение работает через сервисный алгоритм и специальный фасад для работы с архивами ArchiveFacade.
Работу с архивом в DATAREON Platform можно осуществлять только в сервисном алгоритме.

Решение тестировалось на версии 3.1.2.4, но подход применим и для других версий платформы. 

Шаг 1. Создание бизнес-процесса

  1. Перейдите в Центр настройки → Обработка данных → Схемы обработки.

  2. Создайте новый бизнес-процесс, например: BP_ПовторнаяОбработкаИзАрхива.

  3. На вкладке Схема добавьте шаг «Сервисный алгоритм».

  4. Заполните необходимые поля для сервисного алгоритма:

    • Укажите время ожидания - обязательное поле, диапазон допустимых значений 0 мс - 1 800 000 мс (30 мин)

    • Включите опцию Только на текущем сервере если используется один сервер в конфигурации.

    • Либо укажите конкретный сервер в соответствующем поле (Необходимо в случае настройки в кластере)

  5. Вставьте код обработки архива в сервисный алгоритм:

// Восстановление из архива по списку типов с лимитом 3 попытки

// Настройка списка типов данных для повторной обработки.
var dataTypes = new List<string>
{
    "Тип1",
    "Тип2"
};

// Настройка максимального количества попыток повторной обработки и наименование свойства для их отслеживания.
const int MAX_ATTEMPTS = 3;
const string ATTEMPT_PROPERTY = "ArchiveRestoreAttempts";

// Обработка
Logger.Info($"Запуск восстановления из архива (лимит {MAX_ATTEMPTS} попыток)");

int totalRestored = 0;
int totalSkipped = 0;

foreach (var typeName in dataTypes)
{
    var filter = new DataStorageFilter
    {
        TypeName = typeName,
        Count = 0,
        WithBody = true
    };

    var archivedMessages = ArchiveFacade.Select(filter);

    Logger.Info($"Тип '{typeName}': найдено {archivedMessages.Count} сообщений в архиве");

    foreach (var archMsg in archivedMessages)
    {
        int attempts = 0;

        if (archMsg.OriginalMessage?.Properties != null)
        {
            var props = archMsg.OriginalMessage.Properties;
            if (props.Has(ATTEMPT_PROPERTY))
            {
                attempts = props.GetValue<int>(ATTEMPT_PROPERTY);
            }
        }

        if (attempts >= MAX_ATTEMPTS)
        {
            Logger.Info($"Пропущено (лимит исчерпан): ID сообщения ={archMsg.Data.Id}, попыток={attempts}");
            totalSkipped++;
            continue;
        }

        // Подготавливаем свойства для восстановления (только счётчик)
        var restoreProps = new PropertiesCollection();
        restoreProps.AddValue(ATTEMPT_PROPERTY, attempts + 1);

        // Восстанавливаем сообщение и передаём обновлённое свойство
        ArchiveFacade.Restore(archMsg.Data.Id, archMsg.TargetId, restoreProps);

        totalRestored++;
        Logger.Info($"Восстановлено: ID={archMsg.Data.Id}, попытка {attempts + 1}/{MAX_ATTEMPTS}");
    }
}

Logger.Info($"Обработка архива завершена. ИТОГО за запуск: восстановлено — {totalRestored}, пропущено (лимит исчерпан) — {totalSkipped}");

Рекомендации по настройке:

  • В списке dataTypes - указывайте типы сообщений, для которых необходима повторная обработка.

  • В константе MAX_ATTEMPTS укажите количество попыток повторной отправки.

  • Осторожно с Count = 0 - При большом архиве это может дать тяжёлую выборку. При большом количестве сообщений в архиве можно ограничиться меньшим объемом сообщений за заход, например 200.

  • В коде используется логирование в раздел журнала “пользовательские данные” используя методы Logger. В продуктивной среде рекомендуется закомментировать большую часть логов чтобы не нагружать систему. Например, вы можете оставить только первый и последний лог о запуске и конце обработки архива.

Сохраните процесс и примените изменения.

Шаг 2. Создание задания по расписанию

  1. Перейдите в Обработка данных → Задания по расписанию.

  2. Создайте новое задание, например: TG_ПовторнаяОбработкаИзАрхива.

  3. Настройте удобное расписание. В примере настроено расписание на запуск процесса каждый час, рекомендуется запускать не чаще, чем раз в 10 минут, чтобы не перегружать систему.

  4. На вкладке Схемы обработки добавьте в «Выбранные схемы» созданный бизнес-процесс BP_ПовторнаяОбработкаИзАрхива.

  5. Сохраните и примените конфигурацию.

Ожидаемый результат

После успешного выполнения этих шагов каждый час по триггеру будет выполняться повторная обработка сообщений из архива.

После запуска задания, вы сможете отследить выполнение в Центре мониторинга в журнале сервера в разделе Пользовательские данные. 

Вы увидите примерно следующее:

При исчерпании лимита попыток сообщения пропускаются и логируются отдельно. Это защищает систему от зацикливания на «проблемных» сообщениях.

Дополнительные варианты использования

Учитывая гибкую настройку фильтров для работы с архивом, можно настроить обработку сообщений для различных кейсов, например:

1. Удаление сообщений из архива по событию

Можно автоматически удалять сообщения с определенным событием. В данном случае по событию “ОбработкаОтменена”. Такой кейс актуален для конфигураций на Platform до версии 3.2.0.0, т.к. начиная с этой версии в Platform появилась возможность не отправлять сообщения с этим событием в архив:

//Удаление сообщений с отменой обработки

Logger.Info($"Удаление из архива отменённых сообщений (EventType = Cancel)");

int deletedCount = 0;

var cancelFilter = new DataStorageFilter
{
    EventType = MessageEventType.Cancel,
    Count = 0,
    WithBody = false
};

var cancelArchivedMessages = ArchiveFacade.Select(cancelFilter);

Logger.Info($"Найдено сообщений с EventType = Cancel: {cancelArchivedMessages.Count}");

foreach (var archMsg in cancelArchivedMessages)
{
    // string msgInfo = $"ID={archMsg.Data.Id}, TargetId={archMsg.TargetId}, InsertDate={archMsg.InsertDate:yyyy-MM-dd HH:mm:ss}"; // Для тестирования
    try
    {
        ArchiveFacade.Remove(archMsg.Data.Id, archMsg.TargetId);
        
        deletedCount++;
        // Logger.Info($"Удалено: {msgInfo}");
    }
    catch (Exception ex)
    {
        Logger.Error($"Ошибка при удалении сообщения {archMsg.Data.Id}: {ex.Message}");
    }
}

2. Повторная обработка всех сообщений из архива

Для случаев когда фильтрация по типам не нужна и необходимо переотправлять абсолютно все сообщения из архива, можно использовать фильтр:

var restoreAllFilter = new DataStorageFilter
{
    Count = 0,
    WithBody = true
};

3. Работа в кластере

Для кластера, в случае если необходимо восстанавливать сообщения из архивов разных серверов, необходимо добавить столько шагов сервисного алгоритма, сколько серверов в кластере.

В настройках каждого шага сервисного алгоритма необходимо указать соответствующий сервер на котором он будет выполняться.

На схеме связать шаги сервисных алгоритмов для их параллельного выполнения (Из выхода ↓). Например:

Заключение

Такой механизм особенно полезен в интеграциях с нестабильными внешними системами, где большая часть ошибок носит временный характер.

Автоматическая переотправка позволяет сократить ручные операции поддержки и уменьшить время доставки сообщений без вмешательства инженера.

Решение гибкое: легко добавлять новые типы данных, менять лимиты и расширять логику.

Для тех кто хочет подробнее ознакомиться с функционалом описанным в статье - полезные ссылки: