Неочевидный RabbitMQ в Yii2 или почему RabbitMQ пишет во все очереди сразу


    Хочу поделиться практической проблемой конфигурирования брокера очередей RabbitMQ в Yii2. Предупреждаю читателя, что не обладаю экспертным мнением по работе с данным брокером очередей, однако очень хочется восполнить пробелы документации Yii2 и закрепить результат собственных мучений. Итак, если вы когда либо сталкивались с проблемой, что посылаемые сообщения попадают во все очереди, которые есть на сервере очередей, вы согласны, что это проблема и не понимаете почему так происходит, тогда прошу под кат.

    Почему вы можете столкнуться с таким поведением? Например, если вы, как и я раньше не работали с RabbitMQ, но работали например с Gearman. Сам Gearman, простой как железная дорога (позаимствовано у известного и уважаемого в интеренете персонажа). Вы создаете очередь с неким именем, кладете туда данные. Воркер читает из очереди с этим же именем. Всё просто. Теперь настала пора использовать модные технологии, сами не понимая зачем вы выбираете RabbitMQ. Потом радуетесь, что в Yii2 есть готовая абстракция для многих популярных брокеров очередей. Скудная документация подсказывает дефолтную конфигурацию чтобы всё завелось:

    return [
        'bootstrap' => [
            'queue', // The component registers its own console commands
        ],
        'components' => [
            'queue' => [
                'class' => \yii\queue\amqp_interop\Queue::class,
                'port' => 5672,
                'user' => 'guest',
                'password' => 'guest',
                'queueName' => 'queue',
                'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,
    
                // or
                'dsn' => 'amqp://guest:guest@localhost:5672/%2F',
    
                // or, same as above
                'dsn' => 'amqp:',
            ],
        ],
    ];
    

    Казалось бы всё до безумия просто, вот знакомая нам строка с queueName, копипастим, исправляем, запускаем — работает! Делаем очереди для других компонентов системы. Распараллеливаем чем можем наш php. Commit, Push довольные кладем таску в QA и идем читать хабр (в обеденный перерыв).

    Тут на самом интересном нас прерывает QA в чате, и говорит, что происходит что-то странное. Почему-то данные которые пишут воркеры (косьюмеры) дублируются. Что что? Не может такого быть. Идем проверять логи. Видим, что сообщение пишется в очередь, при чем очередь выбрана правильно, hash jobId получен. Не-не, у нас нет никаких ошибок. Пишем довольные QA, проверь ещё раз, не может такого быть, у нас всё хорошо.

    Буквально через пол часа нас отвлекают снова, ошибка повторилась и тут же на мыло упало уведомление — задача снова переведена в работу. Ну что, я ж программист, сейчас разберемся. У RabbitMQ, в отличие от того же Gearman есть веб интерфейс, в котором есть много информации о работе сервера. Выглядит сие чудо вот так:



    Кидаем ещё пару сообщений в очередь, видим в вебморде, что наши сообщения доходят и обрабатываются воркером. Случайный взгляд замечает, что когда мы кидаем сообщение в очередь, во всех очередях подскакивает график «Message rates».



    Ещё сто раз проверяем конфигурацию, перечитываем скудную документациюю Yii. Мы всё сделали правильно. Идем читать документацию на сайт rabbit'a. После пары десятков минут блуждания в темноте натыкаемся на туториал. Сразу после первого абзаца знакомимся с причиной наших непоняток — Exchanges. Повторять документацию не буду, очень коротко.

    В RabbitMQ мы не пишем сообщение в очередь, мы пишем его в exchange, этакий прокси, который одним концом принимает наши сообщения, а другим концом общается с очередями на сервере. В его власти принять решения в какую очередь положить наши данные. Занятно, что в документации Yii об этом нет ни строчки. С первого взгляда непонятно как сконфигуровать exchange, ныряем в кишочки и в файле vendor/yiisoft/yii2-queue/src/drivers/amqp_interop/Queue.php:176 находим заветное свойство, которое можно засетить. Тут надо сказать, что драйверов для RabbitMQ много, в моем случае используется enqueue/amqp-lib. Выставляем exchangeName, тестируем, ничего не меняется. Мы ж как настоящий русский инженер, сначала пробуем, а потом идем вдумчиво читать документацию ещё раз. Читаем ещё раз внимательно, потом идем в веб морду rabbit'a и видим следующее:



    Несколько наших очередей связаны с одним и тем же exchange. Бинго! Вот она причина, одно «но», я их не связывал. Идем снова в кишочки драйвера, находим метод setupBroker строчку 392

    $this->context->bind(new AmqpBind($queue, $topic));
    

    Вот это неуправляемое связывание.

    И так, недолго поразмыслив, приходим к выводу, что для каждой очереди должен быть объявлен свой exchange, тогда связь будет верной и у одного exchange будет всего одна связанная очередь. таким образом мы добьемся поведения аналогичного Gearman. Кстати, в документации подробно описано для чего сделан exchange, и как я понял, одна из причин это возможность связать с exchange несколько очередей. Вот только я не придумал, что за кейс такой когда это может понадобиться, ребята пишите в комментариях. И пишите, сталкивались ли вы с описанной выше ситуацией или я всё делаю неправильно?
    Поделиться публикацией

    Комментарии 7

      +4
      Хорошая практика при смене инструмента — почитать в документации как работает данный инструмент и какие best practices рекомендуют его разработчики. В случае с кроликом сразу бы бросилось в глаза, что по мимо queue в нём есть и exchange. Это бы сэкономило время и сразу дало бы направление для поиска решения в Yii.
        +3
        Немного странная реализация плагина.

        Если кролик нужен на уровне «отправки почты или смсок», то ничего об обменниках (exchange) знать и не нужно. Так как rabbit, для очередей без обменников, создает обменники автоматически (если не ошибаюсь, то с type=direct и с точно таким же именем), поэтому код
        $queue = $this->context->createQueue($this->queueName);
        $this->context->declareQueue($queue);

        Создаст очередь, в которую можно сразу же писать без обменника.

        Исходя из кода либы, в конфиге, вместе с `queueName` можно указывать `exchangeName` (не утверждаю, но судя по коду это должно быть именно так), хотя об этом в доке ни слова не сказано

        Схема с обменниками очень удобна, при распределенной архитектуре. Например когда есть 2 очереди — одна отправляет почту, а другая пишет об этом в лог. Тогда подписываем их на один обменник и отправляем только 1 сообщение, которое попадет в 2 очереди. Пример, конечно, притянут за уши, но думаю вполне понятен
          0
          Спасибо, кейс с логом и отправкой имеет право на жизнь.
          +2
          В свое время тоже использовал кролика в Yii2, но без плагинов, а с библиотекой из документации. Вообще rabbitmq такой комбайн, который может все, что хочешь: queue, event-subscriber, rpc и п.р. И используемый метод будет зависеть от умелого жонглирования конфигами. И для правильного использования комбайна нужно иметь знания, как для комбайна. А типичный для нас подход plugin first в итоге обычно заставляет нас вникать в детали, потому что не работает оно так, как нужно
            +1
            Если класть сообщение в default exchange, оно не попадет во все очереди. Очень странная реализация плагина (или драйвера?)
              +1
              Кстати, в документации подробно описано для чего сделан exchange, и как я понял, одна из причин это возможность связанть с exchange несколько очередей. Вот только я не придумал, что за кейс такой когда это может понадобиться, ребята пишите в комметариях.

              Например, для автоматического сохранения истории всего отправленного в очередь. Или для федерации, которую многие используют вместо кластера, когда одна из очередей вычитывается на другом сервере кролика. Возможно, для того и другого одновременно

                0
                Для первого плагина вы конечно молодец, что завели эту машину. Но про exchanges рассказывается уже в третьей статье 'get started' на их официальном сайте. Документация у кролика конечно не самая дружелюбная, но за полдня осилить можно все гайды :)

                Вот только я не придумал, что за кейс такой когда это может понадобиться

                Да, для самых простых плюшек (отправил и забыл) достаточно и просто очередей. А вот exchanges, да еще вместе с routing_key дают свободу для построения очень сложных структур. www.rabbitmq.com/tutorials/tutorial-four-php.html

                Можно использовать связки сразу нескольких exchanges. Могу привести хороший пример, который сам недавно реализовывал — это «отложенные сообщения» aka dead-letter exchange. Например когда вам нужен или просто сервис «отложенной отправки» или когда чей-то API прямо сейчас недоступен, и вы пытаетесь через N минут ожидания повторить запрос. Вот тут связка нескольких обменников и роут-ключей будет очень кстати.

                Ссыль на лучшее исполнение этого подхода (на ноде) — github.com/ria-com/rabbit-mq-learning
                Даже схема есть.

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                Самое читаемое