Комментарии 10
Не знаете они пофиксили, когда делаешь Table to DataStream с помощью модели (class) если выбраны не все поля модели вылетал Exception ? и изза этого нужно было указывать фейковые данные по всем полям модели , даже если эти поля были не нужны.
С такой проблемой на практике не сталкивался.
Воспроизвел проблему в тестах на Flink 1.15 (для него у меня уже есть настроенные E2E-тесты с Hive 1.2.1 в проекте) и вот что получил:
Java-класс, в который буду считываться данные:
@AllArgsConstructor
@NoArgsConstructor
@Data
public static class HiveRow {
@DataTypeHint("INT")
private Integer id;
@DataTypeHint("STRING")
private String name;
}
Hive-таблица имеет поле useless, которого нет в Java-классе:
CREATE TABLE hiveTable (id INT, name STRING, useless STRING)
Пайплайн
// ...
final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable);
tableEnv.toDataStream(inputTable, HiveRow.class).print();
env.execute();
//...
В этом случае падает такая ошибка:
org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_datastream_sink$1*' do not match.
Cause: Different number of columns.
Query schema: [id: INT, name: STRING, useless: STRING]
Sink schema: [id: INT, name: STRING]
Если вы имели в виду именно ее и хотели бы из коробки иметь возможность пропускать неуказанные в Java-классе поля, то:
С помощью автоматических Flink-средств проблему решить не удалось. Продебажил основные Flink-абстракции и везде, где можно было бы зацепиться кастомным кодом, висят
Internal
-аннотации. По сути регламент такой, что при чтении таблицы указанный класс должен полностью соотноситься со схемой данных, то есть класс типизирует api до непосредственного получения схемы из самой таблицы в рантайме.Написать подобный конвертер самому пока возможно через явно заданный далее map-оператор (
Row
->HiveRow
), но тогда вы будете вычитывать "лишние" поля, чего не хотелось быИз других решений - использовать не передачу класса для преобразования таблицы в DataStream, а напрямую схему.
Как плюс-минус нормальное решение можно было бы взять следующее (рабочий вариант):
final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
.select(col("id"), col("name"));
tableEnv.toDataStream(inputTable, HiveRow.class)
.print();
То есть вы напрямую указываете колонки в селекте, соответствующие java-классу. Это работает. Далее все таки не хочется хардкодить тут имена колонок, а управлять только java-классом. Это натолкнуло на мысль, что можно создать метод, который будет анализировать java-класс и генерировать все аргументы для метода select(...)
при построении джобы (а значит без ухудшения перфоманса в рантайме) например так:
// org.apache.flink.table.expressions.Expression
public static Expression[] extractColumns(Class<?> clazz) {
final var result = new ArrayList<Expression>();
for (Field field : clazz.getDeclaredFields()) {
result.add(col(field.getName()));
}
return result.toArray(Expression[]::new);
}
В итоге пайплайн будет выглядеть следующим образом:
final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
.select(extractColumns(HiveRow.class));
tableEnv.toDataStream(inputTable, HiveRow.class)
.print();
Это тоже работает, хотя дважды дублируем HiveRow.class
. В целом дальше в Java-классе можно вешать и анализировать уже свои аннотации, чтобы в них хранить имена соответствующих полей в таблице, если не хотим их повторять в классе. Но это уже рефакторинг.
Возможно вашу проблему неправильно понял
Работаю с Flink какое-то время. Предположу, что будет не очень удобно иметь несколько разных пайплайнов в одной задаче:
- при обновления останавливаться будут все пайплайны сразу;
- больше рисков, что проблема с одним пайплайном вызовет сбой в работе всей задачи (память, исключения и т.д.);
- ну и если даже разные задачи в одном кластере, то уже проблемы с одной задачей могут затронуть весь кластер.
Чаще разворачивают 1 задачу на кластер. Вижу в этом следующие преимущества:
- можно более точно настроить ресурсы под конкретный тип нагрузки и особенности задачи (количество TM, разные настройки Flink, память, RocksDB и т.д);
-меньше рисков сбоя и легче обновлять.
Могу еще посоветовать идею упаковки вашего кода в образ на основе Flink (можно экстернализировать и настройки для кластера). И тогда каждый пайплайн будет отдельным образом. Берешь и разворачиваешь в отдельный namespace k8s, например.
вы предлагаете на каждую JOB создавать огромный целый проект JAVA? И каждый JOB-проект хранить в отдельном репозитории?
Это же сумма можно сойти переключатся (запускать IDE) под каждую JOB.
Можно в одном проекте код для нескольких разных задач (Flink jobs) хранить, просто собирать свой образ для каждой. Плюс если у них общие зависимости, это может даже один модуль быть. Если разные версии Flink - другой модуль или отдельный репозиторий. Тут уже кому как удобнее. Но понятно, что над сборкой контейнеров надо поработать.
Чтобы не было путаницы в моем ответе ниже, сразу оговорюсь, что буду понимать под Job именно флинковый запуск артефакта, а под пайплайном - конечный набор операторов для выполнения какой-то бизнес-задачи. То есть как вы правильно заметили - в одной Job сейчас могут быть несколько пайплайнов.
Мы выбрали следующую схему:
Храним разные пайплайны в одном артефакте, если используем "общий API". Под общим API имею в виду набор библиотек одинаковых версий, разных абстракций и тд которые требуются каждому пайплайну.
Например, пайплайны Kafka-to-Kafka будут требовать одни зависимости, а File-to-Hive другие (например, потому что у нас старый Hive с Flink 1.15). Тогда у нас имеются два разных артефакта, чтобы ненужные зависимости одного не были внутри другого.
Под каждый пайплайн внутри артефакта делаем feature-toggle, который этот пайплайн запускает
Такой подход никак не противоречит проблемам, которые вы описываете. По сути то, что вы описали - выбор между Session/Application mode развертыванием:
На проде мы используем Application Mode - берем собранный артефакт и запускаем отдельную Job в которой включен только один соответствующий пайплайн. Для второго пайплайна берем этот же артефакт и запускаем следующую Job в своем кластере (Application Mode) с включенным feature-toggle второго пайплайна и тд
На тестовом стенде вполне можем запустить все пайплайны сразу в одном кластере с минимальными ресурсами (Session Mode) просто включив все feature-toggle для каждого пайплайна
Спасибо за статью.
Можно у вас поинтересоваться следующим:
Я правильно понимаю, что вы используете Spring Boot исключительно для объединения всех Flink Jobs в один jar-архив, чтобы потом за 1 раз запустить сразу все джобы на кластере или же есть еще какие-то причины использования Spring?
Из преимуществ использования Spring скорее только то, что легче и быстрее становится работать с кодом: автоконфигурации, стартеры, DI при построении пайплайнов, разные интеграции, которые упростят тестирование, более привычная работа с application.yml и тд. Везде понемногу, а в сумме новому разработчику уже легче будет ориентироваться и добавить что-то новое из примеров стандартных проектов.
Например, в тестах где-то заиспользовали аннотацию DynamicPropertySource, для легкости работы с TestContainers, далее в тестовых зависимостях добавили Spring-Kafka, чтобы одной аннотацией создать KafkaListener для прослушивания результирующего топика в E2E-тесте и тд.
Про формирование jar_ника мы используем плагин com.github.johnrengelman.shadow
. По поводу запуска 1 раз джобы расписал более подробно нашу стратегию в комментарии выше - пока что такой подход удобен, но возможно будем его менять.
Как использовать Spring в качестве фреймворка для Flink-приложений