Queue
(очередь) — структура данных на диске или в оперативной памяти, которая хранит ссылки на сообщения и отдает их копии consumers
(потребителям). Queue
представляет собой Erlang-процесс с состоянием (где могут кэшироваться и сами сообщения). 1 тысяча очередей может занимать порядка 80Mb.
Binding
(привязка) — правило, которое сообщает обменнику в какую из очередей должны попадать сообщения.
Оглавление
- RabbitMQ. Часть 1. Introduction. Erlang, AMQP и RPC
- RabbitMQ. Часть 2. Разбираемся с Exchanges
- RabbitMQ. Часть 3. Разбираемся с Queues и Bindings
- RabbitMQ. Часть 4. Подробно про Connection и Chanel
- RabbitMQ. Часть 5. Разбираемся с тем, что-такое сообщения и фреймы
- RabbitMQ. Часть 6. Производительность публикации и потребления сообщений
- RabbitMQ. Часть 7. Обзор модулей Federation и Shovel
- RabbitMQ. Часть 8. RabbitMQ в .NET
- RabbitMQ. Часть 9. Мониторинг
Временные очереди
Если создание очереди происходит с установленным параметром autoDelete
, то такая очередь обретает способность автоматически удалять себя. Такие очереди обычно создаются в момент подключения первого клиента и удаляются в момент, когда все клиенты отсоединились.
Если создание очереди происходит с установленным параметром exclusive
, то такая очередь разрешает подключаться только одному потребителю и удаляется если закроется канал. До тех пор пока канал не закроется, клиент может отключаться/подключаться, но только в рамках того же самого соединения. Если параметр exclusive
установлен, то параметр autoDelete
не имеет никакого эффекта.
Особенности:
- при кратковременном разрыве связи мы будем терять сообщения, которые ещё не успели дойти до потребителя
- можно поймать феномен
binding churn
. Феномен возникает, когда количество операций по созданию/удалению очередей и привязок достигает очень больших значений. В кластерном режиме такой поток операций будет расползаться по всем узлам и создаст большую нагрузку. Данный процесс можно оптимизировать за счет контроля количества подписок
Постоянные очереди
Если создание очереди происходит с установленным параметром durable
, то такая очередь сохраняет свое состояние и восстанавливается после перезапуска сервера/брокера. Данная очередь будет существовать до тех пор пока не будет вызвана команду Queue.Delete
.
Highly Available очереди
Очереди HA требуют кластерной среды RabbitMQ. В кластерном режиме вся информация об обменниках, очередях, привязках и потребителях будет скопирована на все узлы.
Когда сообщение публикуется в какую-то HA очередь, оно хранится на каждом узле, относящемуся к HA очереди. После того как сообщение потребляется на каком-то из узлов, все копии этого сообщения будут удалены на других узлах.
Очереди HA могут распространяться на все узлы в некотором кластере или только на индивидуальные.
Особенности:
- использование HA очередей приводит к наказаниям в производительности. При помещении сообщения в некую HA очередь или при потреблении сообщения из HA очереди RabbitMQ должен выполнять координацию по всем узлам (2-3 узла обычно достаточно)
Создание очереди
Создание очереди происходит при помощи синхронного RPC
запроса к серверу. Запрос осуществляется при помощи метода Queue.Declare
, вызываемого с параметрами:
- название очереди
- другие параметры
Пример создания очереди при помощи RabbitMQ.Client:
// ...
channel.QueueDeclare(
queue: "my_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// ...
queue
— название очереди, которую мы хотим создать. Название должно быть уникальным и не может совпадать с системным именем очередиdurable
— если true, то очередь будет сохранять свое состояние и восстанавливается после перезапуска сервера/брокераexclusive
— если true, то очередь будет разрешать подключаться только одному потребителюautoDelete
— если true, то очередь обретает способность автоматически удалять себяarguments
— необязательные аргументы. Ниже разберем подробнее.
arguments
x-message-ttl
(x-message-time-to-live
) — позволяет установить время истечения срока жизни сообщения в миллисекундах. Если создание очереди происходит с установленным значением аргументаx-message-ttl
, то такая очередь будет автоматически исключать сообщения, у которых истек срок действия. Установка значения аргументаx-message-ttl
задает максимальный возраст для всех сообщений в данной очереди. Создание такой очереди позволяет предотвратить получение устаревшей информации. Это можно использовать в системах реального времени. Если у очереди для которой задан обменник для отклоненных сообщений установить значение аргументаx-message-ttl
, то отклоненные сообщения в данной очереди начнут обладать сроком жизни.x-expires
— задает значение в миллисекундах по истечению которого происходит удаление очереди. Очередь может израсходовать срок своего действия только если она не имеет никаких подписчиков. Если к очереди подключены подписчики, она сможет автоматически удалиться только тогда, когда все подписчики вызовутBasic.Cancel
или отсоединятся. Срок жизни очереди может завершиться только в том случае, если к ней не было запросаBasic.Get
. Иначе текущее значение установки времени жизни обнуляется, и очередь больше не будет автоматически удаляться. Также нет гарантий того, насколько быстро происходит удаление очереди после истечения её срока жизни.x-max-length
— задает максимальное число сообщений в очереди. Если число сообщений в очереди начинает превышать макимальное чило, то начинают удаляться самые старые
x-max-lenght-bytes
— задает максимально допустимый суммарный размер полезной нагрузки сообщений в очереди. При превышении установленного значения (возникло переполнение очереди при очередной публикации сообщения) самые старые сообщения начнут удалятьсяx-overflow
— данный аргумент используется для настройки поведения в результате переполнения очереди. Доступны два значения:drop-head
(значение по умолчанию) иreject-publish
. Если выбратьdrop-head
, то самые старые сообщения будут удаляться. Если выбратьreject-publish
, то прием сообщений будет приостановленx-dead-letter-exchange
— задает exchange, в который направляются отвергнутые сообщения, которые не поставлены повторно в очередьx-dead-letter-routing-key
— задает не обязательный ключ маршрутизации для отвергнутых сообщенийx-max-priority
— разрешает сортировку по приоритетам в очереди с максимальным значением приоритета 255 (RabbitMQ версий 3.5.0 и выше). Число указывает максимальный приоритет, который будет поддерживать очередь. Если аргумент не установлен, очередь не будет поддерживать приоритет сообщенийx-queue-mode
— позволяет перевести очередь в ленивый режим. В таком режиме как можно больше сообщений будет храниться на диске. Использование оперативной памяти будет минимально. В случае, если он не установлен, очередь будет хранить сообщения в памяти, чтобы доставлять сообщения максимально быстроx-queue-master-locator
— если у нас кластер, то можно задать мастер очередьx-ha-policy
— используется при создании HA очередей и определяет как сообщение будет распространяться по узлам. Если установлено значениеall
, то сообщение будет сохраняться на всех узлах. Если установлено значениеnodes
, то сообщение будет сохраняться на определенных узлах кластераx-ha-nodes
— задает узлы, к которым будет относиться некая очередьHA
Если создание очереди возможно, то сервер отправит клиенту синхронный RPC
ответ Queue.DeclareOk
. Если создание очереди невозможно (произошел отказ по запросу Queue.Declare
), то канал закроется сервером при помощи команды Channel.Close
и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.
Повторный вызов Queue.Declare
с аналогичными параметрами вернет полезную информацию об этой очереди. Например, общее число сообщений, находящихся в ожидании в данной очереди, и общее число подписанных на неё потребителей.
Вызов Queue.Declare
под учетными данными пользователя, которому не назначены необходимые права закроет канал при помощи команды Channel.Close
и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки 403 и ее описание.
После того, как очередь простаивает в течении >= 10 секунд, она впадает в спящий режим, вызывая GC в очереди, что приводит к значительному сокращению памяти, необходимой для этой очереди.
Создание Queue через графический интерфейс
Заходим в панель администратора RabbitMQ
под пользователем guest
(username: guest
и password: guest
). Обратите внимание, что пользователь guest
может подключаться только с локального хоста. Теперь перейдем на вкладку Queues
и нажмем на Add a new queue
. Заполняем свойства:
Как только мы введем все необходимые данные и нажмете на Add queues
, очередь появится в общем списке.
Щелчок по имени очереди покажет ее детальную информацию. Здесь можно настроить привязку между обменом и очередью, посмотреть список consumers
, опубликовать/получить сообщения, удалить очередь и посмотреть статистику.
Создание Binding
Создание привязки происходит при помощи синхронного RPC
запроса к серверу. Запрос осуществляется при помощи метода Queue.Bind
, вызываемого с параметрами:
- название очереди
- название точки обмена
- другие параметры
Пример создания привязки при помощи RabbitMQ.Client:
//...
channel.QueueBind(
queue: queueName,
exchange: "my_exchange",
routingKey: "my_key",
arguments: null
);
//...
queue
— имя очередиexchange
— имя обменникаroutingKey
— ключ маршрутизацииarguments
— необязательные аргументы
Если создание привязки возможно, то сервер отправит клиенту синхронный RPC
ответ Queue.BindOk
.
Создание Binding через графический интерфейс
Заходим в панель администратора RabbitMQ
под пользователем guest
(username: guest
и password: guest
). Обратите внимание, что пользователь guest
может подключаться только с локального хоста. Теперь перейдем на вкладку Queues
и жмем на очередь my_queue
. Заполняем поля раздела bindings
:
Как только мы введем все необходимые данные и нажмем на Bind
, привязка отобразится в общем списке:
Code
В данном разделе опишем очередь и привязку кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.
public interface IQueue
{
string Name { get; }
/// <summary>
/// Если установить true, то queue будет являться постоянным.
/// Она будет храниться на диске и сможет
/// пережить перезапуск сервера/брокера.
/// Если значение false, то queue является временной и будет удаляться,
/// когда сервер/брокер будет перезагружен
/// </summary>
bool IsDurable { get; }
/// <summary>
/// Если значение равно true, то
/// такая очередь будет разрешать подключаться
/// только одному consumer-у
/// </summary>
bool IsExclusive { get; }
/// <summary>
/// Автоматическое удаление.
/// Очередь будет удалена, когда все клиенты отсоединятся.
/// </summary>
bool IsAutoDelete { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}
public class Queue : IQueue
{
public Queue(
string name,
bool isDurable = true,
bool isExclusive = false,
bool isAutoDelete = false,
IDictionary<string, object> arguments = null)
{
Name = name ??
throw new ArgumentNullException(name, $"{name} must not be null");
IsDurable = isDurable;
IsExclusive = isExclusive;
IsAutoDelete = isAutoDelete;
Arguments = arguments ?? new Dictionary<string, object>();
}
public string Name { get; }
public bool IsDurable { get; }
public bool IsExclusive { get; }
public bool IsAutoDelete { get; }
public IDictionary<string, object> Arguments { get; }
}
public static class QueueMode
{
public const string Default = "default";
/// <summary>
/// Ленивый режим. Ленивый режим заставит сохранять
/// как можно больше сообщений на диске, чтобы уменьшить
/// использование оперативной памяти
/// </summary>
public const string Lazy = "lazy";
}
public interface IBinding
{
/// <summary>
/// Обменник, который будет связываться привязкой
/// </summary>
IExchange Exchange { get; }
/// <summary>
/// Ключ маршрутизации
/// </summary>
string RoutingKey { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}
public class Binding : IBinding
{
public Binding(
IExchange exchange,
string routingKey,
IDictionary<string, object> arguments)
{
Exchange = exchange;
RoutingKey = routingKey;
Arguments = arguments;
}
public IExchange Exchange { get; }
public string RoutingKey { get; }
public IDictionary<string, object> Arguments { get; }
}