
На текущем проекте у нас начинает активно использоваться Apache NiFi в качестве основного ETL/ELT-инструмента. NiFi используется для получения данных из различных источников (Kafka, REST, HDFS) и подготовки данных для их последующей загрузки в основное хранилище на базе Greenplum. Загрузка подготовленных данных в Greenplum реализована средствами последнего (PXF), поэтому NiFi только подготавливает данные в формате Avro и записывает их в HDFS.
Немного о задаче. Пусть мы имеем информацию о подписках пользователя на уведомления для различных разделов/сервисов портала. Для каждого раздела пользователи могут указать от нуля до нескольких видов транспорта, которым эти уведомления будут доставляться, например, PUSH, EMAIL, SMS. Требуется обеспечить загрузку этих данных в наше аналитическое хранилище.
Исходные данные
Данные приходят в Apache Kafka, формат данных примерно такой:
{ "uID": 1000358546, "events": [{ "eventTypeCode": "FEEDBACK", "transports": ["PUSH", "SMS"] }, { "eventTypeCode": "MARKETING", "transports": ["PUSH", "EMAIL"] }, { "eventTypeCode": "ORDER_STATUS", "transports": ["SOC_VK"] } ] }
Вроде всё просто, но у PXF есть некоторые сложности с загрузкой иерархических структур (массив простых значений оно загрузит, но массив объектов - нет), поэтому нам нужно сделать наши данные максимально плоскими. Да, и цель статьи - показать, что можно сделать с JSON используя встроенные процессоры.
Для начинающих
Для экспериментов в NiFi можно создать процессорную группу, выбрав в качестве первого процессора GenerateFlowFile поместив в параметр Custom Text текст нашего Json.


FlattenJson
Одним из способов сделать из произвольного json плоский является процессор FlattenJson. Процессор предоставляет пользователю возможность взять вложенный документ JSON и представить его в простой документ содержащий пары ключ/значение. Ключи объединяются на каждом уровне с помощью определяемого пользователем разделителя, который по умолчанию имеет значение ".". Процессор поддерживает три режима преобразования (Flatten Mode): normal, keep-arrays и dot notation (применяется для запросов в MongoDB). Режим преобразования по умолчанию - "keep-arrays". В режиме keep-arrays мы получим, для нашего примера, практически исходный Json. Поэтому этот режим нам не подходит, и мы переключим процессор на режим normal. В результате мы получим такой json:
{ "uID" : 1000358546, "events[0].eventTypeCode" : "FEEDBACK", "events[0].transports[0]" : "PUSH", "events[0].transports[1]" : "SMS", "events[1].eventTypeCode" : "MARKETING", "events[1].transports[0]" : "PUSH", "events[1].transports[1]" : "EMAIL", "events[2].eventTypeCode" : "ORDER_STATUS", "events[2].transports[0]" : "SOC_VK" }
Что-ж, оно действительно плоское, но... Если такое попытаться завернуть в avro, то оно сломается. Если не на этапе конвертации, то на этапе загрузки в Greenplum точно. Всё по причине того, что тут буквально для каждого пользователя будет свой набор полей и схема каждого avro-файла будет различаться.
Этот процессор, сам по себе весьма полезный в иных случаях, в чистом виде нам не подходит. Смотрим, что же мы можем использовать ещё.
JoltTransformJSON

JoltTransformJSON, пожалуй, самый мощный в арсенале NiFi, процессор для трансформации Json. Он позволяет применять список Jolt-спецификаций к нашему Json. На хабре уже была статья, посвященная этому процессору. Но, позвольте мне рассказать об этом процессоре применительно к нашей задаче.
Вариантов решения нашей задачи как минимум два - это либо обработать Json, полученный с помощью FlattenJson-процессора с помощью Jolt, либо попробовать от FlattenJson избавиться и решить всё с помощью JoltTransformJson.
Но, для начала опишем, какие же возможности предоставляет нам этот процессор. Самое главное - это возможность работать с Jolt-спецификацией в расширенном редакторе, где можно не только писать спецификацию, проверить её корректность, но и выполнить трансформацию произвольного json не покидая окна редактора.

Это всё круто и очень помогает в работе, но, если честно, я предпочитаю использовать Jolt Transform Demo (jolt-demo.appspot.com) . Субъективно он более удобен и там есть примеры с комментариями для начала работы с Jolt.
Итак, как вы видите, на картинке выше, я начал с простой спецификации:
[{ "operation": "shift", "spec": { "*": "&" } }]
Эта спецификация, по сути ничего не трансформирует, поскольку говорит "возьми любое поле и выведи его как оно есть". Будем исправлять. Для начала определить нашу цель. А уже потом напишем для неё Jolt-спецификацию.
Итак, у нас на входе:
{ "uID": 1000358546, "events": [{ "eventTypeCode": "FEEDBACK", "transports": ["PUSH", "SMS"] }, { "eventTypeCode": "MARKETING", "transports": ["PUSH", "EMAIL"] }, { "eventTypeCode": "ORDER_STATUS", "transports": ["SOC_VK"] } ] }
А хотим мы получить такой json на выходе:
{ "uID": 1000358546, "FEEDBACK": ["PUSH", "SMS"], "MARKETING": ["PUSH", "EMAIL"], "ORDER_STATUS": ["SOC_VK"] }
Давайте напишем для него спецификацию. Нам нужно для каждого значения в events->transports взять ключ из events->eventTypeCode и в результате записать с этим ключем массив значений. Поле uID оставляем без изменений.
[{ "operation": "shift", "spec": { "events": { "*": { "transports": { "*": { "@": "@(3,eventTypeCode)[]" } } } }, "*": "&" } }]
Пояснение спецификации
"events":{"*":{"transports":{"*":{,думаю, не вызывает особых сложностей. Здесь мы для каждого events берём каждый элемент массива (это первая "*") для которого из transports берём каждый элемент массива (вторая "*").
"@": "@(3,eventTypeCode)[]" вот тут самое интересное. Левая, от двоеточия, часть ("@") говорит о том, что мы берём текущее значение элемента массива, а это у нас PUSH для самого первого совпадения. А вот правая часть говорит о том, с каким ключём мы запишем это значение в результирующий json. И запись @(3,eventTypeCode) означает, что для того, чтобы получить имя ключа, нам нужно подняться на 3 уровня выше (на уровень первой "*") и взять там значение поля eventTypeCode. Если, опять же, рассматривать самое первое совпадение, то это значение будет FEEDBACK - это и будет ключём, в который будет записано значение PUSH.
Думаю, вы заметили квадратные скобки на конце правой части выражения? Они говорят о том, что ключ, который мы определяем, должен быть массивом. Если их не поставить, то тогда, например для ORDER_STATUS, мы получим не массив, а просто строку. Для других типов событий у нас определено несколько значений транспорта, поэтому они будут объединены в массив автоматически. Т.е. результат мог бы выглядеть так:
{ "uID": 1000358546, "FEEDBACK": ["PUSH", "SMS"], "MARKETING": ["PUSH", "EMAIL"], "ORDER_STATUS": "SOC_VK" }
И так, мы добились желаемого результата. Но, давайте подумаем, какие сложности с данным вариантом. А мы опять имеем проблемы с потенциально различными схемами для разных пользователей. Я бы даже сказал с гарантированными. Это можно обойти, если при конвертации в avro будем использовать определенную схему, а не выводить её из данных json. Но, это значит, что мы должны в этой схеме заранее прописать все типы событий всех наших сервисов, и менять схему при каждом изменении их состава. Было бы легче, если бы мы взяли в качестве ключа транспорт, а в качестве значения массив типов событий, для которых уведомления используют данный транспорт. Я не буду приводить Jolt-спецификацию для данной трансформации, оставлю это в качестве задания читателю. Да, видов транспорта сильно меньше, чем событий, но это не гарантирует нам постоянство их списка.
И так, подумаем, как мы можем обеспечить постоянство схемы выходных данных, если у нас на входе могут меняться как типы событий так и виды транспортов. Вариантов не так много:
сделать два массива, один для событий, другой для транспортов, а соответствующие значения записывать с одним и тем-же индексом
сделать один массив, в котором пары событие/транспорт будут строками с разделителем
Первый вариант был отклонён из-за более сложной реализации разбора на стороне Greenplum. Второй вариант выглядит, для нашего примера, так:
{ "uID" : 1000358546, "events" : [ "FEEDBACK|PUSH", "FEEDBACK|SMS", "MARKETING|PUSH", "MARKETING|EMAIL", "ORDER_STATUS|SOC_VK" ] }
Чтобы выполнить такую Jolt-трансформацию понадобится цепочка преобразований. Для начала, нужно развернуть массив транспортов и желательно вывести тип событий на один уровень с транспортом. Затем склеить строки и убрать лишние поля.
Первый этап
[{ "operation": "shift", "spec": { "nET": { "*": { "transports": { "*": { "*": { "@1": "outer[&4].inner[&2].t", "@(3,eventTypeCode)": "outer[&4].inner[&2].etc" } } } } }, "*": "&" } }]
Здесь мы для каждого транспорта создаём объект, который вложен в два массива - внешний и внутренний (относительно "transports"). Значение транспорта записываем в поле t этого объекта, потом добавляем в этот объект поле etc, которое будет иметь значение из eventTypeCode.
Результат выполнения:
{ "uID" : 1000358546, "outer" : [ { "inner" : [ { "t" : "PUSH", "etc" : "FEEDBACK" }, { "t" : "SMS", "etc" : "FEEDBACK" } ] }, { "inner" : [ { "t" : "PUSH", "etc" : "MARKETING" }, { "t" : "EMAIL", "etc" : "MARKETING" } ] }, { "inner" : [ { "t" : "SOC_VK", "etc" : "ORDER_STATUS" } ] } ] }
Второй этап
Итак, у нас есть пары, но, прежде чем склеивать, давайте упростим массив:
{ "operation": "shift", "spec": { "outer": { "*": { "inner": { "*": "events[]" } } }, "*": "&" } }
Добавив эту спецификацию в список Jolt-спецификаций получим результат:
{ "uID" : 1000358546, "events" : [ { "t" : "PUSH", "etc" : "FEEDBACK" }, { "t" : "SMS", "etc" : "FEEDBACK" }, { "t" : "PUSH", "etc" : "MARKETING" }, { "t" : "EMAIL", "etc" : "MARKETING" }, { "t" : "SOC_VK", "etc" : "ORDER_STATUS" } ] }
Третий этап
Теперь наш массив не выглядит так страшно, как после первого этапа. Теперь мы можем легко склеить пары значений в одну строку.
{ "operation": "modify-default-beta", "spec": { "events": { "*": { "transport": "=concat(@(1,etc),'|',@(1,t))" } } } }
Добавив этот этап к нашему списку спецификаций получим:
{ "uID" : 1000358546, "events" : [ { "t" : "PUSH", "etc" : "FEEDBACK", "transport" : "FEEDBACK|PUSH" }, { "t" : "SMS", "etc" : "FEEDBACK", "transport" : "FEEDBACK|SMS" }, { "t" : "PUSH", "etc" : "MARKETING", "transport" : "MARKETING|PUSH" }, { "t" : "EMAIL", "etc" : "MARKETING", "transport" : "MARKETING|EMAIL" }, { "t" : "SOC_VK", "etc" : "ORDER_STATUS", "transport" : "ORDER_STATUS|SOC_VK" } ] }
Последний этап
Теперь нам остаётся только преобразовать массив объектов в массив строк, взяв только значения поля transport.
{ "operation": "shift", "spec": { "events": { "*": { "@transport": "events[]" } }, "*": "&" } }
Добавление этой операции приведёт нас к желаемому результату:
{ "uID" : 1000358546, "events" : [ "FEEDBACK|PUSH", "FEEDBACK|SMS", "MARKETING|PUSH", "MARKETING|EMAIL", "ORDER_STATUS|SOC_VK" ] }
Итак, цепочка спецификаций выглядит так:
[ { "operation": "shift", "spec": { "events": { "*": { "transports": { "*": { "*": { "@1": "outer[&4].inner[&2].t", "@(3,eventTypeCode)": "outer[&4].inner[&2].etc" } } } } }, "*": "&" } }, { "operation": "shift", "spec": { "outer": { "*": { "inner": { "*": "events[]" } } }, "*": "&" } }, { "operation": "modify-default-beta", "spec": { "events": { "*": { "transport": "=concat(@(1,etc),'|',@(1,t))" } } } }, { "operation": "shift", "spec": { "events": { "*": { "@transport": "events[]" } }, "*": "&" } } ]
Результат наших преобразований будет иметь простую схему при конвертации в avro и не будет изменяться от пользователя к пользователю. Строка легко разбивается в запросе к внешней pxf-таблице в Greenplum.
Коллеги, вопросы, предложения, комментарии...
