company_banner

Повторная обработка событий, полученных из Kafka


    Привет, Хабр.


    Недавно я поделился опытом о том, какие параметры мы в команде чаще всего используем для Kafka Producer и Consumer, чтобы приблизиться к гарантированной доставке. В этой статье хочу рассказать, как мы организовали повторную обработку события, полученного из Kafka, в результате временной недоступности внешней системы.


    Современные приложения работают в очень сложной среде. Бизнес-логика, обернутая в современный технологический стек, работающая в Docker-образе, который управляется оркестратором вроде Kubernetes или OpenShift, и коммуницирующая с другими приложениями или enterprise-решениями через цепочку физических и виртуальных маршрутизаторов. В таком окружении всегда что-то может сломаться, поэтому повторная обработка событий в случае недоступности одной из внешних систем — важная часть наших бизнес-процессов.


    Как было до Kafka


    Ранее в проекте мы использовали IBM MQ для асинхронной доставки сообщений. При возникновении какой-либо ошибки в процессе работы сервиса полученное сообщение могло быть помещено в dead-letter-queue (DLQ) для дальнейшего ручного разбора. DLQ создавался рядом с входящей очередью, перекладывание сообщения происходило внутри IBM MQ.


    Если ошибка имела временный характер и мы могли это определить (например, ResourceAccessException при HTTP-вызове или MongoTimeoutException при запросе в MongoDb), то в силу вступала стратегия повторных вызовов. Вне зависимости от ветвления логики приложения, исходное сообщение перекладывалось или в системную очередь для отложенной отправки, или в отдельное приложение, которое когда-то давно было сделано для повторной отправки сообщений. При этом в заголовок сообщения записывается номер повторной отправки, который привязан к интервалу задержки или к концу стратегии на уровне приложения. Если мы достигли конца стратегии, но внешняя система все еще недоступна, то сообщение будет помещено в DLQ для ручного разбора.


    Поиск решения


    Поискав в интернете, можно найти следующее решение. Если коротко, то предлагается завести по топику на каждый интервал задержки и реализовать на стороне приложения Consumer’ы, которые будут вычитывать сообщения с необходимой задержкой.



    Несмотря на большое количество положительных отзывов, оно кажется мне не совсем удачным. В первую очередь потому, что разработчику, кроме реализации бизнес-требований, придется потратить много времени на реализацию описанного механизма.


    Кроме того, если на Kafka-кластере включено управление доступами, то придется потратить какое-то время на заведение топиков и обеспечение необходимых доступов к ним. В дополнение к этому нужно будет подбирать правильный параметр retention.ms для каждого из ретрай-топиков, чтобы сообщения успевали повторно отправляться и не пропадали из него. Реализацию и запрос доступов придется повторять для каждого существующего или нового сервиса.


    Давайте теперь посмотрим, какие механизмы для повторной обработки сообщения предоставляет нам spring в целом и spring-kafka в частности. Spring-kafka имеет транзитивную зависимость на spring-retry, который предоставляет абстракции для управления разными BackOffPolicy. Это довольно гибкий инструмент, но его значительным недостатком является хранение сообщений для повторной отправки в памяти приложения. Это значит, что перезапуск приложения из-за обновления или ошибки во время эксплуатации приведет к потере всех сообщений, ожидающих повторной обработки. Так как этот пункт критичен для нашей системы, мы не стали рассматривать его дальше.


    Сама spring-kafka предоставляет несколько реализаций ContainerAwareErrorHandler, например SeekToCurrentErrorHandler, с помощью которого можно, не смещая offset в случае возникновения ошибки, обработать сообщение позже. Начиная с версии spring-kafka 2.3 появилась возможность задавать BackOffPolicy.


    Этот подход позволяет повторно обрабатываемым сообщениям переживать рестарт приложения, но механизм DLQ по-прежнему отсутствует. Именно этот вариант мы выбрали в начале 2019 года, оптимистично полагая, что DLQ не понадобится (нам повезло и он действительно не понадобился за несколько месяцев эксплуатации приложения с такой системой повторной обработки). Временные ошибки приводили к срабатыванию SeekToCurrentErrorHandler. Остальные ошибки печатались в лог, приводили к смещению offset, и обработка продолжалась со следующим сообщением.


    Итоговое решение


    Реализация, основанная на SeekToCurrentErrorHandler, подтолкнула нас к разработке собственного механизма для повторной отправки сообщений.


    Прежде всего мы хотели использовать уже имеющийся опыт и расширить его в зависимости от логики приложения. Для приложения с линейной логикой оптимальным было бы прекратить считывание новых сообщений в течение небольшого промежутка времени, заданного в рамках стратегии повторных вызовов. Для остальных приложений хотелось иметь единую точку, которая бы обеспечивала выполнение стратегии повторных вызовов. В дополнение эта единая точка должна обладать функциональностью DLQ для обоих подходов.


    Сама стратегия повторных вызовов должна храниться в приложении, которое отвечает за получение следующего интервала при возникновении временной ошибки.


    Остановка Consumer’a для приложения с линейной логикой


    При работе с spring-kafka код для остановки Consumer’a может выглядеть примерно так:


    public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                       Instant retryAt) {
            if (nonNull(retryAt) && listenerContainer.isRunning()) {
                listenerContainer.stop();
                taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
                return;
            }
            // to DLQ
        }

    В примере retryAt — это время, когда нужно заново запустить MessageListenerContainer, если он еще работает. Повторный запуск произойдет в отдельном потоке, запущенном в TaskScheduler, реализацию которого тоже предоставляет spring.


    Значение retryAt мы находим следующим способом:


    1. Ищется значение счетчика повторных вызовов.
    2. В соответствии со значением счетчика ищется текущий интервал задержки в стратегии повторных вызовов. Стратегия объявляется в самом приложении, для ее хранения мы выбрали формат JSON.
    3. Найденный в JSON-массиве интервал содержит в себе количество секунд, через которое необходимо будет повторить обработку. Это количество секунд прибавляется к текущему времени, образуя значение для retryAt.
    4. Если интервал не найден, то значение retryAt равно null и сообщение отправится в DLQ для ручного разбора.

    При таком подходе остается только сохранить количество повторных вызовов для каждого сообщения, которое находится сейчас на обработке, например в памяти приложения. Сохранение счетчика попыток в памяти не критично для этого подхода, так как приложение с линейной логикой не может выполнять обработку в целом. В отличие от spring-retry перезапуск приложения приведет не к потере всех сообщений для повторной обработки, а просто к перезапуску стратегии.


    Этот подход помогает снять нагрузку с внешней системы, которая может быть недоступна из-за очень большой нагрузки. Другими словами, в дополнение к повторной обработке мы добились реализации паттерна circuit breaker.


    В нашем случае порог ошибки равен всего 1, а чтобы минимизировать простой системы из-за временного сетевого перебоя, мы используем очень гранулярную стратегию повторных вызовов с небольшими интервалами задержки. Это может не подойти для всех приложений группы компаний, поэтому соотношение между порогом ошибки и величиной интервала нужно подбирать, опираясь на особенности системы.


    Отдельное приложение для обработки сообщений от приложений с недетерминированной логикой


    Вот пример кода, отправляющего сообщение в такое приложение (Retryer), которое выполнит повторную отправку в топик DESTINATION при достижении времени RETRY_AT:


    
    public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                             Instant retryAt, String counter, String groupId, Exception e) {
            Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
            List<Header> arrayOfHeaders = 
                new ArrayList<>(Arrays.asList(headers.toArray()));
            updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
            updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
            updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                         () -> Integer.toString(record.partition()).getBytes());
            if (nonNull(retryAt)) {
                updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
                updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
                updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
            } else {
                updateHeader(arrayOfHeaders, REASON, 
                             ExceptionUtils.getStackTrace(e)::getBytes);
                updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
            }
            ProducerRecord<K, V> messageToSend =
                new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
            kafkaTemplate.send(messageToSend);
        }

    Из примера видно, что много информации передается в хедерах. Значение RETRY_AT находится так же, как и для механизма повтора через остановку Consumer’a. Помимо DESTINATION и RETRY_AT мы передаем:


    • GROUP_ID, по которому группируем сообщения для ручного анализа и упрощения поиска.
    • ORIGINAL_PARTITION, чтобы постараться сохранить тот же Consumer для повторной обработки. Этот параметр может быть равен null, в таком случае новая partition будет получена по ключу record.key() оригинального сообщения.
    • Обновленное значение COUNTER, чтобы следовать стратегии повторных вызовов.
    • SEND_TO — константа, показывающая, отправить ли сообщение на повторную обработку по достижении RETRY_AT или поместить в DLQ.
    • REASON — причина, по которой обработка сообщения была прервана.

    Retryer сохраняет сообщения для повторной отправки и ручного разбора в PostgreSQL. По таймеру запускается задача, которая находит сообщения с наступившим RETRY_AT и отправляет их обратно в партицию ORIGINAL_PARTITION топика DESTINATION с ключом record.key().


    После отправки сообщения удаляются из PostgreSQL. Ручной разбор сообщений происходит в простом UI, который взаимодействует с Retryer по REST API. Основными его особенностями являются переотправка или удаление сообщений из DLQ, просмотр информации об ошибке и поиск сообщений, например по имени ошибки.


    Так как на наших кластерах включено управление доступом, необходимо дополнительно запрашивать доступы к топику, который слушает Retryer, и дать возможность Retryer’у писать в DESTINATION топик. Это неудобно, но, в отличие от подхода с топиком на интервал, у нас появляется полноценная DLQ и UI для управления ею.


    Бывают случаи, когда входящий топик читают несколько разных consumer-групп, приложения которых реализуют разную логику. Повторная обработка сообщения через Retryer для одного из таких приложений приведет к дубликату на другом. Чтобы защититься от этого, мы заводим отдельный топик для повторной обработки. Входящий и retry-топик может читать один и тот же Consumer без каких-либо ограничений.



    По умолчанию этот подход не предоставляет возможности circuit breaker’a, однако его можно добавить в приложение с помощью spring-cloud-netflix или нового spring cloud circuit breaker, обернув места вызовов внешних сервисов в соответствующие абстракции. Кроме того, появляется возможность выбора стратегии для bulkhead паттерна, что тоже может быть полезно. Например, в spring-cloud-netflix это может быть thread pool или семафор.


    Вывод


    В результате у нас получилось отдельное приложение, которое позволяет повторить обработку сообщения при временной недоступности какой-либо внешней системы.


    Одним из главных преимуществ приложения является то, что им могут пользоваться внешние системы, работающие на том же Kafka-кластере, без значительных доработок на своей стороне! Такому приложению необходимо будет только получить доступ к retry-топику, заполнить несколько Kafka-заголовков и отправить сообщение в Retryer. Не нужно поднимать никакой дополнительной инфраструктуры. А чтобы уменьшить количество перекладываемых сообщений из приложения в Retryer и обратно, мы выделили приложения с линейной логикой и сделали в них повторную обработку через остановку Consumer.

    Tinkoff.ru
    IT’s Tinkoff.ru — просто о сложном

    Комментарии 5

      +1
      При возникновении какой-либо ошибки в процессе работы сервиса полученное сообщение могло быть помещено в dead-letter-queue (DLQ) для дальнейшего ручного разбора

      MongoTimeoutException при запросе в MongoDb


      Можно конкретизировать пример? Допустим, при нормальной обработке сообщения нужно работать с MongoDb, и оттуда пришел MongoTimeoutException. Какой смысл писать сообщение в DLQ, не проще/правильнее повторить обработку?
        +1
        Да, конечно. Сразу хочу отметить, что в тексте статьи я сначала описал общий случай с записью в DLQ, а затем частный с применением стратегии повторных вызовов.
        Если мы могли понять, что ошибка имеет временный характер, то мы начинали применять стратегию повторных вызовов. В этом случае сообщение попадало в DLQ только если наступил конец стратегии, не раньше.
        Во всех остальных случаях сообщение сразу попадало в DLQ.
        0
        Тем кто не использует специфические решения и кому достаточно стандартных технологий (Kafka Connect) может быть интересна статья "Kafka Connect Deep Dive – Error Handling and Dead Letter Queues".
          0
          1. Если остановить listener container не происходит ребалансировка consumer group?
          2. Насколько долго выполняется обработка сообщения? Если делать комит offset после обработки сообщения и добавить сюда spring retry то можно выскочить за timeout обработки сообщения.
          3. И еще всегда хотел спросить — какое количество партишенов для топика считается нормальным?
            Я понимаю что все зависит от размера кластера, но может есть какие-то рекомендации.
            0

            Спасибо за вопросы. Отвечу на них по порядку


            1. При остановке и поднятии listener container происходит остановка и поднятие consumer-a, а это приведет к ребалансировке. Хотя, при временной недоступности внешней системы скорее всего будут остановлены все consumer-ы в затронутой consumer group.
            2. Время обработки сообщения отличается от сервиса к сервису. На одном из наших контуров для приложений, которые ходят в несколько внешних источников за данными, оно не превышает 50 мс. Как я и писал, spring-retry мы не стали серьезно рассматривать. Он хранит сообщения в памяти, блокирует поток исполнения для выполнения стратегии и с ним у нас по прежнему отсутсвует DLQ. Кроме того, действительно, есть вероятность превысить max.poll.interval.ms интервал, что приведет к нежелательной ребалансировке consumer group.
            3. Я затрудняюсь ответить на этот вопрос. Мои наблюдения показывают, что количество партиций должно соответствовать количеству consumer-ов для эффективной утилизации. Если consumer-ов будет больше, чем партиций, то они будут простаивать. Если партиций будет больше, то какой-либо consumer подключится и будет читать из нескольких партиций сразу, что может негативно отразиться на производительности.

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое