RabbitMQ — отложенные сообщения

    image

    На Хабре имеется серия переводов официального руководства по RabbitMQ (1, 2, 3, 4, 5). К сожалению, в официальном руководстве не рассматривается вопрос организации отложенных сообщений, а я считаю этот вопрос весьма важным. Поэтому я решил сам написать такую статью.

    Примеры кода будут на Перле, но никаких специфических для Перла деталей в коде не будет, поэтому примеры могут быть сравнительно легко адаптированы для любого другого языка.

    Постановка задачи


    Иногда нужно выполнить какую-либо задачу не «вот-прям-сию-секунду», а спустя некоторое время.

    Например, у нас есть скрипт, который время от времени обращается к какому-нибудь API, и, если ответ не изменился, «зысыпает» на некоторое время, потом «просыпается» и снова проверяет.

    Или, к примеру, мы сохранили временный файл и нам нужно завести таймер, чтобы удалить файл по истечении указанного времени.

    В подобных случаях нам нужен механизм, позволяющий создать в RabbitMQ отложенное сообщение (если, конечно, мы хотим делать это средствами RabbitMQ).

    К сожалению, в самом RabbitMQ нет готового механизма для публикации отложенных сообщений. Сообщения, публикуемые отправителями в RabbitMQ, доставляются получателям мгновенно. Конечно, получатель может быть не подключен к RabbitMQ, в этом случае сообщение будет доставлено после подключения, но если получатель подключен — доставка сообщения производится сразу.

    Нельзя просто так опубликовать сообщение и сказать ему: «Полежи пока незаметно в уголочке, а через 10 минут вылезай и доставляйся получателю».

    Поэтому возникает задача — как, с помощью RabbitMQ, организовать отложенные сообщения?

    Решение


    Для этого придется сделать обходной маневр. Ключевая идея такая — если сообщение, отправленное в очередь, немедленно доставляется слушающему эту очередь получателю, значит, нужно отправить это сообщение в другую очередь!

    В общем и целом схема работы будет такая:

    image

    1. Создаем обменник, в который будут отправляться отложенные сообщения
    2. Создаем очередь, в которой будут храниться отложенные сообщения
    3. Делаем переплет между очередью и обменником
    4. Настраиваем очередь так, чтобы сообщения, полежав в ней некоторое заданное время, отправлялись в обычный обменник, для немедленной доставки получателю

    Получатель


    Рассмотрим скрипт consumer_dlx.pl:

    #!/usr/bin/perl
    
    use strict;
    use warnings;
    
    use Net::AMQP::RabbitMQ;
    
    my $mq = Net::AMQP::RabbitMQ->new();
    
    my $user     = 'guest';
    my $password = 'guest';
    
    my $exchange     = 'myexchange';
    my $queue        = 'myqueue';
    my $routing_key  = 'mykey';
    
    # Подключение
    $mq->connect("localhost", {user => $user, password => $password});
    
    # Канал
    $mq->channel_open(1);
    
    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
    
    # Очередь
    $mq->queue_declare(1, $queue);
    
    # Переплет
    $mq->queue_bind(1, $queue, $exchange, $routing_key);
    
    # Подписка
    $mq->consume(1, $queue);
    
    # Второй комплект очередь-переплет-подписка
    $mq->queue_declare(1, $queue.'2');
    $mq->queue_bind(1, $queue.'2', $exchange, $routing_key.'2');
    $mq->consume(1, $queue.'2');
    
    # Получение сообщений (бесконечный цикл)
    while ( my $msg = $mq->recv() ) {
        print "$msg->{body} ($msg->{routing_key})\n";
    }
    

    Я не буду заострять внимание на каждой строке этого скрипта, так как тут нет ничего нового для человека, прочитавшего вышеупомянутые статьи из руководства. Это вполне обычный получатель сообщений, нет даже никакой специфики, связанной с рассматриваемой темой — отложенными сообщениями. Получатель нужен лишь для демонстрации, вся соль будет в отправителе.

    Отмечу лишь один момент:

    Обратите внимание на то, что получатель создает и слушает две разные очереди, переплетенные с обменником двумя разными routing_key. В принципе, достаточно и одной очереди, но с двумя будет нагляднее, плюс это поможет далее продемонстрировать одну полезную возможность.

    Отправитель


    Теперь рассмотрим скрипт producer_dlx.pl:

    #!/usr/bin/perl
    
    use strict;
    use warnings;
    
    use Net::AMQP::RabbitMQ;
    
    my $mq = Net::AMQP::RabbitMQ->new();
    
    my $user     = 'guest';
    my $password = 'guest';
    
    my $exchange     = 'myexchange';
    my $exchange_dlx = 'myexchange.dlx';
    my $queue_dlx    = 'myqueue.dlx';
    
    my $message     = $ARGV[0] || 'mymessage';
    my $routing_key = $ARGV[1] || 'mykey';
    my $expiration  = $ARGV[2] || 0;
    
    # Подключение
    $mq->connect("localhost", {user => $user, password => $password});
    
    # Канал
    $mq->channel_open(1);
    
    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
    
    # Обменник dlx
    $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
    
    # Очередь dlx
    $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
    
    # Переплет
    $mq->queue_bind(1, $queue_dlx, $exchange_dlx, $routing_key);
    
    # Публикуем сообщение
    $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
    

    Разберем отдельные участки кода.

    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
    

    Это тот же самый обменник, который используется в получателе. Наш отправитель не отправляет сообщения напрямую в этот обменник, но создать обменник все равно нужно, так как он все-таки будет далее использоваться, хоть и косвенно.

    # Обменник dlx
    $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
    

    Это обменник, в который мы будем отправлять отложенные сообщения.

    Обратите внимание на тип создаваемого обменника — 'fanout', в отличие от первого обменника, имеющего типа 'direct'. Далее я объясню, почему именно 'fanout'.

    # Очередь dlx
    $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
    

    Здесь мы создаем очередь, в которую будет помещаться отложенные сообщения.

    Аргумент 'x-dead-letter-exchange' — это гвоздь, на котором держится весь механизм отложенный сообщений. Если для очереди указан этот аргумент, то сообщения, у которых истекло время хранения, будут автоматически перемещаться из этой очереди в тот обменник, который был указан в этом аргументе.

    Соответственно, в качестве обменника нужно указать обычный обменник, с которым работает получатель.

    На всякий случай, пометка для тех, что не знаком с Перлом: конструкция {} в третьем параметре означает, что в этом месте нужно передать ссылку на хеш с опциями, но, поскольку в данном конкретном случае никаких опций не требуется, то передается ссылка на пустой хеш.

    # Публикуем сообщение
    $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
    

    Отправляем сообщение в обменник для отложенных сообщений.

    Здесь важен параметр 'expiration'. Этот параметр задает время хранения сообщения в миллисекундах. По истечению этого времени сообщение будет удалено из очереди. Но, как говорилось выше, если для очереди задан аргумент 'x-dead-letter-exchange', то одновременно с удалением из очереди сообщение будет отправлено в указанный в аргументе обменник, а тот в свою очередь, отправит сообщение в переплетенную с ним обычную очередь для немедленной доставки.

    Тонкий момент с routing_key


    Как вы помните, в получателе мы создали один обменник типа 'direct' и две переплетенные с ним по разным ключам очереди. Такая схема может применяться для отправки сообщений на одну тему двум разным получателям, например, отправка лога в файл или на консоль, в зависимости от ситуации. За то, в какую очередь отправлять сообщение, отвечает ключ routing_key.

    А теперь представьте, что два сообщения с двумя разными ключами нужно сделать отложенными. Мы отправим их в обменник для отложенных сообщений, обменник должен будет решить, в какую очередь их отправлять. Но у них разные routing_key, поэтому обменник, если бы он был типа 'direct', не мог бы отправить их в одну и ту же очередь.

    Именно поэтому обменник для отложенных сообщений мы делаем типа 'fanout' — этот тип обменника игнорирует routing_key и отправляет сообщения во все очереди, которые с ним переплетены. В нашем случае переплетена только одна очередь — очередь для отложенных сообщений. Соответственно, все сообщения, с любыми routing_key, отправленные в обменник для отложенных сообщений, пойдут в эту очередь.

    Внимательный читатель в этом месте должен спросить: «А с каким routing_key сообщения будут отправлены в обычный обменник после истечения их срока хранения в очереди отложенных сообщений?»

    Они будут отправлены с тем же routing_key, который у них и был. Значение routing_key не меняется, если ничего для этого специально не делать (но при желании можно настроить очередь на изменение routing_key).

    Запуск


    Сначала нужно запустить consumer_dlx.pl, потом можно запускать producer_dlx.pl с разными параметрами.

    Параметры: [сообщение] [ключ mykey или mykey2] [задержка в миллисекундах].

    image

    На статичной картинке не видно, но после запуска producer_dlx.pl с указанием задержки происходит эта самая задержка, а потом consumer_dlx.pl выводит сообщение (в скобках выводится ключ).

    WARNING


    Как мне тут верно подсказал пользователь Tsyganov_Ivan, имеется проблема с сообщениями, имеющими разный expired. Дело в том, что сообщения «выходят» из очереди строго последовательно (на то она и очередь). Из-за этого возможна ситуация, когда сообщение с большим expired, идущее впереди, «запрёт» выход из очереди сообщениям, имеющим маленький expired, даже если этот маленький expired уже истек.

    Поэтому, если вдруг у вас есть необходимость для разных очередей указывать разные 'expired', то вместо одной общей отложенной очереди сделайте нескольких индивидуальных отложенных очередей — на каждую обычную очередь своя отложенная.

    Более универсальное решение для произвольных значений 'expired' описано во второй части статьи.
    Поделиться публикацией

    Комментарии 26

      0
      Когда реализовывал такую же схему у себя — столкнулся с проблемой.
      TTL всех сообщений очереди получался одинаковым — равным максимальному TTL в очереди. То есть при постоянном притоке новых сообщений на отложенную отправку — сообщения никогда не признавались мертвыми. И следовательно не переносились в очередь на немедленную отправку.
      У себя я решил эту проблему созданием именованных очередей с указанием времени отправки и устанавливал время жизни очереди.

      Вы не столкнулись с подобной проблемой?
        0
        «TTL» очереди и «expiration» сообщения — это разные вещи. Я не совсем понимаю, что означает TTL, как-то не приходилось сталкиваться. У меня используется «expiration».
          0
          Немножко не в тех терминах написал. TTL — time to live
          При указании expiration сообщения — обновлялись expiration для всех сообщений в этой очереди
            0
            Да, есть такое дело. Сейчас отмечу это в статье.
        0
        Почему у Вас в скрипте consumer_dlx.pl во втором комплекте указана очередь myqueue с постфиксом 2, а в скрипте producer_dlx.pl очередь уже называется myqueue.dlx?
          0
          Это разные очереди.
          0
          Что происходит, если я закладываю в очередь два отложенных сообщения с разным TTL? Например: Message 1/ TTL 1000, и сразу за ним Message 2/ TTL 500.

          P.S. Пардон, когда набирал, первого комментария ещё не было.
            0
            Указывайте сообщениям «expiration». TTL — это свойство очереди.
              0
              Поправил статью, смотрите абзац в конце. Спасибо за замечание.
              0
              Не хватает трёх слов: «Использую в продакшен»
                0
                Ммм?
                  0
                  Я использовал подобную схему, все хорошо, проблем не было)
                    0
                    Использую в продакшен. Подойдет? Связка Symfony2 + videlalvaro/RabbitMqBundle. В delayed откидываются сообщения, которые по какой-то причине не смогли быть обработаны в основной очереди. При наступлении expiration они поступают на повторную обработку. Количество повторов ограничено на уровне кода. Вполне рабочая схема ))
                    0
                    Кривовато оно всё. Нужна отложенная доставка — берите очереди Амазона или Azure (там это есть из коробки), ещё есть в IronMQ. Почему нету штатного механизма в RabbitMQ — непонятно, штука полезная.
                      0
                      Почему нету штатного механизма в RabbitMQ — непонятно, штука полезная.


                      Ну, типа концепция такая. Кролик предоставляет более низкоуровневый функционал — очереди и маршрутизацию между ними. Всё. Дальше можно накручивать что угодно, в том числе отложенные сообщения.
                      0
                      Мы делали похожую вещь, но без использования экспаира. У нас были отложенные сообщения фиксированных интервалов, например, 10 мин, 20, 40, 2 часа и т.д. Под каждый интервал была очередь и консьюмер, который доставал сообщение, смотрел наступила ли пора и либо отправлял в нужную очередь, либо слипался до наступления времени. В целом те же яйца, но использование штатных средств, конечно забавнее.
                        0
                        Под каждый интервал была очередь и консьюмер, который доставал сообщение, смотрел наступила ли пора и либо отправлял в нужную очередь, либо слипался до наступления времени.


                        Если проверять срок жизни сообщения на получателе, то и таймер нужно делать на получателе. Можно по простому — периодически запускать получателя кроном, можно сложнее — таймер встроить в самого получателя. А мне как-раз хотелось этого избежать, переложить эту заботу на кролика.
                          0
                          Крон не подойдет. Нужно рассасывать сообщения, которые приходят постоянно.
                        0
                        Не было времени вчитаться полностью, да и базы по RMQ не хватает. Так что извиняюсь, если ответ тупо пропустил на следующий вопрос.

                        А разьве нельзя прокручивать с определенной периодичностью отложенную очередь, проверяя каждый элемент очереди на экспайрэйшн? Если элемент не протух еще, то перекладывать его в конец очереди или в ее середину по определенному алгоритму.

                        ЗЫ: Еще раз извиняюсь за возможное нубство.
                          0
                          У очереди в RabbitMQ нет середины. Есть только вход и выход. На вход можно отправить, с выхода получить. В середину положить нельзя, и прочитать из середины нельзя.

                          Можно непрерывно читать выход, смотреть expiration, и, если он не истек, снова класть на вход очереди. Но это получится бешеная карусель — сообщения ведь доставляются сразу, поэтому как только положил — тут же получишь на выходе, снова положишь, снова получишь… и так до упора, пока expiration не истечет. Но скорее всего, получится просто 100% загрузка процессора, идущая на бесполезное перекладывание из выхода на вход.

                          Можно читать не постоянно, а периодически, раз в секунду или раз в минуту. Но в этом случае, во-первых, все-равно бесполезная трата сил, а во вторых можно пропустить сообщение — expiration закончится, а вы в этот момент его не получили — тю-тю, сообщение будет дропнуто.

                          Короче говоря, это неразумно.
                            0
                            А что если минимальный срок жизни (expiration или TTL? не уверен, что правильно понимаю термины) много больше периода опроса? Скажем, через 3 сек. нужно исполнить, а опрос каждые 0.1 сек или даже 0.01. При ограничении длины очереди со стороны бизнеса можно гарантировать обработку каждого.
                            И еще непонятно почему «тю-тю». После истечения expiration задание будет само удалено из очереди?
                              0
                              А что если минимальный срок жизни много больше периода опроса? Скажем, через 3 сек. нужно исполнить, а опрос каждые 0.1 сек или даже 0.01.


                              Ничего не меняется. Вы проверите в 2.95, в 3.00 сообщение устареет, в 3.05 вы снова проверите — а сообщения уже тю-тю.

                              И еще непонятно почему «тю-тю». После истечения expiration задание будет само удалено из очереди?


                              Да.
                                0
                                А нельзя внедриться в процесс «протухания» и очистки?
                                  0
                                  Нет, разве что исходники править.
                                    0
                                    Тогда все-таки надежнее «откладывание яиц» реализовывать снаружи, как было написано где-то выше. От RMQ получаем базовый функционал очередей, а хитрую высокоуровневую логику снаружи реализуем. << У меня такое впечатление сложилось в ходе дискуссии. Нужно найти время и поизучать поглубже тему. Пойду еще раз перечитаю Distributed Ruby.
                                      0
                                      В кролике вся эта «хитрая высокоуровневая логика» сводится к созданию еще одной стандартной очереди с двумя специальными параметрами. Если сможете реализовать «снаружи» что-то более надежное — пожалуйста:) Но если что — я предупреждал:)

                        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                        Самое читаемое