Hangfire — это библиотека для .net (core), позволяющая асинхронно выполнять некоторый код по принципу "fire and forget". Примером такого кода может быть отправка E-Mail, обработка видео, синхронизация с другой системой и т.д. Помимо "fire and forget" есть поддержка отложенных задач, а также задач по расписанию в формате Cron.


В настоящее время существует масса подобных библиотек. Несколько преимуществ, говорящих в пользу Hangfire:


  • Простая конфигурация, удобный API
  • Надежность. Hangfire гарантирует, что созданная задача будет выполнена хотя бы один раз
  • Возможность параллельного выполнения задач и отличная производительность
  • Расширяемость (вот ей-то мы и воспользуемся ниже)
  • Достаточно полная и понятная документация
  • Dashboard, на котором можно видеть всю статистику о задачах

Не буду слишком вдаваться в детали, поскольку существует немало хороших статей о Hangfire и способах его применения. В этой статье я разберу, как воспользоваться поддержкой нескольких очередей (или пулов задач), как починить стандартную retry-функциональность и сделать так, чтобы каждая очередь имела индивидуальную конфигурацию.


Существующая поддержка (псевдо)-очередей


Важное замечание: в заголовке я использовал термин псевдо-очередь, потому что Hangfire не гарантирует выполнение задач в определенном порядке. Т.е. принцип "First In First Out" не действует и мы не будем на него опираться. Более того автор библиотеки рекомендует делать задачи идемпотентными, т.е. устоичивыми к непредвиденному многократному выполнению. Далее я буду использовать просто слово "очередь", т.к. в Hangfire используется термин "Queue".


В Hangfire заложена простая поддержка очередей. Хотя он и не предлагает гибкости Message Queue Systems, таких как rabbitMQ или Azure Service Bus, но этого часто вполне достаточно д��я решения широкого спектра задач.


Каждая задача имеет свойство "Queue", то есть имя очереди, в которой она должна выполняться. По умолчанию, задача отправляется в очередь с именем "default", если не указано иное. Поддержка нескольких очередей нужна для того, чтобы раздельно управлять выполнением задач разных типов. Например, мы можем захотеть, чтобы задачи по обработке видео попадали в очередь "video_queue", а рассылка E-Mail'ов в очередь "email_queue". Таким образом мы получаем возможность независимо выполнять эти два типа задач. Если мы захотим вынести обработку видео на выделенный сервер, то мы легко сможем это сделать, запустив отдельный Hangfire-сервер как консольное приложение, которое будет обрабатывать очередь "video_queue".


Перейдем к практике


Настройка Hangfire-сервера в asp.net core выглядит следующим образом:


public void Configure(IApplicationBuilder app)
{
    app.UseHangfireServer(new BackgroundJobServerOptions
    {
        WorkerCount = 2,
        Queues = new[] { "email_queue", "video_queue" }
    });
}

Проблема 1 — Задачи при повторе попадают в очередь "default"


Как я уже упоминал выше, в Hangfire существует очередь по умолчанию, которая называется "default". Если задача, положенная в очередь, например, "video_queue", завершилась с ошибкой и нуждается в повторе, то на повторное выполнение она будет отправлена в очередь "default", а не "video_queue" и, как следствие, наша задача будет выполняться совсем не тем экземпляром Hangfire-сервера, которым нам бы хотелось, если вообще будет. Такое поведение было мной установлено опытным путем и возможно является багом в самом Hangfire.


Job Filters


Hangfire предоставляет нам возможность расширения функционала с помощью так называемых фильтров (Job Filters), которые по принципу работы похожи на Actions Filters в ASP.NET MVC. Дело в том, что внутренняя логика Hangfire реализована как State Machine. Это движок, который поочередно переводит имеющиеся в пуле задачи из одного состояния в другое (например, created -> enqueued -> processing -> succeeded), а фильтры позволяют нам "перехватывать" выполняемую задачу при каждом изменении её состояния и производить манипуляции с ней. Фильтр реализуется как аттрибут, который может быть применен к отдельному методу, классу или глобально.


Job Parameters


В качестве аргумента в метод фильтра передается объект ElectStateContext. Этот объект содержит полную информацию о выполняемой в данный момент задаче. Среди прочего он имеет методы GetJobParameter<>(...) и SettJobParameter<>(...). Job Parameters позволяют сохранять связанную с задачей информацию в базе данных. Именно в Job Parameters и хранится имя очереди, в которую была изначально отправлена задача, только почему-то эта информация игнорируется при последующем повторе.


Решение


Итак, у нас есть задача, которая завершилась с ошибкой и должна быть отправлена на повторное выполнение в нужную очередь (в ту самую, которая была ей присвоена в момент первоначального создания). Повторение завершившейся с ошибкой задачи — это переход из состояния "failed" в состояние "enqueued". Для решения проблемы создадим фильтр, который при переходе задачи в состояние "enqueued", будет проверять в какую очередь задача была отправлена изначально и проставлять параметр "QueueName" в нужное значение:


public class HangfireUseCorrectQueueFilter 
    : JobFilterAttribute, IElectStateFilter
{
    public void OnStateElection(ElectStateContext context)
    {
        if (context.CandidateState is EnqueuedState enqueuedState)
        {
            var queueName = context.GetJobParameter<string>("QueueName");

            if (string.IsNullOrWhiteSpace(queueName))
            {
                context.SetJobParameter("QueueName", enqueuedState.Queue);
            }
            else
            {
                enqueuedState.Queue = queueName;
            }
        }
    }
}

Для того, чтобы применть фильтр по умолчанию ко всем задачам (то есть глобально), добавим следующий код в нашу конфигурацию:


GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

Еще одна небольшая загвоздка заключается в том, что коллекция GlobalJobFilters по умолчанию содержит экземпляр класса AutomaticRetryAttribute. Это стандартный фильтр, который отвечает за повторное выполнение неудачно завершенных задач. Он же и отправляет задачу в очередь "default", игнорируя изначальную очередь. Для того, чтобы наш велосипед поехал нужно удалить этот фильтр из коллекции и позволить нашему фильтру взять на себя ответственность за повторное выполнение задач. В результате конфигурационный код будет выглядеть так:


var defaultRetryFilter = GlobalJobFilters.Filters
    .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute);

if (defaultRetryFilter != null && defaultRetryFilter.Instance != null)
{
    GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance);
}

GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

Необходимо отметить, что AutomaticRetryAttribute реализует логику автоматического увеличения интервала между попытками (с каждой последующей попыткой интервал увеличивается), и удаляя AutomaticRetryAttribute из коллекции GlobalJobFilters, мы отказываемся от этой функциональности (см. реализацию метода ScheduleAgainLater)


Итак, мы добились того, что наши задачи могут выполняться в разных очередях и это позволяет нам независимо управлять их выполнением, в том числе обрабатывать разные очереди на разных машинах. Только теперь мы не знаем сколько раз и с каким интервалом наши задачи будут повторяться в случае ошибки, поскольку удалили AutomaticRetryAttribute из коллекции фильтров.


Проблема 2 — Индивидуальные настройки для каждой очереди


Мы хотим иметь возможность конфигурировать интервал и количество повторений отдельно для каждой очереди, а также, если для какой-то очереди мы не указали значения явно, то хотим, чтобы применялись значения по умолчанию. Для этого мы реализуем еще один фильтр и назовем его HangfireRetryJobFilter.


В идеале, конфигурационный код должен выглядть примерно так:


GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter
{
    Order = 2,
    ["email_queue"] = new HangfireQueueSettings
    {
        DelayInSeconds = 120,
        RetryAttempts = 3
    },
    ["video_queue"] = new HangfireQueueSettings
    {
        DelayInSeconds = 60,
        RetryAttempts = 5
    }
});

Решение


Для этого сначала добавим класс HangfireQueueSettings, который будет служить контейнером для наших настроек.


public sealed class HangfireQueueSettings
{
    public int RetryAttempts { get; set; }
    public int DelayInSeconds { get; set; }
}

Затем добавим реализацию самого фильтра, который при повторном выполнении задач после ошибки будет применять настройки в зависимости от конфигурации очереди и следить за количеством повторов:


public class HangfireRetryJobFilter 
        : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
{
    private readonly HangfireQueueSettings _defaultQueueSettings = 
        new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 };

    private readonly IDictionary<string, HangfireQueueSettings> _settings 
        = new Dictionary<string, HangfireQueueSettings>();

    public HangfireQueueSettings this[string queueName]
    {
        get
        {
            return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) 
                ? queueSettings 
                : _defaultQueueSettings;
        }
        set
        {
            _settings[queueName] = value;
        }
    }

    public void OnStateElection(ElectStateContext context)
    {
        if (!(context.CandidateState is FailedState failedState))
        {
            // This filter accepts only failed job state.
            return;
        }

        var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1;
        var queueName = context.GetJobParameter<string>("QueueName");

        if (retryAttempt <= this[queueName].RetryAttempts)
        {
            ScheduleAgainLater(context, retryAttempt, failedState, queueName);
        }
        else
        {
            TransitionToDeleted(context, failedState, queueName);
        }
    }

    public void OnStateApplied(
        ApplyStateContext context, 
        IWriteOnlyTransaction transaction)
    {
        if (context.NewState is ScheduledState &&
            context.NewState.Reason != null &&
            context.NewState.Reason.StartsWith("Retry attempt"))
        {
            transaction.AddToSet("retries", context.BackgroundJob.Id);
        }
    }

    public void OnStateUnapplied(
        ApplyStateContext context, 
        IWriteOnlyTransaction transaction)
    {
        if (context.OldStateName == ScheduledState.StateName)
        {
            transaction.RemoveFromSet("retries", context.BackgroundJob.Id);
        }
    }

    private void ScheduleAgainLater(
        ElectStateContext context, 
        int retryAttempt, 
        FailedState failedState, 
        string queueName)
    {
        context.SetJobParameter("RetryCount", retryAttempt);

        var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds);

        const int maxMessageLength = 50;

        var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength
            ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…"
            : failedState.Exception.Message;

        // If attempt number is less than max attempts, we should
        // schedule the job to run again later.

        var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}";

        context.CandidateState = delay == TimeSpan.Zero
            ? (IState)new EnqueuedState { Reason = reason }
            : new ScheduledState(delay) { Reason = reason };
    }

    private void TransitionToDeleted(
        ElectStateContext context, 
        FailedState failedState, 
        string queueName)
    {
        context.CandidateState = new DeletedState
        {
            Reason = this[queueName].RetryAttempts > 0
                ? "Exceeded the maximum number of retry attempts."
                : "Retries were disabled for this job."
        };
    }
}

Примечание к коду: при реализации класса HangfireRetryJobFilter был взят за основу класс AutomaticRetryAttribute из Hangfire, поэтому реализация некоторых методов частично совпадает с соответствующими методами этого класса.

Проблема 3 — Как отправить задачу на выполнение в конкретную очередь?


Мне удалось найти два способа присвоить задаче очередь: задокументированный и — нет.


1-й способ — повесить на метод соответствующий атрибут


[Queue("video_queue")]
public void SomeMethod() { }

BackgroundJob.Enqueue(() => SomeMethod());

http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html


2-й способ (незадокументированный) — использовать класс BackgroundJobClient


var client = new BackgroundJobClient();
client.Create(() => MyMethod(), new EnqueuedState("video_queue"));

Преимущество второго способа в том, что он не создаёт лишних зависимостей от Hangfire и позволяет в процессе выполнения решать, в какую очередь должна отправиться задача. К сожалению, в официальной документации я не нашел упоминания о классе BackgroundJobClient и о том, как его применять. Второй способ я использовал в своем решении, так что он проверен на практике.


Заключение


В этой статье мы воспользовались поддержкой нескольких очередей в Hangfire для разделения обработки разных типов задач. Мы реализовали свой механизм повторения неудачно завершившихся задач с возможностью индивидуальной конфигурации для каждой очереди, расширив функциональность Hangfire с помощью Job Filters, а также научились отправлять задачи на выполнение в нужную нам очередь.


Надеюсь, эта статья окажется кому-нибудь полезной. Буду рад комментариям.


Полезные ссылки


Документация Hangfire
Исходный код Hangfire
Scott Hanselman — How to run Background Tasks in ASP.NET