MassTransit. Сервисная шина для обмена сообщениями на основе сервера очередей RabbitMQ (MSMQ) для .Net

MassTransit
Впервые я услышал о библиотеке MassTransit (MT) около года назад от экс-коллеги, зашедшего в наш офис для обмена опытом. Компания, в которую он устроился, использовала MT для уменьшения связности между модулями разрабатываемого ими сервиса и, поскольку высокая связность начала превращаться в проблему и для нас, чужой опыт оказался нам очень кстати. Помимо уменьшения связности путем перехода на событийную модель взаимодействия между модулями, MT пригодился нам и для распределения ресурсоемких задач между несколькими процессами.



Что такое MassTransit.


MassTransit — это реализация хорошо известного паттерна DataBus. Основная задача этого паттерна — организовать взаимодействие нескольких объектов, не подозревающих о существовании друг друга, через обмен сообщениями между ними. Библиотека была написана Dru Sellers и Chris Patterson как бесплатный аналог проекта NServiceBus, способный использовать в качестве транспорта сервера сообщений RabbitMQ или MSMQ на выбор. В своем проекте мы предпочли использовать RabbitMQ, поэтому здесь будет описан опыт работы и подводные камни, поджидающие при конфигурации шины на этом сервере очередей. Несмотря на то, что MassTransit является слоем абстракции над протоколом AMQP и разработчики старались скрыть детали реализации так, что знания об устройстве сервера очередей и протокола AMQP для использования библиотеки практически не требуются, для понимания статьи и успешного обхождения граблей при конфигурации шины общее представление об устройстве сервера RabbitMQ желательно иметь. Это плохая новость, но есть и хорошая — знаний необходим самый минимум, вполне будет достаточно прочесть первые четыре урока отсюда. Уроки небольшие и понятные, изучение основ работы с RabbitMQ не займет много времени, но способно принести много пользы. К слову, на хабре даже была переведена первая пара уроков. Урок один и урок два.

К делу.


Перейдем от теории к практике и попытаемся с помощью библиотеки MassTransit выполнить задачу, похожую по своему функционалу на первый пример из туториала к RabbitMQ. Мы напишем простое консольное приложение, в котором будут взаимодействовать два объекта Publisher и Subscriber. Publisher, при нажатии на любую клавишу, будет посылать в шину сообщение “KeyWasPressed” и код нажатой клавиши. Subscriber будет захватывать это сообщение из шины и выводить его на экран.

Для начала нам придется
1) Установить Erlang
2) Установить RabbitMQ
3) Установить MassTransit.RabbitMQ в тестовое приложение, выполнив команду PM> Install-Package MassTransit.RabbitMQ.

Перейдем к непосредственно к коду. Сообщения, отправляемые в шину, это обычные DTO объекты. В первую очередь нам потребуется создать класс самого сообщения, пересылаемого от издателя к подписчику. Назовем его KeyWasPressed.
public class KeyWasPressed
{
   //в сообщении будем передавать нажатую пользователем клавишу
   public ConsoleKey PressedKey { get; set; }
}


Теперь перейдем к написанию простеньких издателя (publisher) и подписчика (subscriber). Ключевым элементом библиотеки является ServiceBus. ServiceBus в MassTransit — это среда для обмена сообщениями, в которой за транспорт сообщений отвечает сервер очередей RabbitMQ (или MSMQ). Наши подписчик и издатель будут представлять собой объекты этого самого типа — ServiceBus.

Подписчик.

 IServiceBus subscriber = ServiceBusFactory.New(sbc =>
{
     //указываем что в качестве транспорта мы будем использовать rabbitMq
     sbc.UseRabbitMq();
     //указываем очередь из которой мы будем получать сообщения на которые мы подписались
     sbc.ReceiveFrom("rabbitmq://localhost/subscriber");
      //подписываемся на сообщение KeyWasPressed. При поступлении
     //соответствующего сообщения выводим его на экран
     sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg =>
     Console.WriteLine("{0}{1}{2}{3}",Environment.NewLine,"Key  '", msg.PressedKey, "' was pressed")
     ));
});


Издатель.

IServiceBus publisher = ServiceBusFactory.New(sbc =>
{
    //указываем, что в качестве транспорта мы будем использовать rabbitMq
     sbc.UseRabbitMq();
     //указываем очередь, из которой мы будем получать сообщения             
     sbc.ReceiveFrom("rabbitmq://localhost/publisher");
});


MassTransit не делит подключенные к серверу сообщений экземпляры ServiceBus на издателей и подписчиков, через каждый подключенный экземпляр можно как публиковать так и обрабатывать сообщения. Поэтому нам всегда необходимо указывать очередь для получения сообщений, хотя иногда, как и в случае с объектом publisher, мы не собираемся ничего получать.
Теперь напишем бесконечный цикл, в котором каждая нажатая клавиша будет отправляться в шину.
while (true)
{
   publisher.Publish(new KeyWasPressed() { PressedKey = Console.ReadKey().Key });
}

Все готово, запускаем наше приложение и жмем на клавиши.


Что происходит на сервере очередей.


Давайте посмотрим — что происходит на сервере очередей при запуске нашего приложения. По умолчанию установщик RabbitMQ регистрирует RabbitMQ как службу Windows, так что мы всегда можем смотреть что присходит в данный момент времени на сервере очередей через утилиты командной строки. Но удобнее пользоваться веб плагином, также входящим в стандартную поставку дистрибутива.

Для его установки нам придется выполнить следующие несколько шагов
1) В командной строке перейдем в папку sbin из каталога установки сервера (например, %PROGRAMFILES%\RabbitMQ Server\rabbitmq_server_2.7.1\sbin\)

2) Далее выполним следующую команду.
rabbitmq-plugins.bat enable rabbitmq_management

3) Наконец, чтобы включить плагин управления мы должны переустановить службу RabbitMQ. Выполним следующую последовательность команд для установки службы:
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
Чтобы убедиться, что плагин управления сервером RabbitMQ установлен и запущен, запустим браузер и перейдем на следующую страницу (для версии 3.0 порт по умолчанию 55672). Если все прошло нормально, появится экран, аналогичный следующему:

Логин/пароль по умолчанию guest/guest. Заходим внутрь и нажимаем на список точек обмена.

Для каждого сообщения, на которое существует хотя бы один подписчик, MT создает точку обмена (exchange) с именем по шаблону Namespace:ClassName и привязывает к ней очереди подписчиков. В нашем приложении точка обмена только одна, с названием MT:KeyWasPressed и эта точка обмена привязана к одной очереди — subscriber.

В новых версиях MT очереди и точки обмена по умолчанию стали постоянными (durable), я так и не понял баг это или фича, но раньше при создании каждой подписки требовалось явно вызывать метод Permanent(), чтобы закрепить соответствующую ей точку обмена на сервере очередей. Устойчивые точки обмена и очереди удобны тем, что, если в момент публикации сообщения издателем подписчик отключен от сервера очередей, опубликованное сообщение все равно не пройдёт мимо подписчика, а встанет в очередь и тихонько дождется его (подписчика) подключения.

Добавление новых подписчиков.


Давайте изменим наше приложение добавив в него еще одного подписчика на сообщение KeyWasPressed. В отличии от первого он будет подписан на очередь под названием anothersubscriber и будет выводить на экран числовое представление каждой нажимаемой пользователем клавиши.

IServiceBus anotherSubscriber = ServiceBusFactory.New(sbc =>
{
   sbc.UseRabbitMq();
   sbc.ReceiveFrom("rabbitmq://localhost/anothersubscriber");
   sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg => 
      Console.WriteLine("{0}{1}{2}{3}", Environment.NewLine, "Key with code  ", (int) msg.PressedKey, " was pressed")
   ));
});

Запустим приложение и понажимаем на клавиши.

Теперь при нажатии на каждую клавишу на экране появляется две строки — с цифровым и символьным кодом каждой нажатой клавиши. Если зайти в панель управления сервером очередей, можно увидеть, что теперь к точке обмена MT:KeyWasPressed привязано уже две очереди subscriber и anothersubscriber. И каждое полученное сообщение типа MT.KeyWasPressed сервер очередей RabbitMQ отправляет в обе очереди.

Распределение ресурсоемких задач.


Теперь давайте посмотрим, как с помощью связки MassTransit + RabbitMQ можно распределять ресурсоемкие задач между несколькими процессами.
Представим, что перед нами стоит задача создать сервис для конвертации видео файлов. Под эту задачу у нас есть два сервера. Опытным путем мы установили, что оптимальная нагрузка для сервера под номером один — это три паралельно конвертируемых видеофайла, для сервера номер два — количество одновременно конвертируемых видеофайлов не должно превышать пяти. Процесс конвертации мы, само собой, будем эмулировать. Представим, что у нас есть очередь под названием filesToConvert, в которую поступают файлы для конвертации. Каждый файл будет представлять у нас объект типа VideoFile.

public class VideoFile
{
  public int Num { get; set; }
  //Время, требующееся для конвертации файла в мс
  public int TimeToConvert { get; set; }
}

Подписчик, получив такое сообщение, по правилам игры должен будет заснуть на количество миллисекунд, заданных в поле TimeToConvert пришедшего сообщения.
Код, по легенде выполняющийся на первом сервере.
int firstServerFilesCount = 0;
IServiceBus firstServer = ServiceBusFactory.New(sbc =>
{
     sbc.UseRabbitMq();
     //указываем количество паралельных потоков, получающих сообщения с сервера очередей
     sbc.SetConcurrentConsumerLimit(3);
     sbc.Subscribe(subs => subs.Handler<VideoFile>(msg =>
       {
       firstServerFilesCount++; 
       Thread.Sleep(msg.TimeToConvert);
       Console.WriteLine("Сервер 1. {0}Файл {1} обработан за {2} мс. Потоков: {3} из 3. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, firstServerFilesCount, Thread.CurrentThread.ManagedThreadId); 
        firstServerFilesCount--;
       }));
       //prefetch=3. Сообщаем серверу очередей, что мы готовы разбирать до трех сообщений одновременно 
       sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=3");
 });

По легенде мы на первом сервере решили ограничить число одновременно разбираемых сообщений тремя. Поэтому, мы вызываем метод SetConcurrentConsumerLimit с аргументом 3. Это означает, что при подключении объекта firstServer к серверу сообщений, MassTransit будет держать наготове пул из трех потоков, предназначенных для обработки сообщений с сервера. Но надо помнить, что распределением сообщений занимается RabbitMQ, и он никак не может знать того факта, что объект firstServer готов разбирать до трех сообщений одновременно. Передать ему эту информацию мы можем указав параметр prefetch в Uri, по которому firstServer подключается к серверу сообщений.

Код, по легенде выполняющийся на втором сервере.
int secondServerFilesCount = 0;
IServiceBus secondServer = ServiceBusFactory.New(sbc =>
{
   sbc.UseRabbitMq();
   //указываем количество паралельных потоков, получающих сообщения с сервера очередей
   sbc.SetConcurrentConsumerLimit(5);
   sbc.Subscribe(subs => subs.Handler<VideoFile>(msg =>
   {
     secondServerFilesCount++;
     Thread.Sleep(msg.TimeToConvert);
     Console.WriteLine("Сервер 2. {0}Файл {1} обработан за {2} мс. Потоков: {3} из 5. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, secondServerFilesCount, Thread.CurrentThread.ManagedThreadId);
     secondServerFilesCount--;
    }));
    //prefetch=3. Сообщаем серверу очередей, что мы готовы разбирать до пяти сообщений одновременно 
    sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=5");
});

Отличия, как можно было догадаться, есть только в количестве потоков в пуле, призванных разбирать сообщения, и значении prefetch в Uri. Куда важнее отметить тот факт, что мы подключили secondServer к той же очереди, куда был подключен firstServer, тем самым создавая конкуренцию между подписчиками за сообщения, появляющиеся в этой очереди. Если объекты firstServer и secondServer будут подключены к разным очередям, то мы столкнемся с тем, что каждый файл будет сконвертирован дважды, по разу на каждом сервере.

Теперь напишем код, наполняющим очередь filesToConvert сотней “видеофайлов”, с заданным рандомом временем конвертации.
Random rnd = new Random();
for (int i = 1; i <= 100; i++)
{
  publisher.Publish(new VideoFile() {Num = i, TimeToConvert = rnd.Next(100, 5000)});
}

Запускаем и убеждаемся, что наши сервера-подписчики работают паралельно, используя назначенное нами число потоков.


Какие еще возможности может предложить нам MassTransit.


В рамках одной статьи рассмотреть все возможности, предлагаемые библиотекой MassTransit, невозможно. Но можно перечислить (чем я и займусь).

  • Sagas. Механизм для координации распределенных процессов


  • Scheduling. Интеграция с библиотекой Quartz.net позволяет отправлять сообщения в очереди по расписанию






  • Encryption. Шифрование отправляемых сообщений. Для шифрования используется блочный шифр Rijndael


  • Unit Testability. Для целей тестирования, MassTransit может использовать встроенный транспорт (Loopback), не требующий внешних MQ серверов


Код примеров из статьи можно взять здесь.
Ads
AdBlock has stolen the banner, but banners are not teeth — they will be back

More

Comments 9

    +2
    А вы сравнивали ее с EasyNetQ? Мы во всех проектах только EasyNetQ используем или «чистый» API от RabbitMQ. Интересны плюсы и минусы этих библиотек относительно друг друга.
      0
      Интересная библиотека. А вы не знаете, как обстоят дела с переподключениями в случае потери связи с сервером RabbitMQ?
        0
        Дела обстоят хорошо) Когда тестировал MassTransit изучал и этот вопрос. Выдержка из документации
        If your connection to the message broker or queue server goes down, MassTransit takes care of trying to reconnect and deal with those failures, so that you don’t have to.

        Перевод.
        Если соединение с сервером очередей будет потеряно, MassTransit сам позаботится о переподключении и корректной обработкой подобных ситуаций, так что вы можете не беспокоиться об этом.
        +5
        Александр, про EasyNetQ я впервые услышал на вашем блоге и немного почитал про нее. Ее автор, Mike Hadlow, решил написать EasyNetQ вдохновившись как раз MassTransit, чьи авторы в свою очередь вдохновлялись NServiceBus. Если вкратце, MT пытается усидеть на двух стульях (MSMQ и RabbitMQ) и в ряде моментов менее интеллектуальная система роутинга сообщений в MSMQ тянет ее вниз.
        Здесь в комментариях Майк сравнивает как раз EasyNetQ, MassTransit и NServiceBus. Меня MassTransit привлекает хорошей поддержкой от авторов в посвященной библиотеке
        группе.
        А так библиотеки очень похожи, да. В чем то лучше MassTransit (есть полноценная поддержка Sagas), в чем то EasyNetQ (Scheduling не такой ограниченный как в MassTransit).
        Вот еще аналоги:
        Burrow.NET
        RabbitBus
        Chinchilla
          0
          Спасибо, в будущем посмотрим на MT как один из вариантов
            +1
            Не вполне корректное сравнение. ENQ — это простая обертка над API, вкусный роутинг. MT — не просто роутинг, и Саги, как Вы заметили, но еще и сохранение стейта объектов. Как следствие, возможность построить нормальные workflow.
              0
              Под сохранением стейта вы подразумеваете Saga State Machine?
                0
                Не совсем, сами по себе саги и стейт-машин саги (которые, кстати, вроде уже заменены на magnum от того же автора) — это просто обвязка. А так, каждый инстанс саги может хранится в базе, например.
                  0
                  Ок, я понял что вы имели в виду.

          Only users with full accounts can post comments. Log in, please.