Так исторически сложилось, что Apache Kafka использует для своих сообщений смещения (или же offset). В зависимости от нужд для настроек консьюмера можно выставить в параметр auto.offset.reset три значения: earliest, latest, none. По умолчанию, если данный параметр не задан, используется значение latest.
В данным выпуске я хочу заострить внимание на параметре none.
Earliest
Данные параметр используется, если вам необходимо получать сообщения с самого начала партиций топика. Как правило, данная опция имеет смысл, если вам нужно наполнить данными БД, передать все значения из одного место в другое и т.п.
Latest
Параметр по умолчанию. Используется тогда, когда мы хотим получать актуальные данные, т.е. пришедшие в последний момент времени. Здесь тоже ничего необычного.
None
Параметр, ради которого я и решил написать данную статью. Этот параметр не задает правил оффсета для новых консьюмеров, он кидает исключения в таких случаях.
Передо мной стояла задача: написать микросервис, который перекладывает данные из одного сервера кафки в другой (опытный читатель быть может подметит, что есть же KafkaConnect, Apache Kafka Mirror Maker и т.п., но не я выбирал требования реализации сего мероприятия через микросервис).
Из особенных требований, может быть то, что при первоначальном запуске необходимо задать оффсет, равный "текущая дата - 24 часа" и с этого момента начать консьюмить сообщения (данных очень много приходит за сутки).
Как всегда, в голову приходит решение более легкое и более быстрое - добавить флаг первичной загрузки и передавать его при старте приложения, что и было сделано (представьте, что мы не используем никакую БД, т.к. бизнес не выделил денег). Но а вдруг приложение законсьюмит данные и упадет через час, а все данные уже были взяты? Произойдет дублирование в продюссируемом топике. Что тогда делать?
Да, можно заморочиться и после запуска сразу убрать флаг первичной загрузки, все равно оффсет для группы уже будет существовать. Но это костыльное решение, от которого нужно избавляться.
Недолго думая, я решил переписать код (на самом деле, я подумал его переписать, когда столкнулся с вышеописанной ситуацией падения консьюмера и дублирования данных). Я всегда слышал, что у параметра auto.offset.reset есть 3 параметра, с двумя параметрами из них я имел опыт построения приложений, но третий - none - оставался для меня загадкой.
Мне казалось плевым делом заиспользовать none. Но в процессе написания кода, я бомбил сильнее и сильнее. Все дело в том, что нигде нет примеров с none, а я обыскал весь гугл (если точнее всю первую страницу запроса). В официальной документации сказано:
You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.
Но ни��аких примеров правильного использования нет. И во всех остальных источниках примерно то же самое, т.к. "это специфичный параметр и подходит для редких случаев и рассматривать его мы не будем".
Спустя день я наконец-то пришел к решению, с которым я намерен поделиться. Признаю, можно лучше написать код, чем я. Я далеко не идеальный программист, и если вы подскажете, как его можно лучше структурировать (ну или вообще другое решение), то буду чрезмерно благодарен.
Код
Для начала выставим параметр в application.yml (application.properties) равный none.
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: lalala-group-id
auto-offset-reset: none
enable-auto-commit: false
client-id: lalala-client-idenable.auto.commit я тоже выключил
package org.example.service;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
@Service
@RequiredArgsConstructor
@Log4j2
public class SettingService {
private final ConsumerFactory<String, String> consumerFactory;
@Value("${spring.kafka.topic.test}")
String topic;
@PostConstruct
public void checkAndResetOffsetsIfNeeded() {
Properties consumerProps = new Properties();
// Передаем конфигурацию через ConsumerFactory
consumerProps.putAll(consumerFactory.getConfigurationProperties());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
// Создаем временного консьюмера и берем партиции для нужного топика
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
Set<TopicPartition> topicPartitions = new HashSet<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
// присваеваем консьюмеру партиции
consumer.assign(topicPartitions);
// смотрим нет ли уже для группы консьюмера закомиченных оффсетов в партициях
Map<TopicPartition, OffsetAndMetadata> commitedOffsets = consumer.committed(topicPartitions);
Instant resetTime = Instant.now().minus(Duration.ofHours(24));
Map<TopicPartition, Long> latestOffsets = consumer.endOffsets(topicPartitions);
for (TopicPartition topicPartition : topicPartitions) {
// Берем закомиченные оффсеты из мапы
OffsetAndMetadata commitedOffset = commitedOffsets.get(topicPartition);
Long latestOffset = latestOffsets.get(topicPartition);
// В следующем блоке смотрим, есть ли закомиченные оффсеты, если нет, то ставим оффсет, равный resetTime
if (commitedOffset == null || commitedOffset.offset() == -1L) {
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer
.offsetsForTimes(Collections.singletonMap(topicPartition, resetTime.toEpochMilli()));
OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
long newOffset = offsetAndTimestamp != null ? offsetAndTimestamp.offset() : latestOffset;
log.info("Resetting offset for partition {}", topicPartition.partition());
// Переходим на новый оффсет и коммитим, т.к. у меня вырублен автокоммит
consumer.seek(topicPartition, newOffset);
consumer.commitSync();
// Если есть закомиченный оффсет уже, то ничего не делаем
} else {
log.info("Offset for partition {} is already commited at {}",
topicPartition.partition(), commitedOffset.offset());
}
}
log.info("closing consumer setting");
}
}KafkaListener настроен стандартно:
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.test}"
)
public void listenTopic(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
try {
// какие-нибудь операции
acknowledgment.acknowledge();
} catch (ProducerException e) {
log.error(e.getMessage());
acknowledgment.nack(Duration.ofSeconds(3));
}
}Приведенный выше код создаст перед запуском KafkaListener временного консьюмера, через который мы уже проведем настройки оффсетов. Метод для работы с временным консьюмером вызывается через аннотацию @PostConstruct,который сначала выполнит манипуляции с временным консьюмером, закроет его и потом запустится наш KafkaListener. Конфликтов точка нет.
Через интерфейс CommandLineRunner вызов метода не сработает должным образом, т.к. KafkaListener будет запускаться в одно время с временным консьюмером и будет пробрасываться экспепшн, связанный с оффсетами (точно не помню какой, поэкспериментируйте). Я пробовал еще отключать автоматический запуск, но он требует задания KafkaListener параметра id, и я не стал с этим экспериментировать, так еще больше кода получилось бы (и не знаю, можно ли было бы легко масштабировать количество консьюмеров).
Также пробовал через обработку возникающих ошибок делать манипуляции с оффсетами, но видимо я там чего-то недоглядел и у меня не получилось это реализовать (не исключаю, что такой подход вполне возможен).
Вот таким образом можно работать с none (мне кажется я первый, кто привел пример как работать с этим параметром). Если у вас уже есть примеры работы с данным параметром, то буду рад глянуть, оставляйте в комментах :-)
Всего хорошего, всем хабравчанам!
