Как стать автором
Поиск
Написать публикацию
Обновить

Как сделать livenessProbe для Kafka-консьюмеров и перестать перезапускать их вручную

Уровень сложностиСредний
Время на прочтение6 мин
Количество просмотров1.6K

Kafka-консьюмеры не всегда работают так стабильно, как хотелось бы. Иногда они просто зависают — без ошибок, без падений, но и без обработки сообщений. LivenessProbe в Kubernetes помогает автоматически перезапускать зависшие сервисы, но с Kafka-консьюмерами всё не так просто: стандартного решения для них нет. В этой статье разберём, как правильно реализовать livenessProbe для консьюмеров с помощью паттерна Heartbeat, чтобы не перезапускать их вручную.

Содержание

Зачем нужен livenessProbe

В Kubernetes livenessProbe периодически опрашивает под (приложение) на предмет работоcпособности и перезапускает его, если что-то не работает. Если бы это был контейнер с API на FastAPI, то мы бы удостоверились, что контейнер возвращает правильный HTTP код 200 с одного из эндпоинтов. Допустим, приложение зависло и перестало отдавать ответ клиенту – livenessProbe перезапустил бы контейнер.

Kubernetes с помощью livenessProbe периодически опрашивает контейнер и перезапускает его в случае необходимости
Kubernetes с помощью livenessProbe периодически опрашивает контейнер и перезапускает его в случае необходимости

В чем проблема livenessProbe для Kafka-консьюмера

Для Kafka Consumer нет единого описанного подхода к реализации такого механизма, из-за чего разработчики сходу не могут реализовать эту функциональность правильно. Зачастую можно наткнуться на статьи, где предлагают следить за Сonsumer lag, что делает такой подход некорректным, так-как наличие лага не говорит, есть ли проблема у конкретного консьюмера или он просто не справляется с нагрузкой.

В моей голове родилось несколько подходов к проверке (увы, не все из них жизнеспособны):

  • Через consumer lag (неточный);

  • Через unix сигналы (проверяет, не завис ли процесс ОС, неточный);

  • Через внутренние метрики кафки (сложно);

  • Через кастомный механизм heartbeat в консьюмере (оптимально и точно);

  • Через отправку сообщения в health топики и его вычитку в livenessProbe (точно, но громоздко)

Реализация livenessProbe через паттерн HeartBeat ❤️

Простым решением проверки работы консьюмера может стать периодическая отправка хартбитов в любое хранилище, а затем сверка текущего времени и времени последнего бита.

Допустим, мы примерно знаем, что если задача выполняется больше 10 секунд, то скорее всего консьюмер завис и нам стоит его перезапустить. Каждый цикл получения и обработки сообщений из топика сохраняем данные по времени в некое хранилище. Самый простой вариант — в файл. Kubernetes затем может посмотреть в файл, сравнить время последнего бита с текущим и перезапустить консьюмер, если разница больше 10 секунд.

Концептуальная работа livenessProbe для кафка консьюмера
Концептуальная работа livenessProbe для кафка консьюмера

Реализация на Python

Для использования этого подхода необходимы изменения на стороне консьюмера, а для Kubernetes необходим скрипт.

Абстрактный класс HeartBeat

Опишем абстрактный класс HeartBeat, наследуя и реализуя который мы сможем вызывать методы save для сохранения heartbeat консьюмера. Это позволит нам в дальнейшем подменять реализацию – сохранять данные в файл, в редис или БД.

import abc

class HeartBeat(abc.ABC):
    @abc.abstractmethod
    def save(self) -> None:
        """Save timestamp data to mark that worker is alive."""
        
    @abc.abstractmethod
    def is_alive(self) -> bool:
        """Check that worker is alive."""        

Структура данных

Опишем структуру данных о heartbeat. В нашем случае важно хранить идентификатор хоста и время. И должно это сохраняться в JSON.

{
    "host_id": "00000000-0000-0000-0000-0242ac160007", 
    "last_beat": "2023-12-08T09:38:01.255909+00:00"
}
class HeartBeatState(BaseModel):
    host_id: str
    last_beat: datetime

FileHeartBeat – сохранение в файл

Для простоты сохраним данные в файл – напишем реализацию FileHeartBeat. Придумаем файлу имя, соберем к нему путь, затем сериализуем HeartBeat в JSON и сохраним в файле.

HEART_BEAT_FILE_NAME = "heartbeat.json"

logger = logging.getLogger(__name__)


class FileHeartBeat(HeartBeat):
    """
    Save heartbeats to a file to check if consumer is alive by livenessProbe.
    """

    def save(self):
        """Save heartbeat timestamp data to indicate that worker is alive."""
        heartbeat_file_path = self._retrieve_heartbeat_file_path()
        host_id = uuid.UUID(int=uuid.getnode())
        last_beat = now()

        state = HeartBeatState(
            host_id=str(host_id),
            last_beat=last_beat,
        )
        self._save_state_to_file(heartbeat_file_path, state)

    def _save_state_to_file(
            self, 
            heartbeat_file_path: Path, 
            state: HeartBeatState,
    ) -> None:
        """Save heartbeat timestamp and other information in a file."""
        logger.info("Saving heart beat to file: %s, %s", heartbeat_file_path, str(state.json()))

        with open(heartbeat_file_path, "w+") as worker_heartbeat_file:
            worker_heartbeat_file.write(state.json())


    def _retrieve_heartbeat_file_path(self) -> Path:
        """Construct file path where heartbeat info is located."""

        base_dir = Path(settings.BASE_DIR)

        heartbeat_file_name = HEART_BEAT_FILE_NAME
        heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)

        return heartbeat_file_path

FileHeartBeat – проверка, жив ли консьюмер

Для проверки, что консьюмер жив, опишем метод is_alive. Прочтем файл, десериализуем данные из JSON в нашу модель и сравним, сколько прошло секунд с момент последнего бита. Если больше, чем мы ожидаем, то считаем консьюмер мертвым.

class FileHeartBeat(HeartBeat):
    """
    Saves heartbeats to a file to check if consumer is alive by livenessProbe.
    """

    def is_alive(self, max_time_after_worker_dead: int = 15) -> bool:
        """ Main livenessProbe method to check if consumer is healthy. """
        heartbeat_file_path = self._retrieve_heartbeat_file_path()

        try:
            logger.info("Trying to check heartbeat file: %s", heartbeat_file_path)
            heart_beat_stats = self._read_heartbeat_file(heartbeat_file_path)
        except FileNotFoundError:
            logger.error("Heartbeat file not found: %s", heartbeat_file_path)

            return False

        logger.info("Got last heartbeat info: %s", str(heart_beat_stats.json()))

        time_after_last_beat = now() - heart_beat_stats.last_beat
        if time_after_last_beat.seconds > max_time_after_worker_dead:
            logger.error(
                "Worker is dead. Time since last beat %s > %s",
                time_after_last_beat.seconds,
                time_offset,
            )
            return False

        logger.info(
            "Worker is alive. Time since last beat %s < %s",
            time_after_last_beat.seconds,
            time_offset,
        )
        return True

    def _read_heartbeat_file(self, heartbeat_file_path: Path) -> HeartBeatState:
        """Read heartbeat timestamp and other information from a file."""
        with open(heartbeat_file_path) as worker_heartbeat_file:
            heart_beat_stats_json: dict = json.load(worker_heartbeat_file)
            heart_beat_state = HeartBeatState(**heart_beat_stats_json)

            return heart_beat_state

    def _retrieve_heartbeat_file_path(self) -> Path:
        """Construct file path where heartbeat info is located."""

        base_dir = Path(settings.BASE_DIR)

        heartbeat_file_name = HEART_BEAT_FILE_NAME
        heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)

        return heartbeat_file_path

FileHeartBeat – интегрируем в консьюмер

Интегрируем этот код в код консьюмера. В данном случае используется датакласс для возможности подмены реализации. В начале каждого цикла сохраняем данные, что живы.

@dataclasses.dataclass
class ConfluentKafkaConsumer:
    heart_beat: HeartBeat

    def run(self):
        consumer = ...
        ....
        
        while True:
            message: ConfluentKafkaMessage = consumer.poll(2.0)
            self.heart_beat.save()

            if not message:
                # No message received
                continue

            self._process_message(message)


confluent_kafka_consumer = ConfluentKafkaConsumer(
    heart_beat=FileHeartBeat(),
)

FileHeartBeat - интегрируем в CLI

Дальше необходимо написать CLI-скрипт, который сообщит Kubernetes о том, что воркер жив или умер. Для этого необходима подобная функция, в зависимости от вашего фреймворка:

ERROR_CODE = 1

...
# Your CLI 
def health_check(max_time: Optional[int]):
    heart_beat = FileHeartBeat()

    is_alive = heart_beat.is_alive(max_time_after_worker_dead=max_time)

    if not is_alive:
        sys.exit(ERROR_CODE)

FileHeartBeat – интегрируем в livenessProbe

В helm пропишем livenessProbe проверять состояние консьюмера каждые 30 секунд. У нас используется Django, поэтому запускаем через него.

    livenessProbe:
      exec:
        command:
        - ./manage.py
        - consumer
        - is_alive
        - --max_time
        - "30"

На этом всё. Мы готовы проверять консьюмеры. Код приведен концептуальный.

Что дальше?

Вы можете реализовать сохранение Heartbeat в БД или Redis, а затем в админке смотреть состояние ваших консьюмеров.

Вы можете пойти ещё дальше и написать команды для приостановки вашего консьюмера прямо в коде через подобную концепцию проверки состояния в цикле перед обработкой сообщения. Это может быть удобно, когда нужно сбросить оффсеты в критических случаях (консьюмер группа должна быть остановлена)

Помимо вышеописанного, можно попробовать выделить код в отдельную библиотеку и переиспользовать.

Выводы ✍️

Из опыта использования этого подхода могу выделить следующее:

  • После реализации механизма Heartbeat мы перестали перезапускать консьюмеры руками и полностью автоматизировали этот процесс.

  • Среди возможных реализаций данный способ оказался наиболее простым и точным.

  • Данный подход позволяет расширять функциональность по отслеживанию и управлению консьюмерами – если использовать Redis, то можно отследить в админке консьюмеры, а также реализовать функциональность по управлению без перезапуска и девопсов прямо из админки.

Дополнительные материалы 📚

Вот, что ещё можно почитать на эту тему:

Теги:
Хабы:
Всего голосов 7: ↑7 и ↓0+11
Комментарии8

Публикации

Информация

Сайт
career.samolet.ru
Дата регистрации
Дата основания
Численность
5 001–10 000 человек
Местоположение
Россия
Представитель
Илья