Kafka — это универсальный и мощный инструмент для построения конвейеров данных в реальном времени и event-driven приложений. В этой статье мы разберемся, как интегрировать Kafka с экосистемой Spring Boot.
Ключевые понятия Kafka
Топики. Kafka собирает данные в топики, которые по сути являются темами, категориями или каналами, в которых публикуются записи.
Продюсеры. Продюсеры отвечают за передачу данных в топики Kafka. Их можно рассматривать как источники потока данных.
Консьюмеры. Консьюмеры подписываются на топики и обрабатывают записи, переданные продюсерами. Они являются получателями потоков данных.
Брокеры. Кластеры Kafka состоят из брокеров, которые хранят данные и управляют распределением записей по топикам.
Партиции. Каждая тема может быть разделена на партиции (partition), которые позволяют выполнять параллельную обработку и распределять данные по кластеру.
Ядро архитектуры Kafka
В архитектуре Kafka сообщения (или события) являются сердцем системы. Продюсеры создают и отправляют сообщения в определенные топики, которые выступают в роли категорий. Эти топики делятся на партиции для обеспечения эффективной параллельной обработки.
Консьюмеры подписываются на топики и получают сообщения из партиций. Каждая партиция одновременно закреплена только за одним консьюмером, чтобы распределять нагрузки. Консьюмеры обрабатывают сообщения в зависимости от своих задач, будь то аналитика, хранение данных или другие приложения.
Такая архитектура позволяет Kafka эффективно обрабатывать огромные потоки данных, обеспечивая отказоустойчивость и масштабируемость. Это надежная основа для конвейеров данных в реальном времени, и построения event-driven архитектуры приложений и т.д.
Теперь, когда мы разобрались с принципами работы Kafka, давайте погрузимся в код.
Настройка Kafka в Spring Boot: Реализация кода
Прежде чем приступить к работе, необходимо, чтобы сервер Kafka работал в вашей локальной среде. Если вы еще не настроили Kafka в своей системе, то смотрите руководство по быстрому запуску Kafka.
Нам необходимо добавить зависимость spring-kafka maven в pom.xml
.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Конфигурация продюсера
Чтобы начать создавать сообщения, мы сначала настраиваем production factory. Она служит руководством для формирования экземпляров Kafka продюсеров.
Далее мы используем шаблон KafkaTemplate, который оборачивается вокруг экземпляра продюсера и предлагает простые методы для отправки сообщений в определенные топики Kafka.
Экземпляры продюсера разработаны как потокобезопасные, поэтому использование одного экземпляра во всем контексте приложения может повысить производительность. Это также относится к экземплярам KafkaTemplate.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
В приведенном выше фрагменте кода мы настраиваем продюсера с помощью свойств ProducerConfig. Рассмотрим основные свойства:
BOOTSTRAP_SERVERS_CONFIGВ
: в этом свойстве указываются адреса брокеров Kafka, которые представляют собой список пар хост-порт, разделенных запятыми.KEY_SERIALIZER_CLASS_CONFIGandVALUE_SERIALIZER_CLASS_CONFIG
: Эти свойства определяют, как будут сериализованы ключ и значение сообщения перед отправкой в Kafka. В данном примере мы используем StringSerializer для сериализации как ключа, так и значения.
Поэтому в данном случае в файле свойств должно присутствовать значение 'bootstrap-server'.
spring.kafka.bootstrap-servers=localhost:9092
Все службы, используемые в данной статье, предполагают работу на порту по умолчанию.
Создание топиков Kafka
Чтобы отправлять сообщение в топик необходимо его создать.
@Configuration
public class KafkaTopic {
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic-1").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic-2").partitions(3).build();
}
}
БинKafkaAdmin
отвечает за создание новых топиков в нашем брокере. В Spring Boot KafkaAdmin
регистрируется автоматически.
Здесь мы создали topic-1 с одной партицией (по умолчанию) и topic-2 с 3 партициями. TopicBuilder предоставляет различные методы для создания топиков.
Отправка сообщений
В KafkaTemplate
имеются различные методы для отправки сообщений в топик:
@Component
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplateString, String> kafkaTemplate;
public void sendMessage(String message, String topicName) {
log.info("Sending : {}", message);
log.info("--------------------------------");
kafkaTemplate.send(topicName, message);
}
}
Для публикации сообщения нам достаточно вызвать метод send(), указав в качестве параметров сообщение и имя топика.
Настройка консьюмера
KafkaMessageListenerContainerFactory
получает все сообщения от всех топиков в одном треде. Для этого нам также необходимо настроить consumerFacotry
.
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactoryString, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
Далее нам необходимо получить сообщения с помощью аннотации KafkaListener
. Для этого мы используем аннотацию EnableKafka
в конфигурации консьюмера. Она указывает Spring на необходимость поиска аннотации KafkaListener
в бинах и конфигурирования необходимой инфраструктуры для обработки сообщений Kafka.
@Component
@Slf4j
public class KafkaListenerExample {
@KafkaListener(topics = "topic-1", groupId = "group1")
void listener(String data) {
log.info("Received message [{}] in group1", data);
}
GroupId — это строка, однозначно идентифицирующая консьюмер-группу процессов, к которой принадлежит данный консьюмер. Мы можем указать несколько топиков для прослушивания в рамках одной группы консьюмеров. Точно так же несколько методов могут прослушивать один и тот же топик.
@KafkaListener(topics = "topic-1,topic-2", groupId = "group1")
void listener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) int offset) {
log.info("Received message [{}] from group1, partition-{} with offset-{}",
data,
partition,
offset);
}
Мы также можем получить некоторые полезные метаданные о получаемом сообщении с помощью аннотации Header()
.
Получение сообщений из определенной партиции с начальным смещением
В некоторых сценариях может потребоваться настройка получения сообщений из определенной партиции топика Kafka со смещением. Это может быть полезно, когда требуется повторная обработка сообщений или для того, чтобы выбрать с чего именно начать получение.
@KafkaListener(
groupId = "group2",
topicPartitions = @TopicPartition(topic = "topic-2",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}))
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("Received Message [{}] from partition-{}",
message,
partition);
}
Устанавливая значение initialOffset
равным "0", мы даем Kafka указание начинать получать сообщения с начала раздела. Если вы хотите просто указать раздел без initialOffset, просто напишите следующее:
@KafkaListener(groupId = "group2", topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "3" }))
KafkaListner на уровне классов
Аннотация на уровне класса подходит для случаев, когда необходимо сгруппировать логику обработки связанных сообщений. Сообщения из этих топиков будут распределяться по методам внутри класса в зависимости от их параметров.
@Component
@Slf4j
@KafkaListener(id = "class-level", topics = "multi-type")
class KafkaClassListener {
@KafkaHandler
void listenString(String message) {
log.info("KafkaHandler [String] {}", message);
}
@KafkaHandler(isDefault = true)
void listenDefault(Object object) {
log.info("KafkaHandler [Default] {}", object);
}
}
Таким образом, мы можем сгруппировать методы, которые будут получать данные из определенных топиков. Здесь мы можем перехватывать различные типы данных с помощью методов, аннотированных KafkaHandler
. Параметры метода будут определять способ получения данных, и если ни один из типов данных не совпадает, то будет применен метод по умолчанию.
Теперь, когда мы рассмотрели основы работы продюсеров и консьюмеров со строковыми сообщениями, давайте изучим различные сценарии и варианты их использования.
Использование шаблона RoutingKafkaTemplate
Мы можем использовать RoutingKafkaTemplate
, когда есть несколько продюсеров с различными конфигурациями и мы хотим выбрать продюсера на основе имени топика во время выполнения.
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile(".*-bytes"), bytesPF);
map.put(Pattern.compile("strings-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}
RoutingKafkaTemplate
направляет сообщения первому экземпляру фабрики, который соответствует заданному имени топика из карты шаблонов regex и экземпляров ProducerFactory
. Шаблон strings-.*
должен быть первым, если есть два шаблона, str-.*
и strings-.*
, так как в противном случае шаблон str-.*
будет "перекрывать" его.
В приведенном примере мы создали два шаблона: .*-bytes
и strings-.*
Сериализация сообщений зависит от имени топика во время выполнения. Имена топиков, заканчивающиеся на '-bytes
', будут использовать байтовый сериализатор, а начинающиеся на strings-.*
- StringSerializer.
Фильтрация сообщений
Все сообщения, удовлетворяющие критериям фильтра, будут отброшены еще до того, как они попадут к слушателю (listener). Здесь отбрасываются сообщения, содержащие слово "ignored".
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().contains("ignored"));
return factory;
}
Слушатель инкапсулируется в FilteringMessageListenerAdapter
. Этот адаптер опирается на реализацию RecordFilterStrategy
, в которой мы определяем метод фильтрации. Для вызова фильтра можно просто добавить одну строку в фабрику текущего консьюмера.
Пользовательские сообщения
Теперь рассмотрим, как отправить или получить Java-объект. В нашем примере мы будем отправлять и получать объекты User
.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
String msg;
}
Конфигурация продюсера и консьюмера
Для конфигурации значений продюсера мы будем использовать JSON Serializer:
@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
А для консьюмеров это будет JSON Deserializer:
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
Сериализатор и десериализатор JSON в spring-kafka используют библиотеку Jackson, которая отвечает за преобразование Java-объектов в байты и наоборот.
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.7.1</version>
</dependency>
Это необязательная зависимость, и если вы хотите ее использовать, то используйте ту же версию, что и spring-kafka.
Отправка Java-объектов
Давайте отправим объект User с помощью созданного нами шаблона userKafkaTemplate().
@Component
@Slf4j
public class KafkaSender {
@Autowired
private KafkaTemplate<String, User> userKafkaTemplate;
void sendCustomMessage(User user, String topicName) {
log.info("Sending Json Serializer : {}", user);
log.info("--------------------------------");
userKafkaTemplate.send(topicName, user);
}
Получение Java-объектов
@Component
@Slf4j
public class KafkaListenerExample {
@KafkaListener(topics = "topic-3", groupId = "user-group",
containerFactory = "userKafkaListenerContainerFactory")
void listenerWithMessageConverter(User user) {
log.info("Received message through MessageConverterUserListener [{}]", user);
}
Поскольку у нас несколько контейнеров-слушателей, мы указываем, какую фабрику контейнеров использовать.
Если мы не укажем атрибут containerFactory
, то по умолчанию будет использоваться kafkaListenerContainerFactory
, которая в нашем случае использует StringSerializer
и StringDeserializer
.
Заключение
В руководстве мы рассказали о настройке Kafka в приложении Spring Boot, и о том, как создавать и получать сообщения с помощью шаблонов и слушателей Kafka. Мы познакомились с примерами работы с различными типами сообщений, маршрутизацией сообщений, фильтрацией сообщений и преобразованием пользовательских форматов данных.
Если вы хотите глубже изучить Apache Kafka, приходите на интенсив Apache Kafka для разработчиков. В нём углублённая теория и практика на Java или Golang с платформой Spring+Docker+Postgres. Вы узнаете типовые шаблоны проектирования, сделаете своё приложение надёжнее, получите опыт разработки нескольких приложений, использующих Kafka.
👉 Программа курса на нашем сайте
Курс поможет уменьшить время на рабочие задачи с Кафкой, добавить красивую строчку в резюме и взобраться на следующую ступень карьерной лестницы.