Search
Write a publication
Pull to refresh
16
0
Бобряков Александр @appp_master

User

Send message

Немного задержался с ответом... Именно с коннектором mongo у меня опыта не было, но общий принцип в том, что у sink должны быть параметры на размера батча и интервала для записи текущего батча (для mongo они есть - https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/mongodb/#configurations-1). В этом случае нагрузка будет высокая, но уже периодичная. Это конечно не решает вышу проблему.

Если говорить конкретно про операцию инкремента, то я бы вставил window-оператор, в котором суммировал все инкременты в окне например 10минут. Когда окно "закрывается", то в последующий sink-оператор передавать уже инкремент на аггрегированное значение, чтобы выполнился один запрос в бд. В этом случае мы добавляем искусственный батчинг из-за ожидания "закрытия" окна (пока оно не закроется - данные не запишутся), но не добавляем батчинг с точки зрения "задержки обработки сообщений".

Да, k8s-оператор, но каких-то тонкостей я рассказать сразу не смогу, так как этим занимается специальный devops, который все настраивал в соответствии с требованиями компании

Из преимуществ использования Spring скорее только то, что легче и быстрее становится работать с кодом: автоконфигурации, стартеры, DI при построении пайплайнов, разные интеграции, которые упростят тестирование, более привычная работа с application.yml и тд. Везде понемногу, а в сумме новому разработчику уже легче будет ориентироваться и добавить что-то новое из примеров стандартных проектов.

Например, в тестах где-то заиспользовали аннотацию DynamicPropertySource, для легкости работы с TestContainers, далее в тестовых зависимостях добавили Spring-Kafka, чтобы одной аннотацией создать KafkaListener для прослушивания результирующего топика в E2E-тесте и тд.

Про формирование jar_ника мы используем плагин com.github.johnrengelman.shadow. По поводу запуска 1 раз джобы расписал более подробно нашу стратегию в комментарии выше - пока что такой подход удобен, но возможно будем его менять.

Чтобы не было путаницы в моем ответе ниже, сразу оговорюсь, что буду понимать под Job именно флинковый запуск артефакта, а под пайплайном - конечный набор операторов для выполнения какой-то бизнес-задачи. То есть как вы правильно заметили - в одной Job сейчас могут быть несколько пайплайнов.

Мы выбрали следующую схему:

  1. Храним разные пайплайны в одном артефакте, если используем "общий API". Под общим API имею в виду набор библиотек одинаковых версий, разных абстракций и тд которые требуются каждому пайплайну.

    Например, пайплайны Kafka-to-Kafka будут требовать одни зависимости, а File-to-Hive другие (например, потому что у нас старый Hive с Flink 1.15). Тогда у нас имеются два разных артефакта, чтобы ненужные зависимости одного не были внутри другого.

  2. Под каждый пайплайн внутри артефакта делаем feature-toggle, который этот пайплайн запускает

Такой подход никак не противоречит проблемам, которые вы описываете. По сути то, что вы описали - выбор между Session/Application mode развертыванием:

  • На проде мы используем Application Mode - берем собранный артефакт и запускаем отдельную Job в которой включен только один соответствующий пайплайн. Для второго пайплайна берем этот же артефакт и запускаем следующую Job в своем кластере (Application Mode) с включенным feature-toggle второго пайплайна и тд

  • На тестовом стенде вполне можем запустить все пайплайны сразу в одном кластере с минимальными ресурсами (Session Mode) просто включив все feature-toggle для каждого пайплайна

С такой проблемой на практике не сталкивался.

Воспроизвел проблему в тестах на Flink 1.15 (для него у меня уже есть настроенные E2E-тесты с Hive 1.2.1 в проекте) и вот что получил:

  1. Java-класс, в который буду считываться данные:

@AllArgsConstructor
@NoArgsConstructor
@Data
public static class HiveRow {
    @DataTypeHint("INT")
    private Integer id;
    @DataTypeHint("STRING")
    private String name;
}
  1. Hive-таблица имеет поле useless, которого нет в Java-классе:

CREATE TABLE hiveTable (id INT, name STRING, useless STRING)
  1. Пайплайн

// ...
final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable);
tableEnv.toDataStream(inputTable, HiveRow.class).print();
env.execute();
//...

В этом случае падает такая ошибка:

org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_datastream_sink$1*' do not match.
Cause: Different number of columns.

Query schema: [id: INT, name: STRING, useless: STRING]
Sink schema:  [id: INT, name: STRING]

Если вы имели в виду именно ее и хотели бы из коробки иметь возможность пропускать неуказанные в Java-классе поля, то:

  1. С помощью автоматических Flink-средств проблему решить не удалось. Продебажил основные Flink-абстракции и везде, где можно было бы зацепиться кастомным кодом, висят Internal-аннотации. По сути регламент такой, что при чтении таблицы указанный класс должен полностью соотноситься со схемой данных, то есть класс типизирует api до непосредственного получения схемы из самой таблицы в рантайме.

  2. Написать подобный конвертер самому пока возможно через явно заданный далее map-оператор ( Row -> HiveRow ), но тогда вы будете вычитывать "лишние" поля, чего не хотелось бы

  3. Из других решений - использовать не передачу класса для преобразования таблицы в DataStream, а напрямую схему.

  4. Как плюс-минус нормальное решение можно было бы взять следующее (рабочий вариант):

final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
                           .select(col("id"), col("name"));
tableEnv.toDataStream(inputTable, HiveRow.class)
    .print();

То есть вы напрямую указываете колонки в селекте, соответствующие java-классу. Это работает. Далее все таки не хочется хардкодить тут имена колонок, а управлять только java-классом. Это натолкнуло на мысль, что можно создать метод, который будет анализировать java-класс и генерировать все аргументы для метода select(...) при построении джобы (а значит без ухудшения перфоманса в рантайме) например так:

// org.apache.flink.table.expressions.Expression
public static Expression[] extractColumns(Class<?> clazz) {
    final var result = new ArrayList<Expression>();
    for (Field field : clazz.getDeclaredFields()) {
        result.add(col(field.getName()));
    }
    return result.toArray(Expression[]::new);
}

В итоге пайплайн будет выглядеть следующим образом:

final var inputTable = tableEnv.from("hive_catalog_name." + DEFAULT_DB + "." + hiveTable)
                           .select(extractColumns(HiveRow.class));
tableEnv.toDataStream(inputTable, HiveRow.class)
    .print();

Это тоже работает, хотя дважды дублируем HiveRow.class. В целом дальше в Java-классе можно вешать и анализировать уже свои аннотации, чтобы в них хранить имена соответствующих полей в таблице, если не хотим их повторять в классе. Но это уже рефакторинг.

Возможно вашу проблему неправильно понял

К сожалению бенчмарки наша команда не делала, но мы полагались на опыт соседних команд, которые проводили сравнение под базовые задачи дедупликации на трафике в ~5-10млн rps. Flink показывал при этом лучшие результаты на одних и тех же ресурсах по latency. При этом нужно учитывать, что в наших задачах важен именно latency. Если сообщение "отдадим" слишком поздно, то оно уже никому не будет нужно.

несколько секунд разницы между Spark и Flink не стоят того, чтобы добавлять новый стек

поддерживаю, если у вас нет подобных требований. Spark может на подобных задачах выдавать latency более секунды, а Flink отработает шустрее благодаря внутренней архитектуре, ориентированной на true-стримминг.

В сравнении с Flink я бы добавил необходимость для Spark очень тонкой настройки, для которой нужны опытные люди. В Flink эта настройка минимальна, а следовательно и порог вхождения ниже. С другой стороны в Spark появился экспериментальный режим Continuous Processing, который обещает минимальную задержку под at-least-once, но у нас нет опыта его использования (если вы использовали - интересно узнать реальные цифры/проблемы/кейсы).

В дополнение могу порекомендовать доклад на SmartData 2023 "Spark Streaming: брать или не брать?", который как раз затрагивает задачи использования Spark, а когда все же лучше глянуть на другие инструменты.

Да, именно так. Для stateful-заданий пользовательское указание идентификаторов для каждого оператора является скорее обязательным условием, так как рано или поздно столкнемся с эволюцией приложения.

Более того такое назначение идентификаторов является одним из пунктов чеклиста "Production Readiness Checklist".

В нашем проекте мы построили декораторы, заставляющие разработчиков указывать id при каждой трансформации потока. Если есть интерес, то могу рассказать отдельной статьей

Information

Rating
Does not participate
Location
Россия
Registered
Activity