Kafka-консьюмеры не всегда работают так стабильно, как хотелось бы. Иногда они просто зависают — без ошибок, без падений, но и без обработки сообщений. LivenessProbe в Kubernetes помогает автоматически перезапускать зависшие сервисы, но с Kafka-консьюмерами всё не так просто: стандартного решения для них нет. В этой статье разберём, как правильно реализовать livenessProbe для консьюмеров с помощью паттерна Heartbeat, чтобы не перезапускать их вручную.
Содержание
Зачем нужен livenessProbe
В Kubernetes livenessProbe периодически опрашивает под (приложение) на предмет работоcпособности и перезапускает его, если что-то не работает. Если бы это был контейнер с API на FastAPI, то мы бы удостоверились, что контейнер возвращает правильный HTTP код 200 с одного из эндпоинтов. Допустим, приложение зависло и перестало отдавать ответ клиенту – livenessProbe перезапустил бы контейнер.

В чем проблема livenessProbe для Kafka-консьюмера
Для Kafka Consumer нет единого описанного подхода к реализации такого механизма, из-за чего разработчики сходу не могут реализовать эту функциональность правильно. Зачастую можно наткнуться на статьи, где предлагают следить за Сonsumer lag, что делает такой подход некорректным, так-как наличие лага не говорит, есть ли проблема у конкретного консьюмера или он просто не справляется с нагрузкой.
В моей голове родилось несколько подходов к проверке (увы, не все из них жизнеспособны):
Через consumer lag (неточный);
Через unix сигналы (проверяет, не завис ли процесс ОС, неточный);
Через внутренние метрики кафки (сложно);
Через кастомный механизм heartbeat в консьюмере (оптимально и точно);
Через отправку сообщения в health топики и его вычитку в livenessProbe (точно, но громоздко)
Реализация livenessProbe через паттерн HeartBeat ❤️
Простым решением проверки работы консьюмера может стать периодическая отправка хартбитов в любое хранилище, а затем сверка текущего времени и времени последнего бита.
Допустим, мы примерно знаем, что если задача выполняется больше 10 секунд, то скорее всего консьюмер завис и нам стоит его перезапустить. Каждый цикл получения и обработки сообщений из топика сохраняем данные по времени в некое хранилище. Самый простой вариант — в файл. Kubernetes затем может посмотреть в файл, сравнить время последнего бита с текущим и перезапустить консьюмер, если разница больше 10 секунд.

Реализация на Python
Для использования этого подхода необходимы изменения на стороне консьюмера, а для Kubernetes необходим скрипт.
Абстрактный класс HeartBeat
Опишем абстрактный класс HeartBeat, наследуя и реализуя который мы сможем вызывать методы save для сохранения heartbeat консьюмера. Это позволит нам в дальнейшем подменять реализацию – сохранять данные в файл, в редис или БД.

import abc class HeartBeat(abc.ABC): @abc.abstractmethod def save(self) -> None: """Save timestamp data to mark that worker is alive.""" @abc.abstractmethod def is_alive(self) -> bool: """Check that worker is alive."""
Структура данных
Опишем структуру данных о heartbeat. В нашем случае важно хранить идентификатор хоста и время. И должно это сохраняться в JSON.
{ "host_id": "00000000-0000-0000-0000-0242ac160007", "last_beat": "2023-12-08T09:38:01.255909+00:00" }
class HeartBeatState(BaseModel): host_id: str last_beat: datetime
FileHeartBeat – сохранение в файл
Для простоты сохраним данные в файл – напишем реализацию FileHeartBeat. Придумаем файлу имя, соберем к нему путь, затем сериализуем HeartBeat в JSON и сохраним в файле.

HEART_BEAT_FILE_NAME = "heartbeat.json" logger = logging.getLogger(__name__) class FileHeartBeat(HeartBeat): """ Save heartbeats to a file to check if consumer is alive by livenessProbe. """ def save(self): """Save heartbeat timestamp data to indicate that worker is alive.""" heartbeat_file_path = self._retrieve_heartbeat_file_path() host_id = uuid.UUID(int=uuid.getnode()) last_beat = now() state = HeartBeatState( host_id=str(host_id), last_beat=last_beat, ) self._save_state_to_file(heartbeat_file_path, state) def _save_state_to_file( self, heartbeat_file_path: Path, state: HeartBeatState, ) -> None: """Save heartbeat timestamp and other information in a file.""" logger.info("Saving heart beat to file: %s, %s", heartbeat_file_path, str(state.json())) with open(heartbeat_file_path, "w+") as worker_heartbeat_file: worker_heartbeat_file.write(state.json()) def _retrieve_heartbeat_file_path(self) -> Path: """Construct file path where heartbeat info is located.""" base_dir = Path(settings.BASE_DIR) heartbeat_file_name = HEART_BEAT_FILE_NAME heartbeat_file_path = base_dir.joinpath(heartbeat_file_name) return heartbeat_file_path
FileHeartBeat – проверка, жив ли консьюмер
Для проверки, что консьюмер жив, опишем метод is_alive. Прочтем файл, десериализуем данные из JSON в нашу модель и сравним, сколько прошло секунд с момент последнего бита. Если больше, чем мы ожидаем, то считаем консьюмер мертвым.

class FileHeartBeat(HeartBeat): """ Saves heartbeats to a file to check if consumer is alive by livenessProbe. """ def is_alive(self, max_time_after_worker_dead: int = 15) -> bool: """ Main livenessProbe method to check if consumer is healthy. """ heartbeat_file_path = self._retrieve_heartbeat_file_path() try: logger.info("Trying to check heartbeat file: %s", heartbeat_file_path) heart_beat_stats = self._read_heartbeat_file(heartbeat_file_path) except FileNotFoundError: logger.error("Heartbeat file not found: %s", heartbeat_file_path) return False logger.info("Got last heartbeat info: %s", str(heart_beat_stats.json())) time_after_last_beat = now() - heart_beat_stats.last_beat if time_after_last_beat.seconds > max_time_after_worker_dead: logger.error( "Worker is dead. Time since last beat %s > %s", time_after_last_beat.seconds, time_offset, ) return False logger.info( "Worker is alive. Time since last beat %s < %s", time_after_last_beat.seconds, time_offset, ) return True def _read_heartbeat_file(self, heartbeat_file_path: Path) -> HeartBeatState: """Read heartbeat timestamp and other information from a file.""" with open(heartbeat_file_path) as worker_heartbeat_file: heart_beat_stats_json: dict = json.load(worker_heartbeat_file) heart_beat_state = HeartBeatState(**heart_beat_stats_json) return heart_beat_state def _retrieve_heartbeat_file_path(self) -> Path: """Construct file path where heartbeat info is located.""" base_dir = Path(settings.BASE_DIR) heartbeat_file_name = HEART_BEAT_FILE_NAME heartbeat_file_path = base_dir.joinpath(heartbeat_file_name) return heartbeat_file_path
FileHeartBeat – интегрируем в консьюмер
Интегрируем этот код в код консьюмера. В данном случае используется датакласс для возможности подмены реализации. В начале каждого цикла сохраняем данные, что живы.

@dataclasses.dataclass class ConfluentKafkaConsumer: heart_beat: HeartBeat def run(self): consumer = ... .... while True: message: ConfluentKafkaMessage = consumer.poll(2.0) self.heart_beat.save() if not message: # No message received continue self._process_message(message) confluent_kafka_consumer = ConfluentKafkaConsumer( heart_beat=FileHeartBeat(), )
FileHeartBeat - интегрируем в CLI
Дальше необходимо написать CLI-скрипт, который сообщит Kubernetes о том, что воркер жив или умер. Для этого необходима подобная функция, в зависимости от вашего фреймворка:
ERROR_CODE = 1 ... # Your CLI def health_check(max_time: Optional[int]): heart_beat = FileHeartBeat() is_alive = heart_beat.is_alive(max_time_after_worker_dead=max_time) if not is_alive: sys.exit(ERROR_CODE)
FileHeartBeat – интегрируем в livenessProbe
В helm пропишем livenessProbe проверять состояние консьюмера каждые 30 секунд. У нас используется Django, поэтому запускаем через него.
livenessProbe: exec: command: - ./manage.py - consumer - is_alive - --max_time - "30"
На этом всё. Мы готовы проверять консьюмеры. Код приведен концептуальный.
Что дальше?
Вы можете реализовать сохранение Heartbeat в БД или Redis, а затем в админке смотреть состояние ваших консьюмеров.
Вы можете пойти ещё дальше и написать команды для приостановки вашего консьюмера прямо в коде через подобную концепцию проверки состояния в цикле перед обработкой сообщения. Это может быть удобно, когда нужно сбросить оффсеты в критических случаях (консьюмер группа должна быть остановлена)
Помимо вышеописанного, можно попробовать выделить код в отдельную библиотеку и переиспользовать.
Выводы ✍️
Из опыта использования этого подхода могу выделить следующее:
После реализации механизма Heartbeat мы перестали перезапускать консьюмеры руками и полностью автоматизировали этот процесс.
Среди возможных реализаций данный способ оказался наиболее простым и точным.
Данный подход позволяет расширять функциональность по отслеживанию и управлению консьюмерами – если использовать Redis, то можно отследить в админке консьюмеры, а также реализовать функциональность по управлению без перезапуска и девопсов прямо из админки.
Дополнительные материалы 📚
Вот, что ещё можно почитать на эту тему:
