Pull to refresh

Библиотека amqpcpp. Часть 2 — Очереди

Reading time6 min
Views3.1K
В статье «Lib amqpcpp wrapper for librabbitmq» был обзор публикации сообщений по протоколу AMQP. Данная статья является ее продолжением, в которой ниже описывается как использовать Очереди.

Если Обмен предназначен для публикации сообщений, то Очередь предназначена для приема сообщений. Между Обменом и Очередью устанавливается Связь (Bind или я часто перевожу Привязка) через ключ Маршрутизации (routing_key).
Очередь, аналогично Обмену — надо объявлять:
    AMQP amqp;
    auto_ptr< AMQPQueue> qu (amqp.createQueue("q2"));
    qu->Declare();
// or
    auto_ptr< AMQPQueue> qu2 (amqp.createQueue());
    qu2->Declare("q2");

Очереди могут быть:
  • Самоудаляемые (AMQP_AUTODELETE), т.е. удаляются если они пустые и не используются (нет клиентских соединений)
  • Сохраняемые (AMQP_DURABLE) т.е. при перезапуске брокера Очереди сохраняют данные
  • Эксклюзисвные (AMQP_EXCLUSIVE) т.е. рассчитанные только на одно соединение
  • Пассивные (AMQP_PASSIVE) т.е. инициация идет со стороны клиента

Если ни один параметр не задан, то по умолчанию Очередь объявляется как самоудаляемая (AMQP_AUTODELETE). Ниже в примере Очередь объявлена как самоудаляемая и сохраняемая.
    AMQP amqp;
    auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
    qu3->Declare("q2", AMQP_AUTODELETE|AMQP_DURABLE); // durable and autodelete mode
Очередь можно:
  • Уладить AMQPQueue::Delete();
  • Отчистить AMQPQueue::Purge();
  • Сбросить подписку AMQPQueue::Cancel();

Очередь можно привязать к Обмену (Bind) или отвязать (unBind). Привязка осуществляется через ключ маршрутизации. Ключ может быть простой или составной. Для составных ключей используются паттерны. Например, мы подписываемся на все новости, которые публикуются в Обмене realty. Тогда ключ может быть "*.news" или мы подписываемся на все сообщения, которые относятся к Санкт-Петербургу: «spb.*». Как уже ранее в части 1 упоминалось, Обмены могут быть трех типов: direct, topic, fanout. Для Обменов типа topic могут использоваться паттрены, для обменов типа direct — используются только простые ключи, а тип fanout вообще не использует ключей, по этому значение ключа указывается чисто формально, задается пустая строка.
    AMQP amqp;
    auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
// очередь и обмен были объявлены ранее
    qu3->Bind("ex""news"); // durable and autodelete mode
Если Очередь привязана к обмену и в Обмене публикуются сообщения с ключом привязки, то эти сообщения перенаправляются в соответствующую првязки Очередь. Читать из Очереди Сообщения можно двумя способами:
  • асинронно, через метод AMQPQueu::Get();
  • синхронно, через метод AMQPQueu::Consume();
Метод AMQPQueu::Get() читает из Очереди одно Сообщение. При чтении сообщения, в заголовочном фрейме передается информация — сколько еще сообщений осталось в Очереди. Ниже на примере это пояснено.
 
    AMQP amqp;
    auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
 
    while (  1 ) {
        qu2->Get();
        auto_ptr<AMQPMessage> m (qu2->getMessage());
 
        cout << "count: "<<  m->getMessageCount() << endl;  
        if (m->getMessageCount() > -1) {
cout << "message\n"<< m->getMessage() << "\nmessage key: "<<  m->getRoutingKey() << endl;
cout << "exchange: "<<  m->getExchange() << endl;
        } else 
            break;
    }
 
У метода AMQPQueu::Get() может быть параметр AMQP_NOACK, который «говорит» брокеру, что данное сообщение не помечать как «прочитанное». Совместно с параметром AMQP_NOACK, используется метод AMQPQueu::Ack(), который подтверждает, что сообщение доставлено. Вся информация о сообщении инкапсулируется в объекте данных AMQPMessage. У Объекта «Сообщение» есть методы для доступа полей, названия говорят сами за себя: getMessage(), getExchange(), getRoutingKey(), get MessageCount(). Следует заострить внимание на методах getConsumerTag() и getDeliveryTag().

Таг подписчика (consumer_tag) — это индивидуальная неповторимая строка, назначается либо при публикации, либо автоматически назначается брокером, что-то типа имя сессии. Отказаться от подписки можно, послав команду Cancel с передачей в параметре данных consumer_tag, например AMQPQueu::Cancel( m->getConsumerTag() ).

Таг доставки — это числовое значение, равное счетчику доставленных сообщений данной сессии, для первого сообщения таг доставки равен — 1, для второго — 2, для третьего — 3 и тд. Для подтверждения приема сообщения, необходимо вызвать метод AMQPQueu::Ack(delivery_tag), где переменная delivery_tag имеет значение тага доставки.
    AMQP amqp;
    auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
 
    while (  1 ) {
        qu2->Get(AMQP_NOACK);
        auto_ptr<AMQPMessage> m (qu2->getMessage());
 
        if (m->getMessageCount() > -1) {
           qu2->Ack(m->getDeliveryTag());
        } else 
            break;
    }
 
В отличии от метода AMQPQueue::Get, метод AMQPQueue::Consume имеет синхронную схему приема, по этому тут используется событийная модель. Перед использованием метода Подписка (Соnsume) необходимо добавить событие AMQP_MESSAGE. Обработчик события — это функция, входным параметром которой является Данные сообщения. А выход, булевое значение: прекращать/нет прием данных. Более понятно на примере:
    AMQP amqp;
    auto_ptr< AMQPQueue> qu (amqp.createQueue("q2"));
    qu->Declare();
// or
    auto_ptr< AMQPQueue> qu2 (amqp.createQueue());
    qu2->Declare("q2");
static int  i=0;
 
int  onMessage( AMQPMessage * message  ) {
     char * data = message->getMessage();
     if (data)
          cout << data << endl;
     i++;
 
     cout << "#" << i << " tag="<< message->getDeliveryTag() <<endl;
     if (i>5)
          return 1;
     return 0;
};
что-то нолик не пропечатался ;)

Что сделать
Данная библиотека не претендует на законченность, конечно хочется развить событийную логику, добавить больше событий, например, onCancel, onSignal, onTimer.
Думаю сделать мультитредовость для подписки. Возможно надо будет переписать сетевую часть librabbitmq, сделать все на неблокируемых соскетах.
Ссылки по теме

AMQP по-русски
RabbitMQ: Введение в AMQP
AMQP. Отладка приложений
PHP-AMQP Что нового у Друзей?
AMQP-PHP чат
Вместо заключения
Пока статус бета, работает относительно стабильно. Лицензия MPL. Желающим помочь проекту всегда рады.
Баги прошу отписывать в трекере
Идеи по дальнейшему развитию можно обсудить здесь или в рассылке

PS. за русский прошу сильно не бить ногами, всегда был с ним не в ладах.
ошибки поправлю, пишите в личку.
Tags:
Hubs:
Total votes 5: ↑5 and ↓0+5
Comments2

Articles