Всем привет. Решил поделиться опытом тестирования логики консьюмеров и продюсеров в стандартном Spring Boot приложении. Я несколько раз подходил к этой задаче с различными вариантами и находил разные подводные камни, которые стимулировали меня искать дальше - более лучшее решение. И вот, в очередной раз прошерстив накопленный опыт человечества (stackoverflow), я реализовал очередной вариант, в котором пока не нашел минусов предыдущих реализаций. В любом случае, поделюсь с вами как я к этому пришел и почему для моих кейсов не сработали другие варианты (или показались хуже последнего).

Дисклеймер. Я сторонник интеграционных тестов при запуске сборки проекта с тестами. То есть это когда для unit-тестов поднимается контекст Spring со всеми плюсами (окружение, максимально похожее на боевое) и минусами (это ДОЛГО запускается и ДОЛГО описывается).

Часть 1. А разве это проблема вообще?

Коротко: да.

Началось все с того, чтобы признать, что логику внутри консьюмера и продюсера нужно тестировать. Как выглядит самый стандартный консьюмер? Ну, предположу, что как-то так:

@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {

    private final UserService userService;
    private final NotUserService notUserService;
    private final UserHandler userHandler;
    private final UserMapper userMapper;
    private final List<String> userCodes;

    @KafkaListener(groupId = "userConsumerGroupId",
            clientIdPrefix = "UserConsumer",
            topics = {"user-topic"},
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(UserInfoRecord userInfoRecord) {
        log.info("Тут разнообразная логика обработки");
        log.info("Cохранение в бд?");
        log.info("Работа с внешними сервисами?");
        log.info("Вызов мапперов?");
        log.info("Отправка информации в Kafka?");
    }
}

В методе consume() как раз может скрываться нетривиальная логика, а может быть даже тривиальная, но которую хочется проверить до того, как отдавать заказчику.

А как выглядит стандартный продюсер? Ну, наверное, как-то так:

@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducer {

    private final UserService userService;
    /** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
    private final KafkaMessageService kafkaMessageService;
    /** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
    private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
    /** Но есть места, где используется JSON вместо AVRO */
    private final KafkaProducer<String, String> kafkaJsonProducer;
    private final ApplicationProperties applicationProperties;

    public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
      log.info("Тут логика отправки");
      log.info("Обычно сначала идет сборка модели для отправки.")
      log.info("Но может быть и какая-то более сложная логика.");
      var record = UserInfoRecord.newBuilder().build();
      log.info("Тут может быть как непосредственно отправка в Kafka");
      log.info("Так и реализация outbox-паттерна.");
      kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
    }
}

Может быть используется outbox-pattern, и тогда мы перед отправкой просто кладем в бд сообщение, которое надо отправить.Или кладем в БД только те сообщения, которые не удалось по каким-то причинам отправить. Или не в БД кладем, а еще куда отправляем. Сути это не меняет - мы собираем сообщение и отправляем его либо в бд, либо сразу в Kafka с помощью KafkaProducer. Мы не хотим проверять в каждом интеграционном тесте логику непосредственной отправки сообщения, логику персиста сообщений - это мы делаем в стартере работы с Kafka. А в конечных сервисах считаем, что все протестировано и все работает корректно. Но очень хочется проверить логику ДО, то есть как собирается модель, корректно ли мы кладем поля, не забыли ли мы докинуть в модель недостающие данные, правильно ли мы вызываем калькулятор роста пользователя на основании количества букв в его ФИО.

Итого: логика в консьюмерах и продюсерах тоже просится в unit-тесты, как и остальной код.

Часть 1.5. Промежуточные варианты

Например, использовать тестовый стенд Kafka, то есть подключаться к нему прямо из тестов. Мне кажется, это плохой вариант, когда у тебя появляется зависимость на внешний сервис для выполнения тестов.

Сюда же можно отнести вариант поднятия Kafka (хоть в контейнере, например) прямо на машине-сборщике (Gitlab Runner в нашем случае). Уже лучше, чем абзацем выше, но выглядит такая конструкция слабоподдерживаемой.

Часть 2. "Просто проверь логику, забей на логи"

Да, первый этап - просто проверить логику, несмотря на то, что в консоль при поднятии контекста Spring постоянно пишутся ошибки подключения к Kafka:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: INFO  o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Node -1 disconnected.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Connection to node -1 (localhost/127.0.0.1:3333) could not be established. Broker may not be available.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Bootstrap broker localhost:3333 (id: -1 rack: null) disconnected

Возможно, это вполне рабочий вариант и эти ошибки коннекта никак не помешают работе всех тестов при сборке. Но если консьюмеров в одном приложении 5-10, то таких логов становится ощутимо много (например, может быть больше половины общего объема логов при тестах), и в целом такое решение может влиять на работоспособность тестов.

Часть 3. "Да замокай ты да и всё.."

Второй совет, который я услышал или увидел - просто замокать консьюмеры и/или продюсеры. То есть вот так (допустим, для интеграционных тестов мы используем некий BaseTest для конфигурирования контекста):

@SpringJUnitConfig
@SpringBootTest
@EnableConfigurationProperties(ApplicationProperties.class)
public abstract class BaseTest {

    @MockBean(name = "kafkaProducer")
    protected KafkaProducer kafkaProducer;
    @MockBean
    protected UserConsumer userConsumer;
  }

Это определенно поможет не ловить ошибки коннекта к Kafka, потому что ни консьюмеры, ни продюсеры в таком случае не будут конфигурироваться для контекста. А просто будут замоканы.

И кстати, для продюсера это вполне оптимальный вариант! Мокая KafkaProducer - то есть бин отправки сообщений в Kafka, мы оставляем бин класса-продюсера в контексте и можем легко написать на него тест:

class UserProducerTest extends BaseTest {

    @Autowired
    private UserProducer userProducer;

    @Test
    void testUserProducer() {
      log.info("тестируем логику");
      userProducer.produce();
    }
  }

Как в таком случае написать тесты на консюмер? Ну, например, так:

class UserConsumerTest extends BaseTest {

    private final UserConsumer userConsumer = new UserConsumer(<все зависимости>);

    @Test
    void testUserProducer() {
      log.info("тестируем логику");
      userConsumer.consume();
    }
  }

Помните, что у нас в UserConsumer может быть много зависимостей на другие сервисы/классы? Их все нужно либо подготовить тут, либо взять из контекста и подготовить только консьюмер. В любом случае это получается ручная подготовка класса.

Итого, замокать продюсер - ок, замокать консьюмер? Сомнительно, но ОКЭ - придется инициализировать консьюмеры вручную (а при запуске приложения это делает Spring, получается тесты происходят в контексте, но условия запуска разные).

Часть 4. "Да просто возьми EmbeddedKafka, она же для этого и сделана!"

Да, есть такой вариант - поднять EmbeddedKafka для тестов. Настраивается такой вариант несложно - вешаем аннотацию на тест:

@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })

И добавляем зависимость в maven (в моем случае):

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
</dependency>

Какие плюсы? Буквально одной строчкой настройки у нас поднимается простенькая embedded Kafka в тестах, которая позволяет проверить полный флоу. Например проверить, что мы примем в консьюмере именно то, что отправили в продюсере.

Более подробно об этом и следующем варианте написано, например, тут: https://www.baeldung.com/spring-boot-kafka-testing

Я лишь скажу минус, который нашел: зависимость spring-kafka-test тянет за собой транзитивно зависимости на scala:

  • jackson-module-scala_2.13

  • scala-collection-compat_2.13

  • scala-java8-compat_2.13

  • scala-library

  • scala-logging_2.13

  • scala-reflect

И в целом-то можно было бы сказать что "ну и ладно, ну и пускай, все равно это только в тестах". Но эти либы влияют на исполнение кода! Я, например, словил такую ошибку:

ClassCast class scala.collection.immutable.$colon$colon cannot be cast to class java.util.List (scala.collection.immutable.$colon$colon is in unnamed module of loader 'app'; java.util.List is in module java.base of loader 'bootstrap')

В коде мы конечно же не использовали данный класс и в принципе пакет scala. Но мы используем querydsl. И при кодогенерации происходит "подстава". Возможно, это легко чинится более точной настройкой maven-плагина, но копаться там не захотелось.

Часть 5. "Ну тогда тебе поможет testcontainers, запускай на нем Kafka!"

И правда, поможет. У последних версий Spring Boot хорошая интеграция с testcontainers, что позволяет очень просто настраивать поднятие контейнеров в тестах.

Ах да, в начале я вскользь упомянул, что мы используем AVRO для описания структуры сообщений, передаваемых по Kafka. То есть мы используем Schema Registry (SR). А значит в тестах нужно решить проблему взаимодействия с SR. Тут тоже есть несколько вариантов - замокать через WireMock, использовать SR тестового стенда, поднимать отдельную на Gitlab Runner или еще где-то. Я попробовал два варианта - эмулировал настоящую SR через wiremock - громоздко, приходится докидывать эмуляции новых схем, обновлять текущие. В какой-то момент мы просто настроили взаимодействие с централизованной тестовой SR компании. Но тут сложность в том, что тебе обязательно нужно зарегистрировать схему в SR до запуска тестов. А тесты вообще говоря прогоняются ДО релиза проектов на тестовый стенд.

Чтобы было удобнее подключать kafka-testcontainers в разных сервисах, я создал отдельную либу для теста, с такими зависимостями:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <scope>compile</scope>
    </dependency>
    <!-- Тут у нас просто либа с модельками и зависимостью на AVRO -->
    <dependency>
        <groupId>ru.alfastrah</groupId>
        <artifactId>avro-models</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.wiremock</groupId>
        <artifactId>wiremock-standalone</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-testcontainers</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-web</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Ну а основной класс в либе такой:

@ImportTestcontainers
public interface KafkaBaseTest {

    Logger log = org.slf4j.LoggerFactory.getLogger(KafkaTestingClass.class);
    /** Kafka-контейнер */
    KafkaContainer KAFKA_CONTAINER = new KafkaContainer(KAFKA_IMAGE_NAME);

    /** Динамическая конфигурация свойств подключения к БД */
    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        if (!KAFKA_CONTAINER.isRunning()) {
            KAFKA_CONTAINER.start();
            var servers = KAFKA_CONTAINER.getBootstrapServers();
            log.info("Kafka-контейнер запущен по адресу {} (и адрес положен в свойство bootstrap-servers)", servers);
            registry.add("bootstrap-servers", () -> servers);
        }
    }
}

Все это позволяет включить в конечном сервисе kafka на testcontainers так (implements KafkaBaseTest):

@SpringJUnitConfig
@SpringBootTest
@EnableConfigurationProperties(value = ApplicationProperties.class)
public abstract class BaseTest implements KafkaBaseTest {
}

Минусы:

  • сборка стала дольше, так как теперь запускается контейнер с kafka;

  • мы проверяем не совсем то, что хотим изначально - не логику консьюмера или продюсера, а конфигурацию взаимодействия с kafka. Это, конечно, тоже важно, но;

  • код в тестах стал довольно громоздким, например:

class UserProducerTest extends BaseTest {

    @Autowired
    private KafkaSettings kafkaSettings;
    @Autowired
    private ApplicationProperties appProperties;
    @Autowired
    private UserProducer userProducer;
    @Autowired
    private KafkaListenerContainerFactory kafkaListenerContainerFactory;
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Test
    void KafkaProducerTest() {
        var counter = new AtomicInteger(0);
        new KafkaListenerCreator<SomeEvent>(kafkaSettings.groupId(), kafkaListenerContainerFactory, kafkaListenerEndpointRegistry)
                .createAndRegisterListener(appProperties.topicName(), "KafkaProducerServiceTestListener", (record) -> {
                    log.info("Получено сообщение: {}", record);
                    if ("UserProducerTest".equals(record.getTestData())) {
                        counter.getAndIncrement();
                    }
                });
        AtomicInteger iteration = new AtomicInteger(0);
        Arrays.stream(EventType.values()).forEach(eventType -> {
            var record = createNewRecord(iteration.getAndIncrement());
            userProducer.send(record);
        });
        await().atMost(Duration.ofSeconds(6L)).untilAsserted(() -> {
            assertEquals(EventType.values().length, iteration.get());
            assertEquals(2, counter.get());
        });
    }
  }

Часть 6. Если не всё, что выше, то что же?

Получая некоторые сложности от варианта выше, вопрос с тестированием логики был все еще не закрыт. Я окунулся в очередной раз в накопленный опыт человечества и нашел там занимательную штуку, а именно: параметр autoStartup в аннотации @KafkaListener (сразу сюда я почему-то не посмотрел).

И тогда мы забываем почти все, что написано выше и пишем следующую конструкцию:

// В тестах мокаем бины продюсеров сообщений в Kafka
public abstract class BaseTest {

    @MockBean(name = "kafkaProducer")
    protected KafkaProducer kafkaProducer;
    @MockBean(name = "kafkaJsonProducer")
    protected KafkaProducer kafkaJsonProducer;
}
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {

    private final UserService userService;
    private final NotUserService notUserService;
    private final UserHandler userHandler;
    private final UserMapper userMapper;
    private final List<String> userCodes;

    @KafkaListener(
      // прописываем по умолчанию и возможность повлиять на него через свойство
      autoStartup = "${auto-startup:true}",
      groupId = "userConsumerGroupId",
      clientIdPrefix = "UserConsumer",
      topics = {"user-topic"},
      containerFactory = "kafkaListenerContainerFactory")
    public void consume(UserInfoRecord userInfoRecord) {
        log.info("Тут разнообразная логика обработки");
        log.info("Cохранение в бд?");
        log.info("Работа с внешними сервисами?");
        log.info("Вызов мапперов?");
        log.info("Отправка информации в Kafka?");
    }
}
# в тестовом application.yml выключаем автозапуск:
auto-startup: false

Что получаем в итоге: бин консьюмера в контексте, при инициализации консьюмер не пытается подключиться к kafka. При этом у нас работает валидация на уровне "неправильно задал containerFactory" - свалится ошибка при запуске или при тестах.

И тогда тест на консьюмер будет выглядеть так:

class UserConsumerTest extends BaseTest {

    // Просто берем бин из контекста и проверяем метод consume
    @Autowired
    private UserConsumer userConsumer;

    @Test
    @DataSet(value = "userConsumer_initial.yml")
    @ExpectedDataSet(value = "userConsumer_expected.yml")
    void testConsume() {
        userConsumer.consume(buildMessage());
    }
}

А вот пример проверки продюсера:

class UserProducerTest extends BaseTest {

    @Autowired
    private UserProducer userProducer;
    @Autowired
    private ApplicationProperties appProperties;

    @Test
    @DataSet(value = {"UserProducerTest.yml"})
    void testPolicyPaymentProducer() {
        getUsers().forEach(user -> userProducer.sendInfo(user));

        verify(kafkaMessageService, times(4)).persistMessageToSend(eq(appProperties.topicName()), argThat((arg) -> {
            var record = (UserRecord) arg;
            assertEquals(expectedField1, record.getId());
            assertEquals(expectedField2, record.getStatus());
            assertEquals(expectedField3, record.getUid());
            return record.getExpectedSomething() == null;
        }));
    }
}

Пока я остановился на этом варианте как на наиболее простом в описании и использовании и при этом он позволяет проверить то, что мы изначально и хотим - логику внутри продюсеров и консьюмеров.

Спасибо, что дочитали. Расскажете в комментариях, какой вариант используете вы?