Перемещение указателя на смещение в Kafka
В данной статье описывается задача, в которой необходимо слушать сообщения с определенного смещения(offset) в Kafka. Для решения данной задачи потребуется интерфейс ConsumerSeekAware и найти позицию с которой нужно начать слушание. Так же необходимо будет создать механизм перезапуска слушателя сообщений Kafka. В примерах кода используются Java и Spring фреймворк.
План
Реализовать интерфейс ConsumerSeekAware.
Создать слушатель Kafka, который слушает сообщения.
Создать сервис, который перезапускает слушатель Kafka.
Описать порядок работы методов.
1. Реализовать ConsumerSeekAware
При реализации интерфейса ConsumerSeekAware, необходимо переопределить метод onPartitionsAssigned(…). Метод onPartitionsAssigned(…) определит позицию с которой необходимо слушать сообщения.
@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
long offset = 777l; //your offset number
String topic = "TopicName"; //your topic name
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(partition ->
callback.seek(this.topic, partition, this.offset));
}
}
}
Метод onPartitionsAssigned(…) будет выполнен после того, как MessageListenerContainer даст ход процессу слушания сообщений.
2. Создать слушатель Kafka, который слушает сообщения
Для реализации слушателя используем аннотацию @KafkaListener:
@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
long offset = 777l; //your offset number
String topic = "TopicName"; //your topic name
String listenerId = "listenerId"; //your listener id
//id of this listener has to be remembered for further
//use in method getListenerContainer of KafkaListenerEndpointRegistry
@KafkaListener(id = listenerId,
groupId = "groupName",
topics = topic)
public void listenServiceCall(@Payload String message,
@Header(KafkaHeaders.OFFSET) Long offset) {
System.out.println("offset: " + offset ", message: " + message);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(partition ->
callback.seek(this.topic, partition, this.offset));
}
}
}
В данном примере видно метод listenServiceCall(…), который отмечен аннотацией @KafkaListener , что делает метод искомым для контейнера приложения и класса MessageListenerContainer.
3. Создать сервис, который перезапускает слушатель Kafka
Возьмём методы stop() и start() класса MessageListenerContainer, далее сделаем для них обертки и затем запустим их по очереди.
@Service
public class KafkaListenerRestartService {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
String listenerId = "listenerId";
public void restartKafkaListener() {
stop();
start();
}
public void start() {
MessageListenerContainer container = kafkaListenerEndpointRegistry
.getListenerContainer(this.listenerId);
if (container != null && !container.isRunning()) {
container.start();
}
}
public void stop() {
MessageListenerContainer container = kafkaListenerEndpointRegistry
.getListenerContainer(this.listenerId);
if (container != null && container.isRunning()) {
container.stop();
}
}
}
В классе KafkaListenerEndpointRegistry, методы start() и stop() предоставляют контроль над запуском определенного слушателя через класс MessageListenerContainer и идентификатор(Id) необходимого слушателя.
4. Описать порядок работы методов
Далее представлен порядок запуска методов, начиная от момента, когда Kafka остановлена для слушания сообщения, далее перемещением указателя смещения(offset) и новым запуском слушания сообщений. В результате, слушатель Kafka будет читать сообщения с определенного смещения(offset).
Порядок запуска методов:
restartKafkaListener()
stop()
start()
onPartitionsAssigned(...)
listenServiceCall(...)
Итог
Разработано решение для перемещения указателя смещения в Kafka на определенную позицию.