Привет! В этой статье я немного объясню важность graceful shutdown и расскажу как сделать плавное завершение работы твоего Spring Boot приложения, которое взаимодействует с Kafka.
Представь, что ты сидишь в кафе, пьешь свой раф на кокосовом, ожидаешь свой ролл и круассан, и решаешь срочно уйти, потому что позвонили с работы. Но тут появляется официант и приносит заказанный тобой круассан. Ты же не встанешь и не уйдешь сразу? Конечно нет - сначала попросишь отменить ролл, съешь круассан, затем допьешь раф и только потом уйдешь. Примерно такая же история должна происходить с твоим приложением: оно должно перестать принимать новые дела, закончить старые, а только потом уже спокойно выключаться.

Правильная настройка graceful shutdown важна не только для того, чтобы ты всегда мог допить кофе, но и повышает целостность данных и стабильность всей системы. Перейдем к делу!
Перестаем принимать запросы
Для этого нам будет необходимо пробежаться по всем запущенным в контейнерах листенерам кафки и завершить их работу. Изменить количество листенеров нам поможет класс для динамического взаимодействия с ними - KafkaListenerEndpointRegistry. (Подробнее можно почитать здесь)
log.info("Starting shutdown kafka components!");
kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> {
c.stop();
log.info("Container {} has stopped", c.getListenerId());
});
log.info("Finish stopping kafka listeners");
Дожидаемся конца обработки вычитанных запросов
Чтобы не завершить работу приложения раньше, чем оно закончит обрабатывать уже считанные из кафки запросы, необходимо вести учет количества запросов в обработке. Это можно сделать, например, через потокобезопасные коллекции, но у меня многопоточная обработка вычитанных сообщений реализована с помощью ThreadPoolTaskExecutor, который предоставляет методы учета недообработанных запросов.
Вычитанные запросы могут обрабатываться бесконечно долго, поэтому давайте определим две переменные, которые будут отвечать за наше терпение:
tpCheckMaxTimes - максимальное количество проверок на наличие недообработанных запросов, которые мы можем сделать, прежде, чем принудительно завершим программу;
tpCheckDelayMs- задержка между проверками в миллисекундах.
Инкапсулируем эти переменные и вынесем их в стендозависимые проперти:
public class ShutDownProperties {
private final long tpCheckDelayMs; // Задержка между проверок тасок у ThreadPool
private final long tpCheckMaxTimes; // Максимальное количество проверок тасок
}
Код проверки моего ThreadPoolTaskExecutor с учетом логики принудительной остановки будет выглядеть так:
var counter = shutDownProperties.getTpCheckMaxTimes();
while (flowExecutor.isRunning() && flowExecutor.getActiveCount() > 0 && counter-- > 0) {
log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}",
shutDownProperties.getTpCheckDelayMs(), flowExecutor.getActiveCount());
Thread.sleep(shutDownProperties.getTpCheckDelayMs());
}
if (counter == -1) {
throw new RuntimeException("Can't wait for end of processing consumed messages!");
}
Таким образом, сервис, который отвечает за функционал плавной остановки приложения будет выглядеть так:
@Slf4j
@Service
@AllArgsConstructor
public class ShutdownService {
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
private final ThreadPoolTaskExecutor flowPprbExecutor;
private final ShutDownProperties shutDownProperties;
public void turnOffConsumers() throws InterruptedException {
log.info("Starting shutdown kafka components!");
kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> {
c.stop();
log.info("Container {} has stopped", c.getListenerId());
});
log.info("Finish stopping kafka listeners");
var counter = shutDownProperties.getTpCheckMaxTimes();
while (flowPprbExecutor.isRunning() && flowPprbExecutor.getActiveCount() > 0 && counter-- > 0) {
log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}",
shutDownProperties.getTpCheckDelayMs(), flowPprbExecutor.getActiveCount());
Thread.sleep(shutDownProperties.getTpCheckDelayMs());
}
if (counter == -1) {
flowPprbExecutor.shutdown();
throw new RuntimeException("Can't wait for end of processing consumed messages!");
}
log.info("No active executors!");
}
Мы реализовали сервис, чтобы наше приложение на Spring Boot могло плавно завершить свою работу, не оставляя за собой бардак в виде потерянных сообщений в Kafka. Осталось только настроить свою облачную среду на взаимодействие с разработанным сервисом и все, ты можешь спать спокойно, зная, что твои сервисы умеют грамотно завершать свои дела, даже если что-то пошло не так :-)