Pull to refresh

До свидания, Kafka, или graceful shutdown на Spring Boot для Kafka

Level of difficultyMedium
Reading time3 min
Views5.3K

Привет! В этой статье я немного объясню важность graceful shutdown и расскажу как сделать плавное завершение работы твоего Spring Boot приложения, которое взаимодействует с Kafka.

Представь, что ты сидишь в кафе, пьешь свой раф на кокосовом, ожидаешь свой ролл и круассан, и решаешь срочно уйти, потому что позвонили с работы. Но тут появляется официант и приносит заказанный тобой круассан. Ты же не встанешь и не уйдешь сразу? Конечно нет - сначала попросишь отменить ролл, съешь круассан, затем допьешь раф и только потом уйдешь. Примерно такая же история должна происходить с твоим приложением: оно должно перестать принимать новые дела, закончить старые, а только потом уже спокойно выключаться.

Правильная настройка graceful shutdown важна не только для того, чтобы ты всегда мог допить кофе, но и повышает целостность данных и стабильность всей системы. Перейдем к делу!

Перестаем принимать запросы

Для этого нам будет необходимо пробежаться по всем запущенным в контейнерах листенерам кафки и завершить их работу. Изменить количество листенеров нам поможет класс для динамического взаимодействия с ними - KafkaListenerEndpointRegistry. (Подробнее можно почитать здесь)

log.info("Starting shutdown kafka components!");

kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> {
  c.stop();
  log.info("Container {} has stopped", c.getListenerId());
});
log.info("Finish stopping kafka listeners");

Дожидаемся конца обработки вычитанных запросов

Чтобы не завершить работу приложения раньше, чем оно закончит обрабатывать уже считанные из кафки запросы, необходимо вести учет количества запросов в обработке. Это можно сделать, например, через потокобезопасные коллекции, но у меня многопоточная обработка вычитанных сообщений реализована с помощью ThreadPoolTaskExecutor, который предоставляет методы учета недообработанных запросов.

Вычитанные запросы могут обрабатываться бесконечно долго, поэтому давайте определим две переменные, которые будут отвечать за наше терпение:

  • tpCheckMaxTimes - максимальное количество проверок на наличие недообработанных запросов, которые мы можем сделать, прежде, чем принудительно завершим программу;

  • tpCheckDelayMs- задержка между проверками в миллисекундах.

Инкапсулируем эти переменные и вынесем их в стендозависимые проперти:

public class ShutDownProperties {
    private final long tpCheckDelayMs; // Задержка между проверок тасок у ThreadPool
    private final long tpCheckMaxTimes; // Максимальное количество проверок тасок
}

Код проверки моего ThreadPoolTaskExecutor с учетом логики принудительной остановки будет выглядеть так:

var counter = shutDownProperties.getTpCheckMaxTimes();
while (flowExecutor.isRunning() && flowExecutor.getActiveCount() > 0 && counter-- > 0) {
  log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}",
    shutDownProperties.getTpCheckDelayMs(), flowExecutor.getActiveCount());
  Thread.sleep(shutDownProperties.getTpCheckDelayMs());
  }
if (counter == -1) {
  throw new RuntimeException("Can't wait for end of processing consumed messages!");
}

Таким образом, сервис, который отвечает за функционал плавной остановки приложения будет выглядеть так:

@Slf4j
@Service
@AllArgsConstructor
public class ShutdownService {

    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    private final ThreadPoolTaskExecutor flowPprbExecutor;
    private final ShutDownProperties shutDownProperties;

    public void turnOffConsumers() throws InterruptedException {
        log.info("Starting shutdown kafka components!");

        kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> {
            c.stop();
            log.info("Container {} has stopped", c.getListenerId());
        });
        log.info("Finish stopping kafka listeners");

        var counter = shutDownProperties.getTpCheckMaxTimes();
        while (flowPprbExecutor.isRunning() && flowPprbExecutor.getActiveCount() > 0 && counter-- > 0) {
            log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}",
                    shutDownProperties.getTpCheckDelayMs(), flowPprbExecutor.getActiveCount());
            Thread.sleep(shutDownProperties.getTpCheckDelayMs());
        }
        if (counter == -1) {
            flowPprbExecutor.shutdown();
            throw new RuntimeException("Can't wait for end of processing consumed messages!");
        }

        log.info("No active executors!");
    }

Мы реализовали сервис, чтобы наше приложение на Spring Boot могло плавно завершить свою работу, не оставляя за собой бардак в виде потерянных сообщений в Kafka. Осталось только настроить свою облачную среду на взаимодействие с разработанным сервисом и все, ты можешь спать спокойно, зная, что твои сервисы умеют грамотно завершать свои дела, даже если что-то пошло не так :-)

Tags:
Hubs:
+6
Comments2

Articles