Хабр Курсы для всех
РЕКЛАМА
Практикум, Хекслет, SkyPro, авторские курсы — собрали всех и попросили скидки. Осталось выбрать!
Что-то мне это болезненно напоминает модель акторов (см Akka.net или Orleans).
Данный подход использования очередей позволяет из коробки управлять задачами, то есть для того что бы задача запустилась на одном из сервисов, а потом релоцировалась от сервиса к сервису, в том случае если сервисы отказывают, не нужно писать вообще ни какой логики и кода.
Ну так вроде бы и в модели акторов то же самое: актор умирает, сообщение не обработано, оно попадает обратно в диспетчер, тот перезапускает актора, переотправляет сообщение. Ничего в акторе для этого писать не надо.
А акторная модель, в частности Orleans, предназначена больше для обеспечения statefull бизнес-логики в облачных серверах.
Ээээ… с чего бы вдруг? Модель акторов предназначена для обработки сообщений, а облака там вообще не при чем.
Ну то есть да, конечно, я с этим копался года два или больше назад, и мог все забыть и перепутать, врать не буду. Но для меня схожесть очевидна.
public class ObserverConsumer : IConsumer<IStartJob> //IConsumer - MassTransit
{
private readonly ObserverJob _observerJob;
public JobConsumer(observerJob ourJob)
{
_observerJob = observerJob;
}
public async Task Consume(ConsumeContext<IStartJob> context)
{
var jobArg = context.Message;
//Запуск (бесконечной) задачи.
//Важно сохранить токен отмены в памяти, тогда другим сообщением мы можем вызвать его и завершить задачу.
_observerJob.Execute(jobArg, new CancellationTokenSource().Token);
}
}cfg.ReceiveEndpoint("observer-queue-name", e =>
{
e.UseMessageRetry(r => r.Immediate(int.MaxValue));
//а тут мы задаем сколько сервис может обрабатывать задач.
e.PrefetchCount = serviceOptions.MaxWorkersCount;
//Подключаем обработчик сообщений.
e.ConfigureConsumer<ObserverConsumer>(provider);
});
Оркестратор бесконечных задач