RabbitMQ — Отложенные сообщения, часть 2

    image В предыдущей статье про отложенные сообщения я рассмотрел вариант организации отложенных сообщений для простого случая, при котором все отложенные сообщения имеют одинаковое время задержки. Однако, тут же в комментариях мне указали на то, что этот вариант организации отложенных сообщений создаст проблемы при попытке использовать его для сообщений с различающимся временем задержки.

    В связи с этим хочу привести более универсальное (но чуть более сложное) решение, позволяющее организовать отложенные сообщения с произвольным временем задержки.

    В чем, собственно, проблема


    Очереди в RabbitMQ устроены так, что ни одно сообщение не может выйти из очереди раньше, чем из нее выйдут предыдущие сообщения. Собственно, на то она и очередь — вперед лезть нельзя. Даже если для сообщения установлен параметр «expiration», и время жизни этого сообщения истекло — это всё-равно не дает права сообщению выйти из очереди, пока перед ним есть другие сообщения. Устаревшее сообщение будет висеть в очереди, пока не придет его очередь (каламбур, ага).

    В тот момент, когда его очередь, наконец, придет, произойдет одно из двух:

    1. Если для очереди не установлен параметр «x-dead-letter-exchange», то сообщение будет просто удалено, без доставки кому-либо
    2. Если же параметр «x-dead-letter-exchange» установлен, то сообщение будет переложено в указанный обменник

    В связи с этим, в рассмотренном в предыдущей статье варианте происходило следующее:

    Если все сообщения имели одинаковый «expiration», то все они выстраивались одно за другим в очередь и ждали, пока из очереди выйдут все предыдущие сообщения. У первого сообщения истекало время хранения, это сообщение выходило из очереди и доставлялось в указанный обменник, освобождая выход из очереди всем остальным сообщениям. У остальных сообщений значение «expiration» был такое же, поэтому они всегда устаревали как минимум одновременно с предыдущим, или позднее, соответственно, к моменту их устаревания они уже оказывались первыми в очереди и никто их не задерживал.

    Другое дело, если в очереди оказывалось сообщение со значением параметра «expiration» большим, чем у других сообщений. В этом случае сообщение с большим «expiration» доходило до выхода из очереди и «застревало» там до истечения своего срока жизни. А за ним начинали скапливаться сообщения с маленьким «expiration», которые не могли выйти из очереди, даже если устаревали. Потом сообщение с большим «expiration» выходило из очереди и сразу за ним вываливались все скопившиеся сообщения, у которых «expiration» уже давно устарел.

    Короче говоря, при отправлении сообщений с задержками 30, 20, 10 порядок вывода был именно такой, а не ожидаемый 10, 20, 30.

    Как это победить


    Там же, в предыдущей статье, я предложил простое решение: создавать не одну общую отложенную очередь, а несколько — на каждую задачу своя отложенная очередь. Предполагается, что для одной задачи будет достаточно одной задержки. Но, если вам нужна возможность задавать разные задержки даже в пределах одной задачи — давайте сделаем универсальный способ создания отложенных сообщений.

    Главная идея будет такая — если сообщения с разными задержками мешают друг-другу в одной очереди, значит, создадим для каждой задержки свою персональную очередь!

    image

    Ключевые моменты:

    1. Создаем очереди для отложенных сообщений, различающиеся по задержке — своя очередь для каждой задержки
    2. Обычную очередь переплетаем с обменником по ключу «название_очереди.*» — чтобы в нее попадали сообщения вне зависимости от задержки

    Получатель


    Рассмотрим скрипт consumer_dlx.pl:

    #!/usr/bin/perl
    
    use strict;
    use warnings;
    
    use Net::AMQP::RabbitMQ;
    
    my $mq = Net::AMQP::RabbitMQ->new();
    
    my $user     = 'guest';
    my $password = 'guest';
    
    # Подключение
    $mq->connect("localhost", {user => $user, password => $password});
    
    # Канал
    $mq->channel_open(1);
    
    # Цикл для создания нескольких комплектов обменник-очередь-переплет-подписка
    for my $i (1..2) {
        my $exchange = 'myexchange' . $i;
    
        # Обменник
        $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'});
    
        for my $j (1..2) {
            my $queue = 'myqueue' . $j;
    
            # Очередь
            my $queue_full = "$exchange.$queue";
            $mq->queue_declare(1, $queue_full, {auto_delete => 0});
    
            # Переплет
            my $routing_key = $queue . '.*';
            $mq->queue_bind(1, $queue_full, $exchange, $routing_key);
    
            # Подписка
            $mq->consume(1, $queue_full);
        }
    }
    
    # Получение сообщений (бесконечный цикл)
    while ( my $msg = $mq->recv() ) {
        print "$msg->{body} ($msg->{routing_key})\n";
    }
    

    Как и в предыдущей статье, в скрипте получателя нет никаких особенностей, связанных непосредственно с отложенными очередями.

    Обратите внимание только на один важный момент — обменник тут создается с типом 'topic'. Это связано с тем, что routing_key, который мы будем использовать, будет содержать два параметра — название очереди и время желаемой доставки.

    Причем, в самом получателе время доставки не учитывается, о чем говорит следующая строка:

    my $routing_key = $queue . '.*';
    

    Как видите, второй параметр задается как '*', из-за чего в обычную очередь будут доставляться все сообщения, без учета времени (а для обычной очереди именно это и требуется).

    Отправитель


    Теперь рассмотрим скрипт producer_dlx.pl:

    #!/usr/bin/perl
    
    use strict;
    use warnings;
    
    use Net::AMQP::RabbitMQ;
    
    my $mq = Net::AMQP::RabbitMQ->new();
    
    my $user     = 'guest';
    my $password = 'guest';
    
    my $message     = $ARGV[0] || 'mymessage';
    my $exchange    = $ARGV[1] || 'myexchange1';
    my $queue       = $ARGV[2] || 'myqueue1';
    my $delay       = $ARGV[3] || 0;
    
    # Подключение
    $mq->connect("localhost", {user => $user, password => $password});
    
    # Канал
    $mq->channel_open(1);
    
    # Обменник
    $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'});
    
    # Обменник dlx
    my $exchange_dlx = $exchange . '.dlx';
    $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
    
    # Очередь dlx
    my $endtime = time() + $delay;
    my $queue_full = "$exchange.$queue.$endtime";
    $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
    
    # Переплет
    my $routing_key = "$queue.$endtime";
    $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
    
    # Публикуем сообщение
    $mq->publish(1, $routing_key, $message, {exchange => $exchange_dlx});
    

    Здесь важны следующие моменты:

    # Обменник dlx
    my $exchange_dlx = $exchange . '.dlx';
    $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
    

    Обменник для временных сообщений так же, как и обычный обменник, должен иметь тип 'topic'.

    # Очередь dlx
    my $endtime = time() + $delay;
    my $queue_full = "$exchange.$queue.$endtime";
    $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
    

    Вычисляем желаемое время доставки сообщения $endtime путем прибавления заданной задержки к текущему времени (используются значения в секундах, unixtime).

    Затем создаем персональную очередь для указанной задержки и вписываем время доставки прямо в название очереди. Вписывание времени в название очереди не является технической необходимостью, название очереди можно дать какое угодно, но для обеспечения наглядности и избежания путаницы удобнее всего сделать именно так.

    При создании очереди нужно передать три аргумента:

    1. 'x-message-ttl' — это время жизни сообщений. Как вы помните, у нас отдельная очередь для каждой задержки, поэтому мы можем задать задержку сразу на все сообщения в очереди, вместо указания одного и того же значение 'expiration' для каждого отдельного сообщения.
    2. 'x-dead-letter-exchange' — это название обменника, куда будут переложены сообщения из этой очереди.
    3. 'x-expires' — это время жизни самой очереди. Поскольку мы на каждую задержку создаем новую отложенную очередь, то эти очереди будут постоянно копиться. Чтобы они не мешались почем зря, зададим им время жизни, по истечении которого они будут автоматически удалены. Важно! Время жизни очереди должно быть больше времени жизни сообщений. Если задать время жизни очереди равным времени жизни сообщений, то доставка сообщений не гарантируется — очередь может быть грохнута раньше, чем будут доставлены сообщения из нее. В этом примере время жизни очереди установлено на 10 секунд больше времени жизни сообщений.

    # Переплет
    my $routing_key = "$queue.$endtime";
    $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
    

    Ключ routing_key задается в виде «название_очереди.желаемое_время_доставки». Точка в середине делит ключ на два параметра, именно эти параметры разбирает обменник типа «topic».

    Как видите, здесь в переплет включены оба параметра. Соответственно, обменник для временных сообщений будет отправлять сообщения с таким ключом в одну конкретную очередь с конкретным временем доставки.

    Запуск


    Сначала нужно запустить consumer_dlx.pl, потом можно запускать producer_dlx.pl с разными параметрами.

    Параметры: [сообщение] [обменник] [очередь] [задержка в секундах].

    image

    Как видите, сначала были отправлены сообщения с большей задержкой, но доставлены сообщения были в правильном порядке — сначала те, у которых задержка была меньше.
    Поделиться публикацией

    Комментарии 10

      +1
      Да, мсье знает толк в извращениях ) Сдается мне этот пост всплыл после мысли товарища chEbba в прошлом топике.

      Решение в ActiveMQ
      Хочу замолвить слово за ActiveMQ — в нем эта проблема отсутствует как класс, так как там есть отдельный JMS-шедулер, которому можно спокойно скормить задержку, количество повторов и интервал — все остальное будет сделано самим брокером.

      За деталями можно обратиться к мануалу ActiveMQ: Delay and Schedule Message Delivery

        0
        Нет, товарищ chEbba там изложил еще более извращенный вариант, с проверкой времени жизни на получателе:)
          0
          Кстати говоря, я вот сейчас поискал и не нашел на Хабре ни одной статьи по IronMQ. Чуете намек? :)
            0
            Насчет IronMQ — это же вроде SaaS-решение? Если да, то я таким решениям доверяю только частично, свое железо с сервисами приятней и надежнее. В целом, заслуживают внимания: RabbitMQ — для больших нагрузок (частично и Enterprise), ZeroMQ — для быстрых обсчетов и ActiveMQ — суровый Enterprise (используется в ЦЕРН-е для коллайдера, и будет в РФ электронное правительство.)

            Остальные — либо облачные, либо еще в активной разработке.
              0
              Тьфуты, я перепутал:) Я хотел сказать именно ActiveMQ:)
                0
                ZeroMQ же в ЦЕРНе для коллайдера, не?
                  0
                  Возможно, для некоторых проектов, но я знаю об анализе данных с БАК — там точно ActiveMQ
                    0
                    А, так презентация от 2010-го, а вот презентация от середины 2013-го, где они рассказывают, что всё, что у них было — устарело, выбрасывается и ZeroMQ будет основным транспортом всех проектов.
            0

            Почему ж я только сейчас в гугле сюда зашёл??


            Более узкое решение для RabbitMQ (плагин): https://github.com/rabbitmq/rabbitmq-delayed-message-exchange


            Параметром x-delay задаём задержку и уже сам exchange объявленный с типом x-delayed-message следит за тем, чтобы сообщения шли далее после задержки x-delay.

              0
              Главное, что таки зашли и написали свой комментарий, большое спасибо, Вы сберегли мне немного времени)

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое