Hangfire — это библиотека для .net (core), позволяющая асинхронно выполнять некоторый код по принципу "fire and forget". Примером такого кода может быть отправка E-Mail, обработка видео, синхронизация с другой системой и т.д. Помимо "fire and forget" есть поддержка отложенных задач, а также задач по расписанию в формате Cron.
В настоящее время существует масса подобных библиотек. Несколько преимуществ, говорящих в пользу Hangfire:
- Простая конфигурация, удобный API
- Надежность. Hangfire гарантирует, что созданная задача будет выполнена хотя бы один раз
- Возможность параллельного выполнения задач и отличная производительность
- Расширяемость (вот ей-то мы и воспользуемся ниже)
- Достаточно полная и понятная документация
- Dashboard, на котором можно видеть всю статистику о задачах
Не буду слишком вдаваться в детали, поскольку существует немало хороших статей о Hangfire и способах его применения. В этой статье я разберу, как воспользоваться поддержкой нескольких очередей (или пулов задач), как починить стандартную retry-функциональность и сделать так, чтобы каждая очередь имела индивидуальную конфигурацию.
Существующая поддержка (псевдо)-очередей
Важное замечание: в заголовке я использовал термин псевдо-очередь, потому что Hangfire не гарантирует выполнение задач в определенном порядке. Т.е. принцип "First In First Out" не действует и мы не будем на него опираться. Более того автор библиотеки рекомендует делать задачи идемпотентными, т.е. устоичивыми к непредвиденному многократному выполнению. Далее я буду использовать просто слово "очередь", т.к. в Hangfire используется термин "Queue".
В Hangfire заложена простая поддержка очередей. Хотя он и не предлагает гибкости Message Queue Systems, таких как rabbitMQ или Azure Service Bus, но этого часто вполне достаточно д��я решения широкого спектра задач.
Каждая задача имеет свойство "Queue", то есть имя очереди, в которой она должна выполняться. По умолчанию, задача отправляется в очередь с именем "default", если не указано иное. Поддержка нескольких очередей нужна для того, чтобы раздельно управлять выполнением задач разных типов. Например, мы можем захотеть, чтобы задачи по обработке видео попадали в очередь "video_queue", а рассылка E-Mail'ов в очередь "email_queue". Таким образом мы получаем возможность независимо выполнять эти два типа задач. Если мы захотим вынести обработку видео на выделенный сервер, то мы легко сможем это сделать, запустив отдельный Hangfire-сервер как консольное приложение, которое будет обрабатывать очередь "video_queue".
Перейдем к практике
Настройка Hangfire-сервера в asp.net core выглядит следующим образом:
public void Configure(IApplicationBuilder app)
{
app.UseHangfireServer(new BackgroundJobServerOptions
{
WorkerCount = 2,
Queues = new[] { "email_queue", "video_queue" }
});
}Проблема 1 — Задачи при повторе попадают в очередь "default"
Как я уже упоминал выше, в Hangfire существует очередь по умолчанию, которая называется "default". Если задача, положенная в очередь, например, "video_queue", завершилась с ошибкой и нуждается в повторе, то на повторное выполнение она будет отправлена в очередь "default", а не "video_queue" и, как следствие, наша задача будет выполняться совсем не тем экземпляром Hangfire-сервера, которым нам бы хотелось, если вообще будет. Такое поведение было мной установлено опытным путем и возможно является багом в самом Hangfire.
Job Filters
Hangfire предоставляет нам возможность расширения функционала с помощью так называемых фильтров (Job Filters), которые по принципу работы похожи на Actions Filters в ASP.NET MVC. Дело в том, что внутренняя логика Hangfire реализована как State Machine. Это движок, который поочередно переводит имеющиеся в пуле задачи из одного состояния в другое (например, created -> enqueued -> processing -> succeeded), а фильтры позволяют нам "перехватывать" выполняемую задачу при каждом изменении её состояния и производить манипуляции с ней. Фильтр реализуется как аттрибут, который может быть применен к отдельному методу, классу или глобально.
Job Parameters
В качестве аргумента в метод фильтра передается объект ElectStateContext. Этот объект содержит полную информацию о выполняемой в данный момент задаче. Среди прочего он имеет методы GetJobParameter<>(...) и SettJobParameter<>(...). Job Parameters позволяют сохранять связанную с задачей информацию в базе данных. Именно в Job Parameters и хранится имя очереди, в которую была изначально отправлена задача, только почему-то эта информация игнорируется при последующем повторе.
Решение
Итак, у нас есть задача, которая завершилась с ошибкой и должна быть отправлена на повторное выполнение в нужную очередь (в ту самую, которая была ей присвоена в момент первоначального создания). Повторение завершившейся с ошибкой задачи — это переход из состояния "failed" в состояние "enqueued". Для решения проблемы создадим фильтр, который при переходе задачи в состояние "enqueued", будет проверять в какую очередь задача была отправлена изначально и проставлять параметр "QueueName" в нужное значение:
public class HangfireUseCorrectQueueFilter
: JobFilterAttribute, IElectStateFilter
{
public void OnStateElection(ElectStateContext context)
{
if (context.CandidateState is EnqueuedState enqueuedState)
{
var queueName = context.GetJobParameter<string>("QueueName");
if (string.IsNullOrWhiteSpace(queueName))
{
context.SetJobParameter("QueueName", enqueuedState.Queue);
}
else
{
enqueuedState.Queue = queueName;
}
}
}
}Для того, чтобы применть фильтр по умолчанию ко всем задачам (то есть глобально), добавим следующий код в нашу конфигурацию:
GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });Еще одна небольшая загвоздка заключается в том, что коллекция GlobalJobFilters по умолчанию содержит экземпляр класса AutomaticRetryAttribute. Это стандартный фильтр, который отвечает за повторное выполнение неудачно завершенных задач. Он же и отправляет задачу в очередь "default", игнорируя изначальную очередь. Для того, чтобы наш велосипед поехал нужно удалить этот фильтр из коллекции и позволить нашему фильтру взять на себя ответственность за повторное выполнение задач. В результате конфигурационный код будет выглядеть так:
var defaultRetryFilter = GlobalJobFilters.Filters
.FirstOrDefault(f => f.Instance is AutomaticRetryAttribute);
if (defaultRetryFilter != null && defaultRetryFilter.Instance != null)
{
GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance);
}
GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });Необходимо отметить, что AutomaticRetryAttribute реализует логику автоматического увеличения интервала между попытками (с каждой последующей попыткой интервал увеличивается), и удаляя AutomaticRetryAttribute из коллекции GlobalJobFilters, мы отказываемся от этой функциональности (см. реализацию метода ScheduleAgainLater)
Итак, мы добились того, что наши задачи могут выполняться в разных очередях и это позволяет нам независимо управлять их выполнением, в том числе обрабатывать разные очереди на разных машинах. Только теперь мы не знаем сколько раз и с каким интервалом наши задачи будут повторяться в случае ошибки, поскольку удалили AutomaticRetryAttribute из коллекции фильтров.
Проблема 2 — Индивидуальные настройки для каждой очереди
Мы хотим иметь возможность конфигурировать интервал и количество повторений отдельно для каждой очереди, а также, если для какой-то очереди мы не указали значения явно, то хотим, чтобы применялись значения по умолчанию. Для этого мы реализуем еще один фильтр и назовем его HangfireRetryJobFilter.
В идеале, конфигурационный код должен выглядть примерно так:
GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter
{
Order = 2,
["email_queue"] = new HangfireQueueSettings
{
DelayInSeconds = 120,
RetryAttempts = 3
},
["video_queue"] = new HangfireQueueSettings
{
DelayInSeconds = 60,
RetryAttempts = 5
}
});Решение
Для этого сначала добавим класс HangfireQueueSettings, который будет служить контейнером для наших настроек.
public sealed class HangfireQueueSettings
{
public int RetryAttempts { get; set; }
public int DelayInSeconds { get; set; }
}Затем добавим реализацию самого фильтра, который при повторном выполнении задач после ошибки будет применять настройки в зависимости от конфигурации очереди и следить за количеством повторов:
public class HangfireRetryJobFilter
: JobFilterAttribute, IElectStateFilter, IApplyStateFilter
{
private readonly HangfireQueueSettings _defaultQueueSettings =
new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 };
private readonly IDictionary<string, HangfireQueueSettings> _settings
= new Dictionary<string, HangfireQueueSettings>();
public HangfireQueueSettings this[string queueName]
{
get
{
return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings)
? queueSettings
: _defaultQueueSettings;
}
set
{
_settings[queueName] = value;
}
}
public void OnStateElection(ElectStateContext context)
{
if (!(context.CandidateState is FailedState failedState))
{
// This filter accepts only failed job state.
return;
}
var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1;
var queueName = context.GetJobParameter<string>("QueueName");
if (retryAttempt <= this[queueName].RetryAttempts)
{
ScheduleAgainLater(context, retryAttempt, failedState, queueName);
}
else
{
TransitionToDeleted(context, failedState, queueName);
}
}
public void OnStateApplied(
ApplyStateContext context,
IWriteOnlyTransaction transaction)
{
if (context.NewState is ScheduledState &&
context.NewState.Reason != null &&
context.NewState.Reason.StartsWith("Retry attempt"))
{
transaction.AddToSet("retries", context.BackgroundJob.Id);
}
}
public void OnStateUnapplied(
ApplyStateContext context,
IWriteOnlyTransaction transaction)
{
if (context.OldStateName == ScheduledState.StateName)
{
transaction.RemoveFromSet("retries", context.BackgroundJob.Id);
}
}
private void ScheduleAgainLater(
ElectStateContext context,
int retryAttempt,
FailedState failedState,
string queueName)
{
context.SetJobParameter("RetryCount", retryAttempt);
var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds);
const int maxMessageLength = 50;
var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength
? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…"
: failedState.Exception.Message;
// If attempt number is less than max attempts, we should
// schedule the job to run again later.
var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}";
context.CandidateState = delay == TimeSpan.Zero
? (IState)new EnqueuedState { Reason = reason }
: new ScheduledState(delay) { Reason = reason };
}
private void TransitionToDeleted(
ElectStateContext context,
FailedState failedState,
string queueName)
{
context.CandidateState = new DeletedState
{
Reason = this[queueName].RetryAttempts > 0
? "Exceeded the maximum number of retry attempts."
: "Retries were disabled for this job."
};
}
}Примечание к коду: при реализации классаHangfireRetryJobFilterбыл взят за основу классAutomaticRetryAttributeиз Hangfire, поэтому реализация некоторых методов частично совпадает с соответствующими методами этого класса.
Проблема 3 — Как отправить задачу на выполнение в конкретную очередь?
Мне удалось найти два способа присвоить задаче очередь: задокументированный и — нет.
1-й способ — повесить на метод соответствующий атрибут
[Queue("video_queue")]
public void SomeMethod() { }
BackgroundJob.Enqueue(() => SomeMethod());http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html
2-й способ (незадокументированный) — использовать класс BackgroundJobClient
var client = new BackgroundJobClient();
client.Create(() => MyMethod(), new EnqueuedState("video_queue"));Преимущество второго способа в том, что он не создаёт лишних зависимостей от Hangfire и позволяет в процессе выполнения решать, в какую очередь должна отправиться задача. К сожалению, в официальной документации я не нашел упоминания о классе BackgroundJobClient и о том, как его применять. Второй способ я использовал в своем решении, так что он проверен на практике.
Заключение
В этой статье мы воспользовались поддержкой нескольких очередей в Hangfire для разделения обработки разных типов задач. Мы реализовали свой механизм повторения неудачно завершившихся задач с возможностью индивидуальной конфигурации для каждой очереди, расширив функциональность Hangfire с помощью Job Filters, а также научились отправлять задачи на выполнение в нужную нам очередь.
Надеюсь, эта статья окажется кому-нибудь полезной. Буду рад комментариям.
Полезные ссылки
Документация Hangfire
Исходный код Hangfire
Scott Hanselman — How to run Background Tasks in ASP.NET
