Как протестировать логику консьюмеров и продюсеров и не сгореть? Spring Boot 3, Spring Kafka
Всем привет. Решил поделиться опытом тестирования логики консьюмеров и продюсеров в стандартном Spring Boot приложении. Я несколько раз подходил к этой задаче с различными вариантами и находил разные подводные камни, которые стимулировали меня искать дальше - более лучшее решение. И вот, в очередной раз прошерстив накопленный опыт человечества (stackoverflow), я реализовал очередной вариант, в котором пока не нашел минусов предыдущих реализаций. В любом случае, поделюсь с вами как я к этому пришел и почему для моих кейсов не сработали другие варианты (или показались хуже последнего).
Дисклеймер. Я сторонник интеграционных тестов при запуске сборки проекта с тестами. То есть это когда для unit-тестов поднимается контекст Spring со всеми плюсами (окружение, максимально похожее на боевое) и минусами (это ДОЛГО запускается и ДОЛГО описывается).
Часть 1. А разве это проблема вообще?
Коротко: да.
Началось все с того, чтобы признать, что логику внутри консьюмера и продюсера нужно тестировать. Как выглядит самый стандартный консьюмер? Ну, предположу, что как-то так:
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {
private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;
@KafkaListener(groupId = "userConsumerGroupId",
clientIdPrefix = "UserConsumer",
topics = {"user-topic"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
log.info("Тут разнообразная логика обработки");
log.info("Cохранение в бд?");
log.info("Работа с внешними сервисами?");
log.info("Вызов мапперов?");
log.info("Отправка информации в Kafka?");
}
}
В методе consume() как раз может скрываться нетривиальная логика, а может быть даже тривиальная, но которую хочется проверить до того, как отдавать заказчику.
А как выглядит стандартный продюсер? Ну, наверное, как-то так:
@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducer {
private final UserService userService;
/** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
private final KafkaMessageService kafkaMessageService;
/** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
/** Но есть места, где используется JSON вместо AVRO */
private final KafkaProducer<String, String> kafkaJsonProducer;
private final ApplicationProperties applicationProperties;
public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
log.info("Тут логика отправки");
log.info("Обычно сначала идет сборка модели для отправки.")
log.info("Но может быть и какая-то более сложная логика.");
var record = UserInfoRecord.newBuilder().build();
log.info("Тут может быть как непосредственно отправка в Kafka");
log.info("Так и реализация outbox-паттерна.");
kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
}
}
Может быть используется outbox-pattern, и тогда мы перед отправкой просто кладем в бд сообщение, которое надо отправить.Или кладем в БД только те сообщения, которые не удалось по каким-то причинам отправить. Или не в БД кладем, а еще куда отправляем. Сути это не меняет - мы собираем сообщение и отправляем его либо в бд, либо сразу в Kafka с помощью KafkaProducer. Мы не хотим проверять в каждом интеграционном тесте логику непосредственной отправки сообщения, логику персиста сообщений - это мы делаем в стартере работы с Kafka. А в конечных сервисах считаем, что все протестировано и все работает корректно. Но очень хочется проверить логику ДО, то есть как собирается модель, корректно ли мы кладем поля, не забыли ли мы докинуть в модель недостающие данные, правильно ли мы вызываем калькулятор роста пользователя на основании количества букв в его ФИО.
Итого: логика в консьюмерах и продюсерах тоже просится в unit-тесты, как и остальной код.
Часть 1.5. Промежуточные варианты
Например, использовать тестовый стенд Kafka, то есть подключаться к нему прямо из тестов. Мне кажется, это плохой вариант, когда у тебя появляется зависимость на внешний сервис для выполнения тестов.
Сюда же можно отнести вариант поднятия Kafka (хоть в контейнере, например) прямо на машине-сборщике (Gitlab Runner в нашем случае). Уже лучше, чем абзацем выше, но выглядит такая конструкция слабоподдерживаемой.
Часть 2. "Просто проверь логику, забей на логи"
Да, первый этап - просто проверить логику, несмотря на то, что в консоль при поднятии контекста Spring постоянно пишутся ошибки подключения к Kafka:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: INFO o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Node -1 disconnected.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Connection to node -1 (localhost/127.0.0.1:3333) could not be established. Broker may not be available.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Bootstrap broker localhost:3333 (id: -1 rack: null) disconnected
Возможно, это вполне рабочий вариант и эти ошибки коннекта никак не помешают работе всех тестов при сборке. Но если консьюмеров в одном приложении 5-10, то таких логов становится ощутимо много (например, может быть больше половины общего объема логов при тестах), и в целом такое решение может влиять на работоспособность тестов.
Часть 3. "Да замокай ты да и всё.."
Второй совет, который я услышал или увидел - просто замокать консьюмеры и/или продюсеры. То есть вот так (допустим, для интеграционных тестов мы используем некий BaseTest для конфигурирования контекста):
@SpringJUnitConfig
@SpringBootTest
@EnableConfigurationProperties(ApplicationProperties.class)
public abstract class BaseTest {
@MockBean(name = "kafkaProducer")
protected KafkaProducer kafkaProducer;
@MockBean
protected UserConsumer userConsumer;
}
Это определенно поможет не ловить ошибки коннекта к Kafka, потому что ни консьюмеры, ни продюсеры в таком случае не будут конфигурироваться для контекста. А просто будут замоканы.
И кстати, для продюсера это вполне оптимальный вариант! Мокая KafkaProducer - то есть бин отправки сообщений в Kafka, мы оставляем бин класса-продюсера в контексте и можем легко написать на него тест:
class UserProducerTest extends BaseTest {
@Autowired
private UserProducer userProducer;
@Test
void testUserProducer() {
log.info("тестируем логику");
userProducer.produce();
}
}
Как в таком случае написать тесты на консюмер? Ну, например, так:
class UserConsumerTest extends BaseTest {
private final UserConsumer userConsumer = new UserConsumer(<все зависимости>);
@Test
void testUserProducer() {
log.info("тестируем логику");
userConsumer.consume();
}
}
Помните, что у нас в UserConsumer может быть много зависимостей на другие сервисы/классы? Их все нужно либо подготовить тут, либо взять из контекста и подготовить только консьюмер. В любом случае это получается ручная подготовка класса.
Итого, замокать продюсер - ок, замокать консьюмер? Сомнительно, но ОКЭ - придется инициализировать консьюмеры вручную (а при запуске приложения это делает Spring, получается тесты происходят в контексте, но условия запуска разные).
Часть 4. "Да просто возьми EmbeddedKafka, она же для этого и сделана!"
Да, есть такой вариант - поднять EmbeddedKafka для тестов. Настраивается такой вариант несложно - вешаем аннотацию на тест:
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
И добавляем зависимость в maven (в моем случае):
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
Какие плюсы? Буквально одной строчкой настройки у нас поднимается простенькая embedded Kafka в тестах, которая позволяет проверить полный флоу. Например проверить, что мы примем в консьюмере именно то, что отправили в продюсере.
Более подробно об этом и следующем варианте написано, например, тут: https://www.baeldung.com/spring-boot-kafka-testing
Я лишь скажу минус, который нашел: зависимость spring-kafka-test тянет за собой транзитивно зависимости на scala:
jackson-module-scala_2.13
scala-collection-compat_2.13
scala-java8-compat_2.13
scala-library
scala-logging_2.13
scala-reflect
И в целом-то можно было бы сказать что "ну и ладно, ну и пускай, все равно это только в тестах". Но эти либы влияют на исполнение кода! Я, например, словил такую ошибку:
ClassCast class scala.collection.immutable.$colon$colon cannot be cast to class java.util.List (scala.collection.immutable.$colon$colon is in unnamed module of loader 'app'; java.util.List is in module java.base of loader 'bootstrap')
В коде мы конечно же не использовали данный класс и в принципе пакет scala. Но мы используем querydsl. И при кодогенерации происходит "подстава". Возможно, это легко чинится более точной настройкой maven-плагина, но копаться там не захотелось.
Часть 5. "Ну тогда тебе поможет testcontainers, запускай на нем Kafka!"
И правда, поможет. У последних версий Spring Boot хорошая интеграция с testcontainers, что позволяет очень просто настраивать поднятие контейнеров в тестах.
Ах да, в начале я вскользь упомянул, что мы используем AVRO для описания структуры сообщений, передаваемых по Kafka. То есть мы используем Schema Registry (SR). А значит в тестах нужно решить проблему взаимодействия с SR. Тут тоже есть несколько вариантов - замокать через WireMock, использовать SR тестового стенда, поднимать отдельную на Gitlab Runner или еще где-то. Я попробовал два варианта - эмулировал настоящую SR через wiremock - громоздко, приходится докидывать эмуляции новых схем, обновлять текущие. В какой-то момент мы просто настроили взаимодействие с централизованной тестовой SR компании. Но тут сложность в том, что тебе обязательно нужно зарегистрировать схему в SR до запуска тестов. А тесты вообще говоря прогоняются ДО релиза проектов на тестовый стенд.
Чтобы было удобнее подключать kafka-testcontainers в разных сервисах, я создал отдельную либу для теста, с такими зависимостями:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>compile</scope>
</dependency>
<!-- Тут у нас просто либа с модельками и зависимостью на AVRO -->
<dependency>
<groupId>ru.alfastrah</groupId>
<artifactId>avro-models</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Ну а основной класс в либе такой:
@ImportTestcontainers
public interface KafkaBaseTest {
Logger log = org.slf4j.LoggerFactory.getLogger(KafkaTestingClass.class);
/** Kafka-контейнер */
KafkaContainer KAFKA_CONTAINER = new KafkaContainer(KAFKA_IMAGE_NAME);
/** Динамическая конфигурация свойств подключения к БД */
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
if (!KAFKA_CONTAINER.isRunning()) {
KAFKA_CONTAINER.start();
var servers = KAFKA_CONTAINER.getBootstrapServers();
log.info("Kafka-контейнер запущен по адресу {} (и адрес положен в свойство bootstrap-servers)", servers);
registry.add("bootstrap-servers", () -> servers);
}
}
}
Все это позволяет включить в конечном сервисе kafka на testcontainers так (implements KafkaBaseTest):
@SpringJUnitConfig
@SpringBootTest
@EnableConfigurationProperties(value = ApplicationProperties.class)
public abstract class BaseTest implements KafkaBaseTest {
}
Минусы:
сборка стала дольше, так как теперь запускается контейнер с kafka;
мы проверяем не совсем то, что хотим изначально - не логику консьюмера или продюсера, а конфигурацию взаимодействия с kafka. Это, конечно, тоже важно, но;
код в тестах стал довольно громоздким, например:
class UserProducerTest extends BaseTest {
@Autowired
private KafkaSettings kafkaSettings;
@Autowired
private ApplicationProperties appProperties;
@Autowired
private UserProducer userProducer;
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
void KafkaProducerTest() {
var counter = new AtomicInteger(0);
new KafkaListenerCreator<SomeEvent>(kafkaSettings.groupId(), kafkaListenerContainerFactory, kafkaListenerEndpointRegistry)
.createAndRegisterListener(appProperties.topicName(), "KafkaProducerServiceTestListener", (record) -> {
log.info("Получено сообщение: {}", record);
if ("UserProducerTest".equals(record.getTestData())) {
counter.getAndIncrement();
}
});
AtomicInteger iteration = new AtomicInteger(0);
Arrays.stream(EventType.values()).forEach(eventType -> {
var record = createNewRecord(iteration.getAndIncrement());
userProducer.send(record);
});
await().atMost(Duration.ofSeconds(6L)).untilAsserted(() -> {
assertEquals(EventType.values().length, iteration.get());
assertEquals(2, counter.get());
});
}
}
Часть 6. Если не всё, что выше, то что же?
Получая некоторые сложности от варианта выше, вопрос с тестированием логики был все еще не закрыт. Я окунулся в очередной раз в накопленный опыт человечества и нашел там занимательную штуку, а именно: параметр autoStartup в аннотации @KafkaListener (сразу сюда я почему-то не посмотрел).
И тогда мы забываем почти все, что написано выше и пишем следующую конструкцию:
// В тестах мокаем бины продюсеров сообщений в Kafka
public abstract class BaseTest {
@MockBean(name = "kafkaProducer")
protected KafkaProducer kafkaProducer;
@MockBean(name = "kafkaJsonProducer")
protected KafkaProducer kafkaJsonProducer;
}
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {
private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;
@KafkaListener(
// прописываем по умолчанию и возможность повлиять на него через свойство
autoStartup = "${auto-startup:true}",
groupId = "userConsumerGroupId",
clientIdPrefix = "UserConsumer",
topics = {"user-topic"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
log.info("Тут разнообразная логика обработки");
log.info("Cохранение в бд?");
log.info("Работа с внешними сервисами?");
log.info("Вызов мапперов?");
log.info("Отправка информации в Kafka?");
}
}
# в тестовом application.yml выключаем автозапуск:
auto-startup: false
Что получаем в итоге: бин консьюмера в контексте, при инициализации консьюмер не пытается подключиться к kafka. При этом у нас работает валидация на уровне "неправильно задал containerFactory" - свалится ошибка при запуске или при тестах.
И тогда тест на консьюмер будет выглядеть так:
class UserConsumerTest extends BaseTest {
// Просто берем бин из контекста и проверяем метод consume
@Autowired
private UserConsumer userConsumer;
@Test
@DataSet(value = "userConsumer_initial.yml")
@ExpectedDataSet(value = "userConsumer_expected.yml")
void testConsume() {
userConsumer.consume(buildMessage());
}
}
А вот пример проверки продюсера:
class UserProducerTest extends BaseTest {
@Autowired
private UserProducer userProducer;
@Autowired
private ApplicationProperties appProperties;
@Test
@DataSet(value = {"UserProducerTest.yml"})
void testPolicyPaymentProducer() {
getUsers().forEach(user -> userProducer.sendInfo(user));
verify(kafkaMessageService, times(4)).persistMessageToSend(eq(appProperties.topicName()), argThat((arg) -> {
var record = (UserRecord) arg;
assertEquals(expectedField1, record.getId());
assertEquals(expectedField2, record.getStatus());
assertEquals(expectedField3, record.getUid());
return record.getExpectedSomething() == null;
}));
}
}
Пока я остановился на этом варианте как на наиболее простом в описании и использовании и при этом он позволяет проверить то, что мы изначально и хотим - логику внутри продюсеров и консьюмеров.
Спасибо, что дочитали. Расскажете в комментариях, какой вариант используете вы?