Привет! В этой статье я немного объясню важность graceful shutdown и расскажу как сделать плавное завершение работы твоего Spring Boot приложения, которое взаимодействует с Kafka.
Представь, что ты сидишь в кафе, пьешь свой раф на кокосовом, ожидаешь свой ролл и круассан, и решаешь срочно уйти, потому что позвонили с работы. Но тут появляется официант и приносит заказанный тобой круассан. Ты же не встанешь и не уйдешь сразу? Конечно нет - сначала попросишь отменить ролл, съешь круассан, затем допьеш�� раф и только потом уйдешь. Примерно такая же история должна происходить с твоим приложением: оно должно перестать принимать новые дела, закончить старые, а только потом уже спокойно выключаться.

Правильная настройка graceful shutdown важна не только для того, чтобы ты всегда мог допить кофе, но и повышает целостность данных и стабильность всей системы. Перейдем к делу!
Перестаем принимать запросы
Для этого нам будет необходимо пробежаться по всем запущенным в контейнерах листенерам кафки и завершить их работу. Изменить количество листенеров нам поможет класс для динамического взаимодействия с ними - KafkaListenerEndpointRegistry. (Подробнее можно почитать здесь)
log.info("Starting shutdown kafka components!"); kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> { c.stop(); log.info("Container {} has stopped", c.getListenerId()); }); log.info("Finish stopping kafka listeners");
Дожидаемся конца обработки вычитанных запросов
Чтобы не завершить работу приложения раньше, чем оно закончит обрабатывать уже считанные из кафки запросы, необходимо вести учет количества запросов в обработке. Это можно сделать, например, через потокобезопасные коллекции, но у меня многопоточная обработка вычитанных сообщений реализована с помощью ThreadPoolTaskExecutor, который предоставляет методы учета недообработанных запросов.
Вычитанные запросы могут обрабатываться бесконечно долго, поэтому давайте определим две переменные, которые будут отвечать за наше терпение:
tpCheckMaxTimes - максимальное количество проверок на наличие недообработанных запросов, которые мы можем сделать, прежде, чем принудительно завершим программу;
tpCheckDelayMs- задержка между проверками в миллисекундах.
Инкапсулируем эти переменные и вынесем их в стендозависимые проперти:
public class ShutDownProperties { private final long tpCheckDelayMs; // Задержка между проверок тасок у ThreadPool private final long tpCheckMaxTimes; // Максимальное количество проверок тасок }
Код проверки моего ThreadPoolTaskExecutor с учетом логики принудительной остановки будет выглядеть так:
var counter = shutDownProperties.getTpCheckMaxTimes(); while (flowExecutor.isRunning() && flowExecutor.getActiveCount() > 0 && counter-- > 0) { log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}", shutDownProperties.getTpCheckDelayMs(), flowExecutor.getActiveCount()); Thread.sleep(shutDownProperties.getTpCheckDelayMs()); } if (counter == -1) { throw new RuntimeException("Can't wait for end of processing consumed messages!"); }
Таким образом, сервис, который отвечает за функционал плавной остановки приложения будет выглядеть так:
@Slf4j @Service @AllArgsConstructor public class ShutdownService { private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; private final ThreadPoolTaskExecutor flowPprbExecutor; private final ShutDownProperties shutDownProperties; public void turnOffConsumers() throws InterruptedException { log.info("Starting shutdown kafka components!"); kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> { c.stop(); log.info("Container {} has stopped", c.getListenerId()); }); log.info("Finish stopping kafka listeners"); var counter = shutDownProperties.getTpCheckMaxTimes(); while (flowPprbExecutor.isRunning() && flowPprbExecutor.getActiveCount() > 0 && counter-- > 0) { log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}", shutDownProperties.getTpCheckDelayMs(), flowPprbExecutor.getActiveCount()); Thread.sleep(shutDownProperties.getTpCheckDelayMs()); } if (counter == -1) { flowPprbExecutor.shutdown(); throw new RuntimeException("Can't wait for end of processing consumed messages!"); } log.info("No active executors!"); }
Мы реализовали сервис, чтобы наше приложение на Spring Boot могло плавно завершить свою работу, не оставляя за собой бардак в виде потерянных сообщений в Kafka. Осталось только настроить свою облачную среду на взаимодействие с разработанным сервисом и все, ты можешь спать спокойно, зная, что твои сервисы умеют грамотно завершать свои дела, даже если что-то пошло не так :-)
