
Apache Kafka — мощный инструмент для обработки потоков данных в реальном времени, но его интеграция в проекты на Spring Boot может быть непростой задачей
В этой статье вы узнаете о лучших практиках разработки стартеров Spring Boot Starter для Kafka с поддержкой Avro, а также получите примеры использования с различными настройками. Эта статья будет полезна разработчикам, желающим упростить работу с Kafka, и менеджерам проектов, ищущим способы оптимизации процессов. Дочитавшим статью до конца будет приятный бонус.
Я хотел поделиться опытом создания библиотеки, которая упрощает интеграцию Kafka в Spring Boot, предоставляя гибкую конфигурацию и поддержку сериализации Avro.
Уникальность. Многие стартеры для Kafka существуют, но мой фокусируется на enterprise‑функциях (идемпотентность, ретраи) и передаче схемы Avro через параметры.
Почему мне верят. Я подробно описываю процесс, включая ошибки и их решения, а также публикую рабочий код.
Проблема. Настройка Kafka в проектах часто требует много boilerplate-кода.
Цель. Дать читателю готовое решение и вдохновить на создание собственных стартеров.
Что такое Spring Boot Starter?
Spring Boot Starter — это модуль, который предоставляет готовую конфигурацию для определенной технологии. Например, spring-boot-starter-web настраивает веб-сервер, а spring-boot-starter-data-jpa — доступ к базе данных. Наш стартер будет:
автоматически создавать
KafkaProducer
иKafkaConsumer
;использовать Avro для сериализации и десериализации сообщений;
поддерживать настройку через application.yml.
Этапы разработки
Процесс разработки включал создание автоконфигурации для продюсера и консюмера Kafka. Вот ключевые шаги:
Определение зависимостей:
Spring Boot для автоконфигурации;
Spring Kafka для работы с Kafka;
Apache Avro для сериализации (опционально).
Создание KafkaProperties:
класс для чтения конфигурации из application.yml с префиксом
apppetr.kafka
.
Автоконфигурация:
настройка
KafkaTemplate
для продюсера иConcurrentKafkaListenerContainerFactory
для консюмера.
Поддержка Avro:
мы будем использовать Avro для строгой типизации сообщений. Определим схему при помощи настройки в конфигурационном файле стартера.
Создание класса свойств
Для настройки стартера через application.yml создадим класс KafkaProperties
в kafka-starter/src/main/java/com/app/petr/KafkaProperties.java:
Код
package com.app.petr;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.HashMap;
import java.util.Map;
@ConfigurationProperties(prefix = "apppetr.kafka")
public class KafkaProperties {
private String bootstrapServers = "localhost:9092";
private Producer producer = new Producer();
private Consumer consumer = new Consumer();
private String avroSchemaPath;
public static class Producer {
private String topic = "default-topic";
private boolean idempotenceEnabled = true; // Идемпотентность для надежности
private int acks = 1; // Подтверждения: 0, 1, all
private int retries = 3; // Количество попыток при сбоях
private int retryBackoffMs = 1000; // Задержка между ретраями
private int deliveryTimeoutMs = 120000; // Таймаут доставки
private int requestTimeoutMs = 30000; // Таймаут запроса
private int maxInFlightRequests = 5; // Макс. количество неподтвержденных запросов
private Map<String, Object> config = new HashMap<>(); // Дополнительные настройки
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public boolean isIdempotenceEnabled() { return idempotenceEnabled; }
public void setIdempotenceEnabled(boolean idempotenceEnabled) { this.idempotenceEnabled = idempotenceEnabled; }
public int getAcks() { return acks; }
public void setAcks(int acks) { this.acks = acks; }
public int getRetries() { return retries; }
public void setRetries(int retries) { this.retries = retries; }
public int getRetryBackoffMs() { return retryBackoffMs; }
public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; }
public int getDeliveryTimeoutMs() { return deliveryTimeoutMs; }
public void setDeliveryTimeoutMs(int deliveryTimeoutMs) { this.deliveryTimeoutMs = deliveryTimeoutMs; }
public int getRequestTimeoutMs() { return requestTimeoutMs; }
public void setRequestTimeoutMs(int requestTimeoutMs) { this.requestTimeoutMs = requestTimeoutMs; }
public int getMaxInFlightRequests() { return maxInFlightRequests; }
public void setMaxInFlightRequests(int maxInFlightRequests) { this.maxInFlightRequests = maxInFlightRequests; }
public Map<String, Object> getConfig() { return config; }
public void setConfig(Map<String, Object> config) { this.config = config; }
}
public static class Consumer {
private String topic = "default-topic";
private String groupId = "default-group";
private int maxPollRecords = 500; // Макс. записей за один poll
private int maxPollIntervalMs = 300000; // Макс. интервал между poll
private int sessionTimeoutMs = 10000; // Таймаут сессии
private int heartbeatIntervalMs = 3000; // Интервал heartbeat
private int fetchMaxBytes = 52428800; // Макс. размер выборки (50MB)
private boolean autoCommitEnabled = true; // Автокоммит оффсетов
private int autoCommitIntervalMs = 5000; // Интервал автокоммита
private String autoOffsetReset = "earliest"; // Сброс оффсета
private int retryBackoffMs = 1000; // Задержка между ретраями
private int maxRetries = 3; // Количество ретраев
private Map<String, Object> config = new HashMap<>(); // Дополнительные настройки
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getGroupId() { return groupId; }
public void setGroupId(String groupId) { this.groupId = groupId; }
public int getMaxPollRecords() { return maxPollRecords; }
public void setMaxPollRecords(int maxPollRecords) { this.maxPollRecords = maxPollRecords; }
public int getMaxPollIntervalMs() { return maxPollIntervalMs; }
public void setMaxPollIntervalMs(int maxPollIntervalMs) { this.maxPollIntervalMs = maxPollIntervalMs; }
public int getSessionTimeoutMs() { return sessionTimeoutMs; }
public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; }
public int getHeartbeatIntervalMs() { return heartbeatIntervalMs; }
public void setHeartbeatIntervalMs(int heartbeatIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; }
public int getFetchMaxBytes() { return fetchMaxBytes; }
public void setFetchMaxBytes(int fetchMaxBytes) { this.fetchMaxBytes = fetchMaxBytes; }
public boolean isAutoCommitEnabled() { return autoCommitEnabled; }
public void setAutoCommitEnabled(boolean autoCommitEnabled) { this.autoCommitEnabled = autoCommitEnabled; }
public int getAutoCommitIntervalMs() { return autoCommitIntervalMs; }
public void setAutoCommitIntervalMs(int autoCommitIntervalMs) { this.autoCommitIntervalMs = autoCommitIntervalMs; }
public String getAutoOffsetReset() { return autoOffsetReset; }
public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; }
public int getRetryBackoffMs() { return retryBackoffMs; }
public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; }
public int getMaxRetries() { return maxRetries; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public Map<String, Object> getConfig() { return config; }
public void setConfig(Map<String, Object> config) { this.config = config; }
}
public String getBootstrapServers() { return bootstrapServers; }
public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; }
public Producer getProducer() { return producer; }
public void setProducer(Producer producer) { this.producer = producer; }
public Consumer getConsumer() { return consumer; }
public void setConsumer(Consumer consumer) { this.consumer = consumer; }
public String getAvroSchemaPath() {
return avroSchemaPath;
}
public void setAvroSchemaPath(String avroSchemaPath) {
this.avroSchemaPath = avroSchemaPath;
}
}
Этот класс позволяет задавать параметры Kafka через конфигурацию, например:
apppetr:
kafka:
bootstrap-servers: localhost:9092
producer:
topic: test-topic
idempotence-enabled: true
acks: all
retries: 5
retry-backoff-ms: 2000
delivery-timeout-ms: 120000
request-timeout-ms: 30000
max-in-flight-requests: 5
consumer:
topic: test-topic
group-id: test-group
max-poll-records: 1000
max-poll-interval-ms: 600000
session-timeout-ms: 15000
heartbeat-interval-ms: 5000
fetch-max-bytes: 52428800
auto-commit-enabled: true
auto-commit-interval-ms: 5000
auto-offset-reset: earliest
retry-backoff-ms: 2000
max-retries: 5
avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl
Реализация автоконфигурации
Ключевой элемент стартера — класс автоконфигурации KafkaAutoConfiguration
в kafka-starter/src/main/java/com/app/petr/KafkaAutoConfiguration.java:
Код
package com.app.petr;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConditionalOnClass({ KafkaTemplate.class, ConcurrentKafkaListenerContainerFactory.class })
@EnableKafka
@EnableConfigurationProperties(KafkaProperties.class)
@ConditionalOnProperty(prefix = "apppetr.kafka", name = "bootstrap-servers")
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
// Producer Configuration
@Bean
@ConditionalOnMissingBean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, properties.getProducer().isIdempotenceEnabled());
config.put(ProducerConfig.ACKS_CONFIG, String.valueOf(properties.getProducer().getAcks()));
config.put(ProducerConfig.RETRIES_CONFIG, properties.getProducer().getRetries());
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, properties.getProducer().getRetryBackoffMs());
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, properties.getProducer().getDeliveryTimeoutMs());
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProducer().getRequestTimeoutMs());
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, properties.getProducer().getMaxInFlightRequests());
config.putAll(properties.getProducer().getConfig());
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
@ConditionalOnMissingBean
public KafkaTemplate<String, byte[]> kafkaTemplate(ProducerFactory<String, byte[]> producerFactory) {
KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic(properties.getProducer().getTopic());
return template;
}
// Consumer Configuration
@Bean
@ConditionalOnMissingBean
public ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getConsumer().getGroupId());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getConsumer().getMaxPollRecords());
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, properties.getConsumer().getMaxPollIntervalMs());
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getConsumer().getSessionTimeoutMs());
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, properties.getConsumer().getHeartbeatIntervalMs());
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, properties.getConsumer().getFetchMaxBytes());
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getConsumer().isAutoCommitEnabled());
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getConsumer().getAutoCommitIntervalMs());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset());
config.putAll(properties.getConsumer().getConfig());
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
ConsumerFactory<String, byte[]> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1); // Параллелизм
factory.getContainerProperties().setPollTimeout(3000); // Таймаут опроса
factory.setCommonErrorHandler(errorHandler()); // Настраиваем обработку ошибок с ретраями
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff backOff = new FixedBackOff(
properties.getConsumer().getRetryBackoffMs(),
properties.getConsumer().getMaxRetries()
);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(record, exception) -> {
System.err.println("Failed to process record: " + record + ", exception: " + exception.getMessage());
}, // Логирование ошибок
backOff
);
return errorHandler;
}
}
Что делает этот код:
создаёт
KafkaProducer
иKafkaConsumer
с настройками изKafkaProperties
;использует
@ConditionalOnMissingBean
, чтобы пользователь мог переопределить бины;добавляет
DisposableBean
для корректного закрытия ресурсов.
Добавление файла импортов автоконфигурации
В процессе разработки стартера я добавил файл src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports, который заменил устаревший подход с spring.factories для указания автоконфигурационных классов в Spring Boot 3.x. Этот файл содержит список классов автоконфигурации, которые Spring Boot должен загрузить.
Пример содержимого:
io.github.bigbox89.KafkaAutoConfiguration
Назначение. Сообщает Spring Boot, что
KafkaAutoConfiguration
является точкой входа для автоконфигурации стартера. Это упрощает обнаружение и регистрацию конфигурации без необходимости полного сканирования classpath.
Что будет, если его не добавить
Если файл org.springframework.boot.autoconfigure.AutoConfiguration.imports отсутствует:
Автоконфигурация не загрузится. Spring Boot не найдёт класс KafkaAutoConfiguration, и стартер не будет применён, даже если все зависимости и свойства указаны корректно.
Тихий сбой. Приложение запустится, но продюсер и консюмер Kafka не будут настроены, что может привести к неочевидным ошибкам (например,
@KafkaListener
не сработает).Совместимость. В версиях Spring Boot до 3.0 можно было использовать spring.factories, но в 3.x без AutoConfiguration.imports стартер становится неработоспособным.
Примеры использования
Продюсер
Пример ProducerApplication в producer-example/src/main/java/com/app/petr/ProducerApplication.java
package com.app.petr;
import app.petr.Message;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import java.nio.ByteBuffer;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, byte[]> kafkaTemplate, KafkaProperties properties) {
return args -> {
String topic = properties.getProducer().getTopic();
if (topic == null) {
throw new IllegalStateException("Producer topic is not configured");
}
for (int i = 0; i < 100; i++) {
Message message = Message.newBuilder()
.setId("id-" + i)
.setContent("Message " + i)
.setTimestamp(System.currentTimeMillis())
.build();
ByteBuffer buffer = message.toByteBuffer();
byte[] messageBytes = new byte[buffer.remaining()];
buffer.get(messageBytes);
kafkaTemplate.send(topic, message.getId().toString(), messageBytes);
System.out.println("Sent message: " + message.getId());
Thread.sleep(1000);
}
};
}
}
Консюмер
Пример ConsumerApplication в consumer-example/src/main/java/com/app/petr/ConsumerApplication.java
package com.app.petr;
import app.petr.Message;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import java.nio.ByteBuffer;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@KafkaListener(topics = "#{kafkaProperties.consumer.topic}", groupId = "#{kafkaProperties.consumer.groupId}")
public void listen(byte[] message) throws Exception {
Message avroMessage = Message.fromByteBuffer(ByteBuffer.wrap(message));
System.out.printf("Received message: id=%s, content=%s, timestamp=%d%n",
avroMessage.getId(), avroMessage.getContent(), avroMessage.getTimestamp());
}
}
Тестирование с Testcontainers
Чтобы убедиться, что стартер работает, добавим интеграционные тесты:
Код
package com.app.petr;
import app.petr.Message;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
public class IntegrationProducerAndConsumerTest {
private static final String TOPIC = "rest_data";
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
private KafkaProducer<String, byte[]> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(props);
}
private KafkaConsumer<String, byte[]> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
@Test
public void testProducerAndConsumer() throws Exception {
KafkaProducer<String, byte[]> producer = createProducer();
KafkaConsumer<String, byte[]> consumer = createConsumer();
consumer.subscribe(Collections.singleton(TOPIC));
Message message = Message.newBuilder()
.setId("test-id")
.setContent("Test content")
.setTimestamp(System.currentTimeMillis())
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class);
writer.write(message, encoder);
encoder.flush();
byte[] serializedMessage = out.toByteArray();
ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
producer.send(record).get();
producer.flush();
SpecificDatumReader<Message> reader = new SpecificDatumReader<>(Message.class);
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.isEmpty()).isFalse();
records.forEach(consumerRecord -> {
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(consumerRecord.value(), null);
Message receivedMessage = reader.read(null, decoder);
assertThat(receivedMessage.getId().toString()).isEqualTo(message.getId());
assertThat(receivedMessage.getContent().toString()).isEqualTo(message.getContent());
assertThat(receivedMessage.getTimestamp()).isEqualTo(message.getTimestamp());
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize message", e);
}
});
producer.close();
consumer.close();
}
}
Также добавим отдельно тесты для продюсера:
Код
package com.app.petr;
import app.petr.Message;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Testcontainers
public class ProducerApplicationTest {
private static final String TOPIC = "test_data";
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
@BeforeAll
static void setup() {
kafkaContainer.start();
}
private KafkaProducer<String, byte[]> createProducer(Properties props) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(props);
}
private byte[] serializeMessage(Message message) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class);
writer.write(message, encoder);
encoder.flush();
return out.toByteArray();
}
@Test
public void testProducerWithDefaultConfig() throws Exception {
Properties props = new Properties();
KafkaProducer<String, byte[]> producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-1")
.setContent("Default config test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future<?> future = producer.send(record);
producer.flush();
assertThat(future.get()).isNotNull(); // Проверяем успешную отправку
producer.close();
}
@Test
public void testProducerWithIdempotenceEnabled() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Включаем идемпотентность
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Требуется для идемпотентности
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // Ограничение для идемпотентности
KafkaProducer<String, byte[]> producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-2")
.setContent("Idempotent test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future<?> future = producer.send(record);
producer.send(record); // Отправляем тот же ключ ещё раз
producer.flush();
assertThat(future.get()).isNotNull(); // Успешная отправка с идемпотентностью
producer.close();
}
@Test
public void testProducerWithAcksZero() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "0"); // Без подтверждений
KafkaProducer<String, byte[]> producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-3")
.setContent("Acks=0 test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future<?> future = producer.send(record);
producer.flush();
assertThat(future.get()).isNotNull(); // Отправка без ожидания подтверждения
producer.close();
}
@Test
public void testProducerWithRetries() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.RETRIES_CONFIG, "3"); // 3 попытки
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100"); // Задержка между попытками 100 мс
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500"); // Уменьшаем таймаут запроса до 500 мс
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1000"); // Таймаут доставки 1000 мс (должен быть >= linger.ms + request.timeout.ms)
props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); // Явно задаём linger.ms для ясности
KafkaProducer<String, byte[]> producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-4")
.setContent("Retries test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future<?> future = producer.send(record);
producer.flush();
assertThat(future.get()).isNotNull(); // Успешная отправка с ретраями
producer.close();
}
@Test
public void testProducerWithShortTimeout() {
Properties props = new Properties();
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут доставки
KafkaProducer<String, byte[]> producer = createProducer(props);
Message message = Message.newBuilder()
.setId("test-id-5")
.setContent("Short timeout test")
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage;
try {
serializedMessage = serializeMessage(message);
} catch (Exception e) {
throw new RuntimeException("Serialization failed", e);
}
ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);
Future<?> future = producer.send(record);
// Ожидаем исключение из-за короткого таймаута
assertThrows(ExecutionException.class, future::get, "Expected timeout exception due to short timeout");
assertThat(future.isDone()).isTrue();
producer.close();
}
}
И конcюмера:
Код
package com.app.petr;
import app.petr.Message;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
public class ConsumerApplicationTest {
private static final String TOPIC = "consumer_test_data";
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
@BeforeAll
static void setup() {
kafkaContainer.start();
}
@BeforeEach
void clearTopic() {
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
try (AdminClient adminClient = AdminClient.create(adminProps)) {
adminClient.deleteTopics(Collections.singleton(TOPIC)).all().get();
adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC, 1, (short) 1))).all().get();
} catch (Exception e) {
// Игнорируем ошибки, если топик не существовал
}
}
private KafkaProducer<String, byte[]> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(props);
}
private KafkaConsumer<String, byte[]> createConsumer(Properties props) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer<>(props);
}
private byte[] serializeMessage(Message message) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class);
writer.write(message, encoder);
encoder.flush();
return out.toByteArray();
}
private Message deserializeMessage(byte[] data) throws Exception {
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<Message> reader = new SpecificDatumReader<>(Message.class);
return reader.read(null, decoder);
}
private void sendMessages(int count) throws Exception {
KafkaProducer<String, byte[]> producer = createProducer();
for (int i = 0; i < count; i++) {
Message message = Message.newBuilder()
.setId("id-" + i)
.setContent("Message " + i)
.setTimestamp(System.currentTimeMillis())
.build();
byte[] serializedMessage = serializeMessage(message);
producer.send(new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage)).get();
}
producer.flush();
producer.close();
}
@Test
public void testConsumerWithDefaultConfig() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, byte[]> consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(5);
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(5);
consumer.close();
}
@Test
public void testConsumerWithMaxPollRecords() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "max-poll-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");
KafkaConsumer<String, byte[]> consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(5);
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isLessThanOrEqualTo(2);
consumer.close();
}
@Test
public void testConsumerWithAutoOffsetResetLatest() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "latest-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer<String, byte[]> consumer = createConsumer(props);
// Подписываемся и вызываем poll для завершения регистрации
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(Duration.ofSeconds(20)); // Даём Kafka время зарегистрировать консюмера
// Убеждаемся, что до отправки сообщений ничего не читается
ConsumerRecords<String, byte[]> recordsBefore = consumer.poll(Duration.ofSeconds(1));
assertThat(recordsBefore.isEmpty()).isTrue();
// Отправляем 3 сообщения после полной регистрации
sendMessages(3);
// Читаем новые сообщения
ConsumerRecords<String, byte[]> recordsAfter = consumer.poll(Duration.ofSeconds(5));
assertThat(recordsAfter.count()).isEqualTo(3);
consumer.close();
}
@Test
public void testConsumerWithDisableAutoCommit() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "no-auto-commit-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, byte[]> consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(5);
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(5);
consumer.close();
consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords<String, byte[]> recordsAgain = consumer.poll(Duration.ofSeconds(5));
assertThat(recordsAgain.count()).isEqualTo(5);
consumer.close();
}
@Test
public void testConsumerWithShortSessionTimeout() throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "short-session-group-" + System.nanoTime());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000");
KafkaConsumer<String, byte[]> consumer = createConsumer(props);
consumer.subscribe(Collections.singleton(TOPIC));
sendMessages(3);
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(3);
consumer.close();
}
}
Как работает тест:
Testcontainers запускает Kafka-контейнер;
сообщение отправляется через продюсер и читается консюмером;
проверяется соответствие отправленного и полученного сообщения.
Преимущества стартера
Простота подключения. Достаточно добавить зависимость и настроить application.yml.
Гибкость. Возможность переопределить бины или добавить кастомные настройки.
Типобезопасность. Avro обеспечивает строгую структуру данных.
Возможные ошибки и их решение
Проблема 1: генерация Avro-классов внутри стартера
Ситуация: изначально я использовал avro-maven-plugin для генерации классов из message.avdl в стартере. Это ограничивало гибкость, так как схема была фиксированной.
Решение:
убрал генерацию из стартера;
добавил параметр avro-schema-path в KafkaProperties, чтобы пользователь указывал путь к схеме;
переложил ответственность за генерацию на проект пользователя.
Пример конфигурации:
apppetr:
kafka:
avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl
Проблема 2: ошибки в тестах с auto.offset.reset=latest
Ситуация: тест testConsumerWithAutoOffsetResetLatest
ожидал 3 сообщения, но получал 1 из-за асинхронности подписки.
Решение:
добавил
consumer.poll(Duration.ofSeconds(2))
послеsubscribe
, чтобы дождаться регистрации консюмера;убрал
Thread.sleep
, сделав тест более надёжным.
consumer.subscribe(Collections.singleton(TOPIC));
consumer.poll(Duration.ofSeconds(20));
sendMessages(3);
assertThat(consumer.poll(Duration.ofSeconds(5)).count()).isEqualTo(3);
Примеры применения стартера
1. Надёжная доставка транзакций
Задача: гарантировать доставку без дубликатов.
Конфигурация:
apppetr:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092
producer:
topic: transactions
idempotence-enabled: true
acks: all
retries: 5
retry-backoff-ms: 2000
avro-schema-path: src/main/resources/avro/transaction.avdl
Код:
@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;
public void sendTransaction(Transaction tx) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter<Transaction> writer = new SpecificDatumWriter<>(Transaction.class);
writer.write(tx, encoder);
encoder.flush();
kafkaTemplate.send("transactions", tx.getId().toString(), out.toByteArray());
}
2. Высокоскоростной сбор логов
Задача: быстрая отправка логов с допустимой потерей.
Конфигурация:
apppetr:
kafka:
bootstrap-servers: localhost:9092
producer:
topic: logs
acks: 0
max-in-flight-requests: 10
consumer:
topic: logs
group-id: log-collector
max-poll-records: 1000
auto-offset-reset: latest
avro-schema-path: src/main/resources/avro/log.avdl
Код:
@KafkaListener(topics = "logs", groupId = "log-collector")
public void processLog(byte[] data) throws Exception {
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<Log> reader = new SpecificDatumReader<>(Log.class);
Log log = reader.read(null, decoder);
System.out.println("Log: " + log.getMessage());
}
3. Обработка событий с ретраями
Задача: повторная обработка при сбоях.
Конфигурация:
apppetr:
kafka:
bootstrap-servers: kafka:9092
consumer:
topic: events
group-id: event-processor
retry-backoff-ms: 2000
max-retries: 5
avro-schema-path: src/main/resources/avro/event.avdl
Код:
@KafkaListener(topics = "events", groupId = "event-processor")
public void handleEvent(byte[] data) throws Exception {
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<Event> reader = new SpecificDatumReader<>(Event.class);
Event event = reader.read(null, decoder);
processEvent(event); // Логика с возможными исключениями
}
Бонус дочитавшим
Памятка-шпаргалка по основным параметрам конфигурации Kafka.
Общие параметры
Параметр | Описание | Тип | По умолчанию |
---|---|---|---|
bootstrap.servers | Список адресов брокеров Kafka | String | localhost:9092 |
Параметры продюсера
Параметр | Описание | Тип | По умолчанию |
---|---|---|---|
key.serializer | Сериализатор ключа | Class | - |
value.serializer | Сериализатор значения | Class | - |
acks | Уровень подтверждений (0, 1, all) | String | 1 |
retries | Количество ретраев при сбоях | int | 0 |
Задержка между ретраями (мс) | int | 100 | |
enable.idempotence | Включение идемпотентности | boolean | false |
max.in.flight.requests.per.connection | Макс. неподтверждённых запросов | int | 5 |
Общий таймаут доставки (мс) | int | 120000 (2 мин) | |
Таймаут запроса к брокеру (мс) | int | 30000 (30 сек) | |
buffer.memory | Размер буфера для отправки (байт) | long | 33554432 (32MB) |
batch.size | Размер батча для отправки (байт) | int | 16384 (16KB) |
Задержка перед отправкой батча (мс) | int | 0 | |
compression.type | Тип сжатия (none, gzip, snappy, lz4, zstd) | String | none |
Параметры консюмера
Параметр | Описание | Тип | По умолчанию |
---|---|---|---|
key.deserializer | Десериализатор ключа | Class | - |
value.deserializer | Десериализатор значения | Class | - |
Идентификатор группы потребителей | String | null | |
auto.offset.reset | Политика сброса оффсета (earliest, latest, none) | String | latest |
enable.auto.commit | Включение автокоммита оффсетов | boolean | true |
Интервал автокоммита (мс) | int | 5000 (5 сек) | |
max.poll.records | Макс. записей за один poll | int | 500 |
Макс. интервал между poll (мс) | int | 300000 (5 мин) | |
Таймаут сессии группы (мс) | int | 10000 (10 сек) | |
Интервал heartbeat (мс) | int | 3000 (3 сек) | |
fetch.max.bytes | Макс. размер выборки (байт) | int | 52428800 (50MB) |
fetch.min.bytes | Мин. размер выборки (байт) | int | 1 |
Макс. ожидание данных (мс) | int | 500 |
Полезные заметки
Идемпотентность. Для
enable.idempotence=true
требуетсяacks=all
иretries > 0
.Производительность. Увеличьте
batch.size
иlinger.ms
для продюсера илиmax.poll.records
для консюмера, чтобы повысить пропускную способность.Надёжность. Используйте
acks=all
и высокийretries
для продюсера, отключитеenable.auto
.commit
для точного контроля оффсетов в консюмере.Отладка. Логируйте
group.id
и проверяйтеauto.offset.reset
, если данные не читаются.
Заключение
Создание kafka-spring-boot-starter позволило мне упростить интеграцию Kafka в проекты на Spring Boot. Проблемы с Avro и тестами научили меня гибкости и важности синхронизации в асинхронных системах. Надеюсь, этот опыт вдохновит вас на создание собственных библиотек!
Вопрос к читателям: какую функциональность вы бы добавили в такой стартер? Делитесь идеями в комментариях!
Попробуйте внедрить стартер в свой проект! Какие задачи вы решаете с Kafka? Делитесь опытом в комментариях — обсудим, как улучшить этот подход!
Исходный код доступен на GitHub https://github.com/bigbox89/kafka-spring-boot-starter . Если у вас возникнут вопросы по настройке или тестированию, пишите — разберёмся вместе!