Pull to refresh
102.55
Слёрм
Учебный центр для тех, кто работает в IT

Apache Kafka и Spring Boot: лёгкая интеграция

Level of difficultyEasy
Reading time10 min
Views23K
Original author: Avinash Hargun

Kafka — это универсальный и мощный инструмент для построения конвейеров данных в реальном времени и event-driven приложений. В этой статье мы разберемся, как интегрировать Kafka с экосистемой Spring Boot.

Ключевые понятия Kafka

  • Топики. Kafka собирает данные в топики, которые по сути являются темами, категориями или каналами, в которых публикуются записи.

  • Продюсеры. Продюсеры отвечают за передачу данных в топики Kafka. Их можно рассматривать как источники потока данных.

  • Консьюмеры. Консьюмеры подписываются на топики и обрабатывают записи, переданные продюсерами. Они являются получателями потоков данных.

  • Брокеры. Кластеры Kafka состоят из брокеров, которые хранят данные и управляют распределением записей по топикам.

  • Партиции. Каждая тема может быть разделена на партиции (partition), которые позволяют выполнять параллельную обработку и распределять данные по кластеру.

Ядро архитектуры Kafka

В архитектуре Kafka сообщения (или события) являются сердцем системы. Продюсеры создают и отправляют сообщения в определенные топики, которые выступают в роли категорий. Эти топики делятся на партиции для обеспечения эффективной параллельной обработки.

Консьюмеры подписываются на топики и получают сообщения из партиций. Каждая партиция одновременно закреплена только за одним консьюмером, чтобы распределять нагрузки. Консьюмеры обрабатывают сообщения в зависимости от своих задач, будь то аналитика, хранение данных или другие приложения.

Архитектура Kafka
Архитектура Kafka

Такая архитектура позволяет Kafka эффективно обрабатывать огромные потоки данных, обеспечивая отказоустойчивость и масштабируемость. Это надежная основа для конвейеров данных в реальном времени, и построения event-driven архитектуры приложений и т.д.

Теперь, когда мы разобрались с принципами работы Kafka, давайте погрузимся в код.

Настройка Kafka в Spring Boot: Реализация кода

Прежде чем приступить к работе, необходимо, чтобы сервер Kafka работал в вашей локальной среде. Если вы еще не настроили Kafka в своей системе, то смотрите руководство по быстрому запуску Kafka.

Нам необходимо добавить зависимость spring-kafka maven в pom.xml.

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

Конфигурация продюсера

Чтобы начать создавать сообщения, мы сначала настраиваем production factory. Она служит руководством для формирования экземпляров Kafka продюсеров.

Далее мы используем шаблон KafkaTemplate, который оборачивается вокруг экземпляра продюсера и предлагает простые методы для отправки сообщений в определенные топики Kafka.

Экземпляры продюсера разработаны как потокобезопасные, поэтому использование одного экземпляра во всем контексте приложения может повысить производительность. Это также относится к экземплярам KafkaTemplate.

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

В приведенном выше фрагменте кода мы настраиваем продюсера с помощью свойств ProducerConfig. Рассмотрим основные свойства:

  • BOOTSTRAP_SERVERS_CONFIGВ: в этом свойстве указываются адреса брокеров Kafka, которые представляют собой список пар хост-порт, разделенных запятыми.

  • KEY_SERIALIZER_CLASS_CONFIGandVALUE_SERIALIZER_CLASS_CONFIG: Эти свойства определяют, как будут сериализованы ключ и значение сообщения перед отправкой в Kafka. В данном примере мы используем StringSerializer для сериализации как ключа, так и значения.

Поэтому в данном случае в файле свойств должно присутствовать значение 'bootstrap-server'.

spring.kafka.bootstrap-servers=localhost:9092

Все службы, используемые в данной статье, предполагают работу на порту по умолчанию.

Создание топиков Kafka

Чтобы отправлять сообщение в топик необходимо его создать.

@Configuration
public class KafkaTopic {

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic-1").build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic-2").partitions(3).build();
    }
}

БинKafkaAdmin отвечает за создание новых топиков в нашем брокере. В Spring Boot KafkaAdmin регистрируется автоматически.

Здесь мы создали topic-1 с одной партицией (по умолчанию) и topic-2 с 3 партициями. TopicBuilder предоставляет различные методы для создания топиков.

Отправка сообщений

В KafkaTemplate имеются различные методы для отправки сообщений в топик:

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplateString, String> kafkaTemplate;

        public void sendMessage(String message, String topicName) {
        log.info("Sending : {}", message);
        log.info("--------------------------------");

        kafkaTemplate.send(topicName, message);
    }
}

Для публикации сообщения нам достаточно вызвать метод send(), указав в качестве параметров сообщение и имя топика.

Настройка консьюмера

KafkaMessageListenerContainerFactory получает все сообщения от всех топиков в одном треде. Для этого нам также необходимо настроить consumerFacotry.

@Configuration
@EnableKafka
public class KafkaConsumer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactoryString, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Далее нам необходимо получить сообщения с помощью аннотации KafkaListener. Для этого мы используем аннотацию EnableKafka в конфигурации консьюмера. Она указывает Spring на необходимость поиска аннотации KafkaListener в бинах и конфигурирования необходимой инфраструктуры для обработки сообщений Kafka.

@Component
@Slf4j
public class KafkaListenerExample {

    @KafkaListener(topics = "topic-1", groupId = "group1")
    void listener(String data) {
        log.info("Received message [{}] in group1", data);
    }

GroupId — это строка, однозначно идентифицирующая консьюмер-группу процессов, к которой принадлежит данный консьюмер. Мы можем указать несколько топиков для прослушивания в рамках одной группы консьюмеров. Точно так же несколько методов могут прослушивать один и тот же топик.

@KafkaListener(topics = "topic-1,topic-2", groupId = "group1")
void listener(@Payload String data,
              @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
              @Header(KafkaHeaders.OFFSET) int offset) {
    log.info("Received message [{}] from group1, partition-{} with offset-{}",
            data,
            partition,
            offset);
}

Мы также можем получить некоторые полезные метаданные о получаемом сообщении с помощью аннотации Header().

Получение сообщений из определенной партиции с начальным смещением

В некоторых сценариях может потребоваться настройка получения сообщений из определенной партиции топика Kafka со смещением. Это может быть полезно, когда требуется повторная обработка сообщений или для того, чтобы выбрать с чего именно начать получение.

@KafkaListener(
  groupId = "group2",
  topicPartitions = @TopicPartition(topic = "topic-2",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      log.info("Received Message [{}] from partition-{}",
               message,
               partition);
}

Устанавливая значение initialOffset равным "0", мы даем Kafka указание начинать получать сообщения с начала раздела. Если вы хотите просто указать раздел без initialOffset, просто напишите следующее:

@KafkaListener(groupId = "group2", topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "3" }))

KafkaListner на уровне классов

Аннотация на уровне класса подходит для случаев, когда необходимо сгруппировать логику обработки связанных сообщений. Сообщения из этих топиков будут распределяться по методам внутри класса в зависимости от их параметров.

@Component
@Slf4j
@KafkaListener(id = "class-level", topics = "multi-type")
class KafkaClassListener {

  @KafkaHandler
  void listenString(String message) {
    log.info("KafkaHandler [String] {}", message);
  }

  @KafkaHandler(isDefault = true)
  void listenDefault(Object object) {
    log.info("KafkaHandler [Default] {}", object);
  }
}

Таким образом, мы можем сгруппировать методы, которые будут получать данные из определенных топиков. Здесь мы можем перехватывать различные типы данных с помощью методов, аннотированных KafkaHandler. Параметры метода будут определять способ получения данных, и если ни один из типов данных не совпадает, то будет применен метод по умолчанию.

Теперь, когда мы рассмотрели основы работы продюсеров и консьюмеров со строковыми сообщениями, давайте изучим различные сценарии и варианты их использования.

Использование шаблона RoutingKafkaTemplate

Мы можем использовать RoutingKafkaTemplate, когда есть несколько продюсеров с различными конфигурациями и мы хотим выбрать продюсера на основе имени топика во время выполнения.

@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {

    // ProducerFactory with Bytes serializer
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
    context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

    // ProducerFactory with String serializer
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);

    Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
    map.put(Pattern.compile(".*-bytes"), bytesPF);
    map.put(Pattern.compile("strings-.*"), stringPF);
    return new RoutingKafkaTemplate(map);
}

RoutingKafkaTemplate направляет сообщения первому экземпляру фабрики, который соответствует заданному имени топика из карты шаблонов regex и экземпляров ProducerFactory. Шаблон strings-.* должен быть первым, если есть два шаблона, str-.* и strings-.*, так как в противном случае шаблон str-.* будет "перекрывать" его.

В приведенном примере мы создали два шаблона: .*-bytes и strings-.* Сериализация сообщений зависит от имени топика во время выполнения. Имена топиков, заканчивающиеся на '-bytes', будут использовать байтовый сериализатор, а начинающиеся на strings-.* - StringSerializer.

Фильтрация сообщений

Все сообщения, удовлетворяющие критериям фильтра, будут отброшены еще до того, как они попадут к слушателю (listener). Здесь отбрасываются сообщения, содержащие слово "ignored".

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(record -> record.value().contains("ignored"));
    return factory;
}

Слушатель инкапсулируется в FilteringMessageListenerAdapter. Этот адаптер опирается на реализацию RecordFilterStrategy, в которой мы определяем метод фильтрации. Для вызова фильтра можно просто добавить одну строку в фабрику текущего консьюмера.

Пользовательские сообщения

Теперь рассмотрим, как отправить или получить Java-объект. В нашем примере мы будем отправлять и получать объекты User.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    String msg;
}

Конфигурация продюсера и консьюмера

Для конфигурации значений продюсера мы будем использовать JSON Serializer:

@Bean
public ProducerFactory<String, User> userProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
    return new KafkaTemplate<>(userProducerFactory());
}

А для консьюмеров это будет JSON Deserializer:

public ConsumerFactory<String, User> userConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer>(User.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    return factory;
}

Сериализатор и десериализатор JSON в spring-kafka используют библиотеку Jackson, которая отвечает за преобразование Java-объектов в байты и наоборот.

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.7.1</version>
        </dependency>

Это необязательная зависимость, и если вы хотите ее использовать, то используйте ту же версию, что и spring-kafka.

Отправка Java-объектов

Давайте отправим объект User с помощью созданного нами шаблона userKafkaTemplate().

@Component
@Slf4j
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, User> userKafkaTemplate;


    void sendCustomMessage(User user, String topicName) {
        log.info("Sending Json Serializer : {}", user);
        log.info("--------------------------------");

        userKafkaTemplate.send(topicName, user);
    }

Получение Java-объектов

@Component
@Slf4j
public class KafkaListenerExample {


    @KafkaListener(topics = "topic-3", groupId = "user-group",
            containerFactory = "userKafkaListenerContainerFactory")
    void listenerWithMessageConverter(User user) {
        log.info("Received message through MessageConverterUserListener [{}]", user);
    }

Поскольку у нас несколько контейнеров-слушателей, мы указываем, какую фабрику контейнеров использовать.

Если мы не укажем атрибут containerFactory, то по умолчанию будет использоваться kafkaListenerContainerFactory, которая в нашем случае использует StringSerializer и StringDeserializer.

Заключение

В руководстве мы рассказали о настройке Kafka в приложении Spring Boot, и о том, как создавать и получать сообщения с помощью шаблонов и слушателей Kafka. Мы познакомились с примерами работы с различными типами сообщений, маршрутизацией сообщений, фильтрацией сообщений и преобразованием пользовательских форматов данных.


Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.

👉 Программа курса на нашем сайте

Курс поможет уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.

Tags:
Hubs:
Total votes 3: ↑2 and ↓1+1
Comments2

Articles

Information

Website
slurm.io
Registered
Founded
Employees
51–100 employees
Location
Россия
Representative
Антон Скобин