Привет, Хабр! Меня зовут Евгений, я backend-разработчик SimbirSoft. В этой статье я разберу два варианта решения нетривиальной задачи создания RPC через брокер сообщений RabbitMQ и библиотеку MassTransit. Подробно разберём подключение MassTransit и работу с Saga. Тема будет полезна как для начинающих, так и опытных backend-разработчиков .NET.
Введение
В микросервисной архитектуре общение между отдельными сервисами как правило делится на два типа:
Синхронное (Request-Response) применяется, когда на запрос нужен незамедлительный ответ (таймаут соединения в расчёт не берётся).
Асинхронное (AMQP). Применяется, когда не нужен незамедлительный ответ.
![](https://habrastorage.org/getpro/habr/upload_files/64d/728/040/64d728040d31876eb31990bacb88d61e.png)
Мы рассмотрим работу брокера RabbitMQ и асинхронную модель взаимодействия сервисов. Подробнее о его особенностях в сравнении с его конкурентом Apache Kafka можно почитать здесь.
Итак, официальная документация RabbitMQ гласит, что брокер сообщений — это сервер промежуточного ПО между автором (Producer) и получателем сообщений (Consumer), который выполняет функции получения сообщения от автора, маршрутизации и установки сообщений в очередь получателя сообщения.
Для наглядности ниже приведена принципиальная схема устройства брокера сообщений.
![Источник: официальная документация RabbitMQ Источник: официальная документация RabbitMQ](https://habrastorage.org/getpro/habr/upload_files/81b/9fe/156/81b9fe1568440e892ce75a137db53b1f.png)
При использовании AMQP асинхронность достигается за счет очередей сообщений. То есть если получатель сообщений (Consumer) занят, то ему не нужно всё бросать и хвататься обрабатывать сообщение, он сможет обработать сообщение из очереди, когда будет к этому готов.
Также стоит отметить, что брокер сообщения выполняет функцию маршрутизации. На рисунке ниже приведена схема взаимодействия сервисов без брокера сообщений, легко заметить, что тут царит хаос:
![](https://habrastorage.org/getpro/habr/upload_files/3df/2da/1a7/3df2da1a77d564bbb20b7611e24e0114.png)
Применение брокера сообщений упорядочивает и упрощает взаимодействие сервисов между собой.
А вот представление схемы взаимодействия с картинки выше, если бы использовался брокер сообщений:
![](https://habrastorage.org/getpro/habr/upload_files/cc1/a1e/cce/cc1a1ecce7ec0470d94f0d5760a7d78e.png)
Существует две основные архитектуры брокеров сообщений:
1. Push-архитектура. Если у сервера (брокера) есть сообщения для получателя (Consumer), то он по своей инициативе отправляет сообщение получателю.
![Источник: https://dev.to/anubhavitis/push-vs-pull-api-architecture-1djo Источник: https://dev.to/anubhavitis/push-vs-pull-api-architecture-1djo](https://habrastorage.org/getpro/habr/upload_files/4c6/69f/e88/4c669fe8857843be16bd4f3d851ccfd9.png)
2. Pull-архитектура. Клиент (он же Consumer) по своей инициативе запрашивает у сервера (брокера) сообщения, и если у брокера есть сообщения для клиента, то отправляет эти сообщения.
![Источник: https://dev.to/anubhavitis/push-vs-pull-api-architecture-1djo Источник: https://dev.to/anubhavitis/push-vs-pull-api-architecture-1djo](https://habrastorage.org/getpro/habr/upload_files/4e4/f33/756/4e4f337567d6b83a372a658cf46019a9.png)
В данной статье будет рассмотрена Push-архитектура, наиболее распространенная при использовании брокера сообщений RabbitMQ.
От слов к делу!
Реализуем RPC c помощью MassTransit
Наиболее популярные библиотеки для работы с RabbitMQ в .NET:
RabbitMQ.Client
MassTransit.RabbitMQ
EasyNetQ
NServiceBus.RabbitMQ
В качестве примера работы с RabbitMQ рассмотрим нестандартную задачу построения RPC (Remote Procedure Call, удаленный вызов процедур) c использованием библиотеки MassTransit.
Условия задачи:
Программа генерирует число, которое пользователь должен угадать. При каждом вводе числа программа пишет результат: больше или меньше отгадываемого. Количество попыток угадывания и диапазон чисел должны задаваться из настроек.
Рассмотрим два варианта решения:
без использования Saga
с использованием Saga
Saga – это долгоживущая транзакция, управляемая координатором. Саги инициируются событием, саги организуют события, и саги поддерживают состояние всей транзакции. Саги предназначены для управления сложностью распределенных транзакций без блокировки и немедленной согласованности. Они управляют состоянием и отслеживают любые компенсации, необходимые в случае частичного сбоя.
Краткое описание реализованной функциональности:
Консольный клиент принимает от пользователя число и отправляет его в брокер сообщений.
Сервер получает в сообщении из брокера сообщений число, отправленное пользователем, обрабатывает его и высылает в брокер сообщений результат обработки.
Клиент получает от брокера сообщений ответ сервера и выводит его пользователю.
В демонстрационных целях будем использовать «облачный» RabbitMQ, где после регистрации будет выдан бесплатный экземпляр RabbitMQ и информация для подлючения из нашего приложения.
Решение без использования Saga MassTransit
Принципиальная схема приложения без использования Saga:
![](https://habrastorage.org/getpro/habr/upload_files/e44/75c/2a8/e4475c2a83ee70c7f5b86d216845b29f.png)
Теперь переходим в Visual Studio и создаем наши проекты.
![](https://habrastorage.org/getpro/habr/upload_files/9b3/a5a/fa6/9b3a5afa6e2d23d8973a990521319df9.png)
Настройки подключения к облаку RabbitMQ указаны в appsettings.json файлах:
![](https://habrastorage.org/getpro/habr/upload_files/91c/247/f21/91c247f2187b749c9a0596c3a74e027a.png)
При старте Producer-приложения и Consumer-приложения происходит инициализация настроек для подключения к RabbitMQ.
Для Producer:
![](https://habrastorage.org/getpro/habr/upload_files/307/7b3/a5e/3077b3a5ece5169cb84c223cecbafd36.png)
И для Consumer:
![](https://habrastorage.org/getpro/habr/upload_files/8fe/599/dbe/8fe599dbe1543d1e20cfae9b87d9a146.png)
![](https://habrastorage.org/getpro/habr/upload_files/61b/574/83f/61b57483fd490b9ef7fda127bb255157.png)
Для примера работы с брокером сообщений мы используем библиотеку MassTransit.RabbitMQ v. 8.1.3
В данном проекте вся логика работы с брокером представлена в библиотеке классов «Infrastructure.MassTransit»:
![](https://habrastorage.org/getpro/habr/upload_files/3ea/17b/fb0/3ea17bfb0a4d1671b8312e2cc7402f47.png)
Процесс отправки сообщения из консольного клиента:
![](https://habrastorage.org/getpro/habr/upload_files/ef2/30a/c12/ef230ac1297c60efb4066b3da77f23d2.png)
Процесс получения сообщений из очереди ответов для консольного клиента:
![](https://habrastorage.org/getpro/habr/upload_files/8a7/7ec/8d0/8a77ec8d046f66a1fde2443516e47a7f.png)
Устанавливаем класс, который будет отвечать за получение сообщений для консольного клиента из очереди:
![](https://habrastorage.org/getpro/habr/upload_files/dd7/45e/414/dd745e414e213d25dc50939d3234e8ec.png)
Для прослушивания сообщений из очереди ответов для консольного клиента определен класс RabbitMqConsumer, который реализует интерфейс IConsumer библиотеки MassTransit.
Логика работы нашего консольного клиента лишь в отправке числа, которое ввел пользователь через консоль на сервер и получение ответа (более подробно можно посмотреть в github), поэтому класс RabbitMqConsumer лишь получает сообщения и выставляет флаг, отгадано ли число с сервера или нет:
![](https://habrastorage.org/getpro/habr/upload_files/400/797/820/40079782068038856bef37d0ee371d51.png)
Теперь рассмотрим request от клиента к серверу через брокер и реализацию responce от сервера к клиенту через брокер:
В классе Program проекта WebApi.Consumer указан метод расширения, внутри которого, помимо установки настроек подключения к облаку RabbitMQ, также указываются настройки класса, который будет прослушивать сообщения из очереди для сервера:
![](https://habrastorage.org/getpro/habr/upload_files/ea1/d7a/00a/ea1d7a00ad57451c0c8738c9b48d5224.png)
Метод AddBLLServicesWithoutSaga():
![](https://habrastorage.org/getpro/habr/upload_files/706/078/07e/70607807e7138488d89d7db623a75d62.png)
Настройка подключения к облаку RabbitMQ и настройка класса прослушивания входящих сообщений:
![](https://habrastorage.org/getpro/habr/upload_files/d1a/773/2fc/d1a7732fc5aa46c34d47e448203fc3f7.png)
Так как мы реализуем RPC, то класс, который получает сообщения из очереди от клиента, должен обрабатывать данные и возвращать ответ клиенту через очередь ответов. Это всё реализовано в классе RabbitMqConsumerWebApi:
![](https://habrastorage.org/getpro/habr/upload_files/a96/00c/7a5/a9600c7a5bcfb108738841652fb66705.png)
Этот класс реализует интерфейс IConsumer из библиотеки MassTransit.
В методе Consume мы получаем сообщение из очереди клиента, обрабатываем его и отправляем результат обработки в очередь ответов.
Более подробное описание работы программы указано в файле Readme на github.
Стоит отметить, что при старте приложений в облаке RabbitMQ с помощью подключенной ранее в наш проект библиотеки MassTransit.RabbitMQ будут созданы очереди сообщений, которые мы указали в коде (очередь для отправки на сервер и очередь для отправки клиенту). Для просмотра статистических данных по очередям необходимо открыть ресурс:
![](https://habrastorage.org/getpro/habr/upload_files/a8c/da5/ac2/a8cda5ac2b101536e5a511af46898bae.png)
Далее откроется страница управления RabbitMQ c довольно богатой функциональностью, если мы перейдем во вкладку очередей, то увидим наши очереди:
![](https://habrastorage.org/getpro/habr/upload_files/123/9da/ad0/1239daad040637e8eaef84430bee82b8.png)
Решение с использованием Saga от MassTransit
Библиотека MassTransit имеет возможность организовать и управлять серий событий.
Схема приложения с использованием Saga от MassTransit:
![](https://habrastorage.org/getpro/habr/upload_files/395/7c8/90d/3957c890d94105c041aeb07ed7bce280.png)
Для использования Saga изменим класс Program консольного клиента:
![](https://habrastorage.org/getpro/habr/upload_files/0f9/398/ccc/0f9398ccc3e91343f5456fd027b1c078.png)
Внутри метода расширения устанавливаем настройки типов запроса и настройки конфигурации:
![](https://habrastorage.org/getpro/habr/upload_files/176/8d4/1a2/1768d41a2b93a289c93af8d1ae9aad24.png)
Для отправки сообщений и получения ответа от саги необходимо использовать метод GetResponse интерфейса IRequestClient библиотеки MassTransit:
![](https://habrastorage.org/getpro/habr/upload_files/afd/c87/bf7/afdc87bf71a55b040f6798d4e966bb1c.png)
![](https://habrastorage.org/getpro/habr/upload_files/34a/5e2/105/34a5e21059fecc35562a42dc32fdcc5b.png)
Далее создаем проект ASP.NET Core c названием WebApi.Saga, внутри которого реализуем сагу для управления запросами и ответами наших микросервисов. В классе Program проекта ASP.NET Core добавим настройки конфигурации:
![](https://habrastorage.org/getpro/habr/upload_files/afd/4a6/7b3/afd4a67b38b1a5a18f30f31b5ac10177.png)
![](https://habrastorage.org/getpro/habr/upload_files/c62/33f/29d/c6233f29d0a1088e20e0f16b52dacb13.png)
Далее необходимо создать класс, описывающий состояние саги:
![](https://habrastorage.org/getpro/habr/upload_files/791/223/e42/791223e42cce20260f700cb35ef9e20c.png)
И класс, описывающий обработку событий, возникающих в саге:
![](https://habrastorage.org/getpro/habr/upload_files/a6d/e32/e92/a6de32e92b89aefc6ae8bdf2b89ec470.png)
Теперь отредактируем класс-обработчик сообщений от клиента:
![](https://habrastorage.org/getpro/habr/upload_files/739/8c2/826/7398c28260e95dbc8203d4e516c0dcf0.png)
Также стоит отметить, что при запуске программы в облаке RabbitMQ автоматически создадутся очереди, которыми будет управлять сага:
![](https://habrastorage.org/getpro/habr/upload_files/06b/855/ca2/06b855ca2d8d1d51332dd8c6dbf6995e.png)
Вывод
При взаимодействии ряда микросервисов между собой предпочтительным вариантом является использование Saga от MassTransit в силу того, что Saga представляет мощную функциональность управления распределенными транзакциями. Помимо этого имеется возможность создавать и обрабатывать события. Более подробно с Saga можно ознакомиться здесь.
Надеюсь, теперь вы получили представления о работе брокера сообщений RabbitMQ по Push-архитектуре в связке с библиотекой Masstransit. А реализовать это на практике помогут два шаблона проекта реализации RPC (с использованием Saga и без).
Спасибо за внимание!
Больше авторских материалов для backend-разработчиков от моих коллег читайте в соцсетях SimbirSoft – ВКонтакте и Telegram.