Отложенные ретраи силами RabbitMQ

    Меня зовут Алексей Казаков, я техлид команды Клиентских коммуникаций в ДомКлике. В этой статье я хочу поделиться с вами «рецептом», который позволил нам реализовать отложенные ретраи при использовании брокера сообщений RabbitMQ


    rabbit_retry

    Введение


    В ДомКлик существенная часть взаимодействия между сервисами реализована асинхронно за счет брокера сообщений RabbitMQ. Типичная схема взаимодействия выглядит так.


    schema_0


    • Сервис A на своём RabbitMQ-virtual_host (service_a_vh):
      • создаёт RabbitMQ-exchange (service_a_inner_exch), в который другие сервисы будут публиковать сообщения-задачи для сервиса A,
      • создаёт RabbitMQ-queue (service_a_input_q), из которой сообщения будут попадать в сервис A,
      • связывает service_a_input_q с service_a_inner_exch.
    • Сервис B, получив доступ к service_a_vh, публикует в service_a_inner_exch сообщения, которые должны быть обработаны сервисом А.

    Обычно сервису B нужны результаты выполнения опубликованных задач. Для этого создаётся обратный RabbitMQ-exchange, в который сервис A публикует результаты, а другие сервисы посредством RabbitMQ-routing_key получают только нужные им данные. Но для нашего «рецепта» это будет не нужно.


    Отличные руководства по RabbitMQ можно найти на их сайте.


    Постановка проблемы


    Наша команда занимается доставкой всевозможных СМС/пушей/писем до клиентов, и для этих целей мы используем сторонних провайдеров, которые не входят в зону нашей ответственности. В общем случае схема выглядит так. Сервис A синхронно взаимодействует по HTTP с внешним сервисом E. Иногда сервис E может испытывать проблемы и не отвечать/таймаутить/пятисотить. Если несколько HTTP-ретраев с возрастающей задержкой не помогают и сервис E по-прежнему отказывается корректно работать, то что делать с сообщением?


    RabbitMQ позволяет сделать reject with requeue, что вернет задачу в очередь и она не потеряется. Проблема заключается в том, что эта же задача очень быстро (~100 раз в секунду) снова попадет в consumer, и так мы будем порождать лишнюю нагрузку на сервис E (реальный случай из практики).


    Возможные решения


    1) Хранить сообщение в памяти приложения, продолжая ретраить.


    Недостатки:


    • Если consumer однопоточный, то таким образом мы блокируем выполнение других задач из очереди, а сервис E может испытывать проблемы именно с конкретной задачей.
    • Хранить задачу в памяти приложения, пока идут ретраи (а это могут быть десятки минут), не выглядит хорошей идеей.

    2) С помощью механизма RabbitMQ-dead_letter_exchange сохранять задачи до лучших времен в отдельной очереди мертвых задач и считывать их оттуда отдельным consumer-ом.


    Недостатки:


    • Ручной запуск дополнительного consumer требует вмешательства программиста.
    • Автоматический запуск и остановка — нетривиальная задача, которая требует лишнего кода.

    3) Сохранять таски в базе, откуда снова доставлять их в consumer по истечении таймаута.


    Недостатки:


    • Нужно писать код, который будет этим заниматься.

    Выбранное нами решение


    Последний вариант привлекателен тем, что тот же самый consumer будет заниматься обработкой задач. Вот бы ещё избавиться от необходимости работать с базой, ведь «Лучший код — не написанный код».


    К счастью, можно реализовать механизм отложенных ретраев исключительно средствами RabbitMQ.


    Для начала нужно узнать, что в RabbitMQ есть очереди с таймаутами: при создании очереди можно указать аргумент x-message-ttl, определяющий, сколько миллисекунд сообщение просуществует в очереди, прежде чем будет помечено «мертвым».


    Ниже приведу схему очередей, описание маршрута задачи и минимальный код на Python, который позволит вам воспроизвести схему.


    scheme_1


    Все элементы схемы уже описаны ранее за исключением пути от dead_letter_queue в service_a_inner_exch. Такая «петля» получается за счет того, что для dead_letter_queue в качестве dead letter exchange мы указываем service_a_inner_exch. В этом и заключается основная идея. Мы зацикливаем путь сообщения, отправляя его после таймаута из dead_letter_queue снова в исходный exchange.


    Путь задачи:


    • сервис B публикует сообщение в service_a_inner_exch,
    • сообщение попадает в очередь service_a_input_q,
    • сервис A не может обработать сообщение и делает reject,
    • сообщение попадает в dead_letter_exchange,
    • а оттуда сразу в dead_letter_queue,
    • в этой очереди сообщение проведет 5 минут и потом будет помечено «мертвым»,
    • «мертвое» сообщение попадает в dead letter exchange очереди dead_letter_queue, а это service_a_inner_exch.

    Количество «кругов», которые проходит одна задача, можно ограничить с помощью анализа заголовков, которые изменяются при прохождении dead letter exchange. Это будет показано в примере кода ниже.


    Код написан на Python 3.6.2 с использованием библиотеки pika==0.10.0.


    publisher.py
    import pika
    
    import settings
    
    def init_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=settings.RMQ_HOST,
            port=settings.RMQ_PORT,
            virtual_host=settings.RMQ_VHOST,
            credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
        ))
        channel = connection.channel()
    
        channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
    
        return channel, connection
    
    if __name__ == '__main__':
        channel, connection = init_rmq()
    
        channel.basic_publish(exchange=settings.RMQ_INPUT_EXCHANGE, routing_key='', body='message from rmq')
        connection.close()

    consumer.py
    import logging
    
    import pika
    
    import settings
    
    logger = logging.getLogger(__name__)
    
    def init_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=settings.RMQ_HOST,
            port=settings.RMQ_PORT,
            virtual_host=settings.RMQ_VHOST,
            credentials=pika.PlainCredentials(settings.RMQ_USERNAME, settings.RMQ_PASSWORD),
        ))
        channel = connection.channel()
    
        # создаем service_a_inner_exch
        channel.exchange_declare(exchange=settings.RMQ_INPUT_EXCHANGE, exchange_type='fanout')
    
        # создаем dead_letter_exchange
        channel.exchange_declare(exchange=settings.RMQ_DEAD_EXCHANGE, exchange_type='fanout')
    
        # создаем service_a_input_q
        channel.queue_declare(
            queue=settings.RMQ_INPUT_QUEUE,
            durable=True,
            arguments={
                # благодаря этому аргументу сообщения из service_a_input_q
                # при nack-е будут попадать в dead_letter_exchange
                'x-dead-letter-exchange': settings.RMQ_DEAD_EXCHANGE,
            }
        )
    
        # создаем очередь для "мертвых" сообщений
        channel.queue_declare(
            queue=settings.RMQ_DEAD_QUEUE,
            durable=True,
            arguments={
                # благодаря этому аргументу сообщения из service_a_input_q
                # при nack-е будут попадать в dead_letter_exchange
                'x-message-ttl': settings.RMQ_DEAD_TTL,
                # также не забываем, что у очереди "мертвых" сообщений
                # должен быть свой dead letter exchange
                'x-dead-letter-exchange': settings.RMQ_INPUT_EXCHANGE,
            }
        )
        # связываем очередь "мертвых" сообщений с dead_letter_exchange
        channel.queue_bind(
            exchange=settings.RMQ_DEAD_EXCHANGE,
            queue=settings.RMQ_DEAD_QUEUE,
        )
    
        # связываем основную очередь с входным exchange
        channel.queue_bind(settings.RMQ_INPUT_QUEUE, settings.RMQ_INPUT_EXCHANGE)
    
        return channel
    
    def callback(ch, method, properties, body):
        logger.info('Processing message `%s`', body)
        if can_retry(properties):
            logger.warning('Retrying message')
            # requeue=False отправит сообщение не в исходную очередь, а в dead letter exchange
            ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
            return
    
        logger.error('Can`t retry, drop message')
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def can_retry(properties):
        """
        Заголовок x-death проставляется при прохождении сообщения через dead letter exchange.
        С его помощью можно понять, какой "круг" совершает сообщение.
        """
        deaths = (properties.headers or {}).get('x-death')
        if not deaths:
            return True
        if deaths[0]['count'] >= settings.RETRY_COUNT:
            return False
        return True
    
    if __name__ == '__main__':
        channel = init_rmq()
    
        logger.info('Consuming.')
        channel.basic_consume(
            queue=settings.RMQ_INPUT_QUEUE, consumer_callback=callback,
        )
        channel.start_consuming()

    settings.py
    import logging.config
    
    RMQ_HOST = ''
    RMQ_PORT = 5672
    RMQ_VHOST = ''
    RMQ_USERNAME = ''
    RMQ_PASSWORD = ''
    RMQ_INPUT_EXCHANGE = ''
    RMQ_INPUT_QUEUE = ''
    RMQ_DEAD_EXCHANGE = ''
    RMQ_DEAD_QUEUE = ''
    RMQ_DEAD_TTL = 60 * 1000  # 1 секунда
    RETRY_COUNT = 2
    
    dict_config = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(levelname)s %(name)s: %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
                'formatter': 'detailed',
            },
        },
        'root': {
            'level': 'INFO',
            'handlers': ['console']
        },
    }
    
    logging.config.dictConfig(dict_config)

    Если в settings.py вы укажете необходимые данные для подключения к RabbitMQ, то последовательный запуск consumer.py и publisher.py выдаст следующий лог:


    ...
    2020-05-02 12:16:32,260 INFO __main__: Consuming.
    2020-05-02 12:16:35,233 INFO __main__: Processing message `b'message from rmq'`
    2020-05-02 12:16:35,233 WARNING __main__: Retrying message
    2020-05-02 12:17:35,241 INFO __main__: Processing message `b'message from rmq'`
    2020-05-02 12:17:35,241 WARNING __main__: Retrying message
    2020-05-02 12:18:35,249 INFO __main__: Processing message `b'message from rmq'`
    2020-05-02 12:18:35,250 ERROR __main__: Can`t retry, drop message
    ...

    Т.е. код создаст схему, показанную на рисунке, отправит одно сообщение в систему, попытается трижды его обработать и отбросит после двух ретраев.


    Возможные улучшения. Разные таймауты


    В качестве расширения функциональности предложенной схемы можно рассмотреть создание нескольких dead letter queue с разными таймаутами. После прохождения через через dead letter exchange:


    • routing key сохраняется, поэтому можно использовать topics-exchange для направления сообщений в разные dead letter queue в зависимости от исходного значения routing key,
    • заголовки дополняются, поэтому можно использовать headers-exchange для направления сообщений в разные dead letter queue в зависимости от исходных заголовков сообщения.

    Возможные улучшения. Несколько consumer-ов


    Если у вас с service_a_inner_exch связано несколько очередей, предназначенных для разных consumer-ов, то предложенная схема должна быть доработана. Например, у вас есть еще один сервис A_another, читающий из очереди service_a_another_input_q, связанной с service_a_inner_exch. Тогда текущая «петля» отправит сообщение повторно в обе очереди, и оба сервиса получат его повторно. Чтобы этого избежать, можно завести отдельный exchange dead_inner_exch, как показано на рисунке ниже.


    scheme_2


    Заключение


    Описанное решение позволяет реализовать произвольное количество отложенных ретраев с равными промежутками между попытками. Для этого требуется минимальный дополнительный код, а вся логика по задержке и повторной доставке выполняется кластером RabbitMQ.


    Эта схема успешно эксплуатируется примерно 7 месяцев, неоднократно спасала при проблемах с сервисом E и ни разу не потребовала ручного вмешательства в свою работу. Условия эксплуатации: RabbitMQ 3.6.12, 4 RPS в среднем, с пиками до 40 RPS.


    Надеюсь, эта статья поможет какому-нибудь программисту крепче спать по ночам.

    ДомКлик
    Место силы

    Комментарии 11

      +2

      Вроде это стандартный подход. Разве что я добавлял retryNo в payload, а не использовал заголовок.

        0
        Подход, действительно, не уникальный. Но, как показывает практика, не все о нём знают.
        Изменение payload — тоже вариант. Мы выбрали заголовок, потому что предпочли не изменять тело сообщения, и заголовки обновляются «кроликом» за нас.
        0
        Почему не использовали готовое решение: github.com/rabbitmq/rabbitmq-delayed-message-exchange или celery (если только python)?
          +2
          Честно говоря, мне кажется, мы не знали об этом плагине. Но думаю, мы бы не стали ставить «experimental yet fairly stable and potential suitable for production use» плагин на продовый кластер RabbitMQ.
          Celery используем в python-проектах, но с ним бывали внезапные утечки памяти. Предложенное решение реализовано для golang-проекта. А примеры сделал на python из-за его большей популярности.
          0
          habr.com/ru/company/dreamkas/blog/332798
          вот такое у меня было решение, совсем просто, без exchange
            0

            Если правильно понял, вы храните задачу в памяти приложения за счет


            await Bluebird.delay(wait - estimatedTime);

            Мы намеренно отказались от такого подхода.
            Можно узнать, как себя показывает ваше решение? Есть ли проблемы? Какие примерно RPS и количество откладываемых задач?

              0
              не совсем
              мы мы берем задачу в память приложения и ждем, когда наступит его время. в случае чего-то плохого, отказа, падения приложения — rabbit заберет задачу обратно к себе в очередь, поэтому не теряется
            0
            Разные таймауты можно выставлять не на уровне очереди, а на уровне отдельного сообщения: заголовок expiration.
            www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
              +2

              Да, можно. Но нужно держать в голове минусы такого подхода: по вашей ссылке чуть ниже есть Caveats. При разных таймаутах на сообщение может сложится ситуация, когда находящееся в голове очереди сообщение с бОльшим ttl блокирует уже "протухшие" сообщения, стоящие за ним.

              0
              Не описан вариант, когда сообщение попадаёт в dead_letter_queue из-за ошибки в формате самого сообщении. В таком случае оно будет утеряно после нескольких попыток возврата в Consumer?
                +1
                alexey_and_kazakov у нас в проекте через реббит ретраи организованы с помощью следущего механизма:
                если сообщение нужно исключить из очереди(возникла ошибка) — то оно попадёт в другую очередь с ttl и увеличенным на один счётчиком ретраев. через время из ttl очереди оно вернётся в основную. если же счётчик ретраев превышает какое то значение — то сообщение попадает в очередь отстойник для дальнейшего изучения. На очередь отстойник стоят алерты с нотификацией в телеграм

                т е разница с описанной выше схемой — что есть третья очередь для разбора почему сообщение не получается обработать консюмером(причина — либо долго лежат другие сервисы, либо ошибка в коде. в обоих случаях сообщение можно вернуть в основную очередь после фикса)

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                Самое читаемое