![](https://habrastorage.org/getpro/habr/upload_files/2fe/2d2/ff0/2fe2d2ff03cd0e2729f9eebf95c41acc.png)
Переносимость процессоров и паттерны
Вот и обещанная третья часть саги о том, что в NiFi можно делать и как это делать правильно, без претензий на истину в последней инстанции, конечно. Сегодня расскажу о переносимости процессоров и дам несколько паттернов для самых популярных задач на платформе ZIIoT. Если вдруг вам интересно почитать про оптимизацию схем и производительности в NiFi — велком в первую часть. Если мечтаете узнать больше о мониторинге, то вторая часть — must read. Только потом сюда не забудьте вернуться.
Переносимость процессоров
Хорошая группа процессоров – переносимая группа процессоров. Бывают, конечно, проектные кастомы, никому кроме разработчика (да и ему после сдачи проекта) не интересные, но даже их бывает необходимо переносить между dev-test-prod средами, и хорошо бы с минимальными накладными расходами. Вот несколько очевидных и не очень лайфхаков, как эту переносимость обеспечить.
Первое, что нам поможет, – версионирование групп процессоров с помощью NiFi registry. Собственно, об этом инструменте уже достаточно написано. Используйте его – рекомендую.
Следующий важный момент – это окружение скрипта. Прежде всего, все конфигурируемые параметры имеет смысл выносить в переменные. Это нужно хотя бы для того, чтобы в случае необходимости изменения (например, при переносе между полигонами) вносить все правки в одном месте с автоматическим перезапуском задействованных элементов. Дополнительно стоит явным образом обозначить перечень конфигурируемых таким образом сущностей на Label, например, как на Рис. 1.
![Рисунок 1. Обозначение перечня конфигурируемых сущностей на Label Рисунок 17](https://habrastorage.org/getpro/habr/upload_files/fef/8b2/9e1/fef8b29e1473614cd6d2f63f0c04f478.png)
И вот тут есть два принципиально различных подхода к обеспечению переносимости.
Первый подход — все необходимые параметры задаются на уровне группы процессоров. Переносимость получается 100% – бери и неси, только не забудь поправить по месту! Но управляемость страдает. Представьте десяток подобным образом организованных групп процессоров, и вдруг у вас возникает необходимость изменить один параметр во всех. А для пущего счастья – каждый разработчик может обозвать переменную так, как ему в тот момент захотелось. Ужас, да?
Второй подход (за который я и выступаю) – максимально стандартизировать окружение NiFi на всех полигонах: определить стандартный набор переменных, задавать их с помощью ENV\Variable registry, поставлять в стандартном наборе стандартный же механизм получения токена для доступа к API и пр., а в группе процессоров задавать только тот минимум, который относится непосредственно к самой группе. В этом случае типовые переносы могут происходить ВООБЩЕ без необходимости какого-либо конфигурирования, ну а у кого «стандартное окружение» не вполне стандартно, тот сам виноват.
Тот же подход ограниченно применим и для работы с database\redis connection pools. Ограниченно — потому что, например, в случае с платформой ZIIoT непосредственный доступ к БД сервисов платформы — штука явно нештатная, которая бывает, когда речь идет о системах заказчика или лютом кастоме, необходимость переносов которого сомнительна. С другой стороны, таскать «нечто» между своими полигонами тоже надо… Дополнительное ограничение: DBCPConnectionPool – таки разделяемый ресурс. Если вы оставили “Max Total Connections”=8 по дефолту и завязали на этот пул 1050 групп процессоров – возможны варианты. Впрочем, при прочих равных дефолтов обычно вполне достаточно.
Context maps – (HTTP, HTTPS), reader\writer сервисы – вопрос другой. Тот же http context map по дефолту предоставляет 5000 одновременных соединений, чего с одной стороны как бы и много, а с другой — может и не хватить на весь инстанс. В общем, в угоду переносимости я предпочитаю создавать эти сущности на уровне группы процессоров. Единственная рекомендация – переименовывать их в человекочитаемый вид, чтобы не было 100500 одинаковых HTTPContextMaps. С reader\writer сервисами примерно то же самое – плюсы от переиспользования не слишком очевидны, трудности от реализации определенно будут.
Теперь о паттернах.
Паттерны
Как я сказал выше, тут речь пойдет о самых популярных задачах, с которыми разработчики сталкиваются, имея дело с ZIIoT-платформой. Возможно, тем, кто еще не имеет дело с платформой, это тоже будет полезным, так как задачи эти не новы.
Задача 1. Чтение только новых записей и перенос их в платформу работы с данными
Это типовая задача, которую приходится решать, забирая данные из какой-нибудь системы заказчика. Часто это делается путем сохранения последней прочитанной записи где-нибудь в кэше и\или в специально созданной таблице БД, а также путем чтения данных за определенный период (now() минус N). В большинстве случаев так делать не стоит, так как есть аж три штатных средства – GenerateTableFetch, QueryDatabaseTable и QueryDatabaseTableRecord. См. Рис. 2
GenerateTableFetch создает SQL-запросы для последующего исполнения (ExecuteSQL(Record). По определению, чтение с дозагрузкой реализуется только на одной ноде кластера, чтобы не было накладок с синхронизацией последних прочитанных данных. В этом случае на нагруженных инстансах TableFetch позволяет распараллелить получение данных из БД: «легкую» часть (с генерацией запросов) выполняет одна нода, а «тяжелая» (с извлечением-обработкой) распараллеливается на все ноды кластера. Данный процессор поддерживает входные данные и может использовать атрибуты входящих flow-файлов для определения, например, таблицы, из которой надо произвести выборку. QueryDatabaseTable позволяет собственно извлечь данные из таблицы, выдавая данные в AVRO-формате. QueryDatabaseTableRecord дает на выходе набор записей, определенный RecordSetWriter.
Для таблицы такого вида (запись с id 527 добавлена позже),
![](https://habrastorage.org/getpro/habr/upload_files/3be/1dd/92d/3be1dd92da88a154a9fcecf7b6dfe0cd.png)
GenerateTableFetch (как более наглядный) генерирует два запроса:
SELECT * FROM test WHERE id <= 8 ORDER BY id LIMIT 10000
SELECT * FROM test WHERE id > 8 AND id <= 527 ORDER BY id LIMIT 10000
![Рисунок 2. Чтение с дозагрузкой Рисунок 23](https://habrastorage.org/getpro/habr/upload_files/16b/dab/14b/16bdab14bf61a90f3f781a5509d302ce.png)
Эти процессоры позволяют задать размер единичной выборки, определить список столбцов, которые должны в нее попасть, и даже задать начальное значение, если речь идет о первичной выборке из уже существующей базы не всех, а только актуальных для нас данных.
![Рисунок 3. Настройка выборки Рисунок 3. Настройка выборки](https://habrastorage.org/getpro/habr/upload_files/2db/c8b/b29/2dbc8bb29b607911e33091d285ece7d6.png)
Немаловажный вопрос – где хранится это «последнее прочитанное» значение и насколько это надежно? Ответ: в «STATE процессора». В случае кластера — в zookeeper’е кластера, в случае standalone-инсталляции – в хранилище состояний инстанса в формате WAL-лога или физически (по дефолту) – в папке /opt/nifi/nifi-current/state таким вот образом, как на Рис. 4.
![Рисунок 4. Последние прочитанные значения в opt/nifi/nifi-current/state Рисунок 4. Последние прочитанные значения в opt/nifi/nifi-current/state](https://habrastorage.org/getpro/habr/upload_files/e40/a3a/7bf/e40a3a7bf2ff32c131b2851324934bf2.png)
Соответственно, если нам требуется сохранить это состояние в случае перезапуска всего NiFi, эту папку необходимо вынести в отдельный volume. Для очистки состояния можно воспользоваться контролом Clear state (View state на соответствующем процессоре) – см. Рис. 5.
![Рисунок 5. Clear state для очистки состояния Рисунок 26](https://habrastorage.org/getpro/habr/upload_files/f5e/45b/307/f5e45b307a0a44fd4e3fd177bb05d0c0.png)
Задача 2. Организация шаблона UPdateorinSERT – UPSERT
Еще одна типовая задача – организация шаблона UPdateorinSERT – UPSERT. Создание записи в БД, в случае её наличия – обновление данных. Шаблон очень распространенный, но на уровне спецификации SQL не слишком стандартизованный. В postgresql он реализован с помощью INSERT INTO <…> ON CONFLICT DO <…>. В NiFi этот шаблон реализуется только в PutDatabaseRecord - Рис. 6.
![](https://habrastorage.org/getpro/habr/upload_files/ad1/8a0/341/ad18a0341d05e7ef558d79888a514c6d.png)
![Рисунок 6. PutDatabaseRecord - тестовая запись в БД + настройки самого процессора Рисунок 33](https://habrastorage.org/getpro/habr/upload_files/87a/9d3/df8/87a9d3df8d122d26c9ece2d42b63fcb9.png)
Основная проблема здесь в том, что, например, ConvertJsonToSQL производит вывод типов исходя из параметров БД и корректно обрабатывает варианты {“id”:10} и {“id”:”10”}, а PutDatabaseRecord использует определение типов из avro-схемы recordsetreader (по дефолту – infer-schema, т.е. попытка «угадать» и создать схему исходя из самой записи). В этом случае, если в БД поле id с типом bigint, входной параметр {“id”:”10”} работать-таки не будет без переопределения avro-схемы.ля входного JSON’а:
[
{
"key": "7",
"name": "jsontest1"
},
{
"key": "8",
"name": "jsontest2"
},
{
"key": "9",
"name": "jsontest3"
},
{
"key": "1",
"name": "jsontest4"
}
]
И таблицы вида:
CREATE TABLE IF NOT EXISTS test (
id INTEGER NOT NULL DEFAULT nextval('test_id_seq' :: REGCLASS),
key BIGINT,
name CHARACTER VARYING,
CONSTRAINT test_key_key UNIQUE (key)
) WITH (OIDS = FALSE);
Схема будет выглядеть так, как на Рис. 7.
![Рисунок 7. Настройка json record reader’а Рисунок 7. Настройка json record reader’а](https://habrastorage.org/getpro/habr/upload_files/9f3/fe3/d7c/9f3fe3d7c68176def68813d371c9b8a7.png)
{
"type": "record",
"name": "test",
"fields": [
{
"name": "key",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
Как правило, никакого rocket-science тут нет, но без подобного «восхода солнца вручную» хочется обойтись. В качестве альтернативы можно поиграться с convertjsontosql в INSERT, а в случае ошибки – повторный convertjsontosql уже в UPDATE, но тут схема выйдет совсем уж монструозной – Рис. 8.
![Рисунок 8. Ручная правка данных под вставку\обновление Рисунок 35](https://habrastorage.org/getpro/habr/upload_files/242/6d8/fd5/2426d8fd57650ba384b6df38ee61049e.png)
Конвертируем JSON в SQL INSERT, отправляем пачку транзакций на исполнение, в случае сбоя – идем по ветке fail, разбираем пачку транзакций поштучно (у второго PutSQL batch size=1). Те записи, что не вставились, конвертируем в UPDATE и отправляем обратно, чтобы избежать зацикливания, ограничиваем время жизни очереди. Тот еще костыльно-грабельный привод, очень бы хотелось как-нибудь без него.
Задача 3. Фильтрация записей
Традиционно делается через RouteOn(Attribute|Text): нужное направляем дальше, не подпавшее под условия — дропаем по fail. Альтернативой является более продвинутый QueryRecord, ориентированный на обработку именно набора записей. Он позволяет выполнять SQL(-like)-запросы, в том числе и по самому flow-файлу – Рис. 9.
![Рисунок 9. Фильтрация с помощью QueryRecord Рисунок 27](https://habrastorage.org/getpro/habr/upload_files/891/831/262/891831262c4a2c165617bff118b29313.png)
С помощью этого же инструмента можно организовать, например, сортировку входного потока записей (ORDER BY), задавать достаточно сложные условия и пр.
Задача 4. Направление потоков данных в один putsql
Иногда, при выносе всех выходов на верхний уровень, мы видим, что, например, раньше запись в БД проводилась в нескольких местах на схеме. У нас появляется желание завернуть все потоки данных с convertjsontosql в один putsql. Хорошее желание, правильное, накладных расходов меньше, править проще. Но тут выясняется, что мы пишем (ну, вдруг?) в разные БД, а ConnectionPool ведет в одну конкретную. Проблема? Проблема. Но решаемая. Для этого, помимо собственно требуемых сервисов ConnectionPool, необходимо создать виртуальный DBCPConnectionPoolLookup, в котором динамическими свойствами задать псевдонимы для ConnectionPools – Рис. 10.
![Рисунок 10. Настройка DBCPConnectionPoolLookup Рисунок 40](https://habrastorage.org/getpro/habr/upload_files/241/58b/440/24158b44058418e18d0b406e598e2c20.png)
После этого ссылаемся на этот Lookup-service (выделено желтым) в *SQL-процессоре – Рис. 11.
![Рисунок 11. Настройка ExecuteSQL Рисунок 41](https://habrastorage.org/getpro/habr/upload_files/79a/52a/c78/79a52ac78211efec6ccd79d2452d5e20.png)
Так как таблицы для записи скорее всего будут разными, зададим их с помощью ExpressionLanguage. Все? Почти. Помимо имени таблицы необходимо в атрибуте database.name flow-файла указать еще и псевдоним ConnectionPool, с которым мы будем работать – Рис. 12.
![Рисунок 44](https://habrastorage.org/getpro/habr/upload_files/7ab/69b/85b/7ab69b85b77c152d81c7df20a3350636.png)
![Рисунок 12 Рисунок 12](https://habrastorage.org/getpro/habr/upload_files/a14/4ef/c2b/a144efc2b3a3cb6b56d665f8ecfbc5e6.png)
Ну а готовые SQL-запросы отправляем на PutSQL с тем же DPCPConnectionPoolLookup и наслаждаемся результатом – Рис. 13.
![Рисунок 13. Запись в две БД в одном потоке Рисунок 45](https://habrastorage.org/getpro/habr/upload_files/9ec/712/fab/9ec712fab6114b90deb997212fa0d78e.png)
Это позволит, например, определить единую точку записи в БД для всего инстанса NiFi, обвязать её нормально обработкой ошибок, логгингом, пробами, задать ей адреса-пароли-явки, завернуть не нее все потоки записи в БД и не беспокоиться, что кто-то кое-где у нас порой на-ко-ся-чит.
Тот же подход применим и к кэшу. Не важно, какой бэкенд – мы всегда стучимся в lookup service.
Задача 5. Вынос данных в кэш
Вынос данных в кэш – достаточно типовой паттерн использования относительно редко меняющихся и относительно часто использующихся в нескольких местах данных. Думаю, все знакомы с ним по получению токена, но использовать его можно много где. Например, в случае интенсивной работы со справочником имеет смысл не делать по запросу на каждый проходящий flow-файл, а по расписанию забирать справочник и дергать нужные значения из кэша – см. Рис. 14.
![Рисунок 14. Кэширование справочников Рисунок 14. Кэширование справочников](https://habrastorage.org/getpro/habr/upload_files/4ed/a27/f18/4eda27f1844e96f4dfb8c82529ec61ad.png)
На входе получаем список справочников и имена интересующих полей, на выходе – записи ключ-значение (имя поля – значение) в кэше. Обратите внимание, что в целом этот паттерн ухудшает читаемость схемы — визуальный «разрыв» в потоке управления, особенно если создание кэша и его использование происходят на разных уровнях вложенности, затрудняет понимание происходящего. Но этот шаблон слишком полезен, чтобы от него отказываться.
Wait-notify создает те же проблемы в читаемости схемы (Визуальный разрыв, когда “Notify” логически никак не связан с “Wait”), но может решать больше проблем, чем создает. Помимо очевидного «отлова» триггерных событий, он достаточно широко используется для реализации паттерна split-merge. Последний (со стратегией объединения Defragment) хорошо работает в случае, если количество объектов не меняется в процессе выполнения. В противном случае (вложенный split, фильтрация результатов, обработка в цикле с условием) – «Хьюстон, у нас проблемы!». Не то, чтобы неразрешимые, нет – я уже видел решение с ручной правкой атрибута fragment count, но все же – проблемы. Wait-notify их решает так, как на Рис. 15.
![Рисунок 15. Split-merge с ожиданием Рисунок 15. Split-merge с ожиданием](https://habrastorage.org/getpro/habr/upload_files/eeb/92c/0ce/eeb92c0cedd7bde7c65fa823c5894548.png)
В этом случае merge производится не по количеству данных, полученных в результате сплита, а по сигналу выхода из цикла. До получения сигнала все данные ожидают в очереди wait процессора Wait, и только после получения сигнала попадают в процессор EnforceOrder, а затем — в merge с типом bin-packing algorithm – см. Рис. 16.
![Рисунок 16. Настройки MergeContent Рисунок 39](https://habrastorage.org/getpro/habr/upload_files/fd2/f3c/92f/fd2f3c92f3a8452eab02456dba0ce814.png)
Обратите внимание, что wait не гарантирует сохранение порядка обработки, скорее наоборот! В некоторых кейсах это важно.
Сам по себе wait-notify обменивается сигналами через общий распределенный кэш и позволяет достаточно гибко управлять потоком: учитывать количество поступивших сигналов, выпускать заданное количество flow-файлов по сигналу и прочее. Но нужно это не везде и не всегда.
На этом у меня все, пока. Спасибо, что дочитали аж до сюда. Надеюсь, мой писательский труд был немного (или даже много) полезен. Если в комментариях всплывут какие-то еще интересные темы, стоящие внимания, то еще увидимся.