Kafka-консьюмеры не всегда работают так стабильно, как хотелось бы. Иногда они просто зависают — без ошибок, без падений, но и без обработки сообщений. LivenessProbe в Kubernetes помогает автоматически перезапускать зависшие сервисы, но с Kafka-консьюмерами всё не так просто: стандартного решения для них нет. В этой статье разберём, как правильно реализовать livenessProbe для консьюмеров с помощью паттерна Heartbeat, чтобы не перезапускать их вручную.

Содержание

Зачем нужен livenessProbe

В Kubernetes livenessProbe периодически опрашивает под (приложение) на предмет работоcпособности и перезапускает его, если что-то не работает. Если бы это был контейнер с API на FastAPI, то мы бы удостоверились, что контейнер возвращает правильный HTTP код 200 с одного из эндпоинтов. Допустим, приложение зависло и перестало отдавать ответ клиенту – livenessProbe перезапустил бы контейнер.

Kubernetes с помощью livenessProbe периодически опрашивает контейнер и перезапускает его в случае необходимости

В чем проблема livenessProbe для Kafka-консьюмера

Для Kafka Consumer нет единого описанного подхода к реализации такого механизма, из-за чего разработчики сходу не могут реализовать эту функциональность правильно. Зачастую можно наткнуться на статьи, где предлагают следить за Сonsumer lag, что делает такой подход некорректным, так-как наличие лага не говорит, есть ли проблема у конкретного консьюмера или он просто не справляется с нагрузкой.

В моей голове родилось несколько подходов к проверке (увы, не все из них жизнеспособны):

  • Через consumer lag (неточный);

  • Через unix сигналы (проверяет, не завис ли процесс ОС, неточный);

  • Через внутренние метрики кафки (сложно);

  • Через кастомный механизм heartbeat в консьюмере (оптимально и точно);

  • Через отправку сообщения в health топики и его вычитку в livenessProbe (точно, но громоздко)

Реализация livenessProbe через паттерн HeartBeat ❤️

Простым решением проверки работы консьюмера может стать периодическая отправка хартбитов в любое хранилище, а затем сверка текущего времени и времени последнего бита.

Допустим, мы примерно знаем, что если задача выполняется больше 10 секунд, то скорее всего консьюмер завис и нам стоит его перезапустить. Каждый цикл получения и обработки сообщений из топика сохраняем данные по времени в некое хранилище. Самый простой вариант — в файл. Kubernetes затем может посмотреть в файл, сравнить время последнего бита с текущим и перезапустить консьюмер, если разница больше 10 секунд.

Концептуальная работа livenessProbe для кафка консьюмера

Реализация на Python

Для использования этого подхода необходимы изменения на стороне консьюмера, а для Kubernetes необходим скрипт.

Абстрактный класс HeartBeat

Опишем абстрактный класс HeartBeat, наследуя и реализуя который мы сможем вызывать методы save для сохранения heartbeat консьюмера. Это позволит нам в дальнейшем подменять реализацию – сохранять данные в файл, в редис или БД.

import abc

class HeartBeat(abc.ABC):
    @abc.abstractmethod
    def save(self) -> None:
        """Save timestamp data to mark that worker is alive."""
        
    @abc.abstractmethod
    def is_alive(self) -> bool:
        """Check that worker is alive."""        

Структура данных

Опишем структуру данных о heartbeat. В нашем случае важно хранить идентификатор хоста и время. И должно это сохраняться в JSON.

{
    "host_id": "00000000-0000-0000-0000-0242ac160007", 
    "last_beat": "2023-12-08T09:38:01.255909+00:00"
}
class HeartBeatState(BaseModel):
    host_id: str
    last_beat: datetime

FileHeartBeat – сохранение в файл

Для простоты сохраним данные в файл – напишем реализацию FileHeartBeat. Придумаем файлу имя, соберем к нему путь, затем сериализуем HeartBeat в JSON и сохраним в файле.

HEART_BEAT_FILE_NAME = "heartbeat.json"

logger = logging.getLogger(__name__)


class FileHeartBeat(HeartBeat):
    """
    Save heartbeats to a file to check if consumer is alive by livenessProbe.
    """

    def save(self):
        """Save heartbeat timestamp data to indicate that worker is alive."""
        heartbeat_file_path = self._retrieve_heartbeat_file_path()
        host_id = uuid.UUID(int=uuid.getnode())
        last_beat = now()

        state = HeartBeatState(
            host_id=str(host_id),
            last_beat=last_beat,
        )
        self._save_state_to_file(heartbeat_file_path, state)

    def _save_state_to_file(
            self, 
            heartbeat_file_path: Path, 
            state: HeartBeatState,
    ) -> None:
        """Save heartbeat timestamp and other information in a file."""
        logger.info("Saving heart beat to file: %s, %s", heartbeat_file_path, str(state.json()))

        with open(heartbeat_file_path, "w+") as worker_heartbeat_file:
            worker_heartbeat_file.write(state.json())


    def _retrieve_heartbeat_file_path(self) -> Path:
        """Construct file path where heartbeat info is located."""

        base_dir = Path(settings.BASE_DIR)

        heartbeat_file_name = HEART_BEAT_FILE_NAME
        heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)

        return heartbeat_file_path

FileHeartBeat – проверка, жив ли консьюмер

Для проверки, что консьюмер жив, опишем метод is_alive. Прочтем файл, десериализуем данные из JSON в нашу модель и сравним, сколько прошло секунд с момент последнего бита. Если больше, чем мы ожидаем, то считаем консьюмер мертвым.

class FileHeartBeat(HeartBeat):
    """
    Saves heartbeats to a file to check if consumer is alive by livenessProbe.
    """

    def is_alive(self, max_time_after_worker_dead: int = 15) -> bool:
        """ Main livenessProbe method to check if consumer is healthy. """
        heartbeat_file_path = self._retrieve_heartbeat_file_path()

        try:
            logger.info("Trying to check heartbeat file: %s", heartbeat_file_path)
            heart_beat_stats = self._read_heartbeat_file(heartbeat_file_path)
        except FileNotFoundError:
            logger.error("Heartbeat file not found: %s", heartbeat_file_path)

            return False

        logger.info("Got last heartbeat info: %s", str(heart_beat_stats.json()))

        time_after_last_beat = now() - heart_beat_stats.last_beat
        if time_after_last_beat.seconds > max_time_after_worker_dead:
            logger.error(
                "Worker is dead. Time since last beat %s > %s",
                time_after_last_beat.seconds,
                time_offset,
            )
            return False

        logger.info(
            "Worker is alive. Time since last beat %s < %s",
            time_after_last_beat.seconds,
            time_offset,
        )
        return True

    def _read_heartbeat_file(self, heartbeat_file_path: Path) -> HeartBeatState:
        """Read heartbeat timestamp and other information from a file."""
        with open(heartbeat_file_path) as worker_heartbeat_file:
            heart_beat_stats_json: dict = json.load(worker_heartbeat_file)
            heart_beat_state = HeartBeatState(**heart_beat_stats_json)

            return heart_beat_state

    def _retrieve_heartbeat_file_path(self) -> Path:
        """Construct file path where heartbeat info is located."""

        base_dir = Path(settings.BASE_DIR)

        heartbeat_file_name = HEART_BEAT_FILE_NAME
        heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)

        return heartbeat_file_path

FileHeartBeat – интегрируем в консьюмер

Интегрируем этот код в код консьюмера. В данном случае используется датакласс для возможности подмены реализации. В начале каждого цикла сохраняем данные, что живы.

@dataclasses.dataclass
class ConfluentKafkaConsumer:
    heart_beat: HeartBeat

    def run(self):
        consumer = ...
        ....
        
        while True:
            message: ConfluentKafkaMessage = consumer.poll(2.0)
            self.heart_beat.save()

            if not message:
                # No message received
                continue

            self._process_message(message)


confluent_kafka_consumer = ConfluentKafkaConsumer(
    heart_beat=FileHeartBeat(),
)

FileHeartBeat - интегрируем в CLI

Дальше необходимо написать CLI-скрипт, который сообщит Kubernetes о том, что воркер жив или умер. Для этого необходима подобная функция, в зависимости от вашего фреймворка:

ERROR_CODE = 1

...
# Your CLI 
def health_check(max_time: Optional[int]):
    heart_beat = FileHeartBeat()

    is_alive = heart_beat.is_alive(max_time_after_worker_dead=max_time)

    if not is_alive:
        sys.exit(ERROR_CODE)

FileHeartBeat – интегрируем в livenessProbe

В helm пропишем livenessProbe проверять состояние консьюмера каждые 30 секунд. У нас используется Django, поэтому запускаем через него.

    livenessProbe:
      exec:
        command:
        - ./manage.py
        - consumer
        - is_alive
        - --max_time
        - "30"

На этом всё. Мы готовы проверять консьюмеры. Код приведен концептуальный.

Что дальше?

Вы можете реализовать сохранение Heartbeat в БД или Redis, а затем в админке смотреть состояние ваших консьюмеров.

Вы можете пойти ещё дальше и написать команды для приостановки вашего консьюмера прямо в коде через подобную концепцию проверки состояния в цикле перед обработкой сообщения. Это может быть удобно, когда нужно сбросить оффсеты в критических случаях (консьюмер группа должна быть остановлена)

Помимо вышеописанного, можно попробовать выделить код в отдельную библиотеку и переиспользовать.

Выводы ✍️

Из опыта использования этого подхода могу выделить следующее:

  • После реализации механизма Heartbeat мы перестали перезапускать консьюмеры руками и полностью автоматизировали этот процесс.

  • Среди возможных реализаций данный способ оказался наиболее простым и точным.

  • Данный подход позволяет расширять функциональность по отслеживанию и управлению консьюмерами – если использовать Redis, то можно отследить в админке консьюмеры, а также реализовать функциональность по управлению без перезапуска и девопсов прямо из админки.

Дополнительные материалы 📚

Вот, что ещё можно почитать на эту тему: