Привет! Меня зовут Александр Булатов, я старший инженер данных в Блоке Данных билайна. В этой серии статей я расскажу, как выглядит создание Source и Sink для Table API & SQL и как Table API взаимодействует с DataStream API.
Я работаю на проекте Radcom, в котором мы получаем данные о детализации звонков. И есть источник потоковых данных, которые мы получаем с Kafka. Таких потоков у нас внутри Radcom одиннадцать штук, и данные от них идут в формате csv. Формат не самый удобный для обработки, потому что он не имеет в себе схему — нам присылают просто голые строки csv, без какой-либо схемы, и нам нужно парсить эти строки относительно ее.
В одном подобном потоке вполне может находиться сто миллиардов записей в сутки, а это со всех потоков почти семь терабайт в день. У нас в билайне это считается одним из самых больших потоков, которому требуется очень много ресурсов, в год с учетом репликации мы занимаем почти семь петабайт данных.
Так вот, мы принимаем данные в csv и должны их сохранять в Hive в колоночных форматах, чтобы впоследствии аналитики и Data Scientists могли пользоваться этими данными. У нас принято использовать либо ORC, либо Parquet. Мы попробовали оба формата, пришли к Parquet.
Ко всему прочему, поток растет, его необходимо обогащать, потому что некоторые данные к нам еще не приходят, нам нужно их обогащать со своих таблиц. Также наши схемы разрастаются — в поток может приходить еще больше полей. Сейчас, например, в одной таблице 42 поля, а могут сказать, что добавляется еще четыре поля. Такие изменения происходят не раз в полгода-год, а могут произойти раз в месяц.
Вот и получается, что поток все время растет не только в понятии количества записей, но и в самих полях.
И нам нужно как-то позволять людям с ним взаимодействовать.
Начало пути
Сперва мы пытались разработать этот поток на голом NiFi — брали строки, обрабатывали NiFi-процессорами, отправляли в HDFS Parquet директориями и навешивали на них партиции. Но у NiFi в то время была какая-то проблема с сериализацией timestamps в Parquet, её мы решали долго, перешли в итоге на ORC, чтобы хоть как-то поставлять правильные данные, и перешли к тому, чтобы использовать NiFi+Hive Streaming.
Hive Streaming работал только с ORC, но и у него были свои проблемы: там транзакционная модель, и каждая транзакция является отдельным файлом. Hive Streaming, таким образом, генерирует на каждую транзакцию свой файл, в результате чего получается очень много мелких файлов. Это очень загибало наш Hadoop-кластер, поэтому мы решили уйти с NiFi. Ко всему прочему, такие объемы NiFi не выдерживал даже отдельными instance’ами.
Решили слезть на такую структуру — Flink+Spark.
Как всё работает?

Перейду сразу к конечной структуре и покажу, как она разрасталась.
Вот у нас есть источник, с которого мы должны принимать данные, наш Kafka Source. Данные с него мы обычным Flink принимаем в первозданном виде и в этом же виде сохраняем их в Landing, то есть в некоторый слой приземления. Мы преобразуем его в Avro, добавляем некоторых метаданных для того, чтобы при каких-нибудь сбоях успеть восстановить данные.
Дальше из этого Landing при помощи приложения Spark все попадает в Detail. Приложение Spark Detail запускается раз в час и парсит строчку csv в подобный вид, в структуру, затем делает очистку, фильтрацию, преобразует это все в Parquet и кладет это все в HDFS, навешивая сверху партицию в Hive-таблицу, и все это остается в Detail.
Однако поток у нас должен быть в real time, ведь данные льются постоянно, потому что мы обрабатываем детализацию звонков. Клиенты общаются не раз в час – они общаются постоянно, поэтому аналитикам в таком случае все равно приходят запоздалые данные, то есть у нас получается потоково-батчевый подход.
Кажется чем-то не совсем тем, что должно быть в оригинале.
Тут возникают еще проблемы, потому что у нас есть еще два приложения. Они возникли, потому что мы используем Spark 2.3, так исторически сложилось, что мы использовали его без возможности перейти на Spark 3. Spark 2.3 не умеет компактить файлы. Из-за Spark Detail у нас возникает очень много мелких файлов, там может быть 14 килобайт, поэтому нам приходилось после процедуры Spark Detail уплотнять все данные во всех партициях, чтобы они были под размер мока — по 128 Мб, по 256 Мб — чтобы было все стандартного размера. Для этого работало еще одно приложение Spark Compaction.
Далее появилось еще требование, что нужно обогащать данные. Для этого появилось приложение Spark Enrichment, которое берет данные из другой таблицы и обогащает их. Возникает такая проблема, что работает четыре приложения, три места, где мы все должны наблюдать, получается, что очень много всего надо мониторить.
Если какой-то из этих узлов сломается, придется долго выяснять, что и где произошло. Получается некоторый конвейерный подход, но все равно хотелось, чтобы все работало в одном месте, при это работало потоково. Поэтому даже если мы перейдем на Spark 3, где Spark Enrichment и Spark Compaction в одном приложении, то все равно полностью потоковой загрузки мы не добьемся.
Поэтому мы решили, что нужно перейти на другую архитектуру только на Flink.

Последний представляет из себя ПО для обработки потоковых данных. Мы будем сразу принимать с Kafka поток, обогащать его и отправлять в нужные потоки. Также к требованиям добавилось, что нужно распределять данные относительно регионов — то есть мы парсим, откуда пришел звонок, с какой пробы, и отправляем в нужный Kafka Topic. Их у нас несколько, например, тринадцать, нам нужно в эти тринадцать топиков отфильтровать и отправить помимо нашего HDFS.
Это у нас получается такое приложение.
Вот как выглядит у нас Flink Landing. Как вообще выглядит на Datastream API приложение Flink?

Здесь мы создаем окружение для нашего Flink, то есть мы цепляемся к нашему Flink кластеру и должны создать источник, то есть откуда мы должны брать приложение. Для начала еще мы должны создать POJO, с которым будет происходить работа, куда мы будем складывать все данные. Здесь мы берем value сообщения, ключ сообщения и ID.

ID у нас композитный, берется партиция, берется офсет и timestamp сообщения. Они все складываются, и получается ID.
Далее у нас есть POJO, для него мы должны создать десериализатор, как мы будем десериализовывать при принятии из Kafka. Здесь создаем DeserializationSchema, берем запись и десериализуем ее.

Создаем BinaryKafkaRecord, здесь мы просто берем бинарные значения без преобразования ключа и значения и создаем id. Так мы десериализуем наши сообщения в POJO.
Далее мы должны создать источник.

Мы создаем KafkaSource в этом месте. Для начала мы должны принять все настройки для нашей Kafka. Это название топика, куда мы должны подсоединяться, какой формат у value и другие вещи.
Создаем наш FlinkKafkaConsumer.

На этом этапе нам необходимо создать POJO, для этого POJO есть десериализатор и на основе всего этого есть источник.

Также у нас создается Sink, то есть приемник в HDFS, задаются все конфиги и создается сам Sink HDFS.
Затем создаем наш поток.

Создаем DataStream, добавляем источник, именуем этот источник, добавляем к нему Sink, добавляем к нему имя и исполняем его. Самое что ни на есть простое приложение для скачивания с Flink данных Kafka. Но тут надо создать POJO, создать десериализатор для него, все настроить — поэтому выглядит немного сложновато.
Вот так работает Flink DataStream приложение.
Как выглядит приложение детального слоя
Вот наше приложение для отдельного потока.

Здесь мы отдаем приложению некоторый трансформер — набор трансформаций датафрейма, здесь мы работаем только с ним и делаем несколько трансформаций.
Главной трансформацией у нас является преобразование строчек csv в структуры. Для этого мы преобразуем строчку в датасет с определенной структурой, убираем все ненужное и получаем структурированный датафрейм.
Далее уже, например, очищаем ip от пробелов, преобразуем строчки из строки в массив и другие. Чаще всего нам хватает того, чтобы просто преобразовать строчку csv в структуру.
Такое преобразование тоже является не настолько хорошим, насколько бы это хотелось видеть.
Все эти преобразования нужно уместить в Flink. При этом нам не нужно работать с POJO, потому что при добавлении колонок не нужно перекомпилировать все приложение. Мы просто отдаем новую Avro-схему в приложение, и оно начинает работать по-новому, с новой структурой, если для этого не требуется никаких дополнительных преобразований.
Вот такая более сложная структура, и нам нужно все это уместить в Flink-приложении. Чтобы не пользоваться POJO и существует Table API.
Именно об этом и поговорим в следующей статье.