Как стать автором
Обновить

Ручное подтверждение сообщения в Kafka

Время на прочтение2 мин
Количество просмотров5.8K

В этой статье описывается реализация задачи ручного подтверждения(acknowledgment) обработки сообщения в Kafka через ручную отправку смещения(commit offset) сообщения. Логика реализована с использованием Java, Spring и Kafka.

План:

  1. Реализовать слушатель Kafka с ручным подтверждением.

  2. Добавить файл application.properties с инструкциями для ручной отправки смещения.

  3. Описание механизма подтверждения в Kafka и определения.

1. Реализовать слушатель Kafka с ручным подтверждением

Создадим слушатель Kafka через использование аннотации @KafkaListener. Далее добавим переменную Acknowledgment для дальнейшей настройки подтверждения:

@Service
public class ServiceCallKafkaListener {

    @KafkaListener(id = "listenerId",
            groupId = "groupListenerId",
            topics = "topicName")
    public void listenServiceCall(@Payload String message,
                                  Acknowledgment acknowledgment) {
        //here is your logic for message processing
        boolean logicForMessageProcessingCompleted = true;
        if (logicForMessageProcessingCompleted) {
            //manual commit
            acknowledgment.acknowledge();
        }
    }
}

Нужно вызвать метод acknowledge() объекта Acknowledgment в теле метода слушателя. Метод слушателя это: listenServiceCall(). Метод acknowledge() отправляет сигнал в Kafka о том, что он подтверждает обработку сообщения.

2. Добавим файл application.properties и настройки

Добавим настройки Kafka в файл application.properties для того, что бы механизм ручной отправки смещения начал работать.

#for detail description see "Resources" section in this article
#type of acknowledgment mode
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

#property that turns off auto commit
spring.kafka.consumer.enable-auto-commit=false

3. Описание механизма подтверждения Kafka и определения

Подтверждение(Acknowledgment) - это сигнал в Kafka, который означает, что сообщение обработано слушателем и слушатель готов слушать следующее сообщение. Слушателем в этой статье является метод listenServiceCall. Kafka двигает указатель смещения до следующего, после отправки сигнала о подтверждении(acknowledge()).

Cлушатель(KafkaListener) будет получать одно и то же сообщение с одним и тем же смещением(offset), до тех пор пока слушатель не отправит сигнал о подтверждении.

Смещение(offset) в Kafka - это идентификатор сообщения в очереди сообщений.

Источники для дополнительного чтения

  1. Acknowledge mode description

  2. Acknowledge method documentation

Теги:
Хабы:
Всего голосов 12: ↑6 и ↓6+4
Комментарии3

Публикации

Истории

Работа

Java разработчик
395 вакансий

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн
10 – 11 октября
HR IT & Team Lead конференция «Битва за IT-таланты»
МоскваОнлайн
25 октября
Конференция по росту продуктов EGC’24
МоскваОнлайн
7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн