Привет, Хабр! С вами Александр Бобряков, техлид в команде МТС Аналитики. Я к вам с новой статьёй из цикла про фреймворк Apache Flink.
В предыдущей части я рассказал, как создать Unit-тест на полноценную джобу Flink и отдельные stateful-операторы с использованием Flink MiniCluster. Ещё мы научились запускать мини-кластер один раз перед всеми тестовыми классами, которые нуждаются в нём. В дополнение создали вспомогательные абстракции и аннотации, значительно разделяя ответственность в тестах и упрощая логику написания новых тестов.
В предыдущих тестах на джобу мы не затрагивали интеграцию с Kafka, ведь нам были не важны реальные source и sink. В этой статье продолжим разбираться в тестировании и напишем полноценный E2E-тест, который охватит Kafka и Flink вместе с использованием Testcontainers. Также рассмотрим неочевидные проблемы в тестировании и новые универсальные абстракции.

Список моих статей про Flink:
Введение в Apache Flink: осваиваем фреймворк на реальных примерах
Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?
Как провести unit-тестирование Flink-операторов: Test Harness
Unit и E2E-тестирование оператора с таймерами в Apache Flink
Apache Flink: тестирование собственного сериализатора состояния
Apache Flink: использование и автоматическая проверка собственного сериализатора состояния
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке — итоговый проект по всей серии статей. Эта статья соответствует релизной ветке под названием release/6_e2e_deduplicator_test.
Оглавление статьи:
E2E-тестирование
E2E-тестирование охватывает поведение всей системы от начала до конца. По специфике нашей джобы, которую мы рассматривали в предыдущих частях, в начале есть Kafka-топик с данными ClickMessage, а на выходе — много разных product-топиков. Значит, в тесте всё это должно учитываться.
Цель такого теста — проверить все используемые интеграции между собой. Иначе блоки системы могут работать по отдельности, а вместе всё сломается. В нашем случае должен подниматься весь Spring-контекст, стартовать Flink и Kafka, как будто мы запускаем приложение на проде.
Поднимаем Kafka с помощью Testcontainers
Testcontainers — это библиотека Java, которая поддерживает тесты JUnit. Она даёт возможность запускать в них всё, что может запускаться в Docker. Значит, вы можете проверить любую интеграцию вашего приложения: с БД, брокерами сообщений, другими сервисами и так далее.
Сценарий написания теста в итоге выглядит так:
Определить в тесте Testcontainers контейнер — например, для Kafka.
Запустить Kafka-контейнер.
Пробросить свойства для подключения к Kafka-контейнеру в конфиг приложения.
Запустить тест, в котором приложение подключается согласно конфигу к Kafka-контейнеру.
В предыдущей статье мы затронули создание кастомных аннотаций под тесты. Это было достаточно удобно, поэтому предлагаю придерживаться аналогичного подхода и в этот раз. Для начала нам нужно подключение к Kafka. TestContainers предоставляет Kafka-контейнер «из коробки», который не нуждается в отдельной инициализации Zookeeper. Ещё TestContainers можно использовать, чтобы создать все необходимые контейнеры на основе любых докер-образов.
Для использования готового Kafka-контейнера можем воспользоваться зависимостью:
testImplementation "org.testcontainers:kafka"
Аналогично тому, как мы создали JUnit Extension для старта Flink MiniCluster, создадим новый Extension для старта Kafka-контейнера:
@Slf4j @SuppressWarnings({"PMD.AvoidUsingVolatile"}) public class KafkaContainerExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource { private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.2")) .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); private static final Lock LOCK = new ReentrantLock(); private static volatile boolean started; @Override public void beforeAll(ExtensionContext context) { LOCK.lock(); try { if (!started) { log.info("Start Kafka Container"); started = true; Startables.deepStart(KAFKA).join(); System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers()); System.setProperty("kafka.bootstrap-servers", KAFKA.getBootstrapServers()); System.setProperty("spring.kafka.consumer.group-id", "group-id-spring"); context.getRoot().getStore(GLOBAL).put("Kafka Container", this); } } finally { LOCK.unlock(); } } @Override public void close() { log.info("Close Kafka Container"); KAFKA.close(); started = false; } }
Код очень похож на наш существующий FlinkClusterExtension, который я описал в прошлой статье: мы инициализируем Kafka-контейнер по указанному докер-образу, потом в beforeAll() синхронно запускаем его через вызов Startables.deepStart(KAFKA).join(), обвязывая блокировками. В конце выполнения всех зависимых тестов закрываем контейнер в колбэк-методе жизненного цикла JUnit тестов close().
Возникает вопрос: как наше Spring-приложение при старте будет подключаться к Kafka? Ведь контейнер запускается на случайном свободном порту. Для этого мы ��ередаём настройки в переменные окружения через System.setProperty() в статическом контексте непосредственно перед стартом приложения. Настройки передаём согласно структуре application.yml, потому что они будут «перезатираться» из указанных переменных окружения — документация:
kafka: group-id: group_id bootstrap-servers: localhost:29092
Для Spring передаём свойства, чтобы автоматически создались основные Spring-бины для интеграции с Kafka. Например, KafkaTemplate — абстракция над Kafka Producer, которая умеет отправлять сообщения в топик. Он понадобится нам в рамках тестов, поэтому добавим зависимость в тестах:
testImplementation "org.springframework.kafka:spring-kafka"
Этого можно было бы добиться и альтернативным способом: через Spring-инициализаторы. Например, можно было бы создать свою реализацию интерфейса ApplicationContextInitializer. Но мы не будем разбирать этот способ в статье, потому что реализация через Extension выглядит красивее.
В итоге, чтобы применять текущий Extension в тестах, нам была бы полезна своя аннотация по аналогии с аннотацией @WithFlinkCluster:
@Retention(RUNTIME) @ExtendWith({KafkaContainerExtension.class}) @Inherited public @interface WithKafkaContainer { }
Эту аннотацию можно вешать на любой тестовый класс, которому нужна интеграция с Kafka.
Абстракции тестирования для Kafka
Во всех тестах было бы удобно пользоваться своими абстракциями, фасадами или dto для взаимодействия с Kafka.
KafkaTestConsumer
Во-первых, нужно определить тестовый Consumer, который будем создавать в каждом тесте отдельно, чтобы он подключался к топикам в рамках новой консьюмерной группы. Ещё он должен использовать де-/сериализацию Jackson, ведь мы определили для сообщений формат JSON. Важно помнить: Kafka-контейнер поднимается в единственном экземпляре. Поэтому, если написать тесты неправильно, они могут косвенно влиять друг на друга.
Класс для создания тестового Kafka consumer можно представить в таком виде:
public class KafkaTestConsumer implements AutoCloseable { private final Consumer<String, String> consumer; private final List<KafkaMessage> receivedMessages = new CopyOnWriteArrayList<>(); private final ObjectMapper objectMapper = createObjectMapper(); public KafkaTestConsumer(String bootstrapServers, Set<String> topics) { this.consumer = new KafkaConsumer<>( Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ) ); consumer.subscribe(topics); } public <T> List<T> receiveAndGetAll(String topic, Class<T> clazz) { return receiveAndGetAll() .stream() .filter(kafkaMessage -> topic.equals(kafkaMessage.getTopic())) .map(kafkaMessage -> readValue(kafkaMessage, clazz)) .collect(toList()); } private List<KafkaMessage> receiveAndGetAll() { final var records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { receivedMessages.add(new KafkaMessage(record.key(), record.topic(), record.value())); } consumer.commitSync(); return receivedMessages; } @SneakyThrows private <T> T readValue(KafkaMessage kafkaMessage, Class<T> clazz) { return objectMapper.readValue(kafkaMessage.getValue(), clazz); } @Override public void close() { receivedMessages.clear(); consumer.close(); } }
Код достаточно прост. В конструкторе определяем основные параметры подключения к Kafka, создаётся базовый consumer, который подписывается на переданные топики. Потом предоставляем основной метод receiveAndGetAll(), который отдаёт все десериализованные сообщения в конкретный тип в разрезе какого-то топика. Внутри этого метода происходит вызов базового consumer.poll() для обращения к Kafka-контейнеру. Внутри используем свою dto KafkaMessage, чтобы не работать с ConsumerRecord напрямую, ведь большинство информации в нём нам не требуется:
@Value public class KafkaMessage { String key; String topic; String value; }
TestKafkaFacade
Нам понадобится абстракция-фасад, которая позволит отправлять сообщения в Kafka, создавать новые топики в каждом тесте, а ещё создавать KafkaTestConsumer. Это будет происходить в контексте Spring-приложения, поэтому удобно создать общий Spring-компонент TestKafkaFacade:
@TestComponent @SuppressWarnings("PMD.TestClassWithoutTestCases") public class TestKafkaFacade { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private KafkaAdmin kafkaAdmin; private final ObjectMapper objectMapper = createObjectMapper(); public void createTopicsIfNeeded(String... names) { final var topics = kafkaAdmin.describeTopics(); if (!topics.keySet().containsAll(Stream.of(names).collect(toSet()))) { kafkaAdmin.createOrModifyTopics( Stream.of(names) .map(n -> new NewTopic(n, 1, (short) 1)) .toArray(NewTopic[]::new) ); } } @SneakyThrows public void sendMessage(String topic, Object message) { kafkaTemplate .send(topic, objectMapper.writeValueAsString(message)) .get(5, TimeUnit.SECONDS); } public KafkaTestConsumer createKafkaConsumer(Set<String> topics) { return new KafkaTestConsumer(System.getProperty("kafka.bootstrap-servers"), topics); } }
Во-первых, этот фасад инжектит Spring-реализации для KafkaTemplate (абстракция над базовым Kafka Producer), а также KafkaAdmin (абстракция над базовым AdminClient). Эти бины будут существовать в контексте благодаря установленному свойству spring.kafka.bootstrap-servers в нашем KafkaContainerExtension.
У нас есть методы для создания новых топиков, отправки в топик сообщения с учётом сериализации и создания нового KafkaTestConsumer. Bootstrap-servers получаем прямо из env, в который мы записываем bootstrap-servers в рамках KafkaContainerExtension.
KafkaTopicCreatorConfig
Прежде чем переходить к тесту, удобно воспользоваться ещё одним тестовым компонентом для создания всех топиков, указанных в application.yml:
@TestConfiguration public class KafkaTopicCreatorConfig { @Autowired private KafkaProperties kafkaProperties; @Bean public KafkaAdmin.NewTopics newTopics() { return new KafkaAdmin.NewTopics( new NewTopic(kafkaProperties.getTopics().getClickTopic(), 1, (short) 1) ); } }
Этот компонент создаёт бин KafkaAdmin.NewTopics, а Spring автоматически создаёт все указанные в нём топики — то есть топики из KafkaProperties (application.yml). В нашем случае сейчас это только один входной топик click-topic. Выходные мы будем создавать в каждом тесте отдельно, потому что они могут определяться динамически в нашем бизнес-кейсе.
E2E-тест на Flink Job
Теперь всё готово для написания первого E2E-теста на нашу джобу. Для этого создадим общую для E2E-тестов аннотацию:
@Retention(RUNTIME) @SpringBootTest(webEnvironment = NONE) @ActiveProfiles({"test"}) @Import({ KafkaTopicCreatorConfig.class, TestKafkaFacade.class }) @WithFlinkCluster @WithKafkaContainer public @interface E2ETest { }
Она совмещает в себе все интеграции проекта @WithFlinkCluster и @WithKafkaContainer. А ещё поднимает весь Spring-контекст, захватывая тестовые абстракции, которые мы описали выше: KafkaTopicCreatorConfig и TestKafkaFacade.
Напомню, что в нашем приложении точка входа — это AppListener:
@Component @RequiredArgsConstructor @ConditionalOnProperty("flink.submit-jobs-on-app-start") public class AppListener { private final JobStarter jobStarter; @EventListener(ApplicationStartedEvent.class) @SneakyThrows public void onApplicationStart() { jobStarter.startJobs(); } }
Он запускается по условию flink.submit-jobs-on-app-start = true. Но в тестах мы выставим его в false (в application-test.yml), чтобы создавать необходимые предусловия перед непосредственным запуском теста.
Напомню, что наша джоба занимается фильтрацией входного потока ClickMessage, пропуская события только с типом платформы WEB и APP. Дальше события APP проходят дедупликацию. Потом события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.
Пайплайн выглядит так:

В итоге E2E-тест на обработку ClickMessage-сообщения может выглядеть таким образом:
@E2ETest @SuppressWarnings("PMD.DataflowAnomalyAnalysis") public class JobE2ETest { @Autowired private JobStarter jobStarter; @Autowired private TestKafkaFacade kafka; @Autowired private KafkaProperties kafkaProperties; @Test @SneakyThrows void shouldProcessClickMessageSourceToProductSink() { final var productTopic = "product_topic_1"; kafka.createTopicsIfNeeded(productTopic); final var clickMessage = aClickMessage().withProductTopic(productTopic).withPlatform(APP).build(); kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage); kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage); final var jobClient = jobStarter.startJobs(); @Cleanup final var kafkaConsumer = kafka.createKafkaConsumer(Set.of(productTopic)); await().atMost(ofSeconds(5)) .until(() -> kafkaConsumer.receiveAndGetAll(productTopic, ProductMessage.class), productMessages -> productMessages.size() == 1 && productMessages.get(0).getUserId().equals(clickMessage.getUserId()) ); jobClient.cancel().get(5, TimeUnit.SECONDS); } }
Мы повесили единств��нную аннотацию @E2ETest, благодаря которой произойдёт запуск Flink-мини-кластера, Kafka-контейнера и всего Spring-контекста нашего приложения. Точку входа JobStarter будем запускать сами в обход AppListener уже после настройки теста, благодаря настройке в application-test.yml: flink.submit-jobs-on-app-start = false.
А вот что происходит в самом тесте:
Создаём новый productTopic. Мы ожидаем, что именно в него попадёт входное ClickMessage-сообщение после обработки.
Создаём само ClickMessage-сообщение. Обязательно передаём в него productTopic.
Отправляем сообщение во входной топик click-topic дважды (ожидаем дедупликацию).
На этом подготовка теста завершена. У нас есть:
все топики
сообщение во входном топике
поднятое Spring-приложение
Пора стартовать задачу! Наша реализация jobStarter.startJobs(); запускает все найденные в контексте Spring задачи (наследующиеся от нашей абстракции FlinkJob) асинхронно через environment.executeAsync(). Дальше мы можем дожидаться сообщения в выходном топике.
Важный момент — именно асинхронный запуск задания, который возвращает объект управления JobClient. Например, раньше в тестах мы использовали только метод execute(), который был синхронным. В случае асинхронного запуска через JobClient мы можем получать статус задачи, завершать её руками и так далее. Это важно, ведь наш Kafka-источник потенциально бесконечен.
Поэтому дальше мы создаём наш AutoCloseable Kafka Consumer и подписываем его на выходной productTopic. Потом периодически проверяем, появилось ли там сообщение ProductMessage с userId, идентичным входному ClickMessage, в течение пяти секунд. Если за пять секунд не дождались сообщения, тест завершится с ошибкой. Для программной реализации такой проверки в тесте используются библиотеки awaitility. После проверки завершаем асинхронную джобу синхронным вызовом: jobClient.cancel().
В этом тесте есть большая проблема: а что, если assert не выполнится или джоба во время запуска упадёт? Будут ли какие-то проблемы с самими тестами?
На самом деле проблемы будут. Например, метод jobClient.cancel() вообще не выполнится, а джоба ещё долго может висеть в мини-кластере. В это время мини-кластер может начать использоваться для следующей джобы следующего теста, а его ресурсов для этого не хватит. Плюс возможны разные сайд-эффекты между выполнением таких тестов. Как решить эту проблему? Об этом — дальше.
Безопасное завершение E2E-тестов
Чтобы решить проблему, которую я описал выше, перед неудачным завершением теста лучше всегда подчищать ресурсы. Для этого можно в лоб оборачивать всё в блок try и finally. Вот как это сделать:
try { final var jobClient = jobStarter.startJobs(); // ... await().atMost(ofSeconds(5)).until(...); } finally { jobClient.cancel().get(5, TimeUnit.SECONDS); }
Выглядит не очень красиво. А ещё такой блок придётся писать в каждом тесте, который использует асинхронный запуск задач. Напрашивается декоратор над абстракцией FlinkClient, который умеет завершать задания:
@RequiredArgsConstructor public class AutoCloseableJobClient implements JobClient, AutoCloseable { private final JobClient original; @Override public JobID getJobID() { return original.getJobID(); } @Override public CompletableFuture<JobStatus> getJobStatus() { return original.getJobStatus(); } @Override public CompletableFuture<Void> cancel() { return original.cancel(); } @Override public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory, SavepointFormatType formatType) { return original.stopWithSavepoint(advanceToEndOfEventTime, savepointDirectory, formatType); } @Override public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory, SavepointFormatType formatType) { return original.triggerSavepoint(savepointDirectory, formatType); } @Override public CompletableFuture<Map<String, Object>> getAccumulators() { return original.getAccumulators(); } @Override public CompletableFuture<JobExecutionResult> getJobExecutionResult() { return original.getJobExecutionResult(); } @Override public void close() throws Exception { original.cancel().get(5, TimeUnit.SECONDS); } }
Декоратор делегирует выполнение внутреннему оригинальному JobClient. Но дополнительно мы имплементируем интерфейс AutoCloseable, чтобы перенести логику завершения джобы в метод close(). Теперь осталось вернуть этот декоратор в нашем JobStarter:
@SneakyThrows public AutoCloseableJobClient startJobs() { if (jobs.isEmpty()) { log.info("No Jobs found for start"); return null; } for (FlinkJob job : jobs) { log.info("Register job '{}'", job.getClass().getSimpleName()); job.registerJob(environment); } return new AutoCloseableJobClient(environment.executeAsync()); }
Тесты при этом значительно упрощаются:
@Test @SneakyThrows void shouldProcessClickMessageSourceToProductSink() { final var productTopic = "product_topic_1"; kafka.createTopicsIfNeeded(productTopic); final var clickMessage = aClickMessage().withProductTopic(productTopic).withPlatform(APP).build(); kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage); kafka.sendMessage(kafkaProperties.getTopics().getClickTopic(), clickMessage); @Cleanup final var jobClient = jobStarter.startJobs(); @Cleanup final var kafkaConsumer = kafka.createKafkaConsumer(Set.of(productTopic)); await().atMost(ofSeconds(5)) .until(() -> kafkaConsumer.receiveAndGetAll(productTopic, ProductMessage.class), productMessages -> productMessages.size() == 1 && productMessages.get(0).getUserId().equals(clickMessage.getUserId()) ); }
Получается, мы добавили аннотацию @Cleanup для объекта JobClient и убрали лишнюю завершающую строку jobClient.cancel().get(5, TimeUnit.SECONDS).
RocksDB в E2E-тестах
Чтобы окончательно убедиться, что наши E2E-тесты покрывают необходимую функциональность, нужно воссоздать окружение, аналогичное production. В нём для больших состояний рекомендуется использовать RocksDB в качестве StateBackend, который применяется напрямую в дедупликаторе. RocksDB в качестве бэкенда состояний я описывал в первой статье этого цикла.
Итак, можно заставить E2E-тест использовать локально поднятый RocksDB. Сделать это достаточно просто, так как Flink предоставляет RocksDB «из коробки». В тестах достаточно указать его в качестве состояния:
@TestConfiguration public class FlinkProductionConfig { @Autowired public void changeFlinkEnvironment(StreamExecutionEnvironment environment) { final var backend = new EmbeddedRocksDBStateBackend(false); environment.setStateBackend(backend); } }
То есть наш компонент перехватывает настройку StreamExecutionEnvironment и указывает напрямую состояние EmbeddedRocksDBStateBackend. Чтобы его использовать, нужно добавить зависимость:
testImplementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"
Теперь осталось только использовать этот конфиг в E2E-тестах, добавив его в аннотацию @E2ETest. Вот что в итоге вы сможете наблюдать в логах:
INFO 35116 --- [ger-io-thread-1] o.a.flink.runtime.jobmaster.JobMaster : Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=FALSE, numberOfTransferThreads=-1, writeBatchSize=-1}
Важное замечание. В процессе использования RocksDb он создаёт свои временные директории с достаточно длинными путями. Например, для Mac:
INFO 35116 --- [essages (1/2)#0] .f.c.s.s.RocksDBKeyedStateBackendBuilder : Finished building RocksDB keyed state-backend at /var/folders/_y/gd8sxnq91z9glrxjlkrj98tnbsxn37/T/junit7312928894650170068/junit1795779582692257025/minicluster_aabeebf82ae121d1fa503365b6aa7eb7/tm_0/tmp/job_9a5c9344174adde1453cf361f9a0f43f_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_270162db-c22d-49ff-bb62-9ed6d9ef2ac5.
Поэтому при запуске кода на Windows вы можете получить неожиданную ошибку:
Caused by: java.io.IOException: The directory path length (275) is longer than the directory path length limit for Windows (247): C:\Users\User\AppData\Local\Temp\junit15245919089769733580\junit7040907885113472950\minicluster_7d9295c691c345c6209f2bb0db16b593\tm_0\tmp\job_7e441850886077eb74921aa5bb0e41f4_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_bcc33f69-17b2-4424-9620-25922af3be34\db at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.throwExceptionIfPathLengthExceededOnWindows(RocksDBOperationUtils.java:285) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:85) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] ... 18 common frames omitted Caused by: org.rocksdb.RocksDBException: Failed to create a directory: C:\Users\User\AppData\Local\Temp\junit15245919089769733580\junit7040907885113472950\minicluster_7d9295c691c345c6209f2bb0db16b593\tm_0\tmp\job_7e441850886077eb74921aa5bb0e41f4_op_StreamFlatMap_371a51a50a977e59af86fcb074c97b9f__1_2__uuid_bcc33f69-17b2-4424-9620-25922af3be34\db: The directory path length (275) is longer than the directory path length limit for Windows (247). at org.rocksdb.RocksDB.open(Native Method) ~[frocksdbjni-6.20.3-ververica-2.0.jar:na] at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[frocksdbjni-6.20.3-ververica-2.0.jar:na] at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:75) ~[flink-statebackend-rocksdb-1.17.0.jar:1.17.0] ... 22 common frames omitted
Есть статьи о том, как решить эту проблему. Но у некоторых разработчиков описанные решения не сработали. Поэтому при тестировании Flink-задач с использованием embedded RocksDB рекомендуется не использовать Windows.
Вывод
Мы разобрали, как написать E2E-тест на Flink и Spring-джобу с использованием Kafka и Testcontainers. Мы создали удобные абстракции и поговорили об основных проблемах, которые нужно учитывать при таком виде тестирования.
На этом серию статей про тестирование я закончу. Дальше перейду к разбору таймеров в Flink с более сложным хранением состояния. Мы посмотрим, как отложить какое-то действие и отправить событие по установленному таймеру с помощью состояния. Разберём, какие проблемы могут возникнуть при более сложном взаимодействии с состояниями. Конечно, продолжим покрывать код тестами — а значит, встретимся с новыми механизмами и практиками при тестировании.
