Как стать автором
Обновить

Как сделать livenessProbe для Kafka-консьюмеров и перестать перезапускать их вручную

Уровень сложностиСредний
Время на прочтение6 мин
Количество просмотров1.5K
Всего голосов 7: ↑7 и ↓0+11
Комментарии8

Комментарии 8

О каком сигнале идет речь?

  • Через unix сигналы (проверяет, не завис ли процесс ОС, неточный);

Я пробовал реализовывать свою логику через SIGUSR1


Было предположение, что завис процесс консьюмера и проблема именно в процессе. После посылки сигнала писалось нечто в файл. Если файл появлялся, то все ок. Если нет - не ок. В итоге консьюмер зависал не только по этой причине, но и по другим, а livenessProbe думал, что все в порядке

Я не встречался с зависанием Kafka consumer, хотя идея отслеживать их живучесть мне понравилась. Сначала подумал, что можно было бы пойти другой дорогой: запускать consumer в отдельном приложении fastapi в spiner до yield в вечном while True, а уже в самом fastapi сделать единственную ручку check —-> 200. И ее держать на health check, но видно у вас зависание происходит не вашего кода, а где-то далее в подключении к Kafka. У вас видно на ваш SIGN приложение также и отзывалось .

А вот еще идея, перед вашим стартом consumer сделать вечную asyncio таску, в которой каждые 10 секунд пытаться подключится к Kafka (например , писать сообщение ping и считывать его, если не считалось, то exit уже.

Да, тут тоже вариант, как раз другие разработчики такую схему делали с отдельной очередью.

Здесь только есть проблема -- если основная таска обработки сообщений внутри застряла, а периодичесая, которая пишет пинг, нет. Допустим, сообщение(не ping) обрабатывается реально долго, то asyncio таска будет писать ping дальше. Тут какой-то такой же механизм нужно прикрутить, чтобы понимать, прогресс обработки основного сообщения

зависание происходит не вашего кода, а где-то далее в подключении к Kafka.

Да, все так, судя по тому, что откопали - местами висело подключение к кафке, после перезапуска все ок. В добавок там могло висеть подключение к БД по какой-то причине. А еще aiokafka имел проблему с зависаниями.

> запускать consumer в отдельном приложении fastapi
Тоже была мысль какой-то эндпоинт прикрутить. У нас, кажется, такое делала другая команда. Сам не использовал, так-как параллельно пришлось прикручивать свое решение.

Не совсем понятно, почему у вас они зависают. У нас один из сервисов кушает 300 000 в минуту, другой 2-3. И оба за год с имплементации еще не зависали. Тоже питон, аиокафка асинхронный.

Мы отслеживаем только количество асинхронных тасок listener. Если вдруг становится на одну меньше - перегружаем под(у нас их 12)

А как вообще выглядит зависание? у aiokafka два потока async(вернее 1+количество консумеров). оба живые остаются?

session_timeout/heartbeat_interval_ms выставлены?

Насчет количества потоков async не могу сказать - вообще не проверял, что там и как. Была задача написать какой-то health check для консьюмеров, но вот причины зависания очень сложно было понять, локально не воспроизвести.

Зависание выглядит так, будто консьюмер перестал обрабатывать сообщения - мы об этом узнаем, когда, например, кто-то кликнул на кнопку в одном сервисе, а в нашем сервисе ничего не появляется. Более глубоких причин не удалось выяснить, разве что кроме зависаний коннекшена к кафке, но это для продьюсера.

session_timeout/heartbeat_interval_ms - вроде стояли дефолтные, тоже не могу точно сказать.

Предполагаю, что дело все-таки в кластере кафки, у нас кластер кафки достаточно нагружен в совокупости. Сами топики по отдельности - нет. А так интересно, конечно, что у вас все хорошо работает, а тут нет.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий