Как стать автором
Обновить

Событийный диспетчер отложенных задач на C#: консолидация и дедупликация данных в текущей инстанции

Уровень сложностиСредний
Время на прочтение4 мин
Количество просмотров2.7K

Всем привет, меня зовут Артур Богданов. Я занимаюсь проектированием, разработкой и оптимизацией высоконагруженных веб-приложений на ASP.NET Core.

Хочу поделиться с сообществом своей разработкой, которая позволяет использовать несколько фоновых задач (или «раннеров») для отложенной обработки консолидированных данных. Раннеры построены на шаблоне PubSub для асинхронного ожидания новых задач, что делает этот подход более реактивным, но менее ресурсоемким.

Отличительное преимущество и области применения

Решение позволяет производить консолидацию данных в текущей инстанции с возможностью вариативного проведения дедупликации или любых других операций на усмотрение разработчика, что может сократить ресурсы при дальнейшей передаче и обработке, а также увеличить быстродействие.

Оно было применено в моей работе по масштабированию WebSocket'ов в рамках микросервисной архитектуры, где консолидация и дедупликация событий перед их непосредственной отправкой оказали немаловажное влияние на быстродействие и потребление ресурсов за счет снижения накладных расходов.

В дальнейшем решение может использоваться в любых других подобных задач, поскольку получилось достаточно универсальным.

Пример использования

1️⃣ Внедряем Singleton зависимость с требуемым типом данных:

services.AddSingleton<IDeferredTaskManagerService<object>, DeferredTaskManagerService<object>>();

2️⃣Создаём фоновую службу с указанием необходимых параметров:

internal sealed class EventManagerService : BackgroundService
{
    private readonly IDeferredTaskManagerService<object> _deferredTaskManager;

    public EventManagerService(IDeferredTaskManagerService<object> deferredTaskManager)
    {
        _deferredTaskManager = deferredTaskManager ?? throw new ArgumentNullException(nameof(deferredTaskManager));
    }

    protected override Task ExecuteAsync(CancellationToken cancellationToken)
    {
        Func<List<object>, CancellationToken, Task> taskDelegate = (events, cancellationToken) =>
        {
            return Task.Delay(1000000, cancellationToken);
        };

        Func<List<object>, CancellationToken, Task> taskDelegateRetryExhausted = async (events, cancellationToken) =>
        {
            Console.WriteLine("Something went wrong...");
        };

        var dtmOptions = new DeferredTaskManagerOptions<string>
        {
            TaskFactory = taskDelegate,
            PoolSize = 1,
            CollectionType = CollectionType.Queue,
            SendDelayOptions = new SendDelayOptions()
            {
                MillisecondsSendDelay = 60000,
                ConsiderDifference = true
            },
            RetryOptions = new RetryOptions<string>
            {
                RetryCount = 3,
                MillisecondsRetryDelay = 10000,
                TaskFactoryRetryExhausted = taskDelegateRetryExhausted
            }
        };

        return Task.Run(() => _deferredTaskManager.StartAsync(dtmOptions, cancellationToken), cancellationToken);
    }
}

Вот некоторые пояснения к вышеизложенному коду:

⚪ TaskFactory — делегат для кастомной логики

Вся кастомная логика размещается в делегате TaskFactory, в который приходит коллекция консолидированных событий. Именно здесь можно осуществить необходимые операции над ними перед дальнейшей передачей/обработкой. Также в делегате можно обработать исключения (это актуально, если события обрабатываются по отдельности), отправив необработанные события на следующий заход после указанной в параметрах временной задержки MillisecondsRetryDelay.

try
{
    // Кастомная операция над полученными событиями
}
catch (Exception ex)
{
    events.RemoveRange(successEvents);

    // Можно выдать исключение после удаления успешно завершенных эвентов
    // или добавить собственные условия
    throw new Exception("Отправка на повторную попытку после исключения");
}

⚪ PoolSize — размер пула (количество доступных раннеров)

Размер пула вариативен и подбирается разработчиком для конкретного спектра задач, ориентируясь на скорость выполнения и количество потребляемых ресурсов.

⚪ CollectionType — тип коллекции

Можно указать тип коллекции хранения эвентов: «Bag» для неупорядоченной коллекции объектов (это работает быстрее) или «Queue» для упорядоченной коллекции объектов. Использовать «Queue» целесообразно только в том случае, если PoolSize = 1, в противном случае порядок выполнения не гарантирован.

⚪ SendDelayOptions — настройка отправки событий через временной интервал

Настраивает отправку добавленных событий на обработку через определенный промежуток времени с возможностью переменного вычета времени предыдущей операции. Имеет смысл указывать, когда для добавления событий используется метод AddWithoutSend, который добавляет события без отправки на обработку.

⚪ RetryOptions — настройка обработки исключений

Вы также можете передать делегат обработки ошибок, который сработает, когда будет исчерпано указанное количество повторных попыток.

3️⃣ Получаем внедренную зависимость и осуществляем добавление события(й):

_deferredTaskManager.Add(events);

Альтернативные варианты использования

DeferredTaskManager можно использовать как обычное хранилище событий, получая эвенты по требованию методом GetEventsAndClearStorage минуя раннеры, или осуществлять отправку доступных событий в делегат любому доступному раннеру по требованию — с помощью метода SendEvents.

Исходники и NuGet пакет

Мне интересно, насколько в целом актуален такой подход, имеет ли место быть? Возможно, что существуют альтернативные решения, которые были бы проще в реализации?

Теги:
Хабы:
Всего голосов 5: ↑1 и ↓4-1
Комментарии13

Публикации

Работа

Ближайшие события