Как стать автором
Обновить

Тестирование асинхронного контракта Spring Boot приложения (Kafka Consumer+Producer)

Уровень сложностиПростой
Время на прочтение12 мин
Количество просмотров2.7K

Приветствую тебя, Хабр! Сегодня мы попробуем разобраться, как написать интеграционный\сервисный\авто тест для Spring Boot приложения, которое взаимодействует с внешним миром исключительно с помощью Kafka. Сначала разберем проблему, а затем поэтапно будем развивать наши тесты, чтобы привести их к состоянию, когда мы не боимся включать их в defenition-of-done наших фич (non-flaky). На эту тему есть замечательные статьи: раз, два, три, однако, я хотел бы раскрыть ее более подробно, подробно описывая все этапы и грабли, на которые можно наступить. Ни в коем случае не претендую на истинность, при этом надеюсь, что кому-нибудь эта статья поможет в повседневной жизни.

Что мы тестируем?

Рассмотрим простое приложение с бизнес-логикой, интеграциями и БД (типовое prod-like решение):

Запрос принимает слой взаимодействия с клиентом (в терминах гексагональной архитектуры это называется primary\in адаптером, далее, для наглядности, будем придерживаться этого термина), приложение обогащается необходимыми данными, выполняет бизнес-логику, сохраняет артефакты в БД. В таком сценарии, если мы хотим упростить жизнь нашим QA-инженерам (путем избавления их от прогона одних и тех же тест-кейсов), наш тест должен иметь следующую структуру:

  1. Имитация клиентского запроса

  2. Проверка корректности интеграции в рамках обработки запроса

  3. Проверка артефактов в БД по результатам обработки запроса

  4. Проверка ответа клиенту

На всякий случай, сделаю оговорку, что все пункты, кроме первого, опциональны – все зависит от ваших потребностей и функционала вашего приложения.

В чем проблема?

Если ваш primary адаптер – это REST-controller, то никаких проблем у вас нет, ваш тест будет выглядеть примерно так:

    @Autowired
    private WebTestClient webTestClient;

    @Test
    void example() {

        var response = webTestClient
                .post()
                .uri(urlPath)
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromValue(request))
                .exchange()
                .expectStatus().isEqualTo(expectedResponseStatus)
                .expectBody(responseEntity)
                .returnResult()
                .responseBody;

        // assertions
    }

Но что, если наше приложение общается с внешним миром с помощью Kafka? Тогда наш primary адаптер на самом деле – это связка consumer + producer:

В этом случае перед нами встает несколько вопросов:

  1. Как имитировать клиентский запрос?

  2. Как начинать проверки гарантированно после того, как приложение обработало запрос?

  3. Как сделать это эффективно? (non-flaky, анализ "настоящего" ответа клиенту и т.д.)

Начнем писать тесты

Итак, для примера я написал простое приложение, которое считает среднюю оценку студента по всем предметам (для простоты их два – химия и математика). Оценку по каждому предмету приложение получает в соответствующих мастер-системах, сохраняя артефакты в БД (для простоты – это название предмета и оценка по нему). Ознакомиться с кодом можно здесь.

Нам нужен тест с базовым сценарием, чтобы при ручном тестировании уделять внимание только corner-кейсам. Настроим инфраструктуру тестов, используя фичи Spring Boot 3.1:

@SpringBootTest
@Testcontainers
public class BaseServiceTest {

    @Container
    @ServiceConnection
    protected static final PostgreSQLContainer<?> POSTGRESQL_CONTAINER =
            new PostgreSQLContainer<>("postgres:15.2")
                    .withDatabaseName("some_name")
                    .withUsername("some_user")
                    .withPassword("some_pass");

    @Container
    @ServiceConnection
    protected static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:7.3.3")
    );
}

Для инициализации БД используется файл schema.sql (документация), при этом testcontainers не используется для "Testcontainers at development time", а только для тестов. Для локального запуска используется docker-compose.

Решаем первый вопрос - как имитировать клиентский запрос?

    // имитация клиентского продюсера
    private static KafkaTemplate<String, String> clientProducer;

    @BeforeAll
    static void setUp() {
        // создаем имитацию клиентского продюсера
        final var clientProducerProps = KafkaTestUtils.producerProps(KAFKA_CONTAINER.getBootstrapServers());
        clientProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        clientProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        clientProducer = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(clientProducerProps));
    }

    @Test
    void example() {
        // имитация клиентского запроса
        clientProducer.send("request-topic", "key", "123");
        
        // assertions
    }

С клиентским запросом мы разобрались, теперь нам необходимо проверить артефакты в БД, появившиеся в процессе обработки запроса:

    @Autowired
    private ScoreDetailsPort scoreDetailsPort;
    
    @Test
    void example() {
        // имитация клиентского запроса
        clientProducer.send("request-topic", "key", "123");

        // проверяем артефакты БД
        final var details = scoreDetailsPort.findAll();
        assertThat(details).as("проверка артефактов БД").hasSize(2);
    }

Запускаем тест – упс, тест регулярно (если не всегда) будет падать, т.к. мы начинаем проверки ДО того, как запрос действительно обработан:

Почему так происходит? Тут два важных нюанса – первый, это то, что тестовый поток после имитации клиентского запроса ничего не держит, и он переходит в блок проверок сразу же. А второй – приложение имеет интеграции с мастер-системами и БД, а значит, что время обработки запроса зависит не только от нашего приложения, но и от сети, и от того, насколько быстро отвечают на запросы внешние интеграции. На практике это значит, что время обработки клиентского запроса непредсказуемо. Чтобы имитировать это, в нашем приложении мы подменили сетевые вызовы на Thread.sleep():

@Service
public class MathSubjectAdapter implements SubjectPort {

    @Override
    public SubjectType getSubjectType() {
        return MATH;
    }

    @Override
    public Double getAverageScore(String studentId) {
        try {
            // имитируем интеграцию
            Thread.sleep(Duration.of(new Random().nextLong(1, 10), SECONDS));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return new Random().nextDouble(1, 5);
    }
}

Первое, что приходит на ум – остановить наш поток на некоторое время, чтобы «дать время» приложению на обработку запроса:

    @Test
    void example() throws InterruptedException {
        // имитация клиентского запроса
        clientProducer.send("request-topic", "key", "123");

        // усыпляем тестовый поток
        Thread.sleep(15_000);

        // проверяем артефакты БД
        final var details = scoreDetailsPort.findAll();
        assertThat(details).as("проверка артефактов БД").hasSize(2);
    }

Однако этот подход имеет существенные недостатки – что, если наша бизнес-логика отвечает на самом деле быстрее? Или, что, если время ответа выросло, за счет добавления новой, более медленной интеграции?

Следующая ступень эволюции нашей мысли – SpyBean для response producer + CountDownLatch:

....

    @SpyBean
    private ExampleProducer exampleProducer;

....

    @Test
    void example() throws InterruptedException {
        final var countDownLatch = new CountDownLatch(1);

        // как только вызывается response producer, вызываем countDown(), чтобы пустить тестовый поток к блоку проверок
        // тем самым гарантируем, что проверка начнется после того, как отправлен ответ клиенту
        doAnswer(invocation -> {
            countDownLatch.countDown();
            return invocation.callRealMethod();
        }).when(exampleProducer).sendMessage(anyString(), anyDouble());

        // имитация клиентского запроса
        final var studentId = "123";
        clientProducer.send("request-topic", "key", studentId);

        // блочим тестовый поток до тех пор, пока не вызван countDown()
        countDownLatch.await(15, TimeUnit.SECONDS);

        final var details = scoreDetailsPort.findAllByStudentId(studentId);
        final var stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
        final var doubleArgumentCaptor = ArgumentCaptor.forClass(Double.class);
        Mockito.verify(exampleProducer).sendMessage(stringArgumentCaptor.capture(), doubleArgumentCaptor.capture());
        final var expectedScore = doubleArgumentCaptor.getValue();
        final var expectedStudentId = stringArgumentCaptor.getValue();

        assertThat(details).as("проверка артефактов БД").hasSize(2);
        assertThat(expectedStudentId).as("проверка ID студента").isEqualTo(studentId);
        assertThat(expectedScore).as("проверка наличия оценки").isNotNull();
    }

Применяем CountDownLatch для синхронизации тестового потока и потока, обслуживающего клиентский запрос. Метод await(15, TimeUnit.SECONDS) с таймаутом необходим, чтобы в исключительной ситуации не заблочить тест на "бесконечном" ожидании. При этом countDown() вызывается при вызове exampleProducer.sendMessage(), а это значит, что тестовый поток будет разлочен сразу по готовности, вместо ожидания конкретного времени на Thread.sleep().

Этот подход более эффективен и работает, однако, у него есть существенные недостатки. Первый - мы проверяем "ненастоящий" ответ, т.е. не то сообщение, которое получит клиент, а то, что отправил наш адаптер (Mockito capturing). И второй, более существенный - каждый Spring Boot тест, содержащий Spy\Mock бины, пересоздает Spring контекст, из-за чего при каждом новом тесте ваш тестовый прогон будет все более и более медленным, пока вы не упретесь в потолок по памяти и не получите необработанную JVM ошибку OutOfMemoryError. Чтобы в этом убедиться, необходимо добавить в application.yml:

logging:
  level:
    org.springframework.test.context.cache: DEBUG

После прогона тестов можем посмотреть сколько раз был создан заново Spring context: в логах будет запись:

DEBUG [ForkJoinPool-1-worker-1] [] o.s.test.context.cache : Spring test ApplicationContext cache statistics: […., hitCount = X, missCount = Y]

Где missCount – сколько раз промахнулись мимо кеша и пришлось заново создавать контекст. Пример результата использования такого подхода (да, тут был OutOfMemory): hitCount = 893, missCount = 4 (всего 4 привело к сильной деградации). Пруфы: раз, два. И есть попытки это исправить, однако мы поменяем подход в целом.

Имитируем клиентский консюмер и дождемся, когда он получит от нас ответ, преждем чем начнем проверки. Для этого будем использовать блокирующую очередь BlockingQueue, она может блокировать наш поток как на вставку, так и на чтение. Блокировка на вставку гарантирует нам обработку единственного сообщения, а блокировка на чтение гарантирует начало проверок после обработки запроса:

....
    // имитация клиентского консюмера
    private static KafkaMessageListenerContainer<String, ScoreResult> clientConsumer;
    // блокирующая очередь с размером 1, чтобы гарантировать последовательную обработку сообщений во всех тестах
    private static final BlockingQueue<ConsumerRecord<String, ScoreResult>> CLIENT_CONSUMER_RECORDS =
            new LinkedBlockingQueue<>(1);
....

    @BeforeAll
    static void setUp() {
        ....

        // создаем имитацию клиентского консюмера
        final var clientConsumerProps = new HashMap<String, Object>();
        clientConsumerProps.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
        clientConsumerProps.put(GROUP_ID_CONFIG, "consumer");
        clientConsumerProps.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        clientConsumerProps.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
        clientConsumerProps.put(ENABLE_AUTO_COMMIT_CONFIG, "true");

        final var deserializer = new JsonDeserializer<ScoreResult>();
        deserializer.addTrustedPackages("*");
        final var clientConsumerFactory = new DefaultKafkaConsumerFactory<>(
                clientConsumerProps,
                new StringDeserializer(),
                deserializer
        );
        final var clientConsumerContainerProperties = new ContainerProperties("response-topic");

        clientConsumer = new KafkaMessageListenerContainer<>(clientConsumerFactory, clientConsumerContainerProperties);
        // задаем поведение клиентского консюмера
        clientConsumer.setupMessageListener(
                (MessageListener<String, ScoreResult>) data -> {
                    try {
                        // поток, обслуживающий клиентский консюмер будет заблокирован, пока в блокирующей очереди
                        // есть хотя бы один необработанный ивент
                        CLIENT_CONSUMER_RECORDS.put(data);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
        );
        clientConsumer.start();
    }

    @AfterAll
    static void stop() {
        clientConsumer.stop();
    }

    @Test
    void example() throws InterruptedException {
        // имитация клиентского запроса
        final var studentId = "123";
        clientProducer.send("request-topic", "key", studentId);

        // блочим тестовый поток до тех пор, пока клиентский консюмер не получит сообщение
        final var response = CLIENT_CONSUMER_RECORDS.poll(15, TimeUnit.SECONDS);
        final var details = scoreDetailsPort.findAllByStudentId(studentId);

        assertThat(response.value())
                .as("проверка ответа клиенту")
                .satisfies(value -> {
                    assertThat(value.studentId()).isEqualTo(studentId);
                    assertThat(value.avgScore()).isNotNull();
                });
        assertThat(details).as("проверка артефактов БД").hasSize(2);
    }
Важное замечание

Мы используем один и тот же экземпляр KafkaMessageListenerContainer, это значит, что параллельность на уровне тестовых методов использовать нельзя - нет никакой гарантии, что блок проверок будет использовать "нужный" ответ, который обработал клиент. При этом, если вам не нужны проверки ответа, то параллельное выполнение вполне допустимо. Для нашего проекта файл junit-platform.properties будет выглядеть так:

junit.jupiter.execution.parallel.enabled=true
junit.jupiter.execution.parallel.config.strategy=dynamic
junit.jupiter.execution.parallel.mode.classes.default=concurrent
junit.jupiter.execution.parallel.mode.default=same_thread

Отлично! Наши тесты работают, мы получили необходимые гарантии, анализируем то, что действительно получит от нас клиент. Однако..... это джокушка ловушкера...

У консюмера есть два очень важных свойства:

  • max.poll.interval.ms - как часто консюмер будет ходить за новой пачкой сообщений

  • auto.commit.interval.ms - как часто будет коммитить полученные сообщения

И проблема в том, что при неудачном стечении обстоятельств или при неудачно подобранных параметрах для этих свойств можем начать наблюдать периодическое падение наших тестов из за того, что пытаемся обработать уже обработанное сообщение (получили flaky тесты):

Пример такого поведения (поменяем настройки, чтобы гарантировать падение):

....
        clientConsumerProps.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // осознанно неудачное значение
        clientConsumerProps.put(MAX_POLL_INTERVAL_MS_CONFIG, "100"); // осознанно неудачное значение
....

@Test
    void example1() throws InterruptedException {
        // имитация клиентского запроса
        final var studentId = "123";

        ....
    }

    @Test
    void example2() throws InterruptedException {
        // имитация клиентского запроса
        final var studentId = "1234";

        ....
    }

    @Test
    void example3() throws InterruptedException {
        // имитация клиентского запроса
        final var studentId = "1236";

        ....
    }

Почему так происходит? Ведь у нас блокирующая очередь, все попадает и исчезает из очередь строго последовательно от теста к тесту...

Дело в том, что наше приложение отвечает от 1 до 10 секунд (рандомно), это значит, что следующую пачку сообщений консюмер запросит тоже через 1-10 секунд, а наш max.poll.interval.ms гораздо меньше - 100 мс (по умолчанию 5 секунд, т.к. используем Spring ContainerProperties), при этом закоммитит сообщения не раньше, чем через 2 секунды (по умолчанию 5). Если консюмер опрашивает брокер реже, чем в max.poll.interval.ms, он посчитается мертвым и будет выведен из консюмер-группы. Это значит, что консюмер получит сообщение, сохранит его в блокирующую очередь, однако будет выведен из группы до того, как подтвердит успешную обработку. Суммируя всю эту информацию, можно сделать вывод, что наше сообщение не будет закоммичено никогда и будет прочитано в следующем тесте. Самое страшное - при определенных обстоятельствах такие тесты будут падать редко и рандомно, что усложнит отладку. Давайте это исправим.

Заберем у нашего консюмера функцию по коммиту сообщений а также ограничим количество прочитанных за раз:

....
        clientConsumerProps.put(MAX_POLL_RECORDS_CONFIG, "1"); // ограничиваем размер пачки сообщений
        clientConsumerProps.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); // отключаем автокоммит
....
        clientConsumerContainerProperties.setAckMode(MANUAL); // устанавливаем политику подтверждения MANUAL
....
        // задаем поведение клиентского консюмера
        clientConsumer.setupMessageListener(
                // используем AcknowledgingMessageListener
                (AcknowledgingMessageListener<String, ScoreResult>) (data, acknowledgment) -> {
                    try {
                        // поток, обслуживающий клиентский консюмер будет заблокирован, пока в блокирующей очереди
                        // есть хотя бы один необработанный ивент
                        CLIENT_CONSUMER_RECORDS.put(data);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    // коммитим offset сразу же после того, как смогли положить сообщение в блокирующую очередь
                    requireNonNull(acknowledgment).acknowledge();
                }
        );
        clientConsumer.start();
....

Для этого устанавливаем AckMode = MANUAL, MAX_POLL_RECORDS_CONFIG = 1 и ENABLE_AUTO_COMMIT_CONFIG = false. Теперь даже при специально неудачно выбранных значениях наши тесты будут гарантированно зеленые:

....
clientConsumerProps.put(MAX_POLL_INTERVAL_MS_CONFIG, "100");
....

Важное замечание - хоть мы и защитились от падений из за выхода консюмера из группы, я все равно рекомендую выставить такое значение MAX_POLL_INTERVAL_MS_CONFIG, при котором этого не будет происходить.

Почти итог

Мы решили задачу, написали настоящие работающие тесты, которые тестируют наш асинхронный сервис как "черную коробочку", однако на ум приходит вопрос - "неужели мы первые кто столкнулся с такой необходимостью?". Ответ - нет. Есть библиотека awaitility, она частично решает нашу задачу, давайте попробуем ее использовать.

    @Test
    void example() {
        // имитация клиентского запроса
        final var studentId = "123";
        clientProducer.send("request-topic", "key", studentId);

        await()
                .pollInterval(Duration.ofSeconds(3))
                .atMost(15, SECONDS)
                .untilAsserted(() -> {
                    final var response = Optional.ofNullable(CLIENT_CONSUMER_RECORDS.poll());
                    final var details = scoreDetailsPort.findAllByStudentId(studentId);

                    assertThat(response).as("проверка наличия ответа").isPresent();
                    assertThat(details).as("проверка артефактов БД").hasSize(2);
                    assertThat(response.get().value())
                            .as("проверка ответа клиенту")
                            .satisfies(value -> {
                                assertThat(value.studentId()).isEqualTo(studentId);
                                assertThat(value.avgScore()).isNotNull();
                            });
                });
    }

Тест выглядит более лаконичным, теперь нет необходимости вызывать блокирующую реализацию метода poll() у нашей очереди. И в целом, можно отметить, что используется этот инструмент довольно часто, он довольно гибкий и даже в наших тестах его можно было использовать совершенно иначе.

Итог

Мы разобрались, как написать non-flaky тесты для нашего приложения, которое общается с внешним миром исключительно через Kafka. Каждый этап эволюции нашей мысли до некоторой степени применим на реальных проектах, все зависит от потребностей и готовности к компромиссам (как и всегда). Надеюсь, мои наработки кому-нибудь принесут пользу - буду считать, что моя миссия выполнена. С полным кодом проекта и тестов можно ознакомиться на GitHub. До встречи, Хабр!

Теги:
Хабы:
+2
Комментарии0

Публикации

Истории

Работа

Java разработчик
347 вакансий

Ближайшие события

Summer Merge
Дата28 – 30 июня
Время11:00
Место
Ульяновская область