В этой статье описывается реализация задачи ручного подтверждения(acknowledgment) обработки сообщения в Kafka через ручную отправку смещения(commit offset) сообщения. Логика реализована с использованием Java, Spring и Kafka.

План:
Реализовать слушатель Kafka с ручным подтверждением.
Добавить файл application.properties с инструкциями для ручной отправки смещения.
Описание механизма подтверждения в Kafka и определения.
1. Реализовать слушатель Kafka с ручным подтверждением
Создадим слушатель Kafka через использование аннотации @KafkaListener. Далее добавим переменную Acknowledgment для дальнейшей настройки подтверждения:
@Service public class ServiceCallKafkaListener { @KafkaListener(id = "listenerId", groupId = "groupListenerId", topics = "topicName") public void listenServiceCall(@Payload String message, Acknowledgment acknowledgment) { //here is your logic for message processing boolean logicForMessageProcessingCompleted = true; if (logicForMessageProcessingCompleted) { //manual commit acknowledgment.acknowledge(); } } }
Нужно вызвать метод acknowledge() объекта Acknowledgment в теле метода слушателя. Метод слушателя это: listenServiceCall(). Метод acknowledge() отправляет сигнал в Kafka о том, что он подтверждает обработку сообщения.
2. Добавим файл application.properties и настройки
Добавим настройки Kafka в файл application.properties для того, что бы механизм ручной отправки смещения начал работать.
#for detail description see "Resources" section in this article #type of acknowledgment mode spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE #property that turns off auto commit spring.kafka.consumer.enable-auto-commit=false
3. Описание механизма подтверждения Kafka и определения
Подтверждение(Acknowledgment) - это сигнал в Kafka, который означает, что сообщение обработано слушателем и слушатель готов слушать следующее сообщение. Слушателем в этой статье является метод listenServiceCall. Kafka двигает указатель смещения до следующего, после отправки сигнала о подтверждении(acknowledge()).
Cлушатель(KafkaListener) будет получать одно и то же сообщение с одним и тем же смещением(offset), до тех пор пока слушатель не отправит сигнал о подтверждении.
Смещение(offset) в Kafka - это идентификатор сообщения в очереди сообщений.
