Всем привет. Решил поделиться опытом тестирования логики консьюмеров и продюсеров в стандартном Spring Boot приложении. Я несколько раз подходил к этой задаче с различными вариантами и находил разные подводные камни, которые стимулировали меня искать дальше - более лучшее решение. И вот, в очередной раз прошерстив накопленный опыт человечества (stackoverflow), я реализовал очередной вариант, в котором пока не нашел минусов предыдущих реализаций. В любом случае, поделюсь с вами как я к этому пришел и почему для моих кейсов не сработали другие варианты (или показались хуже последнего).
Дисклеймер. Я сторонник интеграционных тестов при запуске сборки проекта с тестами. То есть это когда для unit-тестов поднимается контекст Spring со всеми плюсами (окружение, максимально похожее на боевое) и минусами (это ДОЛГО запускается и ДОЛГО описывается).
Часть 1. А разве это проблема вообще?
Коротко: да.
Началось все с того, чтобы признать, что логику внутри консьюмера и продюсера нужно тестировать. Как выглядит самый стандартный консьюмер? Ну, предположу, что ��ак-то так:
@Slf4j @Service @RequiredArgsConstructor public class UserConsumer { private final UserService userService; private final NotUserService notUserService; private final UserHandler userHandler; private final UserMapper userMapper; private final List<String> userCodes; @KafkaListener(groupId = "userConsumerGroupId", clientIdPrefix = "UserConsumer", topics = {"user-topic"}, containerFactory = "kafkaListenerContainerFactory") public void consume(UserInfoRecord userInfoRecord) { log.info("Тут разнообразная логика обработки"); log.info("Cохранение в бд?"); log.info("Работа с внешними сервисами?"); log.info("Вызов мапперов?"); log.info("Отправка информации в Kafka?"); } }
В методе consume() как раз может скрываться нетривиальная логика, а может быть даже тривиальная, но которую хочется проверить до того, как отдавать заказчику.
А как выглядит стандартный продюсер? Ну, наверное, как-то так:
@Slf4j @Service @RequiredArgsConstructor public class UserProducer { private final UserService userService; /** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */ private final KafkaMessageService kafkaMessageService; /** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */ private final KafkaProducer<String, SpecificRecordBase> kafkaProducer; /** Но есть места, где используется JSON вместо AVRO */ private final KafkaProducer<String, String> kafkaJsonProducer; private final ApplicationProperties applicationProperties; public void sendUserInfoMessage(User user, Set<Integer> managerIds) { log.info("Тут логика отправки"); log.info("Обычно сначала идет сборка модели для отправки.") log.info("Но может быть и какая-то более сложная логика."); var record = UserInfoRecord.newBuilder().build(); log.info("Тут может быть как непосредственно отправка в Kafka"); log.info("Так и реализация outbox-паттерна."); kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record); } }
Может быть используется outbox-pattern, и тогда мы перед отправкой просто кладем в бд сообщение, которое надо отправить.Или кладем в БД только те сообщения, которые не удалось по каким-то причинам отправить. Или не в БД кладем, а еще куда отправляем. Сути это не меняет - мы собираем сообщение и отправляем его либо в бд, либо сразу в Kafka с помощью KafkaProducer. Мы не хотим проверять в каждом интеграционном тесте логику непосредственной отправки сообщения, логику персиста сообщений - это мы делаем в стартере работы с Kafka. А в конечных сервисах считаем, что все протестировано и все работает корректно. Но очень хочется проверить логику ДО, то есть как собирается модель, корректно ли мы кладем поля, не забыли ли мы докинуть в модель недостающие данные, правильно ли мы вызываем калькулятор роста пользователя на основании количества букв в его ФИО.
Итого: логика в консьюмерах и продюсерах тоже просится в unit-тесты, как и остальной код.
Часть 1.5. Промежуточные варианты
Например, использовать тестовый стенд Kafka, то есть подключаться к нему прямо из тестов. Мне кажется, это плохой вариант, когда у тебя появляется зависимость на внешний сервис для выполнения тестов.
Сюда же можно отнести вариант поднятия Kafka (хоть в контейнере, например) прямо на машине-сборщике (Gitlab Runner в нашем случае). Уже лучше, чем абзацем выше, но выглядит такая конструкция слабоподдерживаемой.
Часть 2. "Просто проверь логику, забей на логи"
Да, первый этап - просто проверить логику, несмотря на то, что в консоль при поднятии контекста Spring постоянно пишутся ошибки подключения к Kafka:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: INFO o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Node -1 disconnected. [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Connection to node -1 (localhost/127.0.0.1:3333) could not be established. Broker may not be available. [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Bootstrap broker localhost:3333 (id: -1 rack: null) disconnected
Возможно, это вполне рабочий вариант и эти ошибки коннекта никак не помешают работе всех тестов при сборке. Но если консьюмеров в одном приложении 5-10, то таких логов становится ощутимо много (например, может быть больше половины общего объема логов при тестах), и в целом такое решение может влиять на работоспособность тестов.
Часть 3. "Да замокай ты да и всё.."
Второй совет, который я услышал или увидел - просто замокать консьюмеры и/или продюсеры. То есть вот так (допустим, для интеграционных тестов мы используем некий BaseTest для конфигурирования контекста):
@SpringJUnitConfig @SpringBootTest @EnableConfigurationProperties(ApplicationProperties.class) public abstract class BaseTest { @MockBean(name = "kafkaProducer") protected KafkaProducer kafkaProducer; @MockBean protected UserConsumer userConsumer; }
Это определенно поможет не ловить ошибки коннекта к Kafka, потому что ни консьюмеры, ни продюсеры в таком случае не будут конфигурироваться для контекста. А просто будут замоканы.
И кстати, для продюсера это вполне оптимальный вариант! Мокая KafkaProducer - то есть бин отправки сообщений в Kafka, мы оставляем бин класса-продюсера в контексте и можем легко написать на него тест:
class UserProducerTest extends BaseTest { @Autowired private UserProducer userProducer; @Test void testUserProducer() { log.info("тестируем логику"); userProducer.produce(); } }
Как в таком случае написать тесты на консюмер? Ну, например, так:
class UserConsumerTest extends BaseTest { private final UserConsumer userConsumer = new UserConsumer(<все зависимости>); @Test void testUserProducer() { log.info("тестируем логику"); userConsumer.consume(); } }
Помните, что у нас в UserConsumer может быть много зависимостей на другие сервисы/классы? Их все нужно либо подготовить тут, либо взять из контекста и подготовить только консьюмер. В любом случае это получается ручная подготовка класса.
Итого, замокать продюсер - ок, замокать консьюмер? Сомнительно, но ОКЭ - придется инициализировать консьюмеры вручную (а при запуске приложения это делает Spring, получается тесты происходят в контексте, но условия запуска разные).
Часть 4. "Да просто возьми EmbeddedKafka, она же для этого и сделана!"
Да, есть такой вариант - поднять EmbeddedKafka для тестов. Настраивается такой вариант несложно - вешаем аннотацию на тест:
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
И добавляем зависимость в maven (в моем случае):
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> </dependency>
Какие плюсы? Буквально одной строчкой настройки у нас поднимается простенькая embedded Kafka в тестах, которая позволяет проверить полный флоу. Например проверить, что мы примем в консьюмере именно то, что отправили в продюсере.
Более подробно об этом и следующем варианте написано, например, тут: https://www.baeldung.com/spring-boot-kafka-testing
Я лишь скажу минус, который нашел: зависимость spring-kafka-test тянет за собой транзитивно зависимости на scala:
jackson-module-scala_2.13
scala-collection-compat_2.13
scala-java8-compat_2.13
scala-library
scala-logging_2.13
scala-reflect
И в целом-то можно было бы сказать что "ну и ладно, ну и пускай, все равно это только в тестах". Но эти либы влияют на исполнение кода! Я, например, словил такую ошибку:
ClassCast class scala.collection.immutable.$colon$colon cannot be cast to class java.util.List (scala.collection.immutable.$colon$colon is in unnamed module of loader 'app'; java.util.List is in module java.base of loader 'bootstrap')
В коде мы конечно же не использовали данный класс и в принципе пакет scala. Но мы используем querydsl. И при кодогенерации происходит "подстава". Возможно, это легко чинится более точной настройкой maven-плагина, но копаться там не захотелось.
Часть 5. "Ну тогда тебе поможет testcontainers, запускай на нем Kafka!"
И правда, поможет. У последних версий Spring Boot хорошая интеграция с testcontainers, что позволяет очень просто настраивать поднятие контейнеров в тестах.
Ах да, в начале я вскользь упомянул, что мы используем AVRO для описания структуры сообщений, передаваемых по Kafka. То есть мы используем Schema Registry (SR). А значит в тестах нужно решить проблему взаимодействия с SR. Тут тоже есть несколько вариантов - замокать через WireMock, использовать SR тестового стенда, поднимать отдельную на Gitlab Runner или еще где-то. Я попробовал два варианта - эмулировал настоящую SR через wiremock - громоздко, приходится докидывать эмуляции новых схем, обновлять текущие. В какой-то момент мы просто настроили взаимодействие с централизованной тестовой SR компании. Но тут сложность в том, что тебе обязательно нужно зарегистрировать схему в SR до запуска тестов. А тесты вообще говоря прогоняются ДО релиза проектов на тестовый стенд.
Чтобы было удобнее подключать kafka-testcontainers в разных сервисах, я создал отдельную либу для теста, с такими зависимостями:
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <scope>compile</scope> </dependency> <!-- Тут у нас просто либа с модельками и зависимостью на AVRO --> <dependency> <groupId>ru.alfastrah</groupId> <artifactId>avro-models</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.wiremock</groupId> <artifactId>wiremock-standalone</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-testcontainers</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <scope>test</scope> </dependency> </dependencies>
Ну а основной класс в либе такой:
@ImportTestcontainers public interface KafkaBaseTest { Logger log = org.slf4j.LoggerFactory.getLogger(KafkaTestingClass.class); /** Kafka-контейнер */ KafkaContainer KAFKA_CONTAINER = new KafkaContainer(KAFKA_IMAGE_NAME); /** Динамическая конфигурация свойств подключения к БД */ @DynamicPropertySource static void kafkaProperties(DynamicPropertyRegistry registry) { if (!KAFKA_CONTAINER.isRunning()) { KAFKA_CONTAINER.start(); var servers = KAFKA_CONTAINER.getBootstrapServers(); log.info("Kafka-контейнер запущен по адресу {} (и адрес положен в свойство bootstrap-servers)", servers); registry.add("bootstrap-servers", () -> servers); } } }
Все это позволяет включить в конечном сервисе kafka на testcontainers так (implements KafkaBaseTest):
@SpringJUnitConfig @SpringBootTest @EnableConfigurationProperties(value = ApplicationProperties.class) public abstract class BaseTest implements KafkaBaseTest { }
Минусы:
сборка стала дольше, так как теперь запускается контейнер с kafka;
мы проверяем не совсем то, что хотим изначально - не логику консьюмера или продюсера, а конфигурацию взаимодействия с kafka. Это, конечно, тоже важно, но;
код в тестах стал довольно громоздким, например:
class UserProducerTest extends BaseTest { @Autowired private KafkaSettings kafkaSettings; @Autowired private ApplicationProperties appProperties; @Autowired private UserProducer userProducer; @Autowired private KafkaListenerContainerFactory kafkaListenerContainerFactory; @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Test void KafkaProducerTest() { var counter = new AtomicInteger(0); new KafkaListenerCreator<SomeEvent>(kafkaSettings.groupId(), kafkaListenerContainerFactory, kafkaListenerEndpointRegistry) .createAndRegisterListener(appProperties.topicName(), "KafkaProducerServiceTestListener", (record) -> { log.info("Получено сообщение: {}", record); if ("UserProducerTest".equals(record.getTestData())) { counter.getAndIncrement(); } }); AtomicInteger iteration = new AtomicInteger(0); Arrays.stream(EventType.values()).forEach(eventType -> { var record = createNewRecord(iteration.getAndIncrement()); userProducer.send(record); }); await().atMost(Duration.ofSeconds(6L)).untilAsserted(() -> { assertEquals(EventType.values().length, iteration.get()); assertEquals(2, counter.get()); }); } }
Часть 6. Если не всё, что выше, то что же?
Получая некоторые сложности от варианта выше, вопрос с тестированием логики был все еще не закрыт. Я окунулся в очередной раз в накопленный опыт человечества и нашел там занимательную штуку, а именно: параметр autoStartup в аннотации @KafkaListener (сразу сюда я почему-то не посмотрел).
И тогда мы забываем почти все, что написано выше и пишем следующую конструкцию:
// В тестах мокаем бины продюсеров сообщений в Kafka public abstract class BaseTest { @MockBean(name = "kafkaProducer") protected KafkaProducer kafkaProducer; @MockBean(name = "kafkaJsonProducer") protected KafkaProducer kafkaJsonProducer; }
@Slf4j @Service @RequiredArgsConstructor public class UserConsumer { private final UserService userService; private final NotUserService notUserService; private final UserHandler userHandler; private final UserMapper userMapper; private final List<String> userCodes; @KafkaListener( // прописываем по умолчанию и возможность повлиять на него через свойство autoStartup = "${auto-startup:true}", groupId = "userConsumerGroupId", clientIdPrefix = "UserConsumer", topics = {"user-topic"}, containerFactory = "kafkaListenerContainerFactory") public void consume(UserInfoRecord userInfoRecord) { log.info("Тут разнообразная логика обработки"); log.info("Cохранение в бд?"); log.info("Работа с внешними сервисами?"); log.info("Вызов мапперов?"); log.info("Отправка информации в Kafka?"); } }
# в тестовом application.yml выключаем автозапуск: auto-startup: false
Что получаем в итоге: бин консьюмера в контексте, при инициализации консьюмер не пытается подключиться к kafka. При этом у нас работает валидация на уровне "неправильно задал containerFactory" - свалится ошибка при запуске или при тестах.
И тогда тест на консьюмер будет выглядеть так:
class UserConsumerTest extends BaseTest { // Просто берем бин из контекста и проверяем метод consume @Autowired private UserConsumer userConsumer; @Test @DataSet(value = "userConsumer_initial.yml") @ExpectedDataSet(value = "userConsumer_expected.yml") void testConsume() { userConsumer.consume(buildMessage()); } }
А вот пример проверки продюсера:
class UserProducerTest extends BaseTest { @Autowired private UserProducer userProducer; @Autowired private ApplicationProperties appProperties; @Test @DataSet(value = {"UserProducerTest.yml"}) void testPolicyPaymentProducer() { getUsers().forEach(user -> userProducer.sendInfo(user)); verify(kafkaMessageService, times(4)).persistMessageToSend(eq(appProperties.topicName()), argThat((arg) -> { var record = (UserRecord) arg; assertEquals(expectedField1, record.getId()); assertEquals(expectedField2, record.getStatus()); assertEquals(expectedField3, record.getUid()); return record.getExpectedSomething() == null; })); } }
Пока я остановился на этом варианте как на наиболее простом в описании и использовании и при этом он позволяет проверить то, что мы изначально и хотим - логику внутри продюсеров и консьюмеров.
Спасибо, что дочитали. Расскажете в комментариях, какой вариант используете вы?
