В статье Новые идеи по АПИ RabbitMQ AMQP для PHP был опубликован набросок по PHP-AMQP API
В продолжении ранее опубликованных идей представляю их реализацию, которая более ООПешнее первой версии.
код расширения можно найти здесь описание проекта и svn пока старые версии (1.0), переводится на анг язык
краткое описание версии 2.0:
APMQConection::APMQConection([array params])
Параметры (все необязательные):
Исключение — нет логическое или физического соединения.
Пример:
создание обмена, если задано имя иначе инициализация класса
AMQPExchange::AMQPExchange(APMQConection cnn, string [name])
name — имя обмена
Пример:
proto bool AMQPEexchange::declare( [string name], [string type=direct], [ bit params ] );
name — имя обмена
type — тип обмена, разрешены типы: direct, topic & fanout
params — параметры:
возвращает результат выполнения операции
Пример:
proto bool AMQPExchange::delete( [string name], [ bit params ] );
name — имя обмена
params — параметры:
возвращает результат выполнения операции
Пример:
proto bool AMQPExchange::bind( string queueName, string routingKey );
queueName — имя очереди
key — routing-key, маршрутный ключ, строка
возвращает результат выполнения операции
Пример:
proto bool AMQPExchange::publish( string msg, [string key] ,bit [parms] );
Публикация сообщения с ключем key для типа обмена topic или direct
msg — сообщение, строка
key — routing-key, маршрутный ключ, строка
params — параметры:
возвращает результат выполнения операции
Пример:
AMQPQueue::AMQPQueue( AMQPConnection cnn, string [name] )
name — имя очереди
proto int AMQPQueue::declare( string [name], bit [params] )
name — имя очереди
params — параметры:
возвращает кол-во элементов в очереди если очередь уже существует.
Пример:
proto bool AMQPQueue::delete( string [name], bit [params] )
name — имя очереди
params — параметры:
возвращает результат выполнения операции
Пример:
proto bool AMQPQueue::bind( string exchangeName, string routingKey );
name — имя обмена
routingkey — маршрутный ключ
Пример:
proto array AMQPQueue::consume( int n );
получить массив n-сообщений из очереди (все прочие сбрасываются )
n — кол-во полученных сообщений
params — параметры:
Внимание!
кол-во полученных сообщений не может превышать общего кол-во сообщений в очереди, иначе АПИ будет ждать от брокера приема всех сообщений.
Если указать кол-во сообщений, менее чем находится в текущей момент в очереди, то все не выбранные сообщения пометятся как выбранные, т.е потеряются при повторном чтении из очереди, если не установлен флаг AMQP_NOACK.
Пример:
proto bool AMQPQueue::unbind( string exchangeName, string routingKey );
отсоединяет текущую очередь от обмана exchangeName для маршрутного ключа routingKey
name — имя обмена
routingkey — маршрутный ключ
возвращает результат выполнения операции
proto bool AMQPQueue::purge( string [name] )
Все сообщения в очереди сбрасываются, сама очередь остается.
name — имя очереди
возвращает результат выполнения операции
Пример:
proto array AMQPQueue::get( string [name], bit [params])
name — имя очереди
params — параметры:
возвращает ассоциативный массив:
В продолжении ранее опубликованных идей представляю их реализацию, которая более ООПешнее первой версии.
код расширения можно найти здесь описание проекта и svn пока старые версии (1.0), переводится на анг язык
краткое описание версии 2.0:
Класс AMQPConnection — открытие логического соединения, включая канальное соединение.
Конструктор:
APMQConection::APMQConection([array params])
Параметры (все необязательные):
- host=[localhost]
- port=[5672]
- login=[guest]
- psw=[guest]
- vhost=[/]
Исключение — нет логическое или физического соединения.
Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1', 'vhost'=>'v1' ) );
Класс Обмена AMQPExchange
конструктор:
создание обмена, если задано имя иначе инициализация класса
AMQPExchange::AMQPExchange(APMQConection cnn, string [name])
name — имя обмена
Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
Объявление обмена:
proto bool AMQPEexchange::declare( [string name], [string type=direct], [ bit params ] );
name — имя обмена
type — тип обмена, разрешены типы: direct, topic & fanout
params — параметры:
- AMQP_PASSIVE
- AMQP_DURABLE
- AMQP_AUTODELETE
- AMQP_INTERNAL
возвращает результат выполнения операции
Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn);
$exchange->declare('ex_name', 'topic',AMQP_DURABLE );
Удаление обмена:
proto bool AMQPExchange::delete( [string name], [ bit params ] );
name — имя обмена
params — параметры:
- AMQP_IFUNUSED
возвращает результат выполнения операции
Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->delete();
// если имя не задано - удаляется с текущим именем, объявленного в конструкторе класса.
Привязка обмена к очереди:
proto bool AMQPExchange::bind( string queueName, string routingKey );
queueName — имя очереди
key — routing-key, маршрутный ключ, строка
возвращает результат выполнения операции
Пример:
$msg = "моя новость, раздел СПб...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$exchange->bind('mylogin','spb.news')
$res = $exchange->publish( $msg, 'spb.news');
Публикация:
proto bool AMQPExchange::publish( string msg, [string key] ,bit [parms] );
Публикация сообщения с ключем key для типа обмена topic или direct
msg — сообщение, строка
key — routing-key, маршрутный ключ, строка
params — параметры:
- AMQP_MANDATORY
- AMQP_IMMEDIATE
возвращает результат выполнения операции
Пример:
$msg = "новости из СПб...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->publish( $msg, 'spb.news');
Класс Очереди AMQPQueue
конструктор — инициализация класса
AMQPQueue::AMQPQueue( AMQPConnection cnn, string [name] )
name — имя очереди
Объявление очереди
proto int AMQPQueue::declare( string [name], bit [params] )
name — имя очереди
params — параметры:
- AMQP_AUTODELETE (default)
- AMQP_DURABLE
- AMQP_PASSIVE
- AMQP_EXCLUSIVE
возвращает кол-во элементов в очереди если очередь уже существует.
Пример:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);
Удаление очереди
proto bool AMQPQueue::delete( string [name], bit [params] )
name — имя очереди
params — параметры:
- AMQP_IFUNUSED
- AMQP_IFEMPTY
возвращает результат выполнения операции
Пример:
$queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();
Привязка очереди к обмену
:proto bool AMQPQueue::bind( string exchangeName, string routingKey );
name — имя обмена
routingkey — маршрутный ключ
Пример:
// Привязка очереди 'mylogin' к обмену 'ex_estate' через ключ '*.spb'
$queue = new AMQPQueue(APMQConection(), 'mylogin');
$queue->declare();
$queue->bind('ex_estate','*.spb');
Подписаться
proto array AMQPQueue::consume( int n );
получить массив n-сообщений из очереди (все прочие сбрасываются )
n — кол-во полученных сообщений
params — параметры:
- AMQP_NOLOCAL
- AMQP_NOACK
- AMQP_EXCLUSIVE
Внимание!
кол-во полученных сообщений не может превышать общего кол-во сообщений в очереди, иначе АПИ будет ждать от брокера приема всех сообщений.
Если указать кол-во сообщений, менее чем находится в текущей момент в очереди, то все не выбранные сообщения пометятся как выбранные, т.е потеряются при повторном чтении из очереди, если не установлен флаг AMQP_NOACK.
Пример:
$i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}
Отписаться
proto bool AMQPQueue::unbind( string exchangeName, string routingKey );
отсоединяет текущую очередь от обмана exchangeName для маршрутного ключа routingKey
name — имя обмена
routingkey — маршрутный ключ
возвращает результат выполнения операции
Сброс очереди
proto bool AMQPQueue::purge( string [name] )
Все сообщения в очереди сбрасываются, сама очередь остается.
name — имя очереди
возвращает результат выполнения операции
Пример:
$queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');
Получить элемент очереди
proto array AMQPQueue::get( string [name], bit [params])
name — имя очереди
params — параметры:
- AMQP_NOASK (default)
возвращает ассоциативный массив:
- msg — текущее сообщение
- count — кол-во оставшихся в очереди сообщений, если:
- count = 0 — данное сообщение последнее в очереди
- count = -1 очередь пуста, ключ msg — отсутствует
Если сравнивать быстродействие, то метод consume() работает быстрее, чем get()
но метод get более надежный.