Добрый день. Меня зовут Иван Клименко, и я архитектор департамента поддержки продаж компании Arenadata. В основном занимаюсь продуктом Arenadata Streaming (ADS) — это масштабируемая отказоустойчивая система для потоковой обработки данных в режиме реального времени, адаптированная для корпоративного использования и построенная на базе Apache Kafka и Apache NiFi. Далее я буду говорить о NiFi, подразумевая под этим Apache NiFi, или cборку Arenadata Streaming NiFi.

Продолжаем цикл статей практического применения, так называемого "How To…". Первая статья была посвящена взаимодействию с базами данных. В связи с уходом с рынка западных вендоров п��вышается интерес к открытому программному обеспечению. В сообществе NiFi часто приходят с вопросами: «Где почитать о NiFi», «Дайте хорошие курсы» и т. д. Поэтому эта статья больше теоретическая. Я хотел бы поговорить о концепции потокового программирования, о FlowFile и вообще о подходе к разработке потоков обработки данных в NiFi. Эта статья будет интересна в первую очередь новичкам в NiFi, ну а от опытных разработчиков всегда жду комментариев с дополнениями или с конструктивной критикой.

Когда первый раз открываешь интерфейс сервиса NiFi и видишь перед собой канву либо уже готовый визуальный pipeline обработки данных, возникает вопрос: как именно это работает?

Чтобы процессоры начали обрабатывать данные, они должны появиться в очереди. Очередь, или соединение, — место хранения данных в NiFi, участвующих в обработке. А сами данные называются FlowFile. Эти объекты движутся сквозь систему и представляют собой набор пар строк «ключ — значение», а также связанный с ними контент из нуля или набора байт. FlowFile — основной субъект в парадигме программирования, основанного на потоках. В общем смысле Flow Based Programming (FSB) — это парадигма программирования, которая определяет приложения как сеть чёрных ящиков (процессов). Эти чёрные ящики обмениваются данными (IP — Information Packets) по предопределённым соединениям посредством передачи сообщений, где соединения (Bounded Buffer) указываются извне для процессов. Эти процессы чёрного ящика можно бесконечно связывать повторно для формиро��ания различных приложений без необходимости внутреннего изменения. Как раз информационным пакетом для NiFi и является FlowFile, который содержит всю необходимую информацию для своей же обработки.

В общем смысле FSB — это ориентированный граф, в вершине которого выполняется вычисление данных либо их обработка, а по рёбрам графа эти данные перемещаются. В этой парадигме применяются такие термины, как Ports, Bounded Buffer, и в NiFi им сопоставляются входные и выходные порты, а также связи или очереди.

В парадигме FSB вся информация для обработки заключается в информационном пакете. Процессы не знают ничего ни о предыдущем, ни о следующем шаге. Есть только данные во входной очереди и алгоритм их обработки. Это в полной мере справедливо для и NiFi. Обычно сравнивают потоки данных в NiFi с конвейерной лентой, по которой едут ящики либо детали.

Очень хорошо это описано в статье Бронислава Житникова. Самое важное, что надо запомнить при проектировании потоков обработки: вы не сможете из одного процессора запросить данные у другого процессора напрямую либо изменить какую-то переменную, которую можно применять в других функциях. То, что можно сделать на пространстве ваших функций и классов в классическом программировании, не будет работать в NiFi. Информация, достаточная и необходимая для обработки данных процессором, должна содержаться в атрибутах либо контенте FlowFile.

Атрибуты

Рассмотрим более подробно атрибуты и то, как с ними можно работать. Всё описано в документации, поэтому подробно по каждому пункту проходить не буду, только самое основное.

Атрибуты FlowFile представляют собой коллекцию пар «ключ — значение», который отражают метаинформацию или метаданные про этот FlowFile. Мы можем узнать идентификатор FlowFile, получить ссылку на его контент, узнать количество записей, содержащихся в контенте, и так далее. Большинство операций по маршрутизации завязано именно на атрибуты. Также многие процессоры читают атрибуты в ходе своей работы либо пишут в атрибуты результаты обработки. Важно понимать, что атрибуты — это не сами данные, это именно метаинформация, позволяющая обрабатывать данные. Отмечу, что атрибуты хранятся в оперативной памяти, и чем больше у вас атрибутов, тем больше памяти понадобится для обработки ваших данных.

У каждого FlowFile есть обязательные атрибуты (Core Attributes), которые генерируются системой при создании FlowFile, прохождении его через различные отношения (очереди).

  • filename — имя файла, которое может быть применено для хранения данных локально либо удалённо.

  • path — директория, которая может быть применена для хранения данных.

  • uuid — уникальный идентификатор, однозначно указывающий на этот FlowFile.

  • entryDate — дата и время создания FlowFile в миллисекундах с 1 января 1970, в формате UTC.

  • lineageStartDate — время появления в системе самого старого предка текущего файла, в миллисекундах, в формате UTC.

  • fileSize — количество байт, составляющих контент. This attribute represents the number of bytes taken up by the FlowFile’s Content.

Такие атрибуты, как uuidentryDatelineageStartDate и fileSize, генерируются системой и не могут быть изменены в ходе обработки пользовательской функцией.

В NiFi есть ряд процессоров, предназначенных для работы с атрибутами, перечисленные в разделе Attribute Extraction:

  • EvaluateJsonPath. С помощью выражений JSONPath можно выполнить обмен данными между атрибутами и контентом.

  • EvaluateXPath. Аналогично предыдущему, только применяется XPath.

  • EvaluateXQuery. Аналогично предыдущему, только применяется запрос XQuery.

  • ExtractText. Позволяет извлечь в атрибут текст по регулярному выражению.

  • HashAttribute. Позволяет применять функции хеширования над списком атрибутов и помещать результат в определённый атрибут.

  • HashConten. Применяет функцию хеширования над контентом и помещает результат в атрибут.

  • IdentifyMimeType. Определяет тип контента и помещает значение в атрибут.

  • UpdateAttribute. Процессор позволяет изменять либо создавать атрибуты, при этом возможно применять специальный язык Expression Language 

Про Expression Language (EL) в NiFi нужно поговорить чуть подробнее. Это очень мощный инструмент работы с метаданными. Каждое выражение в EL начинается со знака $ и обрамляется фигурными скобками — ${выражение}. В самом выражении прописываем атрибут и функцию над ним. Например, проверим, что в атрибуте, содержащем имя файла, такое же значение, как и в идентификаторе FlowFile. Для этого прочитаем атрибут filename и сравним его с атрибутом uuid — ${filename:equals(${uuid})}. Функция отделена от аргумента знаком «:», и таких функций может быть много. Например, выражение ${filename:startWith(“some_value”):not()} возвращает true, если имя файла не начинается со строки «some_value».

В выражениях можно применять атрибуты, переменные, системные переменные. Во время выполнения происходит поиск этих данных в следующем порядке:

  1. Атрибуты FlowFile.

  2. Переменные в процессной группе.

  3. Переменные из файла, определённые в свойстве nifi.variable.registry.properties.

  4. NiFi JVM свойства.

  5. Системные переменные.

EL поддерживает работу с несколькими типами данных: String, Number, Decimal, Date, Boolean; после выполнения выражения значение приводится к строке. Поддерживаются логические операции, проверки на существование, работа с JSON, преобразование типов, математические операции и так далее.

Например, при работе потоков обработки возникает ситуация, что запись не может быть помещена в базу данных (БД) из-за превышения размера одного из полей. При этом такая запись может быть одна в наборе, и чтобы её отыскать, потребуется разбить весь набор записей. Чтобы минимизировать затраты на поиск, можно разделять набор записей пополам за один заход. Если запись одна, то она будет в одном из двух получившихся фрагментов, а корректный фрагмент будет внесён в БД. Таким образом, повторяя операцию деления, можно получить проблемную запись, а все корректные будут внесены в БД. Остаётся определить, как отделить отдельную запись и как составить условие для деления набора записей. Для этого можно воспользоваться атрибутом record.count:

  1.  ${record.count:equals(1)} — условие для определения, что запись в наборе одна, и если она попала в ошибочную ветку, то её нужно проанализировать в ручном режиме.

  2. ${record.count:divide(2)} — определяем количество записей, на которое будем делить набор.

Пайплайн, выполняющий такую работу можно представить как:

И конфигурации процессоров, соответственно:

Таким образом, с помощью метаданных автоматизировано выявления проблемной записи в наборе, при этом оптимизирован процесс внесения в БД.

Контент

При решении задачи определения некорректной записи весь набор записей был поделён пополам. Этот набор содержался в контенте — последовательности байт, в котором находятся сами данные FlowFile. Они хранятся на диске и подгружаются только при необходимости операций над ними. За счёт того, что контент никуда не перемещается, а хранится в виде ссылки на фрагмент репозитория в атрибутах, достигается высокая производительность NiFi. Также в NiFi реализован механизм однократной записи: при модификации содержимого контент полностью переписывается на новое место в репозитории. Останавливаться подробно на механизме хранения я не буду, он хорошо описан в статье Бронислава и в документации.

В NiFi большое количество процессоров предназначено для обработки данных. Это группы процессоров трансформации данных, анализа контента и маршрутизации, получения данных из внешних систем и отправки данных во внешнюю систему. Подробно с полным списком можно ознакомиться в документации.

Различают два подхода работы с контентом: обработка всего контента целиком и абстракция контента в виде набора записей. Запись — это экземпляр интерфейса Record, и вся работа с таким контентом выполняется через специальные абстракции — классы чтения записей и классы записи записей.

Например, вы можете загрузить текстовый файл с диска и выполнить в нём операцию поиска с помощью регулярного выражения. В этом случае контент будет обрабатываться как единый элемент. Но если в тестовом виде вы загрузите CSV, то с помощью CVSReader вы можете читать каждую строку как запись.

В ходе модификации контента может возникнуть ситуация, когда от обработки записей нужно перейти к обработке контента целиком. В этом случае следует учесть, что при такой обработке весь объём обрабатываемых данных будет загружен в оперативную память. Если же контент будет разбит на мелкие фрагменты, например по одной записи, то это может породить большое количество FlowFile в очереди. А так как каждый файл будет содержать набор атрибутов оригинального файла, то эта операция также вызовет увеличение задействования оперативной памяти. Таким образом, рекомендуется переходить в процессах обработки на работу с записями.

Различают следующие основные группы процессоров, обрабатывающие контент целиком:

  • Dara Ingestion — загрузка данных в NiFi из внешних систем. Тут можно указать такие процессоры, как GetFile, GetFPT, GetHTTP, GetHDFS, GetKafka, GetMongo и так далее. То есть это группа процессоров, позволяющая данные из источника загрузить в NiFi в том виде, как они п��едставлены в источнике.

  • Data Egress — выгрузка данных во внешнюю систему. К этой категории можно отнести процессоры PutFile, PutFTP, PutSQL, PutKafka, PutMongo. Процессор выгружает контент с минимальными модификациями. Например, PutEmail помещает контент либо во вложение, либо представляет его как тело письма.

  • Data Transformation — процессоры, преобразующие контент по заданным алгоритмам. Например, CompresssContent, TransformXML, JoltTransformJson.

  • Database Access — процессоры, предназначенные для доступа к базам данных, например ConvertJSONToSQL, PutSQL, SelectHiveSQL, PutHiveQL.

  • System Interaction — взаимодействие с системой, например ExecuteProcess и ExecuteStreamCommand. Процессоры позволяют передать контент во внешние процессы, запускаемые в операционной системе.

  • Spliting and Aggregation — разделение и соединение. Такие процессоры, как SplitText, SlitJson, SplitXml, SplitContent, выполняют операцию разделения контента, а MergeContent выполняет операцию соединения контента из нескольких источников.

Все эти процессоры работают с контентом как с одним целым, и при этом контент полностью помещается в оперативную память.

Чтобы работать с контентом эффективно, введён подход представления контента в виде записей. Структура записи/Record в NiFi описывается с помощью спецификации Apache Avro. Для чтения применяется специальная абстракция Record Reader. Такие абстракции используются для анализа и преобразования необработанных данных в структурированный формат, который может обрабатываться процессорами NiFi. Они ключевой компонент возможностей NiFi для работы с записями, что позволяет эффективно обрабатывать структурированные форматы данных. Для записи применяется RecordSetWriters — которые преобразовывают структурированные записи (обработанных NiFi) обратно в формат необработанных данных для хранения или передачи в другие системы.

Для погружения в Record-OrientedNiFi рекомендую статью Mark Payne

В чём основное преимущество работы с записями? Когда NiFi выполняет чтение данных, он не читает весь контент сразу, а "бежит" по нему и полученные данные преобразует в записи, с которыми уже работает процессор. И процессор не держит весь контент в памяти, а работает со структурированным представлением. Аналогично происходит процесс записи контента, кода Record пишутся одна за другой, требуя оперативную память только для буфера данных.

Как работают Record Readers в NiFi:

1. Record Readers принимают необработанные данные (например, файлы, потоки или сообщения) в качестве входных данных. Эти данные могут быть в различных форматах, таких как CSV, JSON, Avro, XML и т. д.

2. Record Readers требуется схема (формат Avro) для интерпретации данных. Схема определяет структуру данных, включая имена полей, типы данных и отношения. Схемы могут быть явно определены, например, заданы как свойство, заданы в атрибуте, определены в SchemaRegistry. Или схемы могут быть автоматически сгенерированы на основе данных. В этом случае применяется первая запись для анализа, и на её основе создается схема.

3. Record Reader анализирует необработанные данные в соответствии с указанным форматом и схемой. Например, CSV Reader разделит данные на поля на основе разделителей и сопоставит их со схемой, JSON Reader проанализирует JSON-объекты и извлечёт поля в соответствии со схемой, а Avro Reader десериализует двоичные данные Avro с использованием предоставленной схемы Avro.

4. Проанализированные данные преобразуются в объекты Record (записи). Каждая запись представляет собой одну строку или запись в данных. Записи представляют собой пары «ключ — значение», где ключи — это имена полей, определённые в схеме, а значения — соответствующие данные.

5. Record Reader передаёт записи процессору NiFi для обработки, например QueryRecord, UpdateRecord или PutDatabaseRecord.

NiFi предоставляет несколько встроенных Record Readers для различных форматов данных:

  • CSVReader. Анализирует CSV-файлы.

  • JSONReader. Анализирует JSON-данные.

  • AvroReader. Читает файлы Avro.

  • ParquetReader. Читает файлы Parquet.

  • XMLReader. Анализирует XML-данные.

  • SyslogReader. Анализирует сообщения syslog.

Как RecordSetWriters работают в NiFi:

  1. RecordSetWriters принимают структурированные объекты Record в качестве входных данных — результата работы процессоров NiFi, например QueryRecord, UpdateRecord и т. д.

  2. Как и Record Reader, RecordSetWriters применяют схему для определения, как сериализовать записи в желаемый формат вывода. Схемы определяются таким же образом, как и для Record Reader.

  3. RecordSetWriter сериализует записи в указанный формат вывода (например, CSV, JSON, Avro, Parquet и т. д.). Так, CSV Writer преобразует записи в разделённые запятыми значения, при этом каждое поле сопоставляется со столбцом, JSON Writer сериализует записи в объекты или массивы JSON, а Avro Writer сериализует записи в двоичный формат Avro, используя предоставленную схему Avro.

  4. Сериализованные данные записываются в выходное место назначения, например файл, сообщение или поток. Затем эти выходные данные могут использоваться нижестоящими системами или сохраняться для дальнейшей обработки.

NiFi предоставляет несколько встроенных RecordSetWriters для различных форматов данных:

  • CSVRecordSetWriter. Преобразует записи в формат CSV.

  • JSONRecordSetWriter. Преобразует записи в формат JSON.

  • AvroRecordSetWriter. Преобразует записи в формат Avro.

  • ParquetRecordSetWriter. Преобразует записи в формат Parquet.

  • XMLRecordSetWriter. Преобразует записи в формат XML в Apache.

Преимущества, которые даёт подход работы с записями:

  1. Эффективность. Record Readers и RecordSetWriters позволяют NiFi обрабатывать большие наборы данных структурированно и эффективно.

  2. Гибкость. Поддерживают множество форматов данных и схем, что упрощает работу с различными источниками данных.

  3. Повторное использование. После настройки Record Readers и RecordSetWiters можно повторно использовать в различных потоках данных.

Узнать, работает ли процессор с записями, легко — в его названии будет слово «Record», например SplitRecord, UpdateRecord, PublishKafkaRecord.

В заключение скажу: хотя NiFi кажется очень простым за счёт понятного интерфейса, разработчики не всегда понимают саму концепцию обработки данных в пайплайнах NiFi. Разобравшись в FSB и поняв, как работает пайплайн, вы сможете легко создавать масштабируемые и гибкие потоки обработки либо интеграции.

Всё это и многое другое вы можете узнать на учебных курсах Arenadata.

Полезные ссылки: