Новые идеи по АПИ RabbitMQ AMQP для PHP

    Недавно опубликовал разработанное ранее PHP API для RabbitMQ «AMQP теперь и для РНР»

    При обсуждении было предложено сделать PHP API более объектной моделью,
    более близкой к модели, предложенной в Протоколе AMQP.

    Код немного усложнится, но объектная модель будет более красивой,

    Прежде чем перейти к кодированию, представляю на обсуждение новое API

    Класс AMQPConnection

    открытие логического соединения, включая канальное соединение.
    $cnn = new APMQConection( [host=localhost],[port=5672],[login=guest],[psw=guest],[vhost=/] );
    $cnn->getResult(); — true/false результат выполнения соединения

    Класс Обмена AMQPExchange

    $exchange = new AMQPExchange($cnn, [name],[parms]) — создание обмена, если задано имя иначе инициализация класса
    $res = $exchange->declare([name]) — объявить обмен
    $res = $exchange->delete([name]) — удалить обмен, если имя не задано — удаляется с текущим именем
    $res = $exchange->publish( msg, routing_key, [parms])
    $res = $exchange->getResult(); — true/false результат выполнения последней операции

    $exchange = new AMQPExchange($cnn, name,[parms]) эквивалентно
    $exchange = new AMQPExchange($cnn);
    $res = $exchange->declare();

    Класс Очереди AMQPQueue

    $queue = new AMQPQueue(cnn, [name],[params]); — создание очереди, если задано имя иначе инициализация класса
    $res = $queue->delete([name]) — удаление очереди, возвращает результат выполнения операции
    $res = $queue->declaere([name],[params]) — объявление очереди, возвращает результат выполнения операции.
    $res = $queue->purge([name]) — удаление всех элементов очереди, возвращает результат выполнения операции
    $res =$queue->bind(exchange,routing_key, [parms]); -связь очереди и обмена
    $queueItem = $queue->getItem(); — получить один элемент из очереди
    $arrayOfQueueItems= $queue->consume([n]) — получить массив n-сообщений из очереди (все прочие сбрасываются ) если n не задано — выбираются все сообщения
    $queue->getResult(); — true/false результат выполнения последней операции, в основном предназначено для использования проверки операций в конструкторе

    $queue = new AMQPQueue(cnn, name,[params]); эквивалентно:
    $queue = new AMQPQueue(cnn)
    $queue->declaere(name,[params])

    Хоть объектная модель не совсем соответствует стандарту AMQP, но вместо одного монолитного класса стало уже четыре. Практически же функциональность не изменилась, добавлены только методы удаления очереди и обмена.

    Средняя зарплата в IT

    120 000 ₽/мес.
    Средняя зарплата по всем IT-специализациям на основании 6 051 анкеты, за 1-ое пол. 2021 года Узнать свою зарплату
    Реклама
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее

    Комментарии 3

      +2
      1. Что такое cnn в APMQConection.

      2. getResult — сомнительная функция. Если все прошло так как и должно было пройти — отлично, смысла вызывать еще метод что бы проверить — как оно там, я не вижу. Если произошла какая-то ошибка, что же, почему бы и не кинуть исключение например. Ведь так или иначе нам захочется узнать что же такого там произошло, если результат выполнения — false. Поэтому надо добавлять метод getLastError что приводит к нас к старому доброму процедурному программированию.

      3. Не нравиться мне вот что — при создании очереди или экченжда у нас появляется режимность, которой нет в соединении. Надо как-то определиться, или мы делаем инициализацию какого-то инстанса только в конструкторе (тогда по сути у нас не должно быть функций подобных declaere) или же расширенный конструктор у нас является просто методом объединяющим сборку (вот хз как в такое раннее время написать более грамотно) инстанса — тогда добавляем методы connect (в коннекшн), и set* для кучи различных параметров.

      3. И что такое params. Понятно что параметры, а что за параметры. И почему их нельзя менять уже в созданном инстансе какого-либо класса. Почему к примеру я не могу изменить к примеру таймауты различные или коннект у какой-то очереди или экченжа.

      4. метод delete и purge должны совершать действия только с текущими инстансом класса. Если вызываем у какой-то очереди, то изменяется только эта очередь. В другую лезть не надо. Сейчас эта функция очень и очень большой шаг в направлении процедурного программирования.

      5. Где класс сообщения? Судя по phpampglib там имеется куча различных параметров, которые хотелось бы хоть иногда изменять.

      6. Как насчет какого-нибудь ConnectionManager который бы умел обращаться с кучей различных соединений, которые бы в нем регистрировались при создании, и отложенного соединения, которое реально создается уже при отсылке первого сообщения в какую-либо сторону.

      7. Будет ли очередь поддерживать интерфейс Iterator и как я могу обратиться к сообщениям в очереди? getItem — это конечно хорошо, но что если мне необходимо получить пятое по счету сообщение, или сразу несколько? Как насчет кучки различных методов, что-то вроде getMessageById? или даже getMessagesByType?

        0
        Кстати, судить о системе могу только по ее интерфейсу, поэтому если есть какие-то разумные ограничения, которые мне кажутся очень странными и нелогичными — тогда… надо бы это как-то объяснить :)
          0
          >1. Что такое cnn в APMQConection.
          вкравшиеся ошибка от copy/paste, fixed

          >2. getResult — сомнительная функция. Если все прошло так как и должно было пройти — отлично, смысла вызывать еще метод что бы проверить — как оно там, я не вижу. Если произошла какая-то ошибка, что же, почему бы и не кинуть исключение например
          думал над исключением, но иногда нужно иметь результат выполнения операции. В принципе — эта функция надумана. Подумаю над исключением.

          > 3. Не нравиться мне вот что — при создании очереди или экченжда у нас появляется режимность, которой нет в соединении.

          не совсем понял что такое режимность, но сама идея вопроса понятна

          >Надо как-то определиться, или мы делаем инициализацию какого-то инстанса только в конструкторе (тогда по сути у нас не должно быть функций подобных declaere) или же расширенный конструктор у нас является просто методом объединяющим сборку (вот хз как в такое раннее время написать более грамотно) инстанса — тогда добавляем методы connect (в коннекшн), и set* для кучи различных параметров.

          А можно пример?
          Пусть необходимо опубликовать в незабинденную очередь сообщение:
          имеем

          $cnn = AMQPConnect();
          $ex = new AMQPExchange($cnn, 'php', NO_DECLARE); // объявляем обмен, но не отправляем объявление на сервер, так как по некоторым особенностям обмен был объявлен ранее
          $qu = new AMQPQueue($cnn, 'php.xXx', AMQP_NODECLARE ); // не отправлять объявление на сервер, по некоторым особенностям очередь была объявлена ранее
          $qu->bind( 'php', 'xXx' ); // привязка очереди и обмена
          $ex->publish('xXx вошел в чат', '*') // публикация

          функция Declare — это возможность объявления очереди/обмена в соотвествие объектной моделью Протокола. Решил ее реализовать. В принципе объявление — должно проходить автоматически в конструкторе, если не выставлен флаг AMQP_NODECLARE

          В соотвествии с Протоколом, есть рекомендация использовать класс BASIC, те вышеприведенный код должен быть таким:

          $cnn = AMQPConnect();
          $bs = new AMQPBasic($cnn);
          $bs->bind( 'php','xXx', 'xXx' ); // привязка очереди и обмена
          $bs->publish('php', 'xXx вошел в чат', '*') // публикация

          но мне показалось, что предоженный мною вариант удобнее.

          >3. И что такое params. Понятно что параметры, а что за параметры. И почему их нельзя менять уже в созданном инстансе какого-либо класса.

          Про параметры можно почитать в реализованном АПИ, см ссылку
          часть параметров задана по умолчанию.

          >Почему к примеру я не могу изменить к примеру таймауты различные или коннект у какой-то очереди или экченжа.

          а вот с этим сложнее, но у меня в планах пропатчить класс коннекта на предмет таймаутов. А вот коннект у очереди и эксченджа пока один. Можно еще намутить воды с номерами каналлов, но на практике то вполне и одного канала хватает. Будет потребность — напишем, делов-то всего на часов восемь.

          Как вариант возможно открыть два соединения — но зачем???? не вижу практической стороны применения.

          >4. метод delete и purge должны совершать действия только с текущими инстансом класса. Если вызываем у какой-то очереди, то изменяется только эта очередь. В другую лезть не надо. Сейчас эта функция очень и очень большой шаг в направлении процедурного программирования.

          методы delete и purge решил реализовать в соответствии с Протоколом, пока на практике использую только delete

          >5. Где класс сообщения? Судя по phpampglib там имеется куча различных параметров, которые хотелось бы хоть иногда изменять

          $res = $exchange->publish( msg, routing_key, [parms]) // задаем параметры и не так их там много. Приоритеты — пока только зарезервированны, но в брокере не реализованны. expiration задается, а также AUTODELETE & DURABLE
          но иногда при задачи некоторых нестандартных параметров мы имеем непредсказуемое поведение, например использование флага NOWAIT, по этому лучше сузить возможности и не давать пользователю убивать ситему.

          В phpamplib решили объять необъятное и работает криво.

          >6. Как насчет какого-нибудь ConnectionManager который бы умел обращаться с кучей различных соединений, которые бы в нем регистрировались при создании, и отложенного соединения, которое реально создается уже при отсылке первого сообщения в какую-либо сторону.

          на счет отложенных соединений я думал, — это в ближайших планах
          на счет нескольких соединений, думаю должно работать так:

          $cnn1 = AMQPConnect('rb1.project.local');
          $ex = new AMQPExchange($cnn1, 'php', NO_DECLARE);
          $ex->publish('xXx вошел в чат', '*') // публикация

          $cnn2 = AMQPConnect('rb2.project.local');
          $ex = new AMQPExchange($cnn2, 'mysql', NO_DECLARE);
          $ex->publish('xXx вошел в чат', '*') // публикация

          имеем по контексту на каждое соединение.

          >7. Будет ли очередь поддерживать интерфейс Iterator и как я могу обратиться к сообщениям в очереди?

          пока над этим не думал, спасибо за подсказку.

          >getItem — это конечно хорошо, но что если мне необходимо получить пятое по счету сообщение, или сразу несколько?

          ну пятое по счету не получится — система FIFO, последовательный доступ к каждому сообщению.

          возможен вариант — прочитать четыре сообщения с флагом NOASK (не прочитанные), а пятое — как обычное. Четыре сообщения будут висеть в очереди. Это хак. Несколько сразу — циклом.

          получить несколько

          $arrayOfQueueItems= $queue->consume([n]) — получить массив n-сообщений из очереди (все прочие сбрасываются ) если n не задано — выбираются все сообщения. так как метод consume привязан к очереди, то в целях защиты от зависания: если n>фактического кол-ва сообщений в очереди сделано ограничение, идет опрос состояния очереди.

          > Как насчет кучки различных методов, что-то вроде getMessageById? или даже getMessagesByType?

          тип пока один plain/text, пока большее не планируется

          getMessageById — если честно, то пока и не задумывался что это нужно. Вот приоритеты — нужны бывают;

          можно использовать consumer-tag, но как выбирать по нему из очереди не считывая все предыдущие — я не в курсе,

          хак: можно все считывать и проверять на id или consumer-tag с флагом NOACK, а потом выполнять удаление конкретных сообщений

          вообще брокер ориентирован на FIFO так что танцы с бубнами…

          спасибо за комментарий

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое