Как стать автором
Обновить

Apache NiFi как доступный ETL инструмент: кейс применения + тестовый стенд Docker

Уровень сложностиПростой
Время на прочтение8 мин
Количество просмотров5.3K

Привет! Меня зовут Владимир Ходак, я работаю инженером данных в компании "ДЮК Технологии". В статье расскажу о практических аспектах использования Apache NiFi, опишу преимущества и проблемы, с которыми я столкнулся.

На Хабре есть статьи, которые подробно описывают работу этого инструмента. Я хотел бы выделить две из них:

Для наглядности я собрал "песочницу" в Docker, в которой представлены упрощенные примеры пайплайнов, аналогичные тем, которые были использованы в реальном проекте. Вы можете найти ее по ссылке: GitHub репозиторий.

Подсказка: Чтобы развернуть песочницу, необходимо установить Docker и скачать весь репозиторий целиком. Затем в командной строке, находясь в папке с скопированным репозиторием, выполните команду "docker-compose up". В папке "GLOBAL_SHARE/NiFi/conf/" содержится конфигурация Apache NiFi со всеми настройками, которая будет применена к образу в Docker.

Описание задачи

Изначально заказчик использовал BI-платформу Qlik Sense, однако для обеспечения независимости от внешних факторов принял решение перейти на Apache Superset. У заказчика имелись специалисты, способные разрабатывать отчеты в Apache Superset. 

Однако отсутствовала собственная команда разработчиков для интеграции данных. Было необходимо найти гибкое решение с открытым исходным кодом, обеспечивающее возможность масштабирования. После анализа всех факторов для реализации проекта был выбран Apache NiFi.

Помимо вышеперечисленного, проект имел ряд особенностей:

  • Большое количество источников: Существовало множество источников данных, требующих интеграции.

  • Ограниченный доступ к источникам: Некоторые источники располагались на стороне партнеров и имели ограниченный доступ.

  • Низкая отказоустойчивость источников: Некоторые источники данных имели низкую отказоустойчивость, что могло привести к проблемам при загрузке.

  • Необходимость потоковой загрузки данных: Требовалась возможность загрузки данных в реальном времени для оперативного анализа.

  • Наличие источников большого объема: Некоторые источники данных генерировали большие объемы информации, что требовало эффективной обработки.

Подготовка данных для отчетов осуществлялась по следующей логике:

  1. Apache NiFi загружал данные из множества различных источников в хранилище GreenPlum.

  2. Обработка данных выполнялась в GreenPlum и оркестрировалась через Apache Airflow.

  3. Apache Superset взаимодействовал с GreenPlum и визуализировал подготовленные витрины данных.

Может возникнуть вопрос, почему на проекте использовались два ETL инструмента и почему нельзя было ограничиться только Apache Airflow. Это связано с уникальными особенностями Apache NiFi.

Особенности Apache NiFi

Apache NiFi - это мощный конструктор для создания пайплайнов загрузки данных, обладающий следующими особенностями:

  • Повторное использование пайплайнов: Инструмент позволяет легко подготовить типовые пайплайны загрузки данных и многократно переиспользовать их с минимальными изменениями, что существенно ускоряет процесс разработки.

  • Группировка пайплайнов в процессные группы: Пайплайны могут быть логически сгруппированы для удобства управления и масштабирования. Этот подход позволяет легко копировать пайплайны с незначительными корректировками через удобный пользовательский интерфейс.

  • Простота использования интерфейса: Для пользователей это означает более дружелюбный опыт работы по сравнению с написанием кода в Apache Airflow. Это позволяет упростить передачу разработок на сопровождение заказчику.

  • Легкая локализация и решение проблем: Даже пользователь с минимальными навыками может быстро определить и решить проблему в пайплайне самостоятельно или передать разработчику, благодаря интуитивно понятному интерфейсу и возможностям быстрой диагностики.

  • Защита от случайных действий пользователя: Инструмент обладает высокой степенью защиты от ошибок пользователей, что снижает риск непреднамеренных сбоев и повреждения данных.

  • Отслеживание изменений и их последствий: С помощью Apache NiFi можно быстро отследить внесенные изменения и их последствия, что обеспечивает прозрачность процесса разработки и управления данными.

Хотя Apache NiFi имеет множество преимуществ, он не идеально подходит для оркестрации запуска функций расчета витрин. Поэтому на проекте было принято решение использовать оба инструмента в соответствующих областях, что позволило получить оптимальные типовые пайплайны в Apache NiFi и Apache Airflow, подробно описанные и легко управляемые как конструкторы.

Пример решения задачи клиента

В собранной мной “песочнице” представлен упрощенный пример процесса ETL из реального проекта. Вот как он работает:

Источником данных является база данных source_db. В схеме source_schema содержатся две таблицы: flights и seats. Необходимо загрузить данные в базу данных dwh_postgres в схему dwh.

На рисунке два пайплайна:

  • Слева находится пайплайн для загрузки всего содержимого таблицы источника.

  • Справа представлен пайплайн для инкрементальной загрузки данных.

Каждый блок представляет собой процессор, выполняющий определенные функции.

Где без штриха отмечена полная загрузка, с штрихом - инкрементальная загрузка.

Для работы с JOLT рекомендую очень удобный сайт JOLT Demo

Архитектурные решения

Одним из основных критериев было быстрое и качественное подключение новых источников с минимальной экспертизой. Для этого были применены следующие решения.

Один источник — одна процессная группа

Чтобы не путаться в источниках, было решено выстроить следующую логику:

  • В головной процессной группе создается группа проекта.

  • В проектной группе для каждого источника создается отдельная дочерняя группа.

  • В группах источников для каждой таблицы создается группа с пайплайном загрузки данных.

Это решение позволяет при подключении нового источника копировать ранее созданные группы целиком с минимальными изменениями. Это упрощает отслеживание показателей процессоров и очередей как с помощью системы мониторинга, так и через пользовательский интерфейс NiFi.

Такой подход представлен в песочнице:

NiFi Flow → головная процессная группа

For_habr → проектная группа

source_db → группа источника

Flights_table и seats_table → группы таблиц

Parameter Contexts

Параметры контекста позволяют задать переменные, которые используются как переменные в процессной группе.

Редактирование параметров ограничено специальными правами. На проекте для всех источников параметры контекста были унифицированы. При необходимости подключить новый источник применялся уже созданный ранее параметр или создавался новый с согласованными именами переменных.

Это позволило копировать процессные группы целиком и значительно снизило ошибки пользователей.

В песочнице приведен пример, когда для источника source_db создается свой параметр контекста, где хранится информация о настройках баз данных и авторизации.

Variables

Для обеспечения универсальности пайплайнов кроме переменных из параметров контекста также используются переменные для групп таблиц.

В зависимости от особенностей могут быть добавлены дополнительные переменные, что позволяет расширить возможности для модификации пайплайнов. Хороший пример использования переменных - процессор "PutFile". Путь для сохранения CSV файла указывается только с использованием переменных.

#{NiFi_share_path}/#{db_source_name}/${source_table_name}

Для каждого источника и таблицы будет создана отдельная папка для хранения временных файлов, что улучшит отслеживаемость потенциальных проблем и объемов загрузки.

Controller services

Контроллеры создаются на уровне группы источника. Это позволяет переиспользовать ранее созданные контроллеры при копировании во всех пайплайнах загрузки таблиц.

Проблемы и решения

Кроме тривиальных советов об установке максимального времени ожидания при запросах к базам данных хочу поделиться решениями для других, менее очевидных проблем.

Зависания процессоров

Источники на проекте работали крайне нестабильно. Одной из проблем было подвисание процессоров без появления ошибок. Эта проблема возникала не регулярно и казалась случайной. Однако выяснилось, что проблема заключалась в SQL сессиях на стороне источника. Решением стало установка проверочного запроса в контроллерах.

Такая настройка потенциально может снизить производительность, однако при тестировании не было обнаружено каких-либо замедлений.

Потерянные CSV файлы с данными

В песочнице данные загружаются через промежуточную выгрузку в CSV файл. При этом имя файла соответствует атрибуту filename. Для подобных операций лучше использовать атрибут UUID, так как он уникален для каждого объекта в NiFi. Например, если будет использован процессор SplitText для разделения содержимого FlowFile на фрагменты, то у всех файлов после деления будет одинаковый атрибут filename. Это может привести к проблемам при сохранении, загрузке и удалении файлов.

Промежуточные запросы к БД

В процессоре 1 с типом ExecuteSQLRecord есть возможность настроить предварительный запрос, основной запрос и последующий запрос. Однако эта настройка не всегда оказывается оптимальной, так как запросы выполняются последовательно, и при этом ошибки в процессоре иногда не фиксируются. В связи с этим было принято решение использовать только поле с основным запросом и, при необходимости, указывать запросы последовательно через точку с запятой.

Ошибочные очереди

Рекомендую всегда создавать очереди для ошибок. Это поможет избежать потери данных при возникновении ошибок и обнаружить проблемы через систему мониторинга или пользовательский интерфейс NiFi.

Можно комбинировать ошибочные очереди с процессом добавления задержки. Например, после возникновения ошибки в процессоре PutFile FlowFile может быть направлен в процессор UpdateAttribute, где устанавливается расписание запуска каждые 1-5 секунд и добавляется атрибут с датой ошибки. Таким образом, при возникновении ошибок FlowFile будет отправляться на переразбор с задержкой, что снизит нагрузку.

Однако у этого метода есть серьезный недостаток. Если очередь ошибок из процессора PutFile переполнится, даже после решения проблемы процессор не будет обрабатывать новые FlowFile, поскольку при возникновении ошибок он не сможет переместить их в ошибочную очередь. Можно увеличить лимит очереди, чтобы избежать переполнения, но это может потребовать ручной настройки и могут возникнуть проблемы с масштабированием.

Параметры запуска процессоров после восстановления

Было принято решение изменить настройку по умолчанию, чтобы в случае падения NiFi всем процессорам устанавливался статус Stop, вместо сохранения последнего статуса до падения. Это решение было принято после нескольких ситуаций, когда пользователи не указывали условия в SQL запросе, что приводило к нехватке памяти и падению сервиса. После восстановления процессоры с некорректной настройкой снова запускались, что приводило к повторному падению сервиса.

Защищенные поля в переменных при создании из шаблона

В NiFi есть возможность сделать поля в параметрах контекста и переменных защищенными. Поля с паролями в контроллерах всегда защищены по умолчанию.

Значение защищенных полей не сохраняется в шаблон для безопасности.

Поэтому при создании процессной группы из шаблона в защищенных поля будут очищены. Например, если потребуется загрузить из шаблона группу источника, то пароль в контроллерах в виде параметра контекста #{db_source_password} будет очищен. Пользователю придется заново ввести переменную в поле с паролем.

Цветовая дифференциация процессоров

Для удобства пользователей процессоры могут быть выделены цветом

В нашем проекте мы решили выделять цветом процессоры для загрузки и выгрузки данных, что позволяло пользователям быстрее ориентироваться в процессе работы.

Результаты внедрения

Этот случай демонстрирует, как NiFi может стать надежным и долгосрочным инструментом в процессе ETL на стороне заказчика. За более чем год использования не было необходимости в значительных изменениях или модификациях, что свидетельствует о стабильной и эффективной работе платформы.

Благодаря значительному запасу по отказоустойчивости и доступности, NiFi стал ключевым инструментом для решения всех задач по загрузке данных заказчика. Заказчик успешно добавлял новые источники данных в систему, несмотря на возможные сбои в инфраструктуре или ошибки со стороны пользователей. Это свидетельствует о гибкости и надежности NiFi, который успешно справлялся с задачами загрузки и обработки данных, обеспечивая стабильную работу всей системы.

Если заказчику требовалась разработка новых пайплайнов для решения новых задач, то он передавал конкретные требования на разработку. Новые типовые пайплайны интегрировались в уже существующую систему и позволяли значительно расширить уже имеющуюся функциональность

Важным преимуществом является масштабируемость NiFi, которая позволяет уверенно добавлять новые источники данных, не оказывая негативного влияния на скорость загрузки и обработки информации. Это обеспечивает заказчику возможность дальнейшего развития и роста без необходимости в поиске и внедрении альтернативных решений.

P.S. Бонусом добавил контейнер с Airflow, — теперь у вас есть полноценный тестовый стенд для практики и реализации своих идей.

Теги:
Хабы:
Всего голосов 11: ↑11 и ↓0+12
Комментарии10

Публикации

Истории

Ближайшие события

2 – 18 декабря
Yandex DataLens Festival 2024
МоскваОнлайн
11 – 13 декабря
Международная конференция по AI/ML «AI Journey»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань