Привет, Хабр! С вами вновь Александр Бобряков, техлид в команде МТС Аналитики. И я с очередной статьёй из цикла про фреймворк Apache Flink.
В предыдущей части я рассказал, как тестировать stateless- и stateful-операторы Flink с использованием вспомогательных TestHarness-абстракций, предоставляемых Flink.
В этой статье напишем тесты на всю джобу с использованием мини-кластера Flink и при помощи JUnit Extension. Ещё мы начнём выделять удобные вспомогательные абстракции для тестов, которые понадобятся позже.

Список моих статей про Flink:
Введение в Apache Flink: осваиваем фреймворк на реальных примерах
Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?
Как провести unit-тестирование Flink-операторов: Test Harness
Unit и E2E-тестирование оператора с таймерами в Apache Flink
Apache Flink: тестирование собственного сериализатора состояния
Apache Flink: использование и автоматическая проверка собственного сериализатора состояния
Весь разбираемый исходный код можно найти в репозитории AlexanderBobryakov/flink-spring. В master-ветке представлен итоговый проект по всей серии статей. Эта статья соответствует релизной ветке с названием release/5_flinkcluster_job_deduplicator_test.
Оглавление статьи:
Flink MiniCluster
В документации по написанию тестов предлагается использовать абстракцию Flink мини-кластера MiniClusterWithClientResource для локального тестирования полноценных заданий. Это обусловлено тем, что мы не сможем полностью воспроизвести работу Flink на обычных Unit-тестах (даже с помощью TestHarness), учитывая параллельность и другие внутренние процессы. Но мини-кластер такую возможность даёт.
После старта мини-кластера универсальный метод для определения окружения StreamExecutionEnvironment.getExecutionEnvironment() автоматически подключится к мини-кластеру, и все наши задания будут выполняться на нём. Документация предлагает такой сценарий использования:
public class ExampleIntegrationTest { @ClassRule public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build()); @Test public void someТest() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // ... } }
Но в этом случае нужно создавать @ClassRule на каждый тестовый класс либо использовать наследование в тестах. Я предлагаю немного другой, более удобный способ.
Итак, какие у нас условия задачи?
Во-первых, хотелось бы не тянуть лишних зависимостей в тесты с Flink.
Во-вторых, у нас должна быть возможность включать мини-кластер в любом тесте максимально просто и, главное, единожды, чтобы кластер поднимался перед всеми тестовыми классами и убивался после всех тестов.
Какое решение? На ум приходит использование своей аннотации @WithFlinkCluster, которая будет предоставлять Flink мини-кластер для класса-теста, над которым она висит. Давайте посмотрим на саму реализацию:
@Retention(RUNTIME) @Inherited @ExtendWith({FlinkClusterExtension.class}) public @interface WithFlinkCluster { }
Ничего особенного в ней нет. Основная фишка внутри FlinkClusterExtension. Это JUnit Extension. Если кратко, то они нужны для изменения поведения тестов с помощью событий их жизненного цикла. Реализация моего FlinkClusterExtension выглядит вот так:
@Slf4j @SuppressWarnings({"PMD.AvoidUsingVolatile"}) public class FlinkClusterExtension implements BeforeAllCallback, ExtensionContext.Store.CloseableResource { private static final MiniClusterWithClientResource FLINK_CLUSTER; private static final Lock LOCK = new ReentrantLock(); private static volatile boolean started; static { final var configuration = new Configuration(); configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2); FLINK_CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(configuration) .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build()); } @Override public void beforeAll(ExtensionContext context) throws Exception { LOCK.lock(); try { if (!started) { log.info("Start Flink MiniCluster"); started = true; FLINK_CLUSTER.before(); context.getRoot().getStore(GLOBAL).put("Flink Cluster", this); } } finally { LOCK.unlock(); } } @Override public void close() { log.info("Close Flink MiniCluster"); FLINK_CLUSTER.after(); started = false; } }
Обратите внимание на имплементацию двух интерфейсов: BeforeAllCallback, ExtensionContext.Store.CloseableResource. Первый предоставляет метод beforeAll перед стартом всех тестов внутри каждого тестового класса, у которых висит аннотация @ExtendWith({FlinkClusterExtension.class}). А второй — коллбэк на закрытие ресурсов уже после отработки всех тестовых классов. В статическом блоке инициализируем Flink мини-кластер, передав ему различные настройки:
количество слотов
TaskManager
стандартный параллелизм — его лучше по умолчанию выставить >1, чтобы отловить неочевидные баги в ваших сценариях
В методе beforeAll выполняется непосредственный запуск кластера через его метод before(). В реализации FlinkClusterExtension присутствуют синхронизации в виде блокировки через объект ReentrantLock, чтобы избежать повторного запуска в случае параллельно запускаемых тестов. Метод close завершает работу мини-кластера единожды после выполнения всех тестов, которые используют текущий JUnit Extension.
В итоге каждый тест, где нужен будет кластер Flink, можно писать так:
@WithFlinkCluster class DeduplicatorUnitTest_byFlinkCluster { @Test void test() { final var env = StreamExecutionEnvironment.getExecutionEnvironment(); // ... } }
Абстракция для Sink в тестировании через Flink MiniCluster
Прежде чем переходить непосредственно к написанию теста, нужно подумать: а как мы будем проверять наши задания? Ожидаемый результат выполнения задания или оператора — наличие событий на его выходе.
Проблема в том, что каждый оператор имеет свою параллельность. Благодаря ей операторы сериализуются на каждый TaskManager в количестве своей параллельности (если в каждом TM существует единственный слот). Каждый из таких параллельных операторов может писать в свой параллельный экземпляр Sink-оператора. А нам было бы удобно собрать их воедино.
Об этом также упоминается в конце документации в блоке замечаний, где авторы предлагают создать свой CollectSink. Они используют статическую коллекцию, нам такой вариант не очень подходит — тесты могут выполняться параллельно и независимо, а создавать отдельный класс со статической коллекцией неудобно, ведь доступ к единственной статической коллекции будет из всех классов одновременно. Это может привести к тому, что тесты будут влиять друг на друга.
В качестве решения в исходниках Flink можно обнаружить синглтон-класс org.apache.flink.test.streaming.runtime.util.TestListWrapper, который предлагает более подходящий вариант:
private List<List<? extends Comparable>> lists;
Далее в каждом отдельном тесте при инициализации TestListWrapper создаётся внутренний List в объекте lists выше. ID этого листа возвращается пользователю, а дальше можно написать свой Writer, который будет писать именно в этот List, запрашивая его по полученному id у самого TestListWrapper. Звучит непонятно, поэтому предоставлю код идеи:
@SuppressWarnings("PMD.TestClassWithoutTestCases") public class TestListSink<T> implements Sink<T> { private static final long serialVersionUID = 1L; private final ListWriter writer = new ListWriter(); private final int resultListId; public TestListSink() { this.resultListId = TestListWrapper.getInstance().createList(); } @Override public SinkWriter<T> createWriter(InitContext context) { return writer; } public List<T> getHandledEvents() { return new ArrayList<>(resultList()); } @SuppressWarnings("unchecked") private List<T> resultList() { synchronized (TestListWrapper.getInstance()) { return (List<T>) TestListWrapper.getInstance().getList(resultListId); } } private class ListWriter implements SinkWriter<T>, Serializable { private static final long serialVersionUID = 1L; @Override public void write(T element, Context context) { resultList().add(element); } @Override public void flush(boolean endOfInput) { // no op } @Override public void close() { // no op } } }
При создании моего TestListSink (в каждом отдельном тесте) инициализируется новый List и запоминается его id: TestListWrapper.getInstance().createList(). Также у нас есть своя реализация SinkWriter, которая при получении события в методе write записывает его в один и тот же List по id листа. Так в случае большой параллельности выходных операторов мы получим единый List, в который будет писать каждый параллельный экземпляр выходного оператора. Также нам полезно определить вспомогательный метод getHandledEvents, который вернёт все записанные события всех параллельных экземпляров sink-оператора после выполнения теста.
Тестирование дедупликатора с помощью Flink MiniCluster
В прошлой статье мы написали тест на дедупликатор с помощью абстракций TestHarness. В качестве дополнительного примера можно переписать тот же тест с использованием мини-кластера. Для этого мы:
На новый тестовый класс повесим нашу новую аннотацию.
Немного перепишем сам тест, определив непосредственно минимальный пайплайн обработки с использованием дедупликатора.
Сделаем это в новом тестовом классе:
@WithFlinkCluster public class DeduplicatorUnitTest_byFlinkCluster { private final Time TTL = Time.milliseconds(100); @SneakyThrows @ParameterizedTest @MethodSource("provideUniqueEvents") void shouldDeduplicateMessagesByTtl(List<String> events) { final var sourceEvents = new ArrayList<String>(); sourceEvents.addAll(events); sourceEvents.addAll(events); final var env = StreamExecutionEnvironment.getExecutionEnvironment(); final var sink = new TestListSink<String>(); env.fromCollection(sourceEvents) .keyBy(value -> value) .flatMap(new Deduplicator<>(TTL)) .sinkTo(sink); env.execute(); final var outputEvents = sink.getHandledEvents(); assertEquals(events.size(), outputEvents.size(), format("Unexpected number of events after deduplication. Output events: %s", outputEvents)); assertEquals(events, new ArrayList<>(outputEvents), "Unexpected events after deduplication"); } private static Stream<Arguments> provideUniqueEvents() { return Stream.of(arguments(List.of("key_1", "key_2"))); } }
По своей сути тест очень похож на предыдущие: на вход получаем несколько String-событий, определяем источник данных fromCollection(), передав в него дважды входные данные. Потом определяем сам дедупликатор и выходной Sink с использованием нашего универсального TestListSink. После запуска пайплайна проверяем, что данных в результирующем Sink столько же, сколько было уникальных сообщений.
Важным моментом является использование MiniCluster. Это происходит под капотом во время вызова метода StreamExecutionEnvironment.getExecutionEnvironment(). Так как наш FlinkClusterExtension отрабатывает до запуска теста, то на момент непосредственного выполнения теста уже будет создан мини-кластер на локальной машине, а метод getExecutionEnvironment() его увидит и подтянет.
Тестирование всей Job с помощью Flink MiniCluster
Теперь можно переходить к первому тесту всего Flink-задания. Напомню, что наша джоба фильтрует входной поток ClickMessage, пропуская события только с типом платформы WEB и APP. Далее события APP проходят дедупликацию, а затем события APP и WEB записываются в выходной топик Kafka, определяющийся динамически из поля ClickMessage.productTopic.
Пайплайн выглядит так:

В тесте будут участвовать многие Spring-компоненты, поэтому выделим новую аннотацию для тестирования Flink Job:
@Retention(RUNTIME) @SpringBootTest( webEnvironment = NONE, classes = { PropertiesConfig.class, FlinkConfig.class, }) @ActiveProfiles({"test"}) @WithFlinkCluster public @interface FlinkJobTest { }
Так как мы не хотим поднимать абсолютно весь контекст по умолчанию, то задаём в аннотации @SpringBootTest лишь две конфигурации:
PropertyConfig, который добавляет в контекстное все наши проперти-классы, которые биндятся с application.yml.
FlinkConfig, в котором регистрируется и настраивается бин StreamExecutionEnvironment.
Дополнительно мы применим созданную аннотацию @FlinkJobTest для поднятия мини-кластера.
Мы тестируем саму логику джобы, а какие именно будут source и sink, неважно. Реальные имплементации (Kafka) мы подставим в Е2Е-тестах.
Первый тест будет проверять, что события APP дедуплицируются:
@FlinkJobTest @SuppressWarnings("PMD.DataflowAnomalyAnalysis") class ClickToProductJobUnitTest { @Autowired private StreamExecutionEnvironment environment; @Autowired private ClickToProductJobProperties properties; @ParameterizedTest @EnumSource(value = Platform.Enum.class, names = {"APP"}) @SneakyThrows void shouldDeduplicateClickMessages(Platform platform) { final var message = aClickMessage().withPlatform(platform).build(); final var sink = new TestListSink<WrappedSinkMessage<ProductMessage>>(); final var job = new ClickToProductJob( properties, env -> env.fromElements(message, message, message).uid("test_source"), () -> sink ); job.registerJob(environment); environment.execute(); final var out = sink.getHandledEvents(); assertEquals(1, out.size(), format("Unexpected message count in sink: %s", out)); } // ... }
В начале кода мы используем нашу аннотацию @FlinkJobTest. В самом тесте можем заинжектить основные бины:
StreamExecutionEnvironment, нацеленный на поднятый в FlinkClusterExtension мини-кластер.
ClickToProductJobProperties, в котором имеем все настроенные в application-test.yml настройки.
В самом тесте создаём ClickMessage — на основе платформы APP, переданной в аргументы параметризованного теста. Далее определяем описанный в предыдущих главах Sink TestListSink и вручную создаём тестируемую джобу ClickToProductJob. В аргументы джобы передаём проперти, источник данных и приёмник.
! Источник данных содержит три одинаковых экземпляра входного сообщения, которые джоба должна дедуплицировать.
После этого происходит регистрация джобы в environment мини-кластера и синхронный запуск. Так как наш источник данных fromCollection имеет три события, то по мере их обработки джоба завершится автоматически. Это достаточно важный момент, потому что в случае бесконечных источников данных (например, Kafka) джоба будет выполняться вечно, пока мы её не завершим. Для вечного выполнения понадобится асинхронный запуск, который рассмотрим в следующих статьях. На выходе проверяем, что Sink содержит лишь одно событие, а остальные дедуплицировались.
Так можно написать тест, который проверяет, что события WEB не дедуплицируются, но обрабатываются, а события с произвольным типом платформы не обрабатываются вообще. Пример теста можно посмотреть в репозитории проекта, указанном в начале статьи.
В итоге структура тестов выглядит следующим образом:

Вывод
Мы рассмотрели создание Unit-теста на полноценную джобу Flink и отдельные stateful-операторы с использованием мини-кластера. Также мы научились запускать мини-кластер один раз перед всеми тестовыми классами, нуждающимися в нём. В дополнение мы создали вспомогательные абстракции и аннотации, существенно разделяя ответственность в тестах и упрощая логику написания новых тестов.
За кадром осталась Kafka, ведь наша джоба читает и пишет данные в её топики. Как написать E2E-тест на полную интеграцию Kafka + Flink Job, я расскажу в следующей части.