Pull to refresh
618.02
Альфа-Банк
Лучший мобильный банк по версии Markswebb

Знакомство с Apache Airflow: установка и запуск первого DAGа

Reading time10 min
Views31K

Привет! Меня зовут Алексей Карпов, я прикладной администратор (MLOps) отдела сопровождения моделей машинного обучения в Альфа-Банке. Хочу поделиться опытом в работе с Apache Airflow. Расскажу, как установить интерпретатор Python и сам Airflow, а также как отладить его работу. Всё это — на примере запуска простейшего DAGа. 

Моя статья будет полезна дата-инженерам и разработчикам, которым необходимо автоматизировать запуск скриптов и установить возможность автоматического выполнения по расписанию.

В этой статье:

вы познакомитесь с основными понятиями Airflow;

научитесь собирать интерпретатор Python из исходного кода;

установите базу данных PostgreSQL;

создадите сервисы для Airflow;

запустите первый DAG.

Что такое Apache Airflow

Apache Airflow — это решение с открытым исходным кодом. Это оркестратор, который позволяет наладить разработку, планирование и мониторинг сложных рабочих процессов. Также он работает как планировщик ETL/ELT-процессов и использует язык программирования Python.

В основе концепции Airflow лежит DAG — направленный ациклический граф. Он описывает процессы обработки данных и позволяет объединять задачи, определяя правила их совместной работы.

Вот базовый пример DAG:

Он описывает четыре задачи — A, B, C и D и определяет порядок, в котором они должны выполняться, а также настраивает взаимосвязи между задачами. Можно указать, как часто требуется запускать DAG: например, «каждые 5 минут, начиная с завтрашнего дня» или «каждый день с 1 января 2023 года». Почитать об этом подробнее можно в документации

Airflow состоит из нескольких компонентов, но главные из них — scheduler и webserver. Без них ничего не запустится.

Scheduler (планировщик) отслеживает все задачи и DAGs, а затем запускает экземпляры задач после установки их зависимостей. В фоновом режиме планировщик запускает подпроцесс, который отслеживает и синхронизирует все DAGs в указанном каталоге. По умолчанию он раз в минуту собирает результаты синтаксического анализа DAGs и проверяет, необходимо ли запустить какие-либо активные задачи. Почитать об этом подробнее можно в документации

WebServer (веб-сервер) отвечает за отображение веб-интерфейса и аутентификацию пользователей, а также решает, к какой группе должен относиться тот или иной пользователь в соответствии с конфигурационным файлом. И снова ссылка на документацию

Установка и запуск

Рассмотрим процесс установки и отладки этого чудесного инструмента, существенно облегчающего жизнь разработчика. 

Допустим, вам необходимо выполнять определённые шаги с каким-то интервалом. Для автоматизации этой рутинной работы прекрасно подходит Airflow (но на самом деле его функциональность намного шире). Работа проводится на CentOS Stream 8 с правами суперпользователя. Также есть настроенный внешний firewall, поэтому использовать тот, который предлагает операционная система, нет смысла. Его можно отключить вместе с SELinux:

sudo su
systemctl disable firewalld && systemctl stop firewalld 
setenforce 0 && sed -i 's/^SELINUX=.*/SELINUX=disabled/g' /etc/selinux/config

Для дальнейшей работы понадобится установленный интерпретатор python последней версии. В репозиториях его нет, но исходный код наиболее свежей версии доступен на официальном сайте языка. На момент написания статьи наиболее свежая стабильная версия — 3.10.10.

Перед началом нужно установить пакеты, необходимые для сборки python из исходного кода:

dnf -y install gcc \
               make \
               openssl-devel \
               bzip2-devel \
               libffi-devel \
               wget \
               sqlite-devel \
               zlib-devel \
               ncurses-devel \
               gdbm-devel \
               readline-devel\
               tar

Скачиваем исходный код python и распаковываем архив:

PYTHON_VERSION="3.10.10"
wget https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgz
tar xf Python-$PYTHON_VERSION.tgz

Переходим в директорию с исходным кодом и проводим конфигурирование файлов:

cd Python-$PYTHON_VERSION
./configure --enable-optimizations

Проведём компиляцию: 

make -j $(nproc)

Устанавливаем python:

make altinstall

Правилом хорошего тона будет оставить исходный код собранный из исходных файлов утилит: в случае, если будет необходимо удалить утилиты, понадобится директория, откуда всё собиралось. 

Запретим изменение и удаление файлов. Для этого выполним команду:

chattr +i ../Python-$PYTHON_VERSION

Проверяем версию python:

python3.10 --version

Теперь у вас на сервере установлен интерпретатор Python актуальной на данный момент версии.

Установка базы данных PostgreSQL-15

По умолчанию Airflow работает с базой данных SQLite. Её функциональности вполне достаточно для пробного запуска и демонстрации процесса работы, но не хватит для использования в Production-среде, поскольку с этой базой данных невозможно распараллеливание задач. Один из способов решения данной проблемы — использование базы данных PostgreSQL.

В стандартных репозиториях уже содержится встроенный модуль PostgreSQL, но более старой версии. Отключим его для избежания конфликтов версий:

dnf -qy module disable postgresql

Установим последнюю версию репозитория PostgreSQL:

dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm

Установим сами пакеты PostgreSQL:

dnf install -y postgresql15-server postgresql15-contrib

После установки пакетов для экономии места на сервере необходимо очистить кэш пакетов DNF:

dnf clean all

Инициализируем базу данных PostgreSQL и добавим её в автозагрузку:

/usr/pgsql-15/bin/postgresql-15-setup initdb
systemctl enable --now postgresql-15

Создадим базу данных Airflow и пользователя к ней. Предоставим пользователю права на БД:

sudo -iu postgres psql -c "CREATE DATABASE airflow;"
sudo -iu postgres psql -c "CREATE USER airflow WITH PASSWORD 'alfa-digital';"
sudo -iu postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;"

Для обеспечения безопасности создадим непривилегированного системного пользователя airflow, под которым будут работать непосредственно сами сервисы:

useradd airflow

Получим права пользователя airflow:

su - airflow

В Python есть правило: для каждого приложения используется своё виртуальное окружение. Это позволит избежать проблем с зависимостями в дальнейшей работе.

Создадим виртуальное окружение и активируем его:

python3.10 -m venv .venv
source .venv/bin/activate

Установим необходимые библиотеки. Для экономии места на сервере используем ключ --no-cache-dir:

pip3.10 install --no-cache-dir apache-airflow psycopg2-binary virtualenv kubernetes

Чтобы не прописывать переменные каждый раз, добавим их в файл .bashrc. Это позволит не прописывать переменные при каждом входе в систему.

cat >> /home/airflow/.bashrc << EOF
export AIRFLOW_HOME=/home/airflow
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:alfa-digital@localhost/airflow
export AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Europe/Moscow
EOF
source ~/.bashrc

Проведём миграции базы данных Airflow. Также будут созданы все необходимые конфигурационные файлы для работы:

airflow db init

Создадим пользователя для работы с UI Apache Airflow:

airflow users create --username admin \
                     --firstname Aleksey\
                     --lastname Karpov \
                     --role Admin \
                     --email akarpov5@alfabank.ru \
                     --password alfa-digital

Поправим конфигурационный файл airflow.cfg:

sed -i '/executor/s/SequentialExecutor/LocalExecutor/' /home/airflow/airflow.cfg 

Список пользователей Apache Airflow можно посмотреть командой:

airflow users list

Выйдем из пользователя Airflow:

exit

Теперь создадим хук, который позволяет сохранять права на pid-файл Airflow, а также сервисы для scheduler и webserver. Воспользуемся официальным репозиторием:

curl https://raw.githubusercontent.com/apache/airflow/main/scripts/systemd/airflow.conf -o /etc/tmpfiles.d/airflow.conf
curl https://raw.githubusercontent.com/apache/airflow/main/scripts/systemd/airflow-webserver.service -o /etc/systemd/system/airflow-webserver.service
curl https://raw.githubusercontent.com/apache/airflow/main/scripts/systemd/airflow-scheduler.service -o /etc/systemd/system/airflow-scheduler.service

Учитывая то, что Airflow работает в виртуальном окружении, поправим файлы сервисов:

sed -i 's/\/bin/\/home\/airflow\/.venv\/bin/g' /etc/systemd/system/airflow-scheduler.service /etc/systemd/system/airflow-webserver.service

Создадим файл для переменных среды, чтобы они автоматически подключались во время работы сервисов:

cat > /etc/sysconfig/airflow << EOF
AIRFLOW_HOME=/home/airflow
AIRFLOW_CONFIG=/home/airflow/airflow.cfg
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:alfa-digital@localhost/airflow
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Europe/Moscow
PATH=/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/bin:/home/airflow/.venv/bin
EOF

Дадим права на данный файл пользователю airflow, иначе сервисы не смогут его прочитать:

chown airflow:airflow /etc/sysconfig/airflow

Перезагрузим демон systemctl, чтобы изменения подтянулись системой:

systemctl daemon-reload

Запустим сервисы Apache Airflow и добавим их в автозагрузку:

systemctl enable --now airflow-scheduler
systemctl enable --now airflow-webserver

Теперь у вас на сервере есть полноценный Airflow, который будет автоматически включаться при старте системы.

Дебаг

Что-то не работает? Занимаемся дебагом!

Посмотрим состояние WebServer:

systemctl status airflow-webserver -l

Вывод:

airflow-webserver.service - Airflow webserver daemon
   Loaded: loaded (/etc/systemd/system/airflow-webserver.service; enabled; vendor preset: disabled)
   Active: active (running) since Sat 2022-07-09 17:01:02 MSK; 2h 38min ago
 Main PID: 6542 (airflow)
    Tasks: 6 (limit: 23630)
   Memory: 592.1M
   CGroup: /system.slice/airflow-webserver.service
           ├─ 6542 /home/airflow/.venv/bin/python3.10 /home/airflow/.venv/bin/airflow webserver --pid /run/airflow/webserver.pid
           ├─ 6594 gunicorn: master [airflow-webserver]
           ├─ 6603 [ready] gunicorn: worker [airflow-webserver]
           ├─ 6604 [ready] gunicorn: worker [airflow-webserver]
           ├─ 6605 [ready] gunicorn: worker [airflow-webserver]
           └─47985 [ready] gunicorn: worker [airflow-webserver]

Посмотрим состояние Scheduler:

systemctl status airflow-scheduler -l

Вывод:

airflow-scheduler.service - Airflow scheduler daemon
   Loaded: loaded (/etc/systemd/system/airflow-scheduler.service; enabled; vendor preset: disabled)
   Active: active (running) since Sat 2022-07-09 17:00:56 MSK; 2h 39min ago
 Main PID: 6286 (/home/airflow/.)
    Tasks: 72 (limit: 23630)
   Memory: 1.7G
   CGroup: /system.slice/airflow-scheduler.service
           ├─6286 /home/airflow/.venv/bin/python3.10 /home/airflow/.venv/bin/airflow scheduler
           ├─6288 gunicorn: master [gunicorn]
           ├─6289 gunicorn: worker [gunicorn]
           ├─6290 airflow executor -- LocalExecutor
           ├─6293 gunicorn: worker [gunicorn]
           ├─6299 airflow worker -- LocalExecutor
           ├─6301 airflow worker -- LocalExecutor
           ├─6303 airflow worker -- LocalExecutor
           ├─6305 airflow worker -- LocalExecutor
           ├─6308 airflow worker -- LocalExecutor
           ├─6310 airflow worker -- LocalExecutor
           ├─6313 airflow worker -- LocalExecutor
           ├─6314 airflow worker -- LocalExecutor
           ├─6318 airflow worker -- LocalExecutor
           ├─6319 airflow worker -- LocalExecutor
           ├─6322 airflow worker -- LocalExecutor
           ├─6325 airflow worker -- LocalExecutor
           ├─6326 airflow worker -- LocalExecutor
           ├─6329 airflow worker -- LocalExecutor
           ├─6331 airflow worker -- LocalExecutor
           ├─6334 airflow worker -- LocalExecutor
           ├─6338 airflow worker -- LocalExecutor
           ├─6339 airflow worker -- LocalExecutor
           ├─6342 airflow worker -- LocalExecutor
           ├─6343 airflow worker -- LocalExecutor
           ├─6346 airflow worker -- LocalExecutor
           ├─6347 airflow worker -- LocalExecutor
           ├─6349 airflow worker -- LocalExecutor
           ├─6350 airflow worker -- LocalExecutor
           ├─6353 airflow worker -- LocalExecutor
           ├─6354 airflow worker -- LocalExecutor
           ├─6355 airflow worker -- LocalExecutor
           ├─6359 airflow worker -- LocalExecutor
           ├─6361 airflow worker -- LocalExecutor
           ├─6366 airflow worker -- LocalExecutor
           ├─6372 airflow worker -- LocalExecutor
           ├─6373 airflow worker -- LocalExecutor
           └─6375 airflow scheduler -- DagFileProcessorManager

Посмотрим последние логи:

journalctl -eu airflow-webserver
journalctl -eu airflow-scheduler

Запуск DAG

Веб-сервер можно открыть напрямую в браузере. Он будет доступен по порту 8080, например:

http://192.168.232.32:8080

Введём данные пользователя, который был создан выше, и нажмём кнопку Sign In.

После входа вам будет представлен набор DAGs, предназначенных для демонстрации функциональности Airflow. Зайдём в DAG example_bash_operator:

На рисунке представлен непосредственно код самого DAG. Теперь запустим его:

Посмотрим на граф DAGа. Жмём на Graph:

Посмотрим, как отработал DAG. Для этого нажмём Grid:

Увидим, что DAG выполнился успешно. Зелёным цветом обозначено то, что должно было быть исполнено, а розовым — то, что не должно:


10Итак, вы узнали, что такое Apache Airflow: познакомились с его основными понятиями, научились собирать интерпретатор Python из исходного кода, установили базу данных PostgreSQL, создали сервисы для Airflow, а также запустили первый DAG.

В следующей статье я расскажу о видах Airflow Operators, чем они отличаются, для чего они нужны и как ими пользоваться.

Tags:
Hubs:
+6
Comments6

Articles

Information

Website
digital.alfabank.ru
Registered
Founded
1990
Employees
over 10,000 employees
Location
Россия
Representative
София Никитина