В статье «Lib amqpcpp wrapper for librabbitmq» был обзор публикации сообщений по протоколу AMQP. Данная статья является ее продолжением, в которой ниже описывается как использовать Очереди.
Если Обмен предназначен для публикации сообщений, то Очередь предназначена для приема сообщений. Между Обменом и Очередью устанавливается Связь (Bind или я часто перевожу Привязка) через ключ Маршрутизации (routing_key).
Очередь, аналогично Обмену — надо объявлять:
Очереди могут быть:
Если ни один параметр не задан, то по умолчанию Очередь объявляется как самоудаляемая (AMQP_AUTODELETE). Ниже в примере Очередь объявлена как самоудаляемая и сохраняемая.
Очередь можно привязать к Обмену (Bind) или отвязать (unBind). Привязка осуществляется через ключ маршрутизации. Ключ может быть простой или составной. Для составных ключей используются паттерны. Например, мы подписываемся на все новости, которые публикуются в Обмене realty. Тогда ключ может быть "*.news" или мы подписываемся на все сообщения, которые относятся к Санкт-Петербургу: «spb.*». Как уже ранее в части 1 упоминалось, Обмены могут быть трех типов: direct, topic, fanout. Для Обменов типа topic могут использоваться паттрены, для обменов типа direct — используются только простые ключи, а тип fanout вообще не использует ключей, по этому значение ключа указывается чисто формально, задается пустая строка.
Таг подписчика (consumer_tag) — это индивидуальная неповторимая строка, назначается либо при публикации, либо автоматически назначается брокером, что-то типа имя сессии. Отказаться от подписки можно, послав команду Cancel с передачей в параметре данных consumer_tag, например AMQPQueu::Cancel( m->getConsumerTag() ).
Таг доставки — это числовое значение, равное счетчику доставленных сообщений данной сессии, для первого сообщения таг доставки равен — 1, для второго — 2, для третьего — 3 и тд. Для подтверждения приема сообщения, необходимо вызвать метод AMQPQueu::Ack(delivery_tag), где переменная delivery_tag имеет значение тага доставки.
Думаю сделать мультитредовость для подписки. Возможно надо будет переписать сетевую часть librabbitmq, сделать все на неблокируемых соскетах.
AMQP по-русски
RabbitMQ: Введение в AMQP
AMQP. Отладка приложений
PHP-AMQP Что нового у Друзей?
AMQP-PHP чат
Баги прошу отписывать в трекере
Идеи по дальнейшему развитию можно обсудить здесь или в рассылке
PS. за русский прошу сильно не бить ногами, всегда был с ним не в ладах.
ошибки поправлю, пишите в личку.
Если Обмен предназначен для публикации сообщений, то Очередь предназначена для приема сообщений. Между Обменом и Очередью устанавливается Связь (Bind или я часто перевожу Привязка) через ключ Маршрутизации (routing_key).
Очередь, аналогично Обмену — надо объявлять:
AMQP amqp;
auto_ptr< AMQPQueue> qu (amqp.createQueue("q2"));
qu->Declare();
// or
auto_ptr< AMQPQueue> qu2 (amqp.createQueue());
qu2->Declare("q2");
Очереди могут быть:
- Самоудаляемые (AMQP_AUTODELETE), т.е. удаляются если они пустые и не используются (нет клиентских соединений)
- Сохраняемые (AMQP_DURABLE) т.е. при перезапуске брокера Очереди сохраняют данные
- Эксклюзисвные (AMQP_EXCLUSIVE) т.е. рассчитанные только на одно соединение
- Пассивные (AMQP_PASSIVE) т.е. инициация идет со стороны клиента
Если ни один параметр не задан, то по умолчанию Очередь объявляется как самоудаляемая (AMQP_AUTODELETE). Ниже в примере Очередь объявлена как самоудаляемая и сохраняемая.
AMQP amqp;Очередь можно:
auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
qu3->Declare("q2", AMQP_AUTODELETE|AMQP_DURABLE); // durable and autodelete mode
- Уладить AMQPQueue::Delete();
- Отчистить AMQPQueue::Purge();
- Сбросить подписку AMQPQueue::Cancel();
Очередь можно привязать к Обмену (Bind) или отвязать (unBind). Привязка осуществляется через ключ маршрутизации. Ключ может быть простой или составной. Для составных ключей используются паттерны. Например, мы подписываемся на все новости, которые публикуются в Обмене realty. Тогда ключ может быть "*.news" или мы подписываемся на все сообщения, которые относятся к Санкт-Петербургу: «spb.*». Как уже ранее в части 1 упоминалось, Обмены могут быть трех типов: direct, topic, fanout. Для Обменов типа topic могут использоваться паттрены, для обменов типа direct — используются только простые ключи, а тип fanout вообще не использует ключей, по этому значение ключа указывается чисто формально, задается пустая строка.
AMQP amqp;Если Очередь привязана к обмену и в Обмене публикуются сообщения с ключом привязки, то эти сообщения перенаправляются в соответствующую првязки Очередь. Читать из Очереди Сообщения можно двумя способами:
auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
// очередь и обмен были объявлены ранее
qu3->Bind("ex", "news"); // durable and autodelete mode
- асинронно, через метод AMQPQueu::Get();
- синхронно, через метод AMQPQueu::Consume();
AMQP amqp;У метода AMQPQueu::Get() может быть параметр AMQP_NOACK, который «говорит» брокеру, что данное сообщение не помечать как «прочитанное». Совместно с параметром AMQP_NOACK, используется метод AMQPQueu::Ack(), который подтверждает, что сообщение доставлено. Вся информация о сообщении инкапсулируется в объекте данных AMQPMessage. У Объекта «Сообщение» есть методы для доступа полей, названия говорят сами за себя: getMessage(), getExchange(), getRoutingKey(), get MessageCount(). Следует заострить внимание на методах getConsumerTag() и getDeliveryTag().
auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
while ( 1 ) {
qu2->Get();
auto_ptr<AMQPMessage> m (qu2->getMessage());
cout << "count: "<< m->getMessageCount() << endl;
if (m->getMessageCount() > -1) {
cout << "message\n"<< m->getMessage() << "\nmessage key: "<< m->getRoutingKey() << endl;
cout << "exchange: "<< m->getExchange() << endl;
} else
break;
}
Таг подписчика (consumer_tag) — это индивидуальная неповторимая строка, назначается либо при публикации, либо автоматически назначается брокером, что-то типа имя сессии. Отказаться от подписки можно, послав команду Cancel с передачей в параметре данных consumer_tag, например AMQPQueu::Cancel( m->getConsumerTag() ).
Таг доставки — это числовое значение, равное счетчику доставленных сообщений данной сессии, для первого сообщения таг доставки равен — 1, для второго — 2, для третьего — 3 и тд. Для подтверждения приема сообщения, необходимо вызвать метод AMQPQueu::Ack(delivery_tag), где переменная delivery_tag имеет значение тага доставки.
AMQP amqp;В отличии от метода AMQPQueue::Get, метод AMQPQueue::Consume имеет синхронную схему приема, по этому тут используется событийная модель. Перед использованием метода Подписка (Соnsume) необходимо добавить событие AMQP_MESSAGE. Обработчик события — это функция, входным параметром которой является Данные сообщения. А выход, булевое значение: прекращать/нет прием данных. Более понятно на примере:
auto_ptr< AMQPQueue> qu3 (amqp.createQueue());
while ( 1 ) {
qu2->Get(AMQP_NOACK);
auto_ptr<AMQPMessage> m (qu2->getMessage());
if (m->getMessageCount() > -1) {
qu2->Ack(m->getDeliveryTag());
} else
break;
}
AMQP amqp;
auto_ptr< AMQPQueue> qu (amqp.createQueue("q2"));
qu->Declare();
// or
auto_ptr< AMQPQueue> qu2 (amqp.createQueue());
qu2->Declare("q2");
static int i=0;
int onMessage( AMQPMessage * message ) {
char * data = message->getMessage();
if (data)
cout << data << endl;
i++;
cout << "#" << i << " tag="<< message->getDeliveryTag() <<endl;
if (i>5)
return 1;
return 0;
};
что-то нолик не пропечатался ;)
Что сделать
Данная библиотека не претендует на законченность, конечно хочется развить событийную логику, добавить больше событий, например, onCancel, onSignal, onTimer.Думаю сделать мультитредовость для подписки. Возможно надо будет переписать сетевую часть librabbitmq, сделать все на неблокируемых соскетах.
Ссылки по теме
AMQP по-русски
RabbitMQ: Введение в AMQP
AMQP. Отладка приложений
PHP-AMQP Что нового у Друзей?
AMQP-PHP чат
Вместо заключения
Пока статус бета, работает относительно стабильно. Лицензия MPL. Желающим помочь проекту всегда рады.Баги прошу отписывать в трекере
Идеи по дальнейшему развитию можно обсудить здесь или в рассылке
PS. за русский прошу сильно не бить ногами, всегда был с ним не в ладах.
ошибки поправлю, пишите в личку.