Так исторически сложилось, что Apache Kafka использует для своих сообщений смещения (или же offset). В зависимости от нужд для настроек консьюмера можно выставить в параметр auto.offset.reset три значения: earliest, latest, none. По умолчанию, если данный параметр не задан, используется значение latest.

В данным выпуске я хочу заострить внимание на параметре none.

Earliest

Данные параметр используется, если вам необходимо получать сообщения с самого начала партиций топика. Как правило, данная опция имеет смысл, если вам нужно наполнить данными БД, передать все значения из одного место в другое и т.п.

Latest

Параметр по умолчанию. Используется тогда, когда мы хотим получать актуальные данные, т.е. пришедшие в последний момент времени. Здесь тоже ничего необычного.

None

Параметр, ради которого я и решил написать данную статью. Этот параметр не задает правил оффсета для новых консьюмеров, он кидает исключения в таких случаях.

Передо мной стояла задача: написать микросервис, который перекладывает данные из одного сервера кафки в другой (опытный читатель быть может подметит, что есть же KafkaConnect, Apache Kafka Mirror Maker и т.п., но не я выбирал требования реализации сего мероприятия через микросервис).

Из особенных требований, может быть то, что при первоначальном запуске необходимо задать оффсет, равный "текущая дата - 24 часа" и с этого момента начать консьюмить сообщения (данных очень много приходит за сутки).

Как всегда, в голову приходит решение более легкое и более быстрое - добавить флаг первичной загрузки и передавать его при старте приложения, что и было сделано (представьте, что мы не используем никакую БД, т.к. бизнес не выделил денег). Но а вдруг приложение законсьюмит данные и упадет через час, а все данные уже были взяты? Произойдет дублирование в продюссируемом топике. Что тогда делать?

Да, можно заморочиться и после запуска сразу убрать флаг первичной загрузки, все равно оффсет для группы уже будет существовать. Но это костыльное решение, от которого нужно избавляться.

Недолго думая, я решил переписать код (на самом деле, я подумал его переписать, когда столкнулся с вышеописанной ситуацией падения консьюмера и дублирования данных). Я всегда слышал, что у параметра auto.offset.reset есть 3 параметра, с двумя параметрами из них я имел опыт построения приложений, но третий - none - оставался для меня загадкой.

Мне казалось плевым делом заиспользовать none. Но в процессе написания кода, я бомбил сильнее и сильнее. Все дело в том, что нигде нет примеров с none, а я обыскал весь гугл (если точнее всю первую страницу запроса). В официальной документации сказано:

You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.

Но ни��аких примеров правильного использования нет. И во всех остальных источниках примерно то же самое, т.к. "это специфичный параметр и подходит для редких случаев и рассматривать его мы не будем".

Спустя день я наконец-то пришел к решению, с которым я намерен поделиться. Признаю, можно лучше написать код, чем я. Я далеко не идеальный программист, и если вы подскажете, как его можно лучше структурировать (ну или вообще другое решение), то буду чрезмерно благодарен.

Код

Для начала выставим параметр в application.yml (application.properties) равный none.

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: lalala-group-id
      auto-offset-reset:  none
      enable-auto-commit: false
      client-id: lalala-client-id

enable.auto.commit я тоже выключил

package org.example.service;

import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;
import java.util.*;

@Service
@RequiredArgsConstructor
@Log4j2
public class SettingService {
  private final ConsumerFactory<String, String> consumerFactory;
  @Value("${spring.kafka.topic.test}")
  String topic;
  
  @PostConstruct
  public void checkAndResetOffsetsIfNeeded() {
      Properties consumerProps = new Properties();
      // Передаем конфигурацию через ConsumerFactory
      consumerProps.putAll(consumerFactory.getConfigurationProperties());
      try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
          // Создаем временного консьюмера и берем партиции для нужного топика
          List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
          Set<TopicPartition> topicPartitions = new HashSet<>();
  
          for (PartitionInfo partitionInfo : partitionInfos) {
              topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
          }

          // присваеваем консьюмеру партиции
          consumer.assign(topicPartitions);

          // смотрим нет ли уже для группы консьюмера закомиченных оффсетов в партициях
          Map<TopicPartition, OffsetAndMetadata> commitedOffsets = consumer.committed(topicPartitions);
          Instant resetTime = Instant.now().minus(Duration.ofHours(24));
  
          Map<TopicPartition, Long> latestOffsets = consumer.endOffsets(topicPartitions);
  
          for (TopicPartition topicPartition : topicPartitions) {
              // Берем закомиченные оффсеты из мапы
              OffsetAndMetadata commitedOffset = commitedOffsets.get(topicPartition);
              Long latestOffset = latestOffsets.get(topicPartition);

              // В следующем блоке смотрим, есть ли закомиченные оффсеты, если нет, то ставим оффсет, равный resetTime
              if (commitedOffset == null || commitedOffset.offset() == -1L) {
                  Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer
                          .offsetsForTimes(Collections.singletonMap(topicPartition, resetTime.toEpochMilli()));
                  OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
  
                  long newOffset = offsetAndTimestamp != null ? offsetAndTimestamp.offset() : latestOffset;
                  log.info("Resetting offset for partition {}", topicPartition.partition());

                  // Переходим на новый оффсет и коммитим, т.к. у меня вырублен автокоммит
                  consumer.seek(topicPartition, newOffset);
                  consumer.commitSync();
                // Если есть закомиченный оффсет уже, то ничего не делаем
              } else {
                  log.info("Offset for partition {} is already commited at {}",
                          topicPartition.partition(), commitedOffset.offset());
              }
          }
  
          log.info("closing consumer setting");
      }
}

KafkaListener настроен стандартно:

@KafkaListener(
            groupId = "${spring.kafka.consumer.group-id}",
            topics = "${spring.kafka.topic.test}"
    )
public void listenTopic(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
  try {
    // какие-нибудь операции
    acknowledgment.acknowledge();
  } catch (ProducerException e) {
    log.error(e.getMessage());
    acknowledgment.nack(Duration.ofSeconds(3));
  }
}

Приведенный выше код создаст перед запуском KafkaListener временного консьюмера, через который мы уже проведем настройки оффсетов. Метод для работы с временным консьюмером вызывается через аннотацию @PostConstruct,который сначала выполнит манипуляции с временным консьюмером, закроет его и потом запустится наш KafkaListener. Конфликтов точка нет.

Через интерфейс CommandLineRunner вызов метода не сработает должным образом, т.к. KafkaListener будет запускаться в одно время с временным консьюмером и будет пробрасываться экспепшн, связанный с оффсетами (точно не помню какой, поэкспериментируйте). Я пробовал еще отключать автоматический запуск, но он требует задания KafkaListener параметра id, и я не стал с этим экспериментировать, так еще больше кода получилось бы (и не знаю, можно ли было бы легко масштабировать количество консьюмеров).

Также пробовал через обработку возникающих ошибок делать манипуляции с оффсетами, но видимо я там чего-то недоглядел и у меня не получилось это реализовать (не исключаю, что такой подход вполне возможен).

Вот таким образом можно работать с none (мне кажется я первый, кто привел пример как работать с этим параметром). Если у вас уже есть примеры работы с данным параметром, то буду рад глянуть, оставляйте в комментах :-)

Всего хорошего, всем хабравчанам!