Привет! Меня зовут Алексей Карпов, я прикладной администратор (MLOps) отдела сопровождения моделей машинного обучения в Альфа-Банке. Хочу поделиться опытом в работе с Apache Airflow. Расскажу, как установить интерпретатор Python и сам Airflow, а также как отладить его работу. Всё это — на примере запуска простейшего DAGа.
Моя статья будет полезна дата-инженерам и разработчикам, которым необходимо автоматизировать запуск скриптов и установить возможность автоматического выполнения по расписанию.
В этой статье:
вы познакомитесь с основными понятиями Airflow;
научитесь собирать интерпретатор Python из исходного кода;
установите базу данных PostgreSQL;
создадите сервисы для Airflow;
запустите первый DAG.
Что такое Apache Airflow
Apache Airflow — это решение с открытым исходным кодом. Это оркестратор, который позволяет наладить разработку, планирование и мониторинг сложных рабочих процессов. Также он работает как планировщик ETL/ELT-процессов и использует язык программирования Python.
В основе концепции Airflow лежит DAG — направленный ациклический граф. Он описывает процессы обработки данных и позволяет объединять задачи, определяя правила их совместной работы.
Вот базовый пример DAG:
Он описывает четыре задачи — A, B, C и D и определяет порядок, в котором они должны выполняться, а также настраивает взаимосвязи между задачами. Можно указать, как часто требуется запускать DAG: например, «каждые 5 минут, начиная с завтрашнего дня» или «каждый день с 1 января 2023 года». Почитать об этом подробнее можно в документации.
Airflow состоит из нескольких компонентов, но главные из них — scheduler и webserver. Без них ничего не запустится.
Scheduler (планировщик) отслеживает все задачи и DAGs, а затем запускает экземпляры задач после установки их зависимостей. В фоновом режиме планировщик запускает подпроцесс, который отслеживает и синхронизирует все DAGs в указанном каталоге. По умолчанию он раз в минуту собирает результаты синтаксического анализа DAGs и проверяет, необходимо ли запустить какие-либо активные задачи. Почитать об этом подробнее можно в документации.
WebServer (веб-сервер) отвечает за отображение веб-интерфейса и аутентификацию пользователей, а также решает, к какой группе должен относиться тот или иной пользователь в соответствии с конфигурационным файлом. И снова ссылка на документацию.
Установка и запуск
Рассмотрим процесс установки и отладки этого чудесного инструмента, существенно облегчающего жизнь разработчика.
Допустим, вам необходимо выполнять определённые шаги с каким-то интервалом. Для автоматизации этой рутинной работы прекрасно подходит Airflow (но на самом деле его функциональность намного шире). Работа проводится на CentOS Stream 8 с правами суперпользователя. Также есть настроенный внешний firewall, поэтому использовать тот, который предлагает операционная система, нет смысла. Его можно отключить вместе с SELinux:
sudo su
systemctl disable firewalld && systemctl stop firewalld
setenforce 0 && sed -i 's/^SELINUX=.*/SELINUX=disabled/g' /etc/selinux/config
Для дальнейшей работы понадобится установленный интерпретатор python последней версии. В репозиториях его нет, но исходный код наиболее свежей версии доступен на официальном сайте языка. На момент написания статьи наиболее свежая стабильная версия — 3.10.10.
Перед началом нужно установить пакеты, необходимые для сборки python из исходного кода:
dnf -y install gcc \
make \
openssl-devel \
bzip2-devel \
libffi-devel \
wget \
sqlite-devel \
zlib-devel \
ncurses-devel \
gdbm-devel \
readline-devel\
tar
Скачиваем исходный код python и распаковываем архив:
PYTHON_VERSION="3.10.10"
wget https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgz
tar xf Python-$PYTHON_VERSION.tgz
Переходим в директорию с исходным кодом и проводим конфигурирование файлов:
cd Python-$PYTHON_VERSION
./configure --enable-optimizations
Проведём компиляцию:
make -j $(nproc)
Устанавливаем python:
make altinstall
Правилом хорошего тона будет оставить исходный код собранный из исходных файлов утилит: в случае, если будет необходимо удалить утилиты, понадобится директория, откуда всё собиралось.
Запретим изменение и удаление файлов. Для этого выполним команду:
chattr +i ../Python-$PYTHON_VERSION
Проверяем версию python:
python3.10 --version
Теперь у вас на сервере установлен интерпретатор Python актуальной на данный момент версии.
Установка базы данных PostgreSQL-15
По умолчанию Airflow работает с базой данных SQLite. Её функциональности вполне достаточно для пробного запуска и демонстрации процесса работы, но не хватит для использования в Production-среде, поскольку с этой базой данных невозможно распараллеливание задач. Один из способов решения данной проблемы — использование базы данных PostgreSQL.
В стандартных репозиториях уже содержится встроенный модуль PostgreSQL, но более старой версии. Отключим его для избежания конфликтов версий:
dnf -qy module disable postgresql
Установим последнюю версию репозитория PostgreSQL:
dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm
Установим сами пакеты PostgreSQL:
dnf install -y postgresql15-server postgresql15-contrib
После установки пакетов для экономии места на сервере необходимо очистить кэш пакетов DNF:
dnf clean all
Инициализируем базу данных PostgreSQL и добавим её в автозагрузку:
/usr/pgsql-15/bin/postgresql-15-setup initdb
systemctl enable --now postgresql-15
Создадим базу данных Airflow и пользователя к ней. Предоставим пользователю права на БД:
sudo -iu postgres psql -c "CREATE DATABASE airflow;"
sudo -iu postgres psql -c "CREATE USER airflow WITH PASSWORD 'alfa-digital';"
sudo -iu postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;"
Для обеспечения безопасности создадим непривилегированного системного пользователя airflow, под которым будут работать непосредственно сами сервисы:
useradd airflow
Получим права пользователя airflow:
su - airflow
В Python есть правило: для каждого приложения используется своё виртуальное окружение. Это позволит избежать проблем с зависимостями в дальнейшей работе.
Создадим виртуальное окружение и активируем его:
python3.10 -m venv .venv
source .venv/bin/activate
Установим необходимые библиотеки. Для экономии места на сервере используем ключ --no-cache-dir
:
pip3.10 install --no-cache-dir apache-airflow psycopg2-binary virtualenv kubernetes
Чтобы не прописывать переменные каждый раз, добавим их в файл .bashrc
. Это позволит не прописывать переменные при каждом входе в систему.
cat >> /home/airflow/.bashrc << EOF
export AIRFLOW_HOME=/home/airflow
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:alfa-digital@localhost/airflow
export AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Europe/Moscow
EOF
source ~/.bashrc
Проведём миграции базы данных Airflow. Также будут созданы все необходимые конфигурационные файлы для работы:
airflow db init
Создадим пользователя для работы с UI Apache Airflow:
airflow users create --username admin \
--firstname Aleksey\
--lastname Karpov \
--role Admin \
--email akarpov5@alfabank.ru \
--password alfa-digital
Поправим конфигурационный файл airflow.cfg
:
sed -i '/executor/s/SequentialExecutor/LocalExecutor/' /home/airflow/airflow.cfg
Список пользователей Apache Airflow можно посмотреть командой:
airflow users list
Выйдем из пользователя Airflow:
exit
Теперь создадим хук, который позволяет сохранять права на pid-файл Airflow, а также сервисы для scheduler и webserver. Воспользуемся официальным репозиторием:
curl https://raw.githubusercontent.com/apache/airflow/main/scripts/systemd/airflow.conf -o /etc/tmpfiles.d/airflow.conf
curl https://raw.githubusercontent.com/apache/airflow/main/scripts/systemd/airflow-webserver.service -o /etc/systemd/system/airflow-webserver.service
curl https://raw.githubusercontent.com/apache/airflow/main/scripts/systemd/airflow-scheduler.service -o /etc/systemd/system/airflow-scheduler.service
Учитывая то, что Airflow работает в виртуальном окружении, поправим файлы сервисов:
sed -i 's/\/bin/\/home\/airflow\/.venv\/bin/g' /etc/systemd/system/airflow-scheduler.service /etc/systemd/system/airflow-webserver.service
Создадим файл для переменных среды, чтобы они автоматически подключались во время работы сервисов:
cat > /etc/sysconfig/airflow << EOF
AIRFLOW_HOME=/home/airflow
AIRFLOW_CONFIG=/home/airflow/airflow.cfg
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:alfa-digital@localhost/airflow
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Europe/Moscow
PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/bin:/home/airflow/.venv/bin
EOF
Дадим права на данный файл пользователю airflow, иначе сервисы не смогут его прочитать:
chown airflow:airflow /etc/sysconfig/airflow
Перезагрузим демон systemctl
, чтобы изменения подтянулись системой:
systemctl daemon-reload
Запустим сервисы Apache Airflow и добавим их в автозагрузку:
systemctl enable --now airflow-scheduler
systemctl enable --now airflow-webserver
Теперь у вас на сервере есть полноценный Airflow, который будет автоматически включаться при старте системы.
Дебаг
Что-то не работает? Занимаемся дебагом!
Посмотрим состояние WebServer:
systemctl status airflow-webserver -l
Вывод:
airflow-webserver.service - Airflow webserver daemon
Loaded: loaded (/etc/systemd/system/airflow-webserver.service; enabled; vendor preset: disabled)
Active: active (running) since Sat 2022-07-09 17:01:02 MSK; 2h 38min ago
Main PID: 6542 (airflow)
Tasks: 6 (limit: 23630)
Memory: 592.1M
CGroup: /system.slice/airflow-webserver.service
├─ 6542 /home/airflow/.venv/bin/python3.10 /home/airflow/.venv/bin/airflow webserver --pid /run/airflow/webserver.pid
├─ 6594 gunicorn: master [airflow-webserver]
├─ 6603 [ready] gunicorn: worker [airflow-webserver]
├─ 6604 [ready] gunicorn: worker [airflow-webserver]
├─ 6605 [ready] gunicorn: worker [airflow-webserver]
└─47985 [ready] gunicorn: worker [airflow-webserver]
Посмотрим состояние Scheduler:
systemctl status airflow-scheduler -l
Вывод:
airflow-scheduler.service - Airflow scheduler daemon
Loaded: loaded (/etc/systemd/system/airflow-scheduler.service; enabled; vendor preset: disabled)
Active: active (running) since Sat 2022-07-09 17:00:56 MSK; 2h 39min ago
Main PID: 6286 (/home/airflow/.)
Tasks: 72 (limit: 23630)
Memory: 1.7G
CGroup: /system.slice/airflow-scheduler.service
├─6286 /home/airflow/.venv/bin/python3.10 /home/airflow/.venv/bin/airflow scheduler
├─6288 gunicorn: master [gunicorn]
├─6289 gunicorn: worker [gunicorn]
├─6290 airflow executor -- LocalExecutor
├─6293 gunicorn: worker [gunicorn]
├─6299 airflow worker -- LocalExecutor
├─6301 airflow worker -- LocalExecutor
├─6303 airflow worker -- LocalExecutor
├─6305 airflow worker -- LocalExecutor
├─6308 airflow worker -- LocalExecutor
├─6310 airflow worker -- LocalExecutor
├─6313 airflow worker -- LocalExecutor
├─6314 airflow worker -- LocalExecutor
├─6318 airflow worker -- LocalExecutor
├─6319 airflow worker -- LocalExecutor
├─6322 airflow worker -- LocalExecutor
├─6325 airflow worker -- LocalExecutor
├─6326 airflow worker -- LocalExecutor
├─6329 airflow worker -- LocalExecutor
├─6331 airflow worker -- LocalExecutor
├─6334 airflow worker -- LocalExecutor
├─6338 airflow worker -- LocalExecutor
├─6339 airflow worker -- LocalExecutor
├─6342 airflow worker -- LocalExecutor
├─6343 airflow worker -- LocalExecutor
├─6346 airflow worker -- LocalExecutor
├─6347 airflow worker -- LocalExecutor
├─6349 airflow worker -- LocalExecutor
├─6350 airflow worker -- LocalExecutor
├─6353 airflow worker -- LocalExecutor
├─6354 airflow worker -- LocalExecutor
├─6355 airflow worker -- LocalExecutor
├─6359 airflow worker -- LocalExecutor
├─6361 airflow worker -- LocalExecutor
├─6366 airflow worker -- LocalExecutor
├─6372 airflow worker -- LocalExecutor
├─6373 airflow worker -- LocalExecutor
└─6375 airflow scheduler -- DagFileProcessorManager
Посмотрим последние логи:
journalctl -eu airflow-webserver
journalctl -eu airflow-scheduler
Запуск DAG
Веб-сервер можно открыть напрямую в браузере. Он будет доступен по порту 8080, например:
http://192.168.232.32:8080
Введём данные пользователя, который был создан выше, и нажмём кнопку Sign In.
После входа вам будет представлен набор DAGs, предназначенных для демонстрации функциональности Airflow. Зайдём в DAG example_bash_operator
:
На рисунке представлен непосредственно код самого DAG. Теперь запустим его:
Посмотрим на граф DAGа. Жмём на Graph:
Посмотрим, как отработал DAG. Для этого нажмём Grid:
Увидим, что DAG выполнился успешно. Зелёным цветом обозначено то, что должно было быть исполнено, а розовым — то, что не должно:
10Итак, вы узнали, что такое Apache Airflow: познакомились с его основными понятиями, научились собирать интерпретатор Python из исходного кода, установили базу данных PostgreSQL, создали сервисы для Airflow, а также запустили первый DAG.
В следующей статье я расскажу о видах Airflow Operators, чем они отличаются, для чего они нужны и как ими пользоваться.