Kafka-консьюмеры не всегда работают так стабильно, как хотелось бы. Иногда они просто зависают — без ошибок, без падений, но и без обработки сообщений. LivenessProbe в Kubernetes помогает автоматически перезапускать зависшие сервисы, но с Kafka-консьюмерами всё не так просто: стандартного решения для них нет. В этой статье разберём, как правильно реализовать livenessProbe для консьюмеров с помощью паттерна Heartbeat, чтобы не перезапускать их вручную.
Содержание
Зачем нужен livenessProbe
В Kubernetes livenessProbe периодически опрашивает под (приложение) на предмет работоcпособности и перезапускает его, если что-то не работает. Если бы это был контейнер с API на FastAPI, то мы бы удостоверились, что контейнер возвращает правильный HTTP код 200 с одного из эндпоинтов. Допустим, приложение зависло и перестало отдавать ответ клиенту – livenessProbe перезапустил бы контейнер.

В чем проблема livenessProbe для Kafka-консьюмера
Для Kafka Consumer нет единого описанного подхода к реализации такого механизма, из-за чего разработчики сходу не могут реализовать эту функциональность правильно. Зачастую можно наткнуться на статьи, где предлагают следить за Сonsumer lag, что делает такой подход некорректным, так-как наличие лага не говорит, есть ли проблема у конкретного консьюмера или он просто не справляется с нагрузкой.
В моей голове родилось несколько подходов к проверке (увы, не все из них жизнеспособны):
Через consumer lag (неточный);
Через unix сигналы (проверяет, не завис ли процесс ОС, неточный);
Через внутренние метрики кафки (сложно);
Через кастомный механизм heartbeat в консьюмере (оптимально и точно);
Через отправку сообщения в health топики и его вычитку в livenessProbe (точно, но громоздко)
Реализация livenessProbe через паттерн HeartBeat ❤️
Простым решением проверки работы консьюмера может стать периодическая отправка хартбитов в любое хранилище, а затем сверка текущего времени и времени последнего бита.
Допустим, мы примерно знаем, что если задача выполняется больше 10 секунд, то скорее всего консьюмер завис и нам стоит его перезапустить. Каждый цикл получения и обработки сообщений из топика сохраняем данные по времени в некое хранилище. Самый простой вариант — в файл. Kubernetes затем может посмотреть в файл, сравнить время последнего бита с текущим и перезапустить консьюмер, если разница больше 10 секунд.

Реализация на Python
Для использования этого подхода необходимы изменения на стороне консьюмера, а для Kubernetes необходим скрипт.
Абстрактный класс HeartBeat
Опишем абстрактный класс HeartBeat, наследуя и реализуя который мы сможем вызывать методы save для сохранения heartbeat консьюмера. Это позволит нам в дальнейшем подменять реализацию – сохранять данные в файл, в редис или БД.

import abc
class HeartBeat(abc.ABC):
@abc.abstractmethod
def save(self) -> None:
"""Save timestamp data to mark that worker is alive."""
@abc.abstractmethod
def is_alive(self) -> bool:
"""Check that worker is alive."""
Структура данных
Опишем структуру данных о heartbeat. В нашем случае важно хранить идентификатор хоста и время. И должно это сохраняться в JSON.
{
"host_id": "00000000-0000-0000-0000-0242ac160007",
"last_beat": "2023-12-08T09:38:01.255909+00:00"
}
class HeartBeatState(BaseModel):
host_id: str
last_beat: datetime
FileHeartBeat – сохранение в файл
Для простоты сохраним данные в файл – напишем реализацию FileHeartBeat. Придумаем файлу имя, соберем к нему путь, затем сериализуем HeartBeat в JSON и сохраним в файле.

HEART_BEAT_FILE_NAME = "heartbeat.json"
logger = logging.getLogger(__name__)
class FileHeartBeat(HeartBeat):
"""
Save heartbeats to a file to check if consumer is alive by livenessProbe.
"""
def save(self):
"""Save heartbeat timestamp data to indicate that worker is alive."""
heartbeat_file_path = self._retrieve_heartbeat_file_path()
host_id = uuid.UUID(int=uuid.getnode())
last_beat = now()
state = HeartBeatState(
host_id=str(host_id),
last_beat=last_beat,
)
self._save_state_to_file(heartbeat_file_path, state)
def _save_state_to_file(
self,
heartbeat_file_path: Path,
state: HeartBeatState,
) -> None:
"""Save heartbeat timestamp and other information in a file."""
logger.info("Saving heart beat to file: %s, %s", heartbeat_file_path, str(state.json()))
with open(heartbeat_file_path, "w+") as worker_heartbeat_file:
worker_heartbeat_file.write(state.json())
def _retrieve_heartbeat_file_path(self) -> Path:
"""Construct file path where heartbeat info is located."""
base_dir = Path(settings.BASE_DIR)
heartbeat_file_name = HEART_BEAT_FILE_NAME
heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)
return heartbeat_file_path
FileHeartBeat – проверка, жив ли консьюмер
Для проверки, что консьюмер жив, опишем метод is_alive
. Прочтем файл, десериализуем данные из JSON в нашу модель и сравним, сколько прошло секунд с момент последнего бита. Если больше, чем мы ожидаем, то считаем консьюмер мертвым.

class FileHeartBeat(HeartBeat):
"""
Saves heartbeats to a file to check if consumer is alive by livenessProbe.
"""
def is_alive(self, max_time_after_worker_dead: int = 15) -> bool:
""" Main livenessProbe method to check if consumer is healthy. """
heartbeat_file_path = self._retrieve_heartbeat_file_path()
try:
logger.info("Trying to check heartbeat file: %s", heartbeat_file_path)
heart_beat_stats = self._read_heartbeat_file(heartbeat_file_path)
except FileNotFoundError:
logger.error("Heartbeat file not found: %s", heartbeat_file_path)
return False
logger.info("Got last heartbeat info: %s", str(heart_beat_stats.json()))
time_after_last_beat = now() - heart_beat_stats.last_beat
if time_after_last_beat.seconds > max_time_after_worker_dead:
logger.error(
"Worker is dead. Time since last beat %s > %s",
time_after_last_beat.seconds,
time_offset,
)
return False
logger.info(
"Worker is alive. Time since last beat %s < %s",
time_after_last_beat.seconds,
time_offset,
)
return True
def _read_heartbeat_file(self, heartbeat_file_path: Path) -> HeartBeatState:
"""Read heartbeat timestamp and other information from a file."""
with open(heartbeat_file_path) as worker_heartbeat_file:
heart_beat_stats_json: dict = json.load(worker_heartbeat_file)
heart_beat_state = HeartBeatState(**heart_beat_stats_json)
return heart_beat_state
def _retrieve_heartbeat_file_path(self) -> Path:
"""Construct file path where heartbeat info is located."""
base_dir = Path(settings.BASE_DIR)
heartbeat_file_name = HEART_BEAT_FILE_NAME
heartbeat_file_path = base_dir.joinpath(heartbeat_file_name)
return heartbeat_file_path
FileHeartBeat – интегрируем в консьюмер
Интегрируем этот код в код консьюмера. В данном случае используется датакласс для возможности подмены реализации. В начале каждого цикла сохраняем данные, что живы.

@dataclasses.dataclass
class ConfluentKafkaConsumer:
heart_beat: HeartBeat
def run(self):
consumer = ...
....
while True:
message: ConfluentKafkaMessage = consumer.poll(2.0)
self.heart_beat.save()
if not message:
# No message received
continue
self._process_message(message)
confluent_kafka_consumer = ConfluentKafkaConsumer(
heart_beat=FileHeartBeat(),
)
FileHeartBeat - интегрируем в CLI
Дальше необходимо написать CLI-скрипт, который сообщит Kubernetes о том, что воркер жив или умер. Для этого необходима подобная функция, в зависимости от вашего фреймворка:
ERROR_CODE = 1
...
# Your CLI
def health_check(max_time: Optional[int]):
heart_beat = FileHeartBeat()
is_alive = heart_beat.is_alive(max_time_after_worker_dead=max_time)
if not is_alive:
sys.exit(ERROR_CODE)
FileHeartBeat – интегрируем в livenessProbe
В helm пропишем livenessProbe
проверять состояние консьюмера каждые 30 секунд. У нас используется Django, поэтому запускаем через него.
livenessProbe:
exec:
command:
- ./manage.py
- consumer
- is_alive
- --max_time
- "30"
На этом всё. Мы готовы проверять консьюмеры. Код приведен концептуальный.
Что дальше?
Вы можете реализовать сохранение Heartbeat в БД или Redis, а затем в админке смотреть состояние ваших консьюмеров.
Вы можете пойти ещё дальше и написать команды для приостановки вашего консьюмера прямо в коде через подобную концепцию проверки состояния в цикле перед обработкой сообщения. Это может быть удобно, когда нужно сбросить оффсеты в критических случаях (консьюмер группа должна быть остановлена)
Помимо вышеописанного, можно попробовать выделить код в отдельную библиотеку и переиспользовать.
Выводы ✍️
Из опыта использования этого подхода могу выделить следующее:
После реализации механизма Heartbeat мы перестали перезапускать консьюмеры руками и полностью автоматизировали этот процесс.
Среди возможных реализаций данный способ оказался наиболее простым и точным.
Данный подход позволяет расширять функциональность по отслеживанию и управлению консьюмерами – если использовать Redis, то можно отследить в админке консьюмеры, а также реализовать функциональность по управлению без перезапуска и девопсов прямо из админки.
Дополнительные материалы 📚
Вот, что ещё можно почитать на эту тему: