Реализация обмена сообщениями через MassTransit
Всем привет! Меня зовут Валерия, я backend-разработчик компании Bimeister. В этой статье я хочу вам рассказать про базовую реализацию работы с MassTransit.
Рано или поздно многие проекты сталкиваются с задачей обмена сообщениями. Часто сообщениями должны обмениваться элементы распределенных систем, использующих разные подходы и технологии. В современных системах для решения данной задачи, как правило, используются шины сообщений, позволяющие абстрагировать работу с сообщениями для различных компонентов системы. Поэтому, когда перед нами стала задача реализации такого подхода с нуля, за нами оставался только выбор подходящего решения. Мы решили поделиться этой базовой реализацией и описать начало работы с шиной сообщений на платформе .NET Core.
Исходные данные были просты: работа под .NET и работа с RabbitMQ, так как именно эти инструменты мы используем на большинстве наших проектов.
Одним из наиболее популярных решений для .NET является MassTransit. Именно на него пал наш выбор. Библиотека представляет собой сервисную шину сообщений, которая является абстракцией над большинством популярных брокеров сообщений (Azure Service Bus, RabbitMQ, Kafka и тд), что позволяет разработчикам не тратить много времени на конкретную реализацию, а сосредоточиться на задачах функциональности.
Для начала, рассмотрим что же такое MassTransit, и как он вообще работает.
Для этого попробуем реализовать простейший обмен сообщениями с использованием библиотеки.
В нашем случае стояла задача получения технических объектов через шину сообщений. Посмотрим, как можно это реализовать с использованием MassTransit.
Для работы наше приложение должно содержать несколько обязательных элементов:
Message. Определенно, нам понадобится само сообщение, которое мы захотим отправить и доставить по нужному адресу.
Сообщения в MassTransit представлены двумя основными типами: команды (Commands) и события (Events) и реализуются с помощью ссылочных типов .NET: записей, интерфейсов или классов.
Команды - это сообщения, которые говорят сервису что-то сделать и, как правило, для одной команды существует один получатель (Consumer).
// Пример сообщения-команды для оценки технического объекта
public record CreateTechnicalObjectAssessment(
string RiskCategoryName,
uint SortingOrder,
bool IsApplicable,
string Justification);
События - сообщения, которые оповещают о том, что что-то произошло, и которые могут быть отправлены сразу нескольким подписанным получателям.
// Сообщение - событие о получении технического объекта
public record TechnicalObjectUploaded(
Guid Id,
string Name,
string ManufacturerName,
string ManufacturerModel,
string Code,
Guid? ParentId,
DateTime UploadedAt);
Producer (он же издатель, он же продюсер по устоявшейся в русской локализации терминологии). Сообщение должен кто-то отправить. И MassTransit предоставляет два основных способа отправки: Send и Publish.
Сообщение, отправленное с помощью Send, доставляется по определенному адресу. Обычно, с помощью Send отправляются команды.
Сообщение, опубликованное через Publish, распространяется ко всем подписчикам, которые подписывались на сообщение данного типа. Например, как оповещение о событии
public class MessagePublisher : BackgroundService
{
// Подключаем интерфейс IBus для публикации сообщений
private readonly IBus _bus;
public MessagePublisher(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Публикуем сообщение
await _bus.Publish(new TechnicalObjectUploaded(
Guid.NewGuid(),
"Установка Л-24/6",
"Цех №1",
"КМ-24521-Т",
"25м",
Guid.NewGuid(),
DateTime.Now), stoppingToken);
await Task.Delay(10000);
}
}
}
Consumer. Ну и наконец, было бы неплохо, чтобы сообщение до кого-то дошло. Иначе, зачем оно все? В MassTransit можно подписаться на сообщения одного или сразу нескольких типов. Для этого класс-подписчик должен унаследовать интерфейс IConsumer<TMessage> и реализовать его метод Task Consume(ConsumeContext<TMessage> context).
// Подписчик на сообщения типа TechnicalObjectUploaded
public class TechnicalObjectsConsumer : IConsumer<TechnicalObjectUploaded>
{
private readonly ILogger<TechnicalObjectsConsumer> _logger;
public TechnicalObjectsConsumer(ILogger<TechnicalObjectsConsumer> logger)
{
_logger = logger;
}
// При получении сообщения типа TechnicalObjectUploaded, выводим информацию о техничeском объекте
public Task Consume(ConsumeContext<TechnicalObjectUploaded> context)
{
_logger.LogInformation("New technical object {Id} was uploaded at {Time}",
context.Message.Id,
context.Message.UploadedAt);
return Task.CompletedTask;
}
}
Подключаем MassTransit в файле Program.cs. Для данного примера используем механизм передачи In-Memory, который вполне подойдет для демонстрационного варианта, так как не требует использования конкретного брокера сообщений. И, при запуске приложения, видим, что сообщения, отправленные через Publish, успешно получаются MessageConsumer и выводятся в консоль.
// Добавляем MassTransit
builder.Services.AddMassTransit(x =>
{
// Указываем, что нам подходят все подписчики данной сборки
x.AddConsumers(typeof(Program).Assembly);
// В данном примере используем InMemory реализацию
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
//Добавляем издателя сообщений
builder.Services.AddHostedService<MessagePublisher>();
Выглядит неплохо, MassTransit действует, глаз радуется. Но для решения нашей задачи (как, пожалуй, и большинства реальных задач разработки) придется, все-таки, кое-что видоизменить.
Основной проблемой этой реализации является то, что In-Memory в MassTransit может работать только внутри одного процесса. То есть, во-первых, время жизни сообщений ограничено работой процесса, а, во-вторых, если мы захотим разделить отправку и получение сообщений между процессами (а мы захотим), этот инструмент нам не подойдет. Это очень ограничивает область применения данного подхода.
Поэтому на нашем проекте понадобится приправить реализацию реальным брокером сообщений и небольшими улучшениями. В качестве брокера будем использовать RabbitMq, так как работа с ним уже идет на других наших проектах. Посмотрим, что нужно будет изменить для работы с ним.
Реализуем механизм обмена сообщениями через MassTransit c RabbitMQ.
Работа с RabbitMQ через MassTransit осуществляется с помощью пакета MassTransit.RabbitMQ.
Для подключения RabbitMQ через MassTransit незначительно изменим Startup.cs:
// Добавляем MassTransit
builder.Services.AddMassTransit(x =>
{
// По умолчанию MassTransit использует Pascal case.
// Так как в большинстве случае в эндпоинтах будет использоваться Kebab case,
// укажем на это явно.
x.SetKebabCaseEndpointNameFormatter();
// Меняем InMemory на RabbitMq и указываем данные для подключение к серверу.
// В остальном подключение похоже на InMemory, так реализация его в MassTransit
// намеренно реплицирует поведение именно RabbitMq
x.UsingRabbitMq((context, cfg) =>
{
// Здесь указаны стандартные настройки для подключения к RabbitMq
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
При реализации работы с брокером сообщений, отправка и получение сообщений будут разделены и будут обрабатываться разными приложениями. В данных приложениях отдельно поместим контракты для сообщений и консьюмеры.
В проект Contracts, который представляет из себя библиотеку классов, перенесем наши подготовленные сообщения:
// Сообщение, оповещающее нас о получении нового технического объекта
public record TechnicalObjectUploaded(
Guid Id,
string Name,
string ManufacturerName,
string ManufacturerModel,
string Code,
Guid? ParentId,
DateTime UploadedAt);
Добавим в наш основной проект механизм публикации сообщения. Сделаем это с помощью контроллера, работающего с техническими объектами:
[ApiController]
[Authorize]
public class TechnicalObjectsController : Controller
{
// Добавляем механизм публикации сообщений
// Обратите внимание, что в этот раз мы используем интерфейс IPublishEndpoint
// В отличие от IBus, используемого в предыдущем примере, IPublishEndpoint
// позволяет обработать scope запроса контроллера
// В то же время, IBus реализован как Singleton, что не подходит для запроса контроллера
private readonly IPublishEndpoint _publishEndpoint;
private readonly IMediator _mediator;
private readonly IMapper _mapper;
public OrdersController(IPublishEndpoint publishEndpoint, IMediator mediator, IMapper mapper)
{
_publishEndpoint = publishEndpoint;
_mediator = mediator;
_mapper = mapper;
}
/// <summary>
/// Создать технический объект.
/// </summary>
[HttpPost("create-technical-object")]
public async Task CreateTechnicalObject([FromBody] CreateTechnicalObjectCommandInfo commandInfo, CancellationToken cancellationToken)
{
var command = _mapper.Map<CreateTechnicalObjectCommand>(commandInfo);
var createdObject = await _mediator.Send(command, cancellationToken);
// Публикуем сообщение о создании через метод Publish
await _publishEndpoint.Publish<TechnicalObjectUploaded>(new
{
Id = createdObject.Id,
Name = createdObject.Name,
ManufacturerName = createdObject.ManufacturerName,
ManufacturerModel = createdObject.ManufacturerModel,
Code = createdObject.Code,
ParentId = createdObject.ParentId,
UploadedAt = DateTime.Now
});
}
}
RabbitMQ поддерживает несколько типов распределительных механизмов (Exchange) между очередью и отправителем. MassTransit по умолчанию использует Fanout Exchange. Данный вид Exchange распределяет все полученные сообщения между всеми очередями, которые подписаны на данный тип сообщений. В документации MassTransit указано, что данный тип обмена сообщениями был выбран, потому что он является наиболее производительным для RabbitMQ.
Данный вариант вполне подходит для нашей задачи, осталось создать подписчика на сообщения и подписаться на сообщения типа TechnicalObjectUploaded. Соответственно, заключительный этап обмена сообщениями через RabbitMQ будет заключаться в реализации получения сообщений.
Для этого обратимся к отдельно созданному проекту, который хранит в себе наши Consumers. Поместим в него класс-подписчик на сообщения типа TechnicalObjectUploaded:
// Подписчик на сообщения типа TechnicalObjectUploaded
public class TechnicalObjectUploadedConsumer : IConsumer<TechnicalObjectUploaded>
{
private readonly ILogger<TechnicalObjectUploadedConsumer> _logger;
public MessageConsumer(ILogger<TechnicalObjectUploadedConsumer> logger)
{
_logger = logger;
}
// При получении сообщения типа TechnicalObjectUploadedConsumer, выводим данные о новом техническом объекте
public Task Consume(ConsumeContext<TechnicalObjectUploaded> context)
{
_logger.LogInformation("New technical object {Id} was uploaded at {Time}",
context.Message.Id,
context.Message.UploadedAt);
return Task.CompletedTask;
}
}
Также, необходимо добавить подключение RabbitMQ в Startup.cs аналогично проекту с издателями сообщений, с той разницей, что в данном случае нам нужно указать, что в проекте есть консьюмеры:
// Добавляем MassTransit
builder.Services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
// Добавляем консьюмеров
var assembly = typeof(Program).Assembly;
x.AddConsumers(assembly);
x.UsingRabbitMq((context, cfg) =>
{
//Здесь указаны стандартные настройки для подключения к RabbitMq
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
Готово, вы (и мы!) восхитительны! При запуске проектов и отправке сообщений по запросу CreateTechnicalObject, мы увидим, что сообщения помещаются в соответствующую очередь RabbitMQ, откуда в дальнейшем будут получены консьюмером из проекта Consumers.
А если что-то пойдет не так?
Но что будет, если во время получения сообщения в консьюмере возникнет исключение?
В таком случае MassTransit предоставляет сообщение Fault<T>, которое представляет собой обобщенный контракт и включает в себя информацию об ошибке. Если в Fault определен FaultAddress, то ошибка отправляется напрямую по этому адресу, в противном случае она публикуется для всех консьюмеров, подписанных на сообщение.
Как и обычное сообщение, мы можем его получить и обработать. Сделаем это для наших технических объектов, выделив для такого случая отдельный консьюмер:
// Создаем отдельный консьюмер для получения сообщений об ошибке создания тех. объекта
public class TechnicalObjectsFaultConsumer : IConsumer<Fault<TechnicalObjectUploaded>>
{
private readonly ILogger<TechnicalObjectsFaultConsumer> _logger;
public TechnicalObjectsFaultConsumer(ILogger<TechnicalObjectsFaultConsumer> logger)
{
_logger = logger;
}
// При возникновении события Fault<T> перехватываем сообщение об ошибке
public Task Consume(ConsumeContext<Fault<TechnicalObjectUploaded>> context)
{
var error = context.Message.Exceptions[0];
_logger.LogError(error.Message);
return Task.CompletedTask;
}
}
По умолчанию, при перехвате исключения, такие сообщения складываются в отдельную очередь, которая имеет постфикс _error. То есть, при отправке сообщения в my-queue ошибка будет добавлена в очередь my-queue_error. При желании, это поведение может быть кастомизировано для каждого конкретного случая.
Можно резюмировать, что MassTransit дал нам возможность создать реализацию двух подходов к обмену сообщениями с минимальными изменениями при смене подхода. Такая базовая реализация позволит нам пользоваться основным механизмом обмена сообщениями, добавляя необходимое для дальнейших задач число контрактов и получателей данных контрактов.