Как стать автором
Обновить

Динамическое создание слушателей в Kafka

Уровень сложностиСредний
Время на прочтение4 мин
Количество просмотров6.6K

Эта статья обьясняет, как создать слушатель в Kafka на лету в процессе работы приложения.

Plan:

  1. Создадим шаблонный класс через реализацию интерфейса MessageListener.

  2. Создадим KafkaListenerEndpoint с помощью шаблона.

  3. Зарегестрируем эндпоинт в KafkaListenerEndpointRegistry.

  4. Создадим окружение для тестирования.

  5. Протестируем решение.

  6. Заключение.

1. Создадим шаблонный класс через реализацию интерфейса MessageListener

Создадим класс KafkaTemplateListener который реализует интерфейс MessageListener. Этот шаблон источник логики для будущих динамически созданных слушателей.

public class KafkaTemplateListener implements MessageListener<String, String> { 
    @Override    
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("RECORD PROCESSING: " + record);
    }
}

2. Создадим KafkaListenerEndpoint с помощью реализованного шаблона

В методе createDefaultMethodKafkaListenerEndpoint(String topic) нужно установить настройки, такие как Endpoint Id, Group Id, Topics и т.д.

В методе createKafkaListenerEndpoint(String topic) нужно установить шаблон слушателя и метод, который слушает сообщение из Kafka

@Service
public class KafkaListenerCreator {
    String kafkaGroupId = "kafkaGroupId";
    String kafkaListenerId = "kafkaListenerId-";
    static AtomicLong endpointIdIndex = new AtomicLong(1);

    private KafkaListenerEndpoint createKafkaListenerEndpoint(String topic) {
        MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
            createDefaultMethodKafkaListenerEndpoint(topic);
        kafkaListenerEndpoint.setBean(new KafkaTemplateListener());
        try {
            kafkaListenerEndpoint.setMethod(KafkaTemplateListener.class.getMethod("onMessage", ConsumerRecord.class));
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("Attempt to call a non-existent method " + e);
        }
        return kafkaListenerEndpoint;
    }

    private MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String topic) {
        MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
        kafkaListenerEndpoint.setId(generateListenerId());
        kafkaListenerEndpoint.setGroupId(kafkaGroupId);
        kafkaListenerEndpoint.setAutoStartup(true);
        kafkaListenerEndpoint.setTopics(topic);
        kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        return kafkaListenerEndpoint;
    }

    private String generateListenerId() {
        return kafkaGeneralListenerEndpointId + endpointIdIndex.getAndIncrement();

    }
}

3. Зарегестрируем эндпоинт в KafkaListenerEndpointRegistry

Логика, которая регистрирует слушатель находиться в теле метода createAndRegisterListener(String topic). Логика находиться в том же классе KafkaListenerCreator.

@Service
public class KafkaListenerCreator {
  //... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
  @Autowired
  private KafkaListenerContainerFactory kafkaListenerContainerFactory;

  public void createAndRegisterListener(String topic) {
    KafkaListenerEndpoint listener = createKafkaListenerEndpoint(topic);
    kafkaListenerEndpointRegistry.registerListenerContainer(listener, kafkaListenerContainerFactory, true);
  }

  //... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE

}

4. Создадим окружение для тестирования

Сначала создадим REST метод для для создания слушателя Kafka.

Я создал класс KafkaController и метод create(String topic). Этот метод может быть вызван с помощью POST HTTP запроса.

@RestController
public class KafkaController {
    @Autowired
    KafkaListenerCreator kafkaListenerCreator;

    @PostMapping(path = "/create")
    @ResponseStatus(HttpStatus.OK)
    public void create(@RequestParam String topic) {
        kafkaListenerCreator.createAndRegisterListener(topic);
    }
}

Далее создадим метод для отправки сообщения в ново созданный слушатель.

Метод send(@RequestParam String topic, @RequestParam String message) имеет два параметра. Где “topic” это имя топика у слушателя Kafka, а “message” это текст сообщения, которое отправляется через KafkaTemplate.

@RestController
public class KafkaController {
    //... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping(path = "/send")
    @ResponseStatus(HttpStatus.OK)
    public void send(@RequestParam String topic, @RequestParam String message) {
        kafkaTemplate.send(topic, message);
    }
    //... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE
}

5. Протестируем решение

  1. вызвать http://localhost:8080/create?topic=myTopic1

  2. вызвать http://localhost:8080/send?topic=myTopic1&message=myTxt1

  3. ожидаемый лог: “ RECORD PROCESSING: myTxt1”

6. Заключение

Эта статья предоставляет быстрое решение для проблемы динамического создания слушателей в Kafka.

Дополнительные ресурсы

https://medium.com/bliblidotcom-techblog/dynamic-spring-boot-kafka-consumer-af8740f2c703?source=post_page-----4f8f359d715e--------------------------------

https://github.com/spring-projects/spring-kafka/issues/460?source=post_page-----4f8f359d715e--------------------------------

Теги:
Хабы:
Всего голосов 5: ↑4 и ↓1+7
Комментарии4

Публикации

Истории

Работа

Java разработчик
347 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань