PHP-AMQP версия 2

    В статье Новые идеи по АПИ RabbitMQ AMQP для PHP был опубликован набросок по PHP-AMQP API

    В продолжении ранее опубликованных идей представляю их реализацию, которая более ООПешнее первой версии.

    код расширения можно найти здесь описание проекта и svn пока старые версии (1.0), переводится на анг язык

    краткое описание версии 2.0:

    Класс AMQPConnection — открытие логического соединения, включая канальное соединение.


    Конструктор:

    APMQConection::APMQConection([array params])

    Параметры (все необязательные):
    • host=[localhost]
    • port=[5672]
    • login=[guest]
    • psw=[guest]
    • vhost=[/]

    Исключение — нет логическое или физического соединения.

    Пример:
    $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1', 'vhost'=>'v1' ) );

    Класс Обмена AMQPExchange


    конструктор:

    создание обмена, если задано имя иначе инициализация класса
    AMQPExchange::AMQPExchange(APMQConection cnn, string [name])
    name — имя обмена

    Пример:

    $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
    $exchange = new AMQPExchange($cnn, 'ex_name');


    Объявление обмена:

    proto bool AMQPEexchange::declare( [string name], [string type=direct], [ bit params ] );

    name — имя обмена
    type — тип обмена, разрешены типы: direct, topic & fanout
    params — параметры:
    • AMQP_PASSIVE
    • AMQP_DURABLE
    • AMQP_AUTODELETE
    • AMQP_INTERNAL

    возвращает результат выполнения операции

    Пример:
    $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
    $exchange = new AMQPExchange($cnn);
    $exchange->declare('ex_name', 'topic',AMQP_DURABLE );


    Удаление обмена:

    proto bool AMQPExchange::delete( [string name], [ bit params ] );

    name — имя обмена
    params — параметры:
    • AMQP_IFUNUSED

    возвращает результат выполнения операции

    Пример:
    $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
    $exchange = new AMQPExchange($cnn, 'ex_name');
    $res = $exchange->delete();
    // если имя не задано - удаляется с текущим именем, объявленного в конструкторе класса.


    Привязка обмена к очереди:

    proto bool AMQPExchange::bind( string queueName, string routingKey );

    queueName — имя очереди
    key — routing-key, маршрутный ключ, строка

    возвращает результат выполнения операции

    Пример:
    $msg = "моя новость, раздел СПб...";
    $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
    $exchange = new AMQPExchange($cnn, 'ex_name');
    $exchange->bind('mylogin','spb.news')
    $res = $exchange->publish( $msg, 'spb.news');


    Публикация:


    proto bool AMQPExchange::publish( string msg, [string key] ,bit [parms] );

    Публикация сообщения с ключем key для типа обмена topic или direct

    msg — сообщение, строка
    key — routing-key, маршрутный ключ, строка
    params — параметры:
    • AMQP_MANDATORY
    • AMQP_IMMEDIATE


    возвращает результат выполнения операции

    Пример:
    $msg = "новости из СПб...";
    $cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
    $exchange = new AMQPExchange($cnn, 'ex_name');
    $res = $exchange->publish( $msg, 'spb.news');


    Класс Очереди AMQPQueue


    конструктор — инициализация класса

    AMQPQueue::AMQPQueue( AMQPConnection cnn, string [name] )

    name — имя очереди

    Объявление очереди

    proto int AMQPQueue::declare( string [name], bit [params] )

    name — имя очереди
    params — параметры:
    • AMQP_AUTODELETE (default)
    • AMQP_DURABLE
    • AMQP_PASSIVE
    • AMQP_EXCLUSIVE

    возвращает кол-во элементов в очереди если очередь уже существует.

    Пример:$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
    $queue = new AMQPQueue($cnn,'chat_12');
    $queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);


    Удаление очереди

    proto bool AMQPQueue::delete( string [name], bit [params] )

    name — имя очереди
    params — параметры:
    • AMQP_IFUNUSED
    • AMQP_IFEMPTY

    возвращает результат выполнения операции

    Пример:$queue = new AMQPQueue(new APMQConection(),'chat_12');
    $queue->delete();


    Привязка очереди к обмену
    :
    proto bool AMQPQueue::bind( string exchangeName, string routingKey );

    name — имя обмена
    routingkey — маршрутный ключ

    Пример:
    // Привязка очереди 'mylogin' к обмену 'ex_estate' через ключ '*.spb'
    $queue = new AMQPQueue(APMQConection(), 'mylogin');
    $queue->declare();
    $queue->bind('ex_estate','*.spb');


    Подписаться

    proto array AMQPQueue::consume( int n );
    получить массив n-сообщений из очереди (все прочие сбрасываются )
    n — кол-во полученных сообщений
    params — параметры:
    • AMQP_NOLOCAL
    • AMQP_NOACK
    • AMQP_EXCLUSIVE


    Внимание!
    кол-во полученных сообщений не может превышать общего кол-во сообщений в очереди, иначе АПИ будет ждать от брокера приема всех сообщений.
    Если указать кол-во сообщений, менее чем находится в текущей момент в очереди, то все не выбранные сообщения пометятся как выбранные, т.е потеряются при повторном чтении из очереди, если не установлен флаг AMQP_NOACK.

    Пример:$i=0;
    $queue = new AMQPQueue(APMQConection());
    $n = $queue->declare('mylogin');
    $queueMessages = $queue->consume( $n );
    foreach($queueMessages as $item){
    $i++;
    echo "$i.$item";
    }


    Отписаться

    proto bool AMQPQueue::unbind( string exchangeName, string routingKey );
    отсоединяет текущую очередь от обмана exchangeName для маршрутного ключа routingKey

    name — имя обмена
    routingkey — маршрутный ключ

    возвращает результат выполнения операции

    Сброс очереди

    proto bool AMQPQueue::purge( string [name] )
    Все сообщения в очереди сбрасываются, сама очередь остается.

    name — имя очереди
    возвращает результат выполнения операции

    Пример:$queue = new AMQPQueue(new APMQConection());
    $queue->purge('chat_12');


    Получить элемент очереди

    proto array AMQPQueue::get( string [name], bit [params])

    name — имя очереди
    params — параметры:
    • AMQP_NOASK (default)


    возвращает ассоциативный массив:
    • msg — текущее сообщение
    • count — кол-во оставшихся в очереди сообщений, если:
      • count = 0 — данное сообщение последнее в очереди
      • count = -1 очередь пуста, ключ msg — отсутствует



    • Если сравнивать быстродействие, то метод consume() работает быстрее, чем get()
      но метод get более надежный.
    Share post
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 6

      0
      Интересно, сейчас рассматриваю такую схему, но интересует возможность работы с Apache Qpid?
        0
        Протокол один — AMQP
        интересно а минусы за что???
        0
        RabbitMQ недавно стал поддерживать 0.9
        сейчас стабле 0.8

        Only users with full accounts can post comments. Log in, please.