Как стать автором
Обновить

RabbitMQ. Часть 3. Разбираемся с Queues и Bindings

Время на прочтение8 мин
Количество просмотров140K

Queue (очередь) — структура данных на диске или в оперативной памяти, которая хранит ссылки на сообщения и отдает их копии consumers (потребителям). Queue представляет собой Erlang-процесс с состоянием (где могут кэшироваться и сами сообщения). 1 тысяча очередей может занимать порядка 80Mb.


Binding (привязка) — правило, которое сообщает обменнику в какую из очередей должны попадать сообщения.


Оглавление



Временные очереди


Если создание очереди происходит с установленным параметром autoDelete, то такая очередь обретает способность автоматически удалять себя. Такие очереди обычно создаются в момент подключения первого клиента и удаляются в момент, когда все клиенты отсоединились.


Если создание очереди происходит с установленным параметром exclusive, то такая очередь разрешает подключаться только одному потребителю и удаляется если закроется канал. До тех пор пока канал не закроется, клиент может отключаться/подключаться, но только в рамках того же самого соединения. Если параметр exclusive установлен, то параметр autoDelete не имеет никакого эффекта.


Особенности:


  • при кратковременном разрыве связи мы будем терять сообщения, которые ещё не успели дойти до потребителя
  • можно поймать феномен binding churn. Феномен возникает, когда количество операций по созданию/удалению очередей и привязок достигает очень больших значений. В кластерном режиме такой поток операций будет расползаться по всем узлам и создаст большую нагрузку. Данный процесс можно оптимизировать за счет контроля количества подписок

Постоянные очереди


Если создание очереди происходит с установленным параметром durable, то такая очередь сохраняет свое состояние и восстанавливается после перезапуска сервера/брокера. Данная очередь будет существовать до тех пор пока не будет вызвана команду Queue.Delete.


Highly Available очереди


Очереди HA требуют кластерной среды RabbitMQ. В кластерном режиме вся информация об обменниках, очередях, привязках и потребителях будет скопирована на все узлы.


Когда сообщение публикуется в какую-то HA очередь, оно хранится на каждом узле, относящемуся к HA очереди. После того как сообщение потребляется на каком-то из узлов, все копии этого сообщения будут удалены на других узлах.


Очереди HA могут распространяться на все узлы в некотором кластере или только на индивидуальные.


rabbitmq_26


Особенности:


  • использование HA очередей приводит к наказаниям в производительности. При помещении сообщения в некую HA очередь или при потреблении сообщения из HA очереди RabbitMQ должен выполнять координацию по всем узлам (2-3 узла обычно достаточно)

Создание очереди


Создание очереди происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Declare, вызываемого с параметрами:


  • название очереди
  • другие параметры

Пример создания очереди при помощи RabbitMQ.Client:


// ...
channel.QueueDeclare(
    queue: "my_queue",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null
);
// ...

  • queue — название очереди, которую мы хотим создать. Название должно быть уникальным и не может совпадать с системным именем очереди
  • durable — если true, то очередь будет сохранять свое состояние и восстанавливается после перезапуска сервера/брокера
  • exclusive — если true, то очередь будет разрешать подключаться только одному потребителю
  • autoDelete — если true, то очередь обретает способность автоматически удалять себя
  • arguments — необязательные аргументы. Ниже разберем подробнее.

arguments

  • x-message-ttl(x-message-time-to-live) — позволяет установить время истечения срока жизни сообщения в миллисекундах. Если создание очереди происходит с установленным значением аргумента x-message-ttl, то такая очередь будет автоматически исключать сообщения, у которых истек срок действия. Установка значения аргумента x-message-ttl задает максимальный возраст для всех сообщений в данной очереди. Создание такой очереди позволяет предотвратить получение устаревшей информации. Это можно использовать в системах реального времени. Если у очереди для которой задан обменник для отклоненных сообщений установить значение аргумента x-message-ttl, то отклоненные сообщения в данной очереди начнут обладать сроком жизни.
  • x-expires — задает значение в миллисекундах по истечению которого происходит удаление очереди. Очередь может израсходовать срок своего действия только если она не имеет никаких подписчиков. Если к очереди подключены подписчики, она сможет автоматически удалиться только тогда, когда все подписчики вызовут Basic.Cancel или отсоединятся. Срок жизни очереди может завершиться только в том случае, если к ней не было запроса Basic.Get. Иначе текущее значение установки времени жизни обнуляется, и очередь больше не будет автоматически удаляться. Также нет гарантий того, насколько быстро происходит удаление очереди после истечения её срока жизни.
  • x-max-length — задает максимальное число сообщений в очереди. Если число сообщений в очереди начинает превышать макимальное чило, то начинают удаляться самые старые

rabbitmq_25


  • x-max-lenght-bytes — задает максимально допустимый суммарный размер полезной нагрузки сообщений в очереди. При превышении установленного значения (возникло переполнение очереди при очередной публикации сообщения) самые старые сообщения начнут удаляться
  • x-overflow — данный аргумент используется для настройки поведения в результате переполнения очереди. Доступны два значения: drop-head (значение по умолчанию) и reject-publish. Если выбрать drop-head, то самые старые сообщения будут удаляться. Если выбрать reject-publish, то прием сообщений будет приостановлен
  • x-dead-letter-exchange — задает exchange, в который направляются отвергнутые сообщения, которые не поставлены повторно в очередь
  • x-dead-letter-routing-key — задает не обязательный ключ маршрутизации для отвергнутых сообщений
  • x-max-priority — разрешает сортировку по приоритетам в очереди с максимальным значением приоритета 255 (RabbitMQ версий 3.5.0 и выше). Число указывает максимальный приоритет, который будет поддерживать очередь. Если аргумент не установлен, очередь не будет поддерживать приоритет сообщений
  • x-queue-mode — позволяет перевести очередь в ленивый режим. В таком режиме как можно больше сообщений будет храниться на диске. Использование оперативной памяти будет минимально. В случае, если он не установлен, очередь будет хранить сообщения в памяти, чтобы доставлять сообщения максимально быстро
  • x-queue-master-locator — если у нас кластер, то можно задать мастер очередь
  • x-ha-policy — используется при создании HA очередей и определяет как сообщение будет распространяться по узлам. Если установлено значение all, то сообщение будет сохраняться на всех узлах. Если установлено значение nodes, то сообщение будет сохраняться на определенных узлах кластера
  • x-ha-nodes — задает узлы, к которым будет относиться некая очередь HA

rabbitmq_10


Если создание очереди возможно, то сервер отправит клиенту синхронный RPC ответ Queue.DeclareOk. Если создание очереди невозможно (произошел отказ по запросу Queue.Declare), то канал закроется сервером при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.


Повторный вызов Queue.Declare с аналогичными параметрами вернет полезную информацию об этой очереди. Например, общее число сообщений, находящихся в ожидании в данной очереди, и общее число подписанных на неё потребителей.


Вызов Queue.Declare под учетными данными пользователя, которому не назначены необходимые права закроет канал при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки 403 и ее описание.


После того, как очередь простаивает в течении >= 10 секунд, она впадает в спящий режим, вызывая GC в очереди, что приводит к значительному сокращению памяти, необходимой для этой очереди.


Создание Queue через графический интерфейс


Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и нажмем на Add a new queue. Заполняем свойства:


rabbitmq_21


Как только мы введем все необходимые данные и нажмете на Add queues, очередь появится в общем списке.


rabbitmq_22


Щелчок по имени очереди покажет ее детальную информацию. Здесь можно настроить привязку между обменом и очередью, посмотреть список consumers, опубликовать/получить сообщения, удалить очередь и посмотреть статистику.


Создание Binding


Создание привязки происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Bind, вызываемого с параметрами:


  • название очереди
  • название точки обмена
  • другие параметры

Пример создания привязки при помощи RabbitMQ.Client:


//...
channel.QueueBind(
    queue: queueName,
    exchange: "my_exchange",
    routingKey: "my_key",
    arguments: null
);
//...

  • queue — имя очереди
  • exchange — имя обменника
  • routingKey — ключ маршрутизации
  • arguments — необязательные аргументы

rabbitmq_11


Если создание привязки возможно, то сервер отправит клиенту синхронный RPC ответ Queue.BindOk.


Создание Binding через графический интерфейс


Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и жмем на очередь my_queue. Заполняем поля раздела bindings:


rabbitmq_23


Как только мы введем все необходимые данные и нажмем на Bind, привязка отобразится в общем списке:


rabbitmq_24


Code


В данном разделе опишем очередь и привязку кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.


public interface IQueue
    {        
        string Name { get; }

        /// <summary>
        ///     Если установить true, то queue будет являться постоянным. 
        ///     Она будет храниться на диске и сможет 
        ///     пережить перезапуск сервера/брокера. 
        ///     Если значение false, то queue является временной и будет удаляться, 
        ///     когда сервер/брокер будет перезагружен
        /// </summary>
        bool IsDurable { get; }

        /// <summary>
        ///     Если значение равно true, то 
        ///     такая очередь будет разрешать подключаться 
        ///     только одному consumer-у
        /// </summary>
        bool IsExclusive { get; }

        /// <summary>
        ///     Автоматическое удаление. 
        ///     Очередь будет удалена, когда все клиенты отсоединятся.
        /// </summary>
        bool IsAutoDelete { get; }

        /// <summary>
        ///     Необязательные аргументы
        /// </summary>
        IDictionary<string, object> Arguments { get; }
    }

public class Queue : IQueue
    {
        public Queue(
             string name, 
             bool isDurable = true, 
             bool isExclusive = false, 
             bool isAutoDelete = false, 
             IDictionary<string, object> arguments = null)
        {
            Name = name ??
                throw new ArgumentNullException(name, $"{name} must not be null");

            IsDurable = isDurable;
            IsExclusive = isExclusive;
            IsAutoDelete = isAutoDelete;
            Arguments = arguments ?? new Dictionary<string, object>();
        }

        public string Name { get; }
        public bool IsDurable { get; }
        public bool IsExclusive { get; }
        public bool IsAutoDelete { get; }
        public IDictionary<string, object> Arguments { get; }
    }

public static class QueueMode
    {       
        public const string Default = "default";
        /// <summary>
        ///     Ленивый режим. Ленивый режим заставит сохранять 
        ///     как можно больше сообщений на диске, чтобы уменьшить 
        ///     использование оперативной памяти
        /// </summary>
        public const string Lazy = "lazy";
    }

public interface IBinding
    {
        /// <summary>
        ///     Обменник, который будет связываться привязкой
        /// </summary>
        IExchange Exchange { get; }

        /// <summary>
        ///     Ключ маршрутизации
        /// </summary>
        string RoutingKey { get; }

        /// <summary>
        ///     Необязательные аргументы
        /// </summary>
        IDictionary<string, object> Arguments { get; }
    }

public class Binding : IBinding
    {
        public Binding(
             IExchange exchange, 
             string routingKey, 
             IDictionary<string, object> arguments)
        {
            Exchange = exchange;
            RoutingKey = routingKey;
            Arguments = arguments;
        }

        public IExchange Exchange { get; }
        public string RoutingKey { get; }
        public IDictionary<string, object> Arguments { get; }
    }
Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
Всего голосов 8: ↑8 и ↓0+8
Комментарии9

Публикации

Истории

Работа

Ближайшие события