
Привет, Хабр!
Меня зовут Илья Вязников, я инженер сопровождения СОФРОС. Это моя первая статья из серии публикаций, где я буду делиться практическими кейсами, полезными настройками и реальными примерами из эксплуатации платформы.
В интеграционных решениях сообщения неизбежно периодически попадают в архив: из-за сетевых ошибок, таймаутов, временной недоступности получателя или проблем валидации.
Разовый ручной возврат сообщений в обработку удобен для диагностики, но плохо масштабируется. Если проблема временная (например, недоступна внешняя система), инженеру приходится либо ждать восстановления и запускать возврат вручную, либо держать ситуацию под постоянным контролем.
В одном из проектов мы автоматизировали этот сценарий: настроили периодическую повторную обработку архивных сообщений по расписанию с ограничением числа попыток и фильтрацией по типам данных.
В статье покажу, как реализовать такой механизм в DATAREON Platform с помощью сервисного алгоритма и ArchiveFacade.
Когда это особенно полезно
Обмены с внешними системами с настроенными блокировками(1С, СУБД, и т.д.).
Высокая нагрузка и нестабильные интеграции.
Принцип работы решения
По триггеру (задание по расписанию) запускается бизнес-процесс, который:
Выбирает сообщения из архива по заданным типам данных.
Проверяет количество уже выполненных попыток восстановления.
Если лимит не исчерпан - восстанавливает сообщение и увеличивает счётчик попыток.
Логирует результат.
Решение работает через сервисный алгоритм и специальный фасад для работы с архивами ArchiveFacade.
Работу с архивом в DATAREON Platform можно осуществлять только в сервисном алгоритме.
Решение тестировалось на версии 3.1.2.4, но подход применим и для других версий платформы.
Шаг 1. Создание бизнес-процесса
Перейдите в Центр настройки → Обработка данных → Схемы обработки.
Создайте новый бизнес-процесс, например: BP_ПовторнаяОбработкаИзАрхива.

На вкладке Схема добавьте шаг «Сервисный алгоритм».

Заполните необходимые поля для сервисного алгоритма:
Укажите время ожидания - обязательное поле, диапазон допустимых значений 0 мс - 1 800 000 мс (30 мин)
Включите опцию Только на текущем сервере если используется один сервер в конфигурации.
Либо укажите конкретный сервер в соответствующем поле (Необходимо в случае настройки в кластере)

Вставьте код обработки архива в сервисный алгоритм:
// Восстановление из архива по списку типов с лимитом 3 попытки // Настройка списка типов данных для повторной обработки. var dataTypes = new List<string> { "Тип1", "Тип2" }; // Настройка максимального количества попыток повторной обработки и наименование свойства для их отслеживания. const int MAX_ATTEMPTS = 3; const string ATTEMPT_PROPERTY = "ArchiveRestoreAttempts"; // Обработка Logger.Info($"Запуск восстановления из архива (лимит {MAX_ATTEMPTS} попыток)"); int totalRestored = 0; int totalSkipped = 0; foreach (var typeName in dataTypes) { var filter = new DataStorageFilter { TypeName = typeName, Count = 0, WithBody = true }; var archivedMessages = ArchiveFacade.Select(filter); Logger.Info($"Тип '{typeName}': найдено {archivedMessages.Count} сообщений в архиве"); foreach (var archMsg in archivedMessages) { int attempts = 0; if (archMsg.OriginalMessage?.Properties != null) { var props = archMsg.OriginalMessage.Properties; if (props.Has(ATTEMPT_PROPERTY)) { attempts = props.GetValue<int>(ATTEMPT_PROPERTY); } } if (attempts >= MAX_ATTEMPTS) { Logger.Info($"Пропущено (лимит исчерпан): ID сообщения ={archMsg.Data.Id}, попыток={attempts}"); totalSkipped++; continue; } // Подготавливаем свойства для восстановления (только счётчик) var restoreProps = new PropertiesCollection(); restoreProps.AddValue(ATTEMPT_PROPERTY, attempts + 1); // Восстанавливаем сообщение и передаём обновлённое свойство ArchiveFacade.Restore(archMsg.Data.Id, archMsg.TargetId, restoreProps); totalRestored++; Logger.Info($"Восстановлено: ID={archMsg.Data.Id}, попытка {attempts + 1}/{MAX_ATTEMPTS}"); } } Logger.Info($"Обработка архива завершена. ИТОГО за запуск: восстановлено — {totalRestored}, пропущено (лимит исчерпан) — {totalSkipped}");
Рекомендации по настройке:
В списке dataTypes - указывайте типы сообщений, для которых необходима повторная обработка.
В константе MAX_ATTEMPTS укажите количество попыток повторной отправки.
Осторожно с Count = 0 - При большом архиве это может дать тяжёлую выборку. При большом количестве сообщений в архиве можно ограничиться меньшим объемом сообщений за заход, например 200.
В коде используется логирование в раздел журнала “пользовательские данные” используя методы Logger. В продуктивной среде рекомендуется закомментировать большую часть логов чтобы не нагружать систему. Например, вы можете оставить только первый и последний лог о запуске и конце обработки архива.
Сохраните процесс и примените изменения.
Шаг 2. Создание задания по расписанию
Перейдите в Обработка данных → Задания по расписанию.
Создайте новое задание, например: TG_ПовторнаяОбработкаИзАрхива.

Настройте удобное расписание. В примере настроено расписание на запуск процесса каждый час, рекомендуется запускать не чаще, чем раз в 10 минут, чтобы не перегружать систему.
На вкладке Схемы обработки добавьте в «Выбранные схемы» созданный бизнес-процесс BP_ПовторнаяОбработкаИзАрхива.

Сохраните и примените конфигурацию.
Ожидаемый результат
После успешного выполнения этих шагов каждый час по триггеру будет выполняться повторная обработка сообщений из архива.
После запуска задания, вы сможете отследить выполнение в Центре мониторинга в журнале сервера в разделе Пользовательские данные.
Вы увидите примерно следующее:

При исчерпании лимита попыток сообщения пропускаются и логируются отдельно. Это защищает систему от зацикливания на «проблемных» сообщениях.

Дополнительные варианты использования
Учитывая гибкую настройку фильтров для работы с архивом, можно настроить обработку сообщений для различных кейсов, например:
1. Удаление сообщений из архива по событию
Можно автоматически удалять сообщения с определенным событием. В данном случае по событию “ОбработкаОтменена”. Такой кейс актуален для конфигураций на Platform до версии 3.2.0.0, т.к. начиная с этой версии в Platform появилась возможность не отправлять сообщения с этим событием в архив:
//Удаление сообщений с отменой обработки Logger.Info($"Удаление из архива отменённых сообщений (EventType = Cancel)"); int deletedCount = 0; var cancelFilter = new DataStorageFilter { EventType = MessageEventType.Cancel, Count = 0, WithBody = false }; var cancelArchivedMessages = ArchiveFacade.Select(cancelFilter); Logger.Info($"Найдено сообщений с EventType = Cancel: {cancelArchivedMessages.Count}"); foreach (var archMsg in cancelArchivedMessages) { // string msgInfo = $"ID={archMsg.Data.Id}, TargetId={archMsg.TargetId}, InsertDate={archMsg.InsertDate:yyyy-MM-dd HH:mm:ss}"; // Для тестирования try { ArchiveFacade.Remove(archMsg.Data.Id, archMsg.TargetId); deletedCount++; // Logger.Info($"Удалено: {msgInfo}"); } catch (Exception ex) { Logger.Error($"Ошибка при удалении сообщения {archMsg.Data.Id}: {ex.Message}"); } }
2. Повторная обработка всех сообщений из архива
Для случаев когда фильтрация по типам не нужна и необходимо переотправлять абсолютно все сообщения из архива, можно использовать фильтр:
var restoreAllFilter = new DataStorageFilter { Count = 0, WithBody = true };
3. Работа в кластере
Для кластера, в случае если необходимо восстанавливать сообщения из архивов разных серверов, необходимо добавить столько шагов сервисного алгоритма, сколько серверов в кластере.
В настройках каждого шага сервисного алгоритма необходимо указать соответствующий сервер на котором он будет выполняться.
На схеме связать шаги сервисных алгоритмов для их параллельного выполнения (Из выхода ↓). Например:

Заключение
Такой механизм особенно полезен в интеграциях с нестабильными внешними системами, где большая часть ошибок носит временный характер.
Автоматическая переотправка позволяет сократить ручные операции поддержки и уменьшить время доставки сообщений без вмешательства инженера.
Решение гибкое: легко добавлять новые типы данных, менять лимиты и расширять логику.
Для тех кто хочет подробнее ознакомиться с функционалом описанным в статье - полезные ссылки:
