Так исторически сложилось, что Apache Kafka использует для своих сообщений смещения (или же offset). В зависимости от нужд для настроек консьюмера можно выставить в параметр auto.offset.reset три значения: earliest, latest, none. По умолчанию, если данный параметр не задан, используется значение latest.
В данным выпуске я хочу заострить внимание на параметре none.
Earliest
Данные параметр используется, если вам необходимо получать сообщения с самого начала партиций топика. Как правило, данная опция имеет смысл, если вам нужно наполнить данными БД, передать все значения из одного место в другое и т.п.
Latest
Параметр по умолчанию. Используется тогда, когда мы хотим получать актуальные данные, т.е. пришедшие в последний момент времени. Здесь тоже ничего необычного.
None
Параметр, ради которого я и решил написать данную статью. Этот параметр не задает правил оффсета для новых консьюмеров, он кидает исключения в таких случаях.
Передо мной стояла задача: написать микросервис, который перекладывает данные из одного сервера кафки в другой (опытный читатель быть может подметит, что есть же KafkaConnect, Apache Kafka Mirror Maker и т.п., но не я выбирал требования реализации сего мероприятия через микросервис).
Из особенных требований, может быть то, что при первоначальном запуске необходимо задать оффсет, равный "текущая дата - 24 часа" и с этого момента начать консьюмить сообщения (данных очень много приходит за сутки).
Как всегда, в голову приходит решение более легкое и более быстрое - добавить флаг первичной загрузки и передавать его при старте приложения, что и было сделано (представьте, что мы не используем никакую БД, т.к. бизнес не выделил денег). Но а вдруг приложение законсьюмит данные и упадет через час, а все данные уже были взяты? Произойдет дублирование в продюссируемом топике. Что тогда делать?
Да, можно заморочиться и после запуска сразу убрать флаг первичной загрузки, все равно оффсет для группы уже будет существовать. Но это костыльное решение, от которого нужно избавляться.
Недолго думая, я решил переписать код (на самом деле, я подумал его переписать, когда столкнулся с вышеописанной ситуацией падения консьюмера и дублирования данных). Я всегда слышал, что у параметра auto.offset.reset есть 3 параметра, с двумя параметрами из них я имел опыт построения приложений, но третий - none - оставался для меня загадкой.
Мне казалось плевым делом заиспользовать none. Но в процессе написания кода, я бомбил сильнее и сильнее. Все дело в том, что нигде нет примеров с none, а я обыскал весь гугл (если точнее всю первую страницу запроса). В официальной документации сказано:
You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.
Но ни��аких примеров правильного использования нет. И во всех остальных источниках примерно то же самое, т.к. "это специфичный параметр и подходит для редких случаев и рассматривать его мы не будем".
Спустя день я наконец-то пришел к решению, с которым я намерен поделиться. Признаю, можно лучше написать код, чем я. Я далеко не идеальный программист, и если вы подскажете, как его можно лучше структурировать (ну или вообще другое решение), то буду чрезмерно благодарен.
Код
Для начала выставим параметр в application.yml (application.properties) равный none.
spring: kafka: consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: lalala-group-id auto-offset-reset: none enable-auto-commit: false client-id: lalala-client-id
enable.auto.commit я тоже выключил
package org.example.service; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Service; import java.time.Duration; import java.time.Instant; import java.util.*; @Service @RequiredArgsConstructor @Log4j2 public class SettingService { private final ConsumerFactory<String, String> consumerFactory; @Value("${spring.kafka.topic.test}") String topic; @PostConstruct public void checkAndResetOffsetsIfNeeded() { Properties consumerProps = new Properties(); // Передаем конфигурацию через ConsumerFactory consumerProps.putAll(consumerFactory.getConfigurationProperties()); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) { // Создаем временного консьюмера и берем партиции для нужного топика List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); Set<TopicPartition> topicPartitions = new HashSet<>(); for (PartitionInfo partitionInfo : partitionInfos) { topicPartitions.add(new TopicPartition(topic, partitionInfo.partition())); } // присваеваем консьюмеру партиции consumer.assign(topicPartitions); // смотрим нет ли уже для группы консьюмера закомиченных оффсетов в партициях Map<TopicPartition, OffsetAndMetadata> commitedOffsets = consumer.committed(topicPartitions); Instant resetTime = Instant.now().minus(Duration.ofHours(24)); Map<TopicPartition, Long> latestOffsets = consumer.endOffsets(topicPartitions); for (TopicPartition topicPartition : topicPartitions) { // Берем закомиченные оффсеты из мапы OffsetAndMetadata commitedOffset = commitedOffsets.get(topicPartition); Long latestOffset = latestOffsets.get(topicPartition); // В следующем блоке смотрим, есть ли закомиченные оффсеты, если нет, то ставим оффсет, равный resetTime if (commitedOffset == null || commitedOffset.offset() == -1L) { Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer .offsetsForTimes(Collections.singletonMap(topicPartition, resetTime.toEpochMilli())); OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition); long newOffset = offsetAndTimestamp != null ? offsetAndTimestamp.offset() : latestOffset; log.info("Resetting offset for partition {}", topicPartition.partition()); // Переходим на новый оффсет и коммитим, т.к. у меня вырублен автокоммит consumer.seek(topicPartition, newOffset); consumer.commitSync(); // Если есть закомиченный оффсет уже, то ничего не делаем } else { log.info("Offset for partition {} is already commited at {}", topicPartition.partition(), commitedOffset.offset()); } } log.info("closing consumer setting"); } }
KafkaListener настроен стандартно:
@KafkaListener( groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.kafka.topic.test}" ) public void listenTopic(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) { try { // какие-нибудь операции acknowledgment.acknowledge(); } catch (ProducerException e) { log.error(e.getMessage()); acknowledgment.nack(Duration.ofSeconds(3)); } }
Приведенный выше код создаст перед запуском KafkaListener временного консьюмера, через который мы уже проведем настройки оффсетов. Метод для работы с временным консьюмером вызывается через аннотацию @PostConstruct,который сначала выполнит манипуляции с временным консьюмером, закроет его и потом запустится наш KafkaListener. Конфликтов точка нет.
Через интерфейс CommandLineRunner вызов метода не сработает должным образом, т.к. KafkaListener будет запускаться в одно время с временным консьюмером и будет пробрасываться экспепшн, связанный с оффсетами (точно не помню какой, поэкспериментируйте). Я пробовал еще отключать автоматический запуск, но он требует задания KafkaListener параметра id, и я не стал с этим экспериментировать, так еще больше кода получилось бы (и не знаю, можно ли было бы легко масштабировать количество консьюмеров).
Также пробовал через обработку возникающих ошибок делать манипуляции с оффсетами, но видимо я там чего-то недоглядел и у меня не получилось это реализовать (не исключаю, что такой подход вполне возможен).
Вот таким образом можно работать с none (мне кажется я первый, кто привел пример как работать с этим параметром). Если у вас уже есть примеры работы с данным параметром, то буду рад глянуть, оставляйте в комментах :-)
Всего хорошего, всем хабравчанам!
