Exchange
— обменник или точка обмена. В него отправляются сообщения. Exchange
распределяет сообщение в одну или несколько очередей. Он маршрутизирует сообщения в очередь на основе созданных связей (bindings
) между ним и очередью.
Exchange
не является Erlang-процессом. Из соображений масштабируемости exchange
— это строка (ссылка на модуль с кодом, где лежит логика маршрутизации) во встроенной базе данных mnesia. 1 тысяч обменников будут потреблять всего 1МБ памяти.
Оглавление
- RabbitMQ. Часть 1. Introduction. Erlang, AMQP и RPC
- RabbitMQ. Часть 2. Разбираемся с Exchanges
- RabbitMQ. Часть 3. Разбираемся с Queues и Bindings
- RabbitMQ. Часть 4. Подробно про Connection и Chanel
- RabbitMQ. Часть 5. Разбираемся с тем, что-такое сообщения и фреймы
- RabbitMQ. Часть 6. Производительность публикации и потребления сообщений
- RabbitMQ. Часть 7. Обзор модулей Federation и Shovel
- RabbitMQ. Часть 8. RabbitMQ в .NET
- RabbitMQ. Часть 9. Мониторинг
Direct Exchange
Direct exchange
— используется, когда нужно доставить сообщение в определенные очереди. Сообщение публикуется в обменник с определенным ключом маршрутизации и попадает во все очереди, которые связаны с этим обменником аналогичным ключом маршрутизации. Ключ маршрутизации — это строка. Поиск соответствия происходит при помощи проверки строк на эквивалентность.
Графическое представление потока сообщений:
В rabbitmq
существует понятие обменник по умолчанию. Это direct exchange
без имени. Если применяется обменник по умолчанию, то сообщение будет маршрутизироваться в очередь с именем равным ключу маршрутизации сообщения.
Topic Exchange
Topic exchange
– аналогично direct exchange
дает возможность осуществления выборочной маршрутизации путем сравнения ключа маршрутизации. Но, в данном случае, ключ задается по шаблону. При создании шаблона используются 0
или более слов (буквы AZ
и az
и цифры 0-9
), разделенных точкой, а также символы *
и #
.
*
— может быть заменен на ровно1
слово#
— может быть заменен на0
или более слов
Графическое представление потока сообщений:
Начиная с версии RabbitMQ 2.4.0
алгоритм маршрутизации для topic exchange
стал работать до 145
раз быстрее. Добились они этого путем внедрения подхода trie implementation, который подразумевает представление шаблонов в виде структуры дерева. Например шаблоны a.b.c
, a.*.b.c
, a.#.c
и b.b.c
будут представлены следующей структурой:
Поиск соответствия шаблону осуществляется, начиная с корня и следуя сверху вниз.
Особенности:
- применение этого обменника может стать хорошим выбором для возможного будущего развития приложения, т.к. шаблоны всегда можно настроить так, чтобы сообщение публиковалось аналогично
direct exchange
илиfanout exchange
- шаблоны, которые используют
*
намного быстрее, чем шаблоны, которые используют#
. topic exchange
медленнееdirect exchange
Fanout Exchange
Fanout exchange
– все сообщения доставляются во все очереди даже если в сообщении задан ключ маршрутизации.
Особенности:
RabbitMQ
не работает с ключами маршрутизации и шаблонами что положительно влияет на производительность. Это самый быстрыйexchange
;- все потребители должны иметь возможность обрабатывать все сообщения;
Графическое представление потока сообщений:
Headers Exchange
Headers exchange
— направляет сообщения в связанные очереди на основе сравнения пар (ключ, значение) свойства headers
привязки и аналогичного свойства сообщения. headers
представляет собой Dictionary<ключ, значение>
.
Если в словарь добавить специальный ключ x-match
со значением any
, то сообщение маршрутизируется при частичном совпадении пар (ключ, значение). Данное поведение аналогично оператору or
.
var bindingArguments = new Dictinary<String, Object>();
bindingArguments.add("x-match", "any");
По умолчанию ключ x-match
содержит значение all
. Это означает, что сообщение маршрутизируется при полном совпадении пар (ключ, значение). Данное поведение аналогично оператору and
.
Графическое представление потока сообщений:
Особенности:
- дополнительная гибкость
- дополнительные накладные расходы на вычисление. Все пары (ключ, значение) атрибута
headers
должны сортироваться по имени ключа перед вычислением значений маршрутизации сообщения. Медленнее, чем прочие типы exchange.
Consistent-Hashing Exchange
Данный обменник является плагином и не встроен в RabbitMQ
.
Consistent-hashing exchange
(exchange с согласованным хешированием) – используется, когда есть несколько очередей, являющихся потенциальными получателями сообщения, и когда нужно сбалансировать нагрузку между ними. Связь сообщения с очередью происходит по весу (условное строковое значение от 0 - n
).
Эквивалентный вес очередей – говорит о том, что в каждую очередь придет примерно одинаковое количество сообщений (каждое сообщение будет помещено только в одну очередь). Полной гарантии равномерного распределения сообщений нет.
Графическое представление потока сообщений:
Hash
вычисляется на основе ключа маршрутизации или свойства headers
сообщения. Если все публикуемые сообщения имели разные ключи маршрутизации или headers
, то распределение будет происходить по весу. Иначе будет использоваться ключ маршрутизации или headers
.
Должен помогать, когда пропускная способность потребителя нуждается в росте более высоком чем решение с несколькими потребителями, использующими одну очередь.
Комбинирование обменников (E2E)
Поведение всех обменников можно комбинировать при помощи связи Exchange-to-Exchange (комбинирование обменников не входит в спецификацию AMQP
. Это расширение протокола со стороны RabbitMQ
).
Графическое представление потока сообщений:
За счет E2E
мы можем найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям.
Создание Exchange
Создание обменника происходит при помощи синхронного RPC
запроса к серверу. Запрос осуществляется при помощи метода Exchange.Declare
, вызываемого с параметрами:
- название обменника
- тип обменника
- другие параметры
Пример создания exchange
при помощи RabbitMQ.Client:
//...
channel.ExchangeDeclare(
exchange: "my_exchange",
type: "direct",
durable: "false",
autoDelete: "false",
arguments: null
);
//...
exchange
— название обменника, который мы хотим создать. Название должно быть уникальнымtype
— тип обменникаdurable
— если установитьtrue
, тоexchange
будет являться постоянным. Он будет храниться на диске и сможет пережить перезапуск сервера/брокера. Если значениеfalse
, тоexchange
является временным и будет удаляться, когда сервер/брокер будет перезагруженautoDelete
— автоматическое удаление.Exchange
будет удален, когда будут удалены все связанные с ним очередиarguments
— необязательные аргументы. Чаще всего, через аргументы задаютalternative exchange
(альтернативный обменник). Если сообщение не может пройти по первоначальному маршруту, ее можно отправить в альтернативный обменник для маршрутизации по другому пути.
Если создание exchange
возможно, то сервер отправит клиенту синхронный RPC
ответ Exchange.DeclareOk
. Если создание невозможно (произошел отказ по запросу Exchange.Declare
), то канал закроется сервером при помощи асинхронной команды Channel.Close
и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.
Обменник должен быть создан перед публикацией сообщений. Если вы опубликуете сообщение в какой-то не существующий обменник — RabbitMQ
тихо удалит его.
Создание Exchange через графический интерфейс
Заходим в панель администратора RabbitMQ
под пользователем guest
(username: guest
и password: guest
). Обратите внимание, что пользователь guest
может подключаться только с локального хоста. Теперь перейдем на вкладку Exchanges
и нажмем на Add a new exchange
. Заполняем свойства:
Большая часть свойств была описана выше. Здесь отметим, что если задать Internal
, то обмен можно будет использовать только для E2E
. Producer
не сможет отправлять сообщения на такой обмен.
Заключение
При разработке системы удобно описывать топологию маршрутизации при помощи графа. Но прежде чем начать строить граф стоит выделить пути с большим трафиком, т.к. именно они требуют более высокую пропускную способность (производительность). Далее можно классифицировать трафик. И уже потом приступить к построению.
Если в построенном графе существует конечное множетсво ключей маршрутизации, то, стоит посмотреть в сторону нескольких fanout exchange
, которые 1:1 связаны с ключом маршрутизации. Помним, что fanout exchange
самый быстрый.
Если число маршрутов стремится к бесконечности, то стоит обратить внимание на topic exchange
или, если шаблон не нужен, то можно выбрать direct exchnge
, т.к. он быстрее topic exchange
.
Комбинации различных exchange
должна помочь найти правильную масштабируемую конфигурацию, которая отвечает как текущим, так и растущим требованиям системы.
Количество exchange
и очередей должно быть минимально по сравнению с количеством маршрутов.
В следующей статье начнем разбираться подробнее с Queues и Bindings.
Code
В данном разделе опишем обменник кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.
public interface IExchange
{
/// <summary>
/// Название обменника, который мы хотим создать.
/// Название должно быть уникальным.
/// </summary>
string Name { get; }
/// <summary>
/// Тип обменника
/// </summary>
string Type { get; }
/// <summary>
/// Если установить true, то exchange будет
/// являться постоянным.
/// Он будет храниться на диске и сможет
/// пережить перезапуск сервера/брокера.
/// Если значение false, то exchange является
/// временным и будет удаляться,
/// когда сервер/брокер будет перезагружен
/// </summary>
bool IsDurable { get; }
/// <summary>
/// Автоматическое удаление. Exchange будет удален,
/// когда будут удалены все связанные с ним очереди
/// </summary>
bool IsAutoDelete { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}
/// <summary>
/// Типы обменников
/// </summary>
public static class ExchangeType
{
public const string Direct = "direct";
public const string Topic = "topic";
public const string Fanout = "fanout";
public const string Header = "headers";
}
/// <summary>
/// Обменник
/// </summary>
public class Exchange : IExchange
{
public Exchange(
string name,
string type = ExchangeType.Direct,
bool durable = true,
bool autoDelete = false,
IDictionary<string, object> arguments = null)
{
Name = name ??
throw new ArgumentNullException(name, $"{name} must not be null");
Type = type;
IsDurable = durable;
IsAutoDelete = autoDelete;
Arguments = arguments ?? new Dictionary<string, object>();
}
public string Name { get; }
public string Type { get; }
public bool IsDurable { get; }
public bool IsAutoDelete { get; }
public IDictionary<string, object> Arguments { get; }
/// <summary>
/// Возвращает Exchange по умолчанию.
/// Exchange по умолчанию - direct exchnge без имени.
/// Если применяется обменник по умолчанию, то сообщение
/// будет маршрутизироваться в очередь
/// с именем равным ключу маршрутизации сообщения.
/// </summary>
public static IExchange GetDefault()
{
return new Exchange("");
}
}