RabbitMQ. Часть 2. Разбираемся с Exchanges

  • Tutorial

Exchange — обменник или точка обмена. В него отправляются сообщения. Exchange распределяет сообщение в одну или несколько очередей. Он маршрутизирует сообщения в очередь на основе созданных связей (bindings) между ним и очередью.


Exchange не является Erlang-процессом. Из соображений масштабируемости exchange — это строка (ссылка на модуль с кодом, где лежит логика маршрутизации) во встроенной базе данных mnesia. 1 тысяч обменников будут потреблять всего 1МБ памяти.


Оглавление



Direct Exchange


Direct exchange — используется, когда нужно доставить сообщение в определенные очереди. Сообщение публикуется в обменник с определенным ключом маршрутизации и попадает во все очереди, которые связаны с этим обменником аналогичным ключом маршрутизации. Ключ маршрутизации — это строка. Поиск соответствия происходит при помощи проверки строк на эквивалентность.


Графическое представление потока сообщений:


rabbitmq_4


В rabbitmq существует понятие обменник по умолчанию. Это direct exchange без имени. Если применяется обменник по умолчанию, то сообщение будет маршрутизироваться в очередь с именем равным ключу маршрутизации сообщения.


Topic Exchange


Topic exchange – аналогично direct exchange дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0 или более слов (буквы AZ и az и цифры 0-9), разделенных точкой, а также символы * и #.


  • * — может быть заменен на ровно 1 слово
  • # — может быть заменен на 0 или более слов

Графическое представление потока сообщений:


rabbitmq_6


Начиная с версии RabbitMQ 2.4.0 алгоритм маршрутизации для topic exchange стал работать до 145 раз быстрее. Добились они этого путем внедрения подхода trie implementation, который подразумевает представление шаблонов в виде структуры дерева. Например шаблоны a.b.c, a.*.b.c, a.#.c и b.b.c будут представлены следующей структурой:


trie-example


Поиск соответствия шаблону осуществляется, начиная с корня и следуя сверху вниз.


Особенности:


  • применение этого обменника может стать хорошим выбором для возможного будущего развития приложения, т.к. шаблоны всегда можно настроить так, чтобы сообщение публиковалось аналогично direct exchange или fanout exchange
  • шаблоны, которые используют * намного быстрее, чем шаблоны, которые используют #.
  • topic exchange медленнее direct exchange

Fanout Exchange


Fanout exchangeвсе сообщения доставляются во все очереди даже если в сообщении задан ключ маршрутизации.


Особенности:


  • RabbitMQ не работает с ключами маршрутизации и шаблонами что положительно влияет на производительность. Это самый быстрый exchange;
  • все потребители должны иметь возможность обрабатывать все сообщения;

Графическое представление потока сообщений:


rabbitmq_5


Headers Exchange


Headers exchange — направляет сообщения в связанные очереди на основе сравнения пар (ключ, значение) свойства headers привязки и аналогичного свойства сообщения. headers представляет собой Dictionary<ключ, значение>.


Если в словарь добавить специальный ключ x-match со значением any, то сообщение маршрутизируется при частичном совпадении пар (ключ, значение). Данное поведение аналогично оператору or.


var bindingArguments = new Dictinary<String, Object>();
bindingArguments.add("x-match", "any");

По умолчанию ключ x-match содержит значение all. Это означает, что сообщение маршрутизируется при полном совпадении пар (ключ, значение). Данное поведение аналогично оператору and.


Графическое представление потока сообщений:


rabbitmq_7_a


Особенности:


  • дополнительная гибкость
  • дополнительные накладные расходы на вычисление. Все пары (ключ, значение) атрибута headers должны сортироваться по имени ключа перед вычислением значений маршрутизации сообщения. Медленнее, чем прочие типы exchange.

Consistent-Hashing Exchange


Данный обменник является плагином и не встроен в RabbitMQ.


Consistent-hashing exchange (exchange с согласованным хешированием) – используется, когда есть несколько очередей, являющихся потенциальными получателями сообщения, и когда нужно сбалансировать нагрузку между ними. Связь сообщения с очередью происходит по весу (условное строковое значение от 0 - n).


Эквивалентный вес очередей – говорит о том, что в каждую очередь придет примерно одинаковое количество сообщений (каждое сообщение будет помещено только в одну очередь). Полной гарантии равномерного распределения сообщений нет.


Графическое представление потока сообщений:


rabbitmq_7


Hash вычисляется на основе ключа маршрутизации или свойства headers сообщения. Если все публикуемые сообщения имели разные ключи маршрутизации или headers, то распределение будет происходить по весу. Иначе будет использоваться ключ маршрутизации или headers.


Должен помогать, когда пропускная способность потребителя нуждается в росте более высоком чем решение с несколькими потребителями, использующими одну очередь.


Комбинирование обменников (E2E)


Поведение всех обменников можно комбинировать при помощи связи Exchange-to-Exchange (комбинирование обменников не входит в спецификацию AMQP. Это расширение протокола со стороны RabbitMQ).


Графическое представление потока сообщений:


rabbitmq_8


За счет E2E мы можем найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям.


Создание Exchange


Создание обменника происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Exchange.Declare, вызываемого с параметрами:


  • название обменника
  • тип обменника
  • другие параметры

Пример создания exchange при помощи RabbitMQ.Client:


//...
channel.ExchangeDeclare(
    exchange: "my_exchange",
    type: "direct",
    durable: "false",
    autoDelete: "false",
    arguments: null
);
//...

  • exchange — название обменника, который мы хотим создать. Название должно быть уникальным
  • type — тип обменника
  • durable — если установить true, то exchange будет являться постоянным. Он будет храниться на диске и сможет пережить перезапуск сервера/брокера. Если значение false, то exchange является временным и будет удаляться, когда сервер/брокер будет перезагружен
  • autoDelete — автоматическое удаление. Exchange будет удален, когда будут удалены все связанные с ним очереди
  • arguments — необязательные аргументы. Чаще всего, через аргументы задают alternative exchange (альтернативный обменник). Если сообщение не может пройти по первоначальному маршруту, ее можно отправить в альтернативный обменник для маршрутизации по другому пути.

rabbitmq_9


Если создание exchange возможно, то сервер отправит клиенту синхронный RPC ответ Exchange.DeclareOk. Если создание невозможно (произошел отказ по запросу Exchange.Declare), то канал закроется сервером при помощи асинхронной команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.


Обменник должен быть создан перед публикацией сообщений. Если вы опубликуете сообщение в какой-то не существующий обменник — RabbitMQ тихо удалит его.


Создание Exchange через графический интерфейс


Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Exchanges и нажмем на Add a new exchange. Заполняем свойства:


rabbitmq_18


Большая часть свойств была описана выше. Здесь отметим, что если задать Internal, то обмен можно будет использовать только для E2E. Producer не сможет отправлять сообщения на такой обмен.


Заключение


При разработке системы удобно описывать топологию маршрутизации при помощи графа. Но прежде чем начать строить граф стоит выделить пути с большим трафиком, т.к. именно они требуют более высокую пропускную способность (производительность). Далее можно классифицировать трафик. И уже потом приступить к построению.


Если в построенном графе существует конечное множетсво ключей маршрутизации, то, стоит посмотреть в сторону нескольких fanout exchange, которые 1:1 связаны с ключом маршрутизации. Помним, что fanout exchange самый быстрый.


Если число маршрутов стремится к бесконечности, то стоит обратить внимание на topic exchange или, если шаблон не нужен, то можно выбрать direct exchnge, т.к. он быстрее topic exchange.


Комбинации различных exchange должна помочь найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям системы.


Количество exchange и очередей должно быть минимально по сравнению с количеством маршрутов.


В следующей статье начнем разбираться подробнее с Queues и Bindings.


Code


В данном разделе опишем обменник кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.


public interface IExchange
{
    /// <summary>
    ///     Название обменника, который мы хотим создать. 
    ///     Название должно быть уникальным.
    /// </summary>
    string Name { get; }

    /// <summary>
    ///     Тип обменника
    /// </summary>
    string Type { get; }

    /// <summary>
    ///     Если установить true, то exchange будет 
    ///     являться постоянным. 
    ///     Он будет храниться на диске и сможет 
    ///     пережить перезапуск сервера/брокера. 
    ///     Если значение false, то exchange является 
    ///     временным и будет удаляться, 
    ///     когда сервер/брокер будет перезагружен
    /// </summary>
    bool IsDurable { get; }

    /// <summary>
    ///     Автоматическое удаление. Exchange будет удален, 
    ///     когда будут удалены все связанные с ним очереди
    /// </summary>
    bool IsAutoDelete { get; }

    /// <summary>
    ///     Необязательные аргументы
    /// </summary>
    IDictionary<string, object> Arguments { get; }
}

/// <summary>
///     Типы обменников
/// </summary>
public static class ExchangeType
{
    public const string Direct = "direct";
    public const string Topic = "topic";
    public const string Fanout = "fanout";
    public const string Header = "headers";
}

/// <summary>
///     Обменник
/// </summary>
public class Exchange : IExchange
{
    public Exchange(
           string name, 
           string type = ExchangeType.Direct, 
           bool durable = true, 
           bool autoDelete = false, 
           IDictionary<string, object> arguments = null)
    {
        Name = name ??
            throw new ArgumentNullException(name, $"{name} must not be null");

        Type = type;
        IsDurable = durable;
        IsAutoDelete = autoDelete;
        Arguments = arguments ?? new Dictionary<string, object>();
    }

    public string Name { get; }
    public string Type { get; }
    public bool IsDurable { get; }
    public bool IsAutoDelete { get; }
    public IDictionary<string, object> Arguments { get; }

    /// <summary>
    ///     Возвращает Exchange по умолчанию. 
    ///     Exchange по умолчанию - direct exchnge без имени.
    ///     Если применяется обменник по умолчанию, то сообщение 
    ///     будет маршрутизироваться в очередь 
    ///     с именем равным ключу маршрутизации сообщения.
    /// </summary>        
    public static IExchange GetDefault()
    {
        return new Exchange("");
    }
}

Ссылки


Похожие публикации

Средняя зарплата в IT

120 000 ₽/мес.
Средняя зарплата по всем IT-специализациям на основании 8 965 анкет, за 1-ое пол. 2021 года Узнать свою зарплату
Реклама
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее

Комментарии 10

    +1
    Спасибо! Вторая замечательная статья! Очень подробно и понятно.
      0

      В каких случаях может использоваться fanout exchange? Какой реальный usecase использования доставки одного сообщения в несколько очередей? Возможно, вопрос покажется глупым, но мне, к сожалению (или к счастью) не приходилось сталкиваться с такими задачами. Всегда хватало direct exchange.

        0
        Применение fanout exchange можно представить в контексте микросервисов. Например, бессмысленное вещание события на которое должны реагировать определенные потребители. Producer определяет только обменник. Consumers зная обменник создают временные очереди и получают только актуальные сообщения.

        Direct exchange можно настроить так, чтобы он работал как fanout exchange, но такая реализация должна быть медленнее из-за ключа маршрутизации. Также и producer и consumer должны знать и про обменник и про очередь, что окажет отрицательное влияние на масштабируемость решения. Фрейм заголовка сообщения также должен содержать ключ маршрутизации.

        Если вещание не должно быть бессмысленным (требуется дополнительная фильтрация), то стоит использовать direct exchange.
          0
          Знать про очередь необходимо при обмене по умолчанию
            0
            И все же я не увидел ответ на вопрос о реальном примере такого использования. Например, на событие X нужно отправить два разных письма пользователю. Это тот случай, когда к одному exchange будут подключены две очереди; в каждую очередь попадет одно и то же сообщение, но будет два разных потребителя, которые по разной логике обработают это сообщение. Я правильно понял?
              0
              Да. Все верно
                0
                Всё зависит от той логики, которая вам нужна. То что ниже можно и в одном сервисе делать, но в разных архитектура чище и управление более гибкое.
                1. подсчет сообщений определенного типа
                2. разная логика обработки сообщения
                3. логгирование
                4. обновление последнего актуального состояния
                5. передача сообщения «дальше», интеграция
                6. накапливание сообщений (среднее значение, за интервал времени)
                7. Репликация
                8. повышение скорости передачи нескольким слушателям. Fanout работает быстрее чем topic в несколько раз.
                9. Несколько конечных устройств должны получить одно и то же сообщение (пример-чат, пользователь залогинен на 5ти разных устройствах)
                10. Или после топика отдать сразу «всем», так как эксчейнджи биндятся к эксчейнджам.

                А статьи очень похожи на перевод официальной документации. Если хотите поближе понять очереди и профит, поищите как был реализован инстаграмм на первых этапах, там связь реббита с редисом есть, видео было где то на ютубе
          0
          В случае x-match=all, должно ли совпадение быть полным или хедеры привязки должны только быть подмножеством хедеров обменника?
            +1
            Годная статья!

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое