Как стать автором
Обновить

PHP-AMQP версия 2

Разработка веб-сайтов *
В статье Новые идеи по АПИ RabbitMQ AMQP для PHP был опубликован набросок по PHP-AMQP API

В продолжении ранее опубликованных идей представляю их реализацию, которая более ООПешнее первой версии.

код расширения можно найти здесь описание проекта и svn пока старые версии (1.0), переводится на анг язык

краткое описание версии 2.0:

Класс AMQPConnection — открытие логического соединения, включая канальное соединение.


Конструктор:

APMQConection::APMQConection([array params])

Параметры (все необязательные):
  • host=[localhost]
  • port=[5672]
  • login=[guest]
  • psw=[guest]
  • vhost=[/]

Исключение — нет логическое или физического соединения.

Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1', 'vhost'=>'v1' ) );

Класс Обмена AMQPExchange


конструктор:

создание обмена, если задано имя иначе инициализация класса
AMQPExchange::AMQPExchange(APMQConection cnn, string [name])
name — имя обмена

Пример:

$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');


Объявление обмена:

proto bool AMQPEexchange::declare( [string name], [string type=direct], [ bit params ] );

name — имя обмена
type — тип обмена, разрешены типы: direct, topic & fanout
params — параметры:
  • AMQP_PASSIVE
  • AMQP_DURABLE
  • AMQP_AUTODELETE
  • AMQP_INTERNAL

возвращает результат выполнения операции

Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn);
$exchange->declare('ex_name', 'topic',AMQP_DURABLE );


Удаление обмена:

proto bool AMQPExchange::delete( [string name], [ bit params ] );

name — имя обмена
params — параметры:
  • AMQP_IFUNUSED

возвращает результат выполнения операции

Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->delete();
// если имя не задано - удаляется с текущим именем, объявленного в конструкторе класса.


Привязка обмена к очереди:

proto bool AMQPExchange::bind( string queueName, string routingKey );

queueName — имя очереди
key — routing-key, маршрутный ключ, строка

возвращает результат выполнения операции

Пример:
$msg = "моя новость, раздел СПб...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$exchange->bind('mylogin','spb.news')
$res = $exchange->publish( $msg, 'spb.news');


Публикация:


proto bool AMQPExchange::publish( string msg, [string key] ,bit [parms] );

Публикация сообщения с ключем key для типа обмена topic или direct

msg — сообщение, строка
key — routing-key, маршрутный ключ, строка
params — параметры:
  • AMQP_MANDATORY
  • AMQP_IMMEDIATE


возвращает результат выполнения операции

Пример:
$msg = "новости из СПб...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->publish( $msg, 'spb.news');


Класс Очереди AMQPQueue


конструктор — инициализация класса

AMQPQueue::AMQPQueue( AMQPConnection cnn, string [name] )

name — имя очереди

Объявление очереди

proto int AMQPQueue::declare( string [name], bit [params] )

name — имя очереди
params — параметры:
  • AMQP_AUTODELETE (default)
  • AMQP_DURABLE
  • AMQP_PASSIVE
  • AMQP_EXCLUSIVE

возвращает кол-во элементов в очереди если очередь уже существует.

Пример:$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);


Удаление очереди

proto bool AMQPQueue::delete( string [name], bit [params] )

name — имя очереди
params — параметры:
  • AMQP_IFUNUSED
  • AMQP_IFEMPTY

возвращает результат выполнения операции

Пример:$queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();


Привязка очереди к обмену
:
proto bool AMQPQueue::bind( string exchangeName, string routingKey );

name — имя обмена
routingkey — маршрутный ключ

Пример:
// Привязка очереди 'mylogin' к обмену 'ex_estate' через ключ '*.spb'
$queue = new AMQPQueue(APMQConection(), 'mylogin');
$queue->declare();
$queue->bind('ex_estate','*.spb');


Подписаться

proto array AMQPQueue::consume( int n );
получить массив n-сообщений из очереди (все прочие сбрасываются )
n — кол-во полученных сообщений
params — параметры:
  • AMQP_NOLOCAL
  • AMQP_NOACK
  • AMQP_EXCLUSIVE


Внимание!
кол-во полученных сообщений не может превышать общего кол-во сообщений в очереди, иначе АПИ будет ждать от брокера приема всех сообщений.
Если указать кол-во сообщений, менее чем находится в текущей момент в очереди, то все не выбранные сообщения пометятся как выбранные, т.е потеряются при повторном чтении из очереди, если не установлен флаг AMQP_NOACK.

Пример:$i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}


Отписаться

proto bool AMQPQueue::unbind( string exchangeName, string routingKey );
отсоединяет текущую очередь от обмана exchangeName для маршрутного ключа routingKey

name — имя обмена
routingkey — маршрутный ключ

возвращает результат выполнения операции

Сброс очереди

proto bool AMQPQueue::purge( string [name] )
Все сообщения в очереди сбрасываются, сама очередь остается.

name — имя очереди
возвращает результат выполнения операции

Пример:$queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');


Получить элемент очереди

proto array AMQPQueue::get( string [name], bit [params])

name — имя очереди
params — параметры:
  • AMQP_NOASK (default)


возвращает ассоциативный массив:
  • msg — текущее сообщение
  • count — кол-во оставшихся в очереди сообщений, если:
    • count = 0 — данное сообщение последнее в очереди
    • count = -1 очередь пуста, ключ msg — отсутствует



  • Если сравнивать быстродействие, то метод consume() работает быстрее, чем get()
    но метод get более надежный.
Теги:
Хабы:
Всего голосов 8: ↑5 и ↓3 +2
Просмотры 9.1K
Комментарии Комментарии 6