Как стать автором
Обновить

Комментарии 10

Не знаете они пофиксили, когда делаешь Table to DataStream с помощью модели (class) если выбраны не все поля модели вылетал Exception ? и изза этого нужно было указывать фейковые данные по всем полям модели , даже если эти поля были не нужны.

С такой проблемой на практике не сталкивался.

Воспроизвел проблему в тестах на Flink 1.15 (для него у меня уже есть настроенные E2E-тесты с Hive 1.2.1 в проекте) и вот что получил:

  1. Java-класс, в который буду считываться данные:

@AllArgsConstructor
@NoArgsConstructor
@Data
public static class HiveRow {
    @DataTypeHint("INT")
    private Integer id;
    @DataTypeHint("STRING")
    private String name;
}
  1. Hive-таблица имеет поле useless, которого нет в Java-классе:

CREATE TABLE hiveTable (id INT, name STRING, useless STRING)
  1. Пайплайн

// ...
final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable);
tableEnv.toDataStream(inputTable, HiveRow.class).print();
env.execute();
//...

В этом случае падает такая ошибка:

org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_datastream_sink$1*' do not match.
Cause: Different number of columns.

Query schema: [id: INT, name: STRING, useless: STRING]
Sink schema:  [id: INT, name: STRING]

Если вы имели в виду именно ее и хотели бы из коробки иметь возможность пропускать неуказанные в Java-классе поля, то:

  1. С помощью автоматических Flink-средств проблему решить не удалось. Продебажил основные Flink-абстракции и везде, где можно было бы зацепиться кастомным кодом, висят Internal-аннотации. По сути регламент такой, что при чтении таблицы указанный класс должен полностью соотноситься со схемой данных, то есть класс типизирует api до непосредственного получения схемы из самой таблицы в рантайме.

  2. Написать подобный конвертер самому пока возможно через явно заданный далее map-оператор ( Row -> HiveRow ), но тогда вы будете вычитывать "лишние" поля, чего не хотелось бы

  3. Из других решений - использовать не передачу класса для преобразования таблицы в DataStream, а напрямую схему.

  4. Как плюс-минус нормальное решение можно было бы взять следующее (рабочий вариант):

final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
                           .select(col("id"), col("name"));
tableEnv.toDataStream(inputTable, HiveRow.class)
    .print();

То есть вы напрямую указываете колонки в селекте, соответствующие java-классу. Это работает. Далее все таки не хочется хардкодить тут имена колонок, а управлять только java-классом. Это натолкнуло на мысль, что можно создать метод, который будет анализировать java-класс и генерировать все аргументы для метода select(...) при построении джобы (а значит без ухудшения перфоманса в рантайме) например так:

// org.apache.flink.table.expressions.Expression
public static Expression[] extractColumns(Class<?> clazz) {
    final var result = new ArrayList<Expression>();
    for (Field field : clazz.getDeclaredFields()) {
        result.add(col(field.getName()));
    }
    return result.toArray(Expression[]::new);
}

В итоге пайплайн будет выглядеть следующим образом:

final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
                           .select(extractColumns(HiveRow.class));
tableEnv.toDataStream(inputTable, HiveRow.class)
    .print();

Это тоже работает, хотя дважды дублируем HiveRow.class. В целом дальше в Java-классе можно вешать и анализировать уже свои аннотации, чтобы в них хранить имена соответствующих полей в таблице, если не хотим их повторять в классе. Но это уже рефакторинг.

Возможно вашу проблему неправильно понял

Работаю с Flink какое-то время. Предположу, что будет не очень удобно иметь несколько разных пайплайнов в одной задаче:
- при обновления останавливаться будут все пайплайны сразу;
- больше рисков, что проблема с одним пайплайном вызовет сбой в работе всей задачи (память, исключения и т.д.);
- ну и если даже разные задачи в одном кластере, то уже проблемы с одной задачей могут затронуть весь кластер.

Чаще разворачивают 1 задачу на кластер. Вижу в этом следующие преимущества:
- можно более точно настроить ресурсы под конкретный тип нагрузки и особенности задачи (количество TM, разные настройки Flink, память, RocksDB и т.д);
-меньше рисков сбоя и легче обновлять.

Могу еще посоветовать идею упаковки вашего кода в образ на основе Flink (можно экстернализировать и настройки для кластера). И тогда каждый пайплайн будет отдельным образом. Берешь и разворачиваешь в отдельный namespace k8s, например.

вы предлагаете на каждую JOB создавать огромный целый проект JAVA? И каждый JOB-проект хранить в отдельном репозитории?

Это же сумма можно сойти переключатся (запускать IDE) под каждую JOB.

Можно в одном проекте код для нескольких разных задач (Flink jobs) хранить, просто собирать свой образ для каждой. Плюс если у них общие зависимости, это может даже один модуль быть. Если разные версии Flink - другой модуль или отдельный репозиторий. Тут уже кому как удобнее. Но понятно, что над сборкой контейнеров надо поработать.

Чтобы не было путаницы в моем ответе ниже, сразу оговорюсь, что буду понимать под Job именно флинковый запуск артефакта, а под пайплайном - конечный набор операторов для выполнения какой-то бизнес-задачи. То есть как вы правильно заметили - в одной Job сейчас могут быть несколько пайплайнов.

Мы выбрали следующую схему:

  1. Храним разные пайплайны в одном артефакте, если используем "общий API". Под общим API имею в виду набор библиотек одинаковых версий, разных абстракций и тд которые требуются каждому пайплайну.

    Например, пайплайны Kafka-to-Kafka будут требовать одни зависимости, а File-to-Hive другие (например, потому что у нас старый Hive с Flink 1.15). Тогда у нас имеются два разных артефакта, чтобы ненужные зависимости одного не были внутри другого.

  2. Под каждый пайплайн внутри артефакта делаем feature-toggle, который этот пайплайн запускает

Такой подход никак не противоречит проблемам, которые вы описываете. По сути то, что вы описали - выбор между Session/Application mode развертыванием:

  • На проде мы используем Application Mode - берем собранный артефакт и запускаем отдельную Job в которой включен только один соответствующий пайплайн. Для второго пайплайна берем этот же артефакт и запускаем следующую Job в своем кластере (Application Mode) с включенным feature-toggle второго пайплайна и тд

  • На тестовом стенде вполне можем запустить все пайплайны сразу в одном кластере с минимальными ресурсами (Session Mode) просто включив все feature-toggle для каждого пайплайна

Да, всё верно поняли. Спасибо за пояснения. Интересно ещё, каким образом разворачивает Flink кластер. k8s оператор?

Да, k8s-оператор, но каких-то тонкостей я рассказать сразу не смогу, так как этим занимается специальный devops, который все настраивал в соответствии с требованиями компании

Спасибо за статью.
Можно у вас поинтересоваться следующим:
Я правильно понимаю, что вы используете Spring Boot исключительно для объединения всех Flink Jobs в один jar-архив, чтобы потом за 1 раз запустить сразу все джобы на кластере или же есть еще какие-то причины использования Spring?

Из преимуществ использования Spring скорее только то, что легче и быстрее становится работать с кодом: автоконфигурации, стартеры, DI при построении пайплайнов, разные интеграции, которые упростят тестирование, более привычная работа с application.yml и тд. Везде понемногу, а в сумме новому разработчику уже легче будет ориентироваться и добавить что-то новое из примеров стандартных проектов.

Например, в тестах где-то заиспользовали аннотацию DynamicPropertySource, для легкости работы с TestContainers, далее в тестовых зависимостях добавили Spring-Kafka, чтобы одной аннотацией создать KafkaListener для прослушивания результирующего топика в E2E-тесте и тд.

Про формирование jar_ника мы используем плагин com.github.johnrengelman.shadow. По поводу запуска 1 раз джобы расписал более подробно нашу стратегию в комментарии выше - пока что такой подход удобен, но возможно будем его менять.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий