Привет! С вами снова писатель-программист из компании Simpl Group (да, без e).
Совсем недавно я выступала на нашем внутреннем Meet Up — уже 6-м, между прочим, — и рассказала своим коллегам занимательную историю, которую поведаю сегодня и вам. Не про ведьм и демонов, конечно, как в моей книге. А про цирк — цифровой цирк, в котором задачи прыгают через обручи, катаются на велосипедах и не падают.
Или, по крайней мере, мы стараемся, чтобы не падали.
(К слову, книгу тоже можете почитать: «Пороки», Ingini)
Представим ситуацию
У вас есть очередь, которая умеет выполнять только один трюк — например, отправлять задачи в расчёт. Всё, как в старом цирке: один артист, один номер.
Но однажды появляется новая задача. И за ней ещё одна. И вот уже зрители хотят видеть не только прыжки через обруч, но и медведя на велосипеде, жонглёров, акробатов и даже кибердракона с машинным обучением.
Значит, мы должны перестроить всё шоу. Так, чтобы:
Добавлять новых артистов минимальными усилиями;
Не перестраивать манеж каждый раз;
Всё работало: надёжно, масштабируемо и эффектно.
Кто в нашем цирке — участники шоу
Представим, что у нас есть две задачи:
Одна — это лошадки, прыгающие через обруч;
Вторая — это мишка на велосипеде, проезжающий то в одну, то в другую сторону.
Что есть общего в этих двух выступлениях?
Контроллеры — дрессировщики, подающиеся голосом: «Вперёд!».
Репозитории — реквизиторы, вытаскивающие снаряжение из склада (БД).
Очереди — манеж, где артисты ждут, пока их объявят.
Экзекьютор — тёмный коридор, ведущий от кулис к свету рампы.
Менеджеры — двери между закулисьем и сценой.
Другие сервисы — собственно сцена, где и происходит номер.
Но есть нюанс:
Лошадки прыгают по одной.
Медведи катаются по трое.
Кто-то выходит через Kafka, кто-то — через HTTP.
По сути мы имеем парочку небольших различий, для которых будет неверно создавать второй сервис или писать почти аналогичный код. Поэтому представим, что кулисы и дрессировщики у мишек и лошадок одни и те же, а двери, актеры и сцены разные.
Как устроен наш манеж — Архитектура
Теперь, когда мы поняли, кто у нас выступает, давайте посмотрим, как работает наш цирк изнутри. Что там в кулисах, и почему никто не спотыкается?
Идея проста:
Сделать единый цирк, куда можно легко впустить любого нового артиста: хоть медведя, хоть жонглёра, ну и да, кибердракона тоже.
В базе
У каждой задачи есть два слоя костюма:
Общий: ID, тип, статус, время постановки, ошибки.
Специфичный: параметры конкретного артиста.
Получается:
task -- общий склад задач task_{taskType}_parameters -- гардероб для костюмов
В коде
Самое главное наше оружие — абстрактный дженерик класс на всё, что скорее всего будет использовано не только для одного типа задач.
Покажу вам, как может выглядеть примерный код.
1. Модельки
public interface ITaskParameters { } public interface ITaskDto { } public record TaskOneParameters(int Value) : ITaskParameters; public record TaskTwoParameters(string Data) : ITaskParameters; public record TaskOneDto(int Value) : ITaskDto; public record TaskTwoDto(string Data) : ITaskDto; public class QueueTask where TParam : ITaskParameters { public QueueTask(TParam parameters) { Parameters = parameters; TaskInfo = new QueueTaskInfo(); } public TParam Parameters { get; } public QueueTaskInfo TaskInfo { get; } } public class QueueTaskInfo { public Guid Id { get; set; } public DateTime QueueTime { get; set; } public QueueTaskStatus Status { get; set; } public QueueTaskType Type { get; set; } } public enum QueueTaskStatus { ReadyForExecution, InProgress, Completed, Failed } public enum QueueTaskType { TaskOne, TaskTwo }
2. Контроллеры
/// <summary> /// Базовый контроллер для постановки задач в очередь /// </summary> [Route("api/[controller]")] [ApiController] public abstract class AbstractQueueTasksController : ControllerBase where TParam : ITaskParameters { protected AbstractQueueTasksController(IMediator mediator) { _mediator = mediator; } protected IMediator _mediator { get; } /// <summary> /// Общий метод для всех типов задач /// </summary> [HttpGet("GetTasks")] public Task<...> GetAsync(CancellationToken cancellationToken = default) { return _mediator.Send(new AbstractGetQueueTasksRequest(), cancellationToken); } } /// <summary> /// Контроллер для задач типа "TaskOne" /// </summary> public class TaskOneController : AbstractQueueTasksController { public TaskOneController(IMediator mediator) : base(mediator) { } /// <summary> /// Постановка задачи, которая пришла из другого сервиса, а значит дажнные уже обработаны /// </summary> [HttpPost("Enqueue")] public Task EnqueueAsync(TaskOneDto dto, CancellationToken cancellationToken = default) { return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken); } } /// <summary> /// Контроллер для задач типа "TaskTwo" /// </summary> public class TaskTwoController : AbstractQueueTasksController { public TaskTwoController(IMediator mediator) : base(mediator) { } /// <summary> /// Постановка задачи, которая пришла с фронта /// </summary> [HttpPost("Enqueue")] public Task EnqueueAsync(TaskTwoDto dto, CancellationToken cancellationToken = default) { ... // тут какая-то обратка и валидация данных return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken); } }
3. Команда и обработчик
/// <summary> /// Команда постановки задачи в очередь /// </summary> public class EnqueueTaskCommand : IRequest where TDto : ITaskDto { public EnqueueTaskCommand(TDto dto) => TaskDto = dto; public TDto TaskDto { get; } } /// <summary> /// Базовый обработчик постановки задач /// </summary> public abstract class EnqueueTaskCommandHandler : IRequestHandler> where TParam : ITaskParameters where TDto : ITaskDto { private readonly AbstractDataflowQueue _queue; protected EnqueueTaskCommandHandler(AbstractDataflowQueue queue) { _queue = queue; } public async Task Handle(EnqueueTaskCommand request, CancellationToken cancellationToken) { if (request is null) throw new ArgumentNullException(nameof(request)); var param = Map(request.TaskDto); await _queue.EnqueueAsync(param, cancellationToken); } /// <summary> /// Просто какая-то работа с данными /// </summary> protected abstract TParam Map(TDto dto); }
4. Очередь
public abstract class AbstractDataflowQueue where TParam : ITaskParameters { private readonly SemaphoreSlim _locker = new(1, 1); // Нужен для защиты от одновременной постановки нескольких задач protected AbstractQueueTaskRepository _repository { get; } protected AbstractBackgroundExecutingTask _executor { get; } protected AbstractDataflowQueue( AbstractBackgroundExecutingTask executor, AbstractQueueTaskRepository repository) { _executor = executor; _repository = repository; } public async Task EnqueueAsync(QueueTask item, CancellationToken cancellationToken = default) { if (item is null) throw new ArgumentNullException(nameof(item)); await _locker.WaitAsync(cancellationToken); try { item.TaskInfo.QueueTime = DateTime.Now; item.TaskInfo.Status = QueueTaskStatus.ReadyForExecution; await _repository.SaveAsync(item, cancellationToken); await _executor.TrySendQueueTask(item.Id); } finally { _locker.Release(); } } }
5. Экзекьютер
/// <summary> /// Базовый экзекьютер: достаёт задачу из очереди и отправляет её в менеджер /// </summary> public abstract class AbstractBackgroundExecutingTask where TParam : ITaskParameters { protected AbstractBackgroundExecutingTask( IManager manager, AbstractQueueTaskRepository repository, int defaultMaxParallelism = 1) { _manager = manager; _repository = repository; var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = defaultMaxParallelism, BoundedCapacity = DataflowBlockOptions.Unbounded }; _block = new ActionBlock(HandleAsync, options); } protected IManager _manager { get; } protected AbstractQueueTaskRepository _repository { get; } protected ActionBlock _block { get; } public bool TrySendQueueTask(Guid taskId) { return _block.Post(taskId); } private async Task HandleAsync(Guid taskId) { var task = await _repository.GetTask(taskId); if (task == null) return; task.TaskInfo.Status = QueueTaskStatus.InProgress; await _manager.TransferTask(task); task.TaskInfo.Status = QueueTaskStatus.Completed; await _repository.UpdateAsync(task); } } /// <summary> /// Лошадки прыгают по одной /// </summary> public class TaskOneExecutor : AbstractBackgroundExecutingTask { public TaskOneExecutor( IManager manager, AbstractQueueTaskRepository repository) : base(manager, repository, defaultMaxParallelism: 1) { } } /// <summary> /// Медведи катаются втроём /// </summary> public class TaskTwoExecutor : AbstractBackgroundExecutingTask { public TaskTwoExecutor( IManager manager, AbstractQueueTaskRepository repository) : base(manager, repository, defaultMaxParallelism: 3) { } }
6. Репозиторий
public abstract class AbstractQueueTaskRepository where TParam : ITaskParameters { // Простое хранилище в памяти protected readonly Dictionary<Guid, QueueTask<TParam>> _storage = new(); public virtual Task SaveAsync(QueueTask task, CancellationToken cancellationToken = default) { _storage[task.TaskInfo.Id] = task; return Task.CompletedTask; } public virtual Task UpdateAsync(QueueTask task, CancellationToken cancellationToken = default) { if (_storage.ContainsKey(task.TaskInfo.Id)) { _storage[task.TaskInfo.Id] = task; } return Task.CompletedTask; } public virtual QueueTask? GetTask(Guid taskId) { _storage.TryGetValue(taskId, out var task); return task; } ... }
+реализации, сохранение в бд и другая логика
7. Менеджеры
public interface IManager where TParam : ITaskParameters { Task TransferTask(QueueTask task); } /// <summary> /// Тут у нас кафка /// </summary> public class TaskOneManager : IManager { private readonly ITaskOneProducer _producer; private readonly ITaskOneConsumer _consumer; public TaskOneManager( ITaskOneProducer producer, ITaskOneConsumer consumer) { _producer = producer; _consumer = consumer; } public async Task TransferTask(QueueTask queueTask) { // Отправка задачи через продюсера await _producer.PublishAsync(queueTask); // Ожидаем результат через консюмера await _consumer.GetResult(queueTask.TaskInfo.Id); } } /// <summary> /// Тут у нас Refit клиент /// </summary> public class TaskTwoManager : IManager { private readonly ITaskTwoClient _client; public TaskTwoManager(ITaskTwoClient client) { _client = client; } public async Task TransferTask(QueueTask task) { await _client.SendTaskTwoAsync(task); } }
Разумеется, код самый примитивный, который просто показывает, как можно сделать.
И не забудьте зарегистрировать реализации как синглтон объекты (иначе вся ваша очередь потеряется). Только Менеджеры можно сделать Transient.
Итоговая архитектура:

Как бы мы приручили разношёрстных артистов — расширяемость
Теперь представим, что завтра к нам заходят:
Слоны, которые будут делать запросы по SOAP.
Пингвины, которые будут танцевать параллельно в 10 потоков. Наша архитектура говорит: «Да не вопрос». Вот как мы добавляем нового зверя в наш цирк:
В базу:
Новая таблица параметров.
В код:
Реализация абстрактного контроллера, необходимых команд и запросов, модельки
Реализация репозитория, очереди, экзекьютора и менеджера.
DI-регистрация в
Program.cs. (то есть подписываем, что наши животные могут пользоваться любыми нашими рельсами)
И всё. Весь путь — по накатанной. Никто не мешает мишкам, лошадям и слонам выступать одновременно.
Слова автора
Спасибо большое, что прочитали статью мини-мидла. Надеюсь, что вам понравились метафоры) Моим коллегам на Meet Up очень понравились! А там, между прочим, не только разработчики были, но и аналитики, тестеры и даже медийщики!
Если вам интересны такие мероприятия, то заглядывайте к нам. Возможно мы даже скоро выйдем на более глобальный уровень с нашим митапом)
Ну и, конечно же, если вам есть что сказать, то милости прошу в комментарии. Я всегда рада конструктивной критике!
