Как стать автором
Обновить

Обеспечение высокой доступности приложений с Kafka Streams

Время на прочтение10 мин
Количество просмотров14K
Автор оригинала: Levani Kokhreidze
Kafka Streams — это Java-библиотека для анализа и обработки данных, хранящихся в Apache Kafka. Как и в любой другой платформе потоковой обработки, она способна выполнять обработку данных с сохранением и/или без сохранения состояния в режиме реального времени. В этом посте я попытаюсь описать, почему достижение высокой доступности (99,99%) проблематично в Kafka Streams и что мы можем сделать для того, чтобы ее достичь.

Что нам нужно знать


Прежде чем описывать проблему и возможные решения, давайте рассмотрим основные концепции Kafka Streams. Если вы работали с API-интерфейсами Kafka для консьюмеров/продьюсеров, то большинство из этих парадигм вам уже знакомы. В следующих разделах я попытаюсь в нескольких словах описать хранение данных в партициях, перебалансировку групп консьюмеров и как основные концепции Kafka клиентов вписываются в библиотеку Kafka Streams.

Kafka: Партицирование данных


В мире Kafka приложения-продьюсеры отправляют данные в виде пар ключ-значение в определенный топик. Сам топик разделен на один или несколько партиций в Kafka брокерах. Kafka использует ключ сообщения, чтобы указать, в какую партицию следует записывать данные. Следовательно, сообщения с одинаковым ключом всегда оказываются в одной партиции.

Консьюмерские приложения организуются в группы консьюмеров, и каждая группа может иметь один или несколько экземпляров консьюмеров.
Каждый экземпляр консьюмера в группе консьюмеров отвечает за обработку данных из уникального набора партиций входного топика.

Экземпляры консьюмеров по сути являются средством масштабирования обработки в вашей группе консьюмеров.

Kafka: Ребалансировка группы консьюмеров


Как мы уже говорили ранее, каждый экземпляр группы консьюмеров получает набор уникальных партиций, из которых он потребляет данные. Всякий раз, когда новый консьюмер присоединяется к группе, должна происходить перебалансировка, чтобы он получил партицию. То же самое происходит когда консьюмер умирает, остальные консьюмеры должны забрать его партиции, чтобы гарантировать, что все партиции обрабатываются.

Kafka Streams: Потоки


В начале этого поста мы ознакомились с тем, что библиотека Kafka Streams построена на основе API-интерфейсов продьюсеров и консьюмеров и обработка данных организована точно так же, как стандартное решение на Kafka. В конфигурации Kafka Streams поле application.id эквивалентно group.id в API-интерфейса консьюмера. Kafka Streams предварительно создает определенное количество потоков и каждый из них выполняет обработку данных из одной или нескольких партиций входных топиков. Говоря в терминологии API консьмеров, потоки по существу совпадают с экземплярами консьюмеров из одной группы. Потоки являются основным способом масштабирования обработки данных в Kafka Streams, это можно сделать вертикально, увеличив число потоков для каждого приложения Kafka Streams на одной машине, или горизонтально, добавив дополнительную машину с тем же application.id.

image

Источник: kafka.apache.org/21/documentation/streams/architecture

В Kafka Streams есть еще много элементов, таких как задачи, топология обработки, threading model и т.д., которые мы не рассмотрим в этом посте. Более подробную информацию можно найти здесь.

Kafka Streams: Хранение состояния


В потоковой обработке существует операции с сохранением и без сохранения состояния. Состояние — это то, что позволяет приложению запоминать необходимую информацию, выходящую за рамки обрабатываемой в данный момент записи.

Операции с состоянием, такие как count, любой тип aggregation, joins и т.д., намного сложнее. Это связано с тем, что имея только одну запись нельзя определить последнее состояние (скажем, count) для данного ключа, поэтому необходимо хранить состояние вашего потока в вашем приложении. Как мы уже обсуждали ранее, каждый поток обрабатывает набор уникальных партиций, следовательно поток обрабатывает только подмножество всего набора данных. Это означает, что каждый поток приложения Kafka Streams с одним и тем же application.id поддерживает свое собственное изолированное состояние. Мы не будем вдаваться в подробности о том, как формируется состояние в Kafka Streams, но важно понимать, что состояния восстанавливается с помощью топика журнала изменений(change-log топик) и сохраняется не только на локальном диске, но и в Kafka Broker. Сохранение журнала изменений состояния в Kafka Broker в качестве отдельного топика сделано не только для отказоустойчивости, но и для того, чтобы вы могли легко развернуть новые экземпляры Kafka Streams с тем же application.id. Поскольку состояние хранится в виде change-log топика на стороне брокера, новый экземпляр может загрузить свое собственное состояние из этого топика.

Более подробную информацию о хранении состояния можно найти здесь.

Почему обеспечение высокой доступности проблематично с Kafka Streams?


Мы рассмотрели основные концепции и принципы обработки данных с Kafka Streams. Теперь давайте попробуем объединить все части вместе и проанализировать, почему достижение высокой доступности может быть проблематичным. Из предыдущих разделов мы должны помнить:

  1. Данные в Kafka топике разделяются на партиции, которые распределяются между потоками Kafka Streams.
  2. Приложения Kafka Streams с одним и тем же application.id — это, по сути, одна группа консьюмеров, и каждый из ее потоков представляет собой отдельный изолированный экземпляр консьюмера.
  3. Для операций с состоянием поток поддерживает свое собственное состояние, которое «резервируется» топиком Kafka в виде журнала изменений.
  4. Когда новый экземпляр консьюмера присоединяется или покидает группу, Kafka запускает ребалансировку и обработка событий останавливается, пока ребалансировка не закончится.

TransferWise SPaaS (Stream Processing as a Service)


Прежде чем осветить суть этого поста, позвольте мне сначала рассказать, что мы создали в TransferWise и почему высокая доступность очень важна для нас.

В TransferWise у нас работает несколько узлов для стриминговой обработки, и каждый узел содержит несколько экземпляров Kafka Streams для каждой продуктовой команды. Экземпляры Kafka Streams, которые предназначены для конкретной команды разработчиков, имеют специальный application.id и обычно имеют более 5 потоков. В целом команды обычно имеют 10-20 потоков (эквивалентно числу инстансов консьюмеров) по всему кластеру. Приложения, которые развернуты на узлах, прослушивают входные топики и выполняют несколько типов операций с состоянием и/или без состояния над входными данными и предоставляют обновления данных в реальном времени для последующих нисходящих микросервисов.

Продуктовые команды нуждаются в обновлении агрегированных данных в режиме реального времени. Это необходимо для того, чтобы предоставить нашим клиентам возможность мгновенного перевода денег. Наш обычный SLA:
В любой день 99,99% агрегированных данных должны быть доступны менее чем за 10 секунд.

Чтобы дать вам представление, во время стресс-тестирования приложение Kafka Streams смогло обрабатывать и агрегировать 20 085 входных сообщений в секунду. Таким образом 10 секунд SLA при нормальной нагрузке звучали вполне достижимыми. К сожалению, наш SLA не был достигнут во время выполнения cкользящего (rolling) обновления узлов, на которых разворачиваются приложения, и ниже я опишу почему это происходило.

Скользящее обновление узлов


В TransferWise мы сильно верим в непрерывную доставку нашего программного обеспечения и обычно выпускаем новые версии наших сервисов пару раз в день. Давайте рассмотрим пример простого непрерывного обновления сервиса и посмотрим, что происходит в процессе выпуска. Опять же, мы должны помнить, что:

  1. Данные в Kafka топике разделяются на партиции, которые распределяются между потоками Kafka Streams.
  2. Приложения Kafka Streams с одним и тем же application.id — это, по сути, одна группа консьюмеров, и каждый из ее потоков представляет собой отдельный изолированный экземпляр консьюмера.
  3. Для операций с состоянием поток поддерживает свое собственное состояние, которое «резервируется» топиком Kafka в виде журнала изменений.
  4. Когда новый экземпляр консьюмера присоединяется или покидает группу, Kafka запускает ребалансировку и обработка событий останавливается, пока ребалансировка не закончится.

Релизный процесс на одном узле обычно занимает от восьми до девяти секунд. Во время релиза экземпляры Kafka Streams на узле «мягко перезагружаются». Таким образом, для одного узла время, необходимое для корректной перезагрузки сервиса, составляет приблизительно восемь-девять секунд. Очевидно, что завершение работы экземпляра Kafka Streams на узле вызывает перебалансировку группы консьюмеров. Поскольку данные разбиты на партиции, все партиции, которые принадлежали парезагружаемому инстансу, должны быть распределены между активными Kafka Streams приложениями с таким же application.id. Это также касается агрегированных данных, которые были сохранены на диске. Пока этот процесс не завершится, данные не будут обрабатываются.

Резервные реплики(Standby replicas)


Чтобы сократить время ребалансировки для Kafka Streams приложений, существует концепция резервных реплик, которые определяются в конфиге как num.standby.replicas. Резервные реплики являются копиями локального хранилища состояния. Этот механизм дает возможность реплицировать хранилище состояний из одного экземпляра Kafka Streams в другой. Когда поток Kafka Streams по какой-либо причине умирает, продолжительность процесса восстановления состояния может быть минимизирована. К сожалению, по причинам, которые я объясню ниже, даже резервные реплики не помогут при скользящем обновлении сервиса.

Предположим, у нас есть два экземпляра Kafka Streams на двух разных машинах: node-a и node-b. Для каждого из экземпляров Kafka Streams на этих 2 узлах указано num.standby.replicas = 1. При такой конфигурации каждый экземпляр Kafka Streams поддерживает свою копию хранилища на другом узле. Во время скользящем обновлении мы имеем следующую ситуацию:

  1. Новая версия сервиса была развернута на node-a.
  2. Экземпляр Kafka Streams на node-a отключается.
  3. Началась ребалансировка.
  4. Хранилище от node-a уже было реплицировано на node-b, так как мы указали конфигурацию num.standby.replicas = 1.
  5. node-b уже имеет теневую копию node-a, поэтому процесс перебалансировки происходит практически мгновенно.
  6. node-a запускается снова.
  7. node-a присоединяется к группе потребителей.
  8. Брокер Kafka видит новый экземпляр Kafka Streams и запускает ребалансировку.

Как мы видим, num.standby.replicas помогает только в сценариях полного отключения узла. Это означает, что если бы node-a потерпел крах, то node-b мог бы продолжить корректную работу почти мгновенно. Но в ситуации скользящего обновления, после отключения node-a снова присоединится к группе, и этот последний шаг будет вызовет ребалансировку. Когда node-a присоединяется к группе консьюмеров после перезагрузки, он будет рассматриваться как новый экземпляр консьюмера. Опять же, мы должны помнить, что обработка данных в режиме реального времени останавливается до тех пор, пока новый инстанс не восстановит состояние из change-log топика.
Обратите внимание, что перебалансировка партиций во время присоединения нового экземпляра группе, не относится к API-интерфейсу Kafka Streams, поскольку именно так работает протокол группы консьюмеров Apache Kafka.

Достижение цели: Высокая доступность с Kafka Streams


Несмотря на то, что клиентские библиотеки Kafka не предоставляют встроенную функциональность для упомянутой выше проблемы, существуют некоторые приемы, которые можно использовать для достижения высокой доступности кластера во время скользящего обновления. Идея, лежащая в основе резервных реплик, остается в силе, и наличие резервных машин, когда наступит подходящее время, является хорошим решением, которое мы используем для обеспечения высокой доступности в случае отказа инстансов.

Проблема с нашей первоначальной настройкой заключалась в том, что у нас была одна группа консьюмеров для всех команд на всех узлах. Теперь вместо одной группы консьюмеров у нас есть две, и вторая действует как «горячий» кластер. В проде узлы имеют специальную переменную CLUSTER_ID, которая добавляется в application.id инстансов Kafka Streams. Вот пример конфигурации Spring Boot application.yml:
application.yml
spring.profiles: production
streaming-pipelines:
team-a-stream-app-id: "${CLUSTER_ID}-team-a-stream-app"
team-b-stream-app-id: "${CLUSTER_ID}-team-b-stream-app"


В один момент времени только один из кластеров находится в активном режиме, соответственно резервный кластер не отправляет сообщения в реальном времени на нисходящие микросервисы. Во время выпуска релиза, активным становится резервный класетр, что позволяет выполнить скользящее обновление на первом кластере. Поскольку это совершенно другая группа консьюмеров, наши клиенты даже не замечают каких-либо нарушений в обработке, а последующие сервисы продолжают получать сообщения из недавно активного кластера. Одним из очевидных недостатков использования резервной группы консьюмеров являются дополнительные накладные расходы и потребление ресурсов, но, тем не менее, такая архитектура обеспечивает дополнительные гарантии, контроль и отказоустойчивость нашей системы потоковой обработки.

Помимо добавления дополнительного кластера, есть еще приемы, которые позволяют смягчить проблему с частой ребалансировкой.

Увеличение group.initial.rebalance.delay.ms


Начиная с Kafka 0.11.0.0 была добавлена конфигурация group.initial.rebalance.delay.ms. Согласно документации, эта настройка отвечает за:
Количество времени в миллисекундах, на которое GroupCoordinator будет задерживать начальную ребалансировку консьюмер группы.

Например, если мы зададим в этой настройке 60000 миллисекунд, то при скользящем обновлении у нас может быть минутное окно для выпуска релиза. Если инстанс Kafka Streams успешно «перезапустится» в этом временном окне, ребалансировака не вызовится. Обратите внимание, что данные, за которые отвечал перезапускаемый инстанс Kafka Streams, будут по-прежнему недоступны до тех пор, пока узел не вернется в оперативный режим. Допустим, если перезагрузка инстанса занимает около восьми секунд, у вас будет восемь секунд простоя для данных, за которые отвечает этот инстанс.

Следует заметить, основной минус этой концепции заключается в том, что в случае сбоя узла вы получите дополнительную задержку в одну минуту при восстановлении с учетом текущей конфигурации.

Уменьшение размера сегмента в change-log топиках


Большая задержка при перебалансировке Kafka Stream связана с восстановлением хранилищ состояний из change-log топиков. Change-log топики являются сжатыми топиками, что позволяет хранить в топике последнюю запись по конкретному ключу. Я кратко опишу эту концепцию ниже.

Топики в Kafka Broker организованы в виде сегментов. Когда сегмент достигает настроенного порогового размера, создается новый сегмент, а предыдущий уплотняется. По умолчанию этот порог установлен на 1 ГБ. Как вы, возможно, знаете, основная структура данных, лежащая в основе топиков Kafka и их партиций, представляет собой структуру лога с упреждающей записью, то есть, когда сообщения передаются в топик, они всегда добавляются в последний «активный» сегмент, и уплотнение не происходит.
Поэтому большинство сохраняемых состояний хранилища в changelog всегда находятся в файле «активного сегмента» и никогда не уплотняются, что приводит к миллионам неуплотненных сообщений changelog. Для Kafka Streams это означает, что во время перебалансировки, когда инстанс Kafka Streams восстанавливает свое состояние из changelog топика, ему необходимо прочитать много избыточных записей из changelog топика. Учитывая, что хранилища состояний заботятся только о последнем состоянии, а не об истории, это время обработки тратится впустую. Уменьшение размера сегмента вызовет более агрессивное сжатие данных, поэтому новые инстансы приложений Kafka Streams могут восстанавливать состояние гораздо быстрее.

Заключение


Даже несмотря на то, что Kafka Streams не предоставляет встроенную возможность для обеспечения высокой доступности во время скользящего обновления сервиса, это все же можно сделать на уровне инфраструктуры. Мы должны помнить, что Kafka Streams не является «кластерным фреймворком» в отличие от Apache Flink или Apache Spark. Это легковесная библиотека на Java, которая позволяет разработчикам создавать легко масштабируемые приложения для потоковой обработки данных. Несмотря на это, он предоставляет необходимые строительные блоки для достижения таких амбициозных целей в потоковой обработке, как «99.99%» доступность.
Теги:
Хабы:
Всего голосов 4: ↑4 и ↓0+4
Комментарии0

Публикации

Истории

Работа

Data Scientist
79 вакансий
Java разработчик
356 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань