Оркестратор задач — инструмент или система, предназначенные для управления и контроля выполнением задач в сложных вычислительных и информационных средах. Он облегчают процесс развертывания, автоматизации и управления выполнением задач, что позволяет повысить эффективность работы и оптимизировать ресурсы.
Одним из популярных оркестратором задач является Apache Airflow. Он, как и все инструменты, имеет свои преимущества и недостатки, о которых пойдет речь в данной статье
Apache Airflow
Apache Airflow - это платформа для программирования, планирования и мониторинга рабочих процессов (Workflows). Он позволяет создавать и управлять сложными зависимостями между задачами, автоматизировать их выполнение и мониторить процесс обработки данных.
Airflow предоставляет графический интерфейс для создания, выполнения и мониторинга рабочих процессов, также он позволяет создавать рабочие процессы в форме направленных ацикличных графов (DAG), где каждый узел представляет собой отдельную задачу. Это обеспечивает удобство и гибкость в управлении задачами и зависимостями между ними.
Airflow широко используется для автоматизации распределенных вычислений, обработки данных, управления ETL (Extract, Transform, Load) процессами, планирования задач и других сценариев работы с данными.
По умолчанию Apache Airflow использует SQLite, как базу данных.
Один из способов использования Apache Airflow - docker файл.
docker-compose
version: '3.8' x-airflow-common: &airflow-common image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on postgres: condition: service_healthy services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s retries: 5 start_period: 5s restart: always airflow-webserver: <<: *airflow-common command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always depends_on: <<: *airflow-common-depends-on airflow-init: condition: service_completed_successfully airflow-init: <<: *airflow-common entrypoint: /bin/bash # yamllint disable rule:line-length command: - -c - | function ver() { printf "%04d%04d%04d%04d" $${1//./ } } airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) airflow_version_comparable=$$(ver $${airflow_version}) min_airflow_version=2.2.0 min_airflow_version_comparable=$$(ver $${min_airflow_version}) if (( airflow_version_comparable < min_airflow_version_comparable )); then echo echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" echo exit 1 fi if [[ -z "${AIRFLOW_UID}" ]]; then echo echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" echo "If you are on Linux, you SHOULD follow the instructions below to set " echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." echo "For other operating systems you can get rid of the warning with manually created .env file:" echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" echo fi one_meg=1048576 mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) disk_available=$$(df / | tail -1 | awk '{print $$4}') warning_resources="false" if (( mem_available < 4000 )) ; then echo echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" echo warning_resources="true" fi if (( cpus_available < 2 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" echo "At least 2 CPUs recommended. You have $${cpus_available}" echo warning_resources="true" fi if (( disk_available < one_meg * 10 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" echo warning_resources="true" fi if [[ $${warning_resources} == "true" ]]; then echo echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" echo "Please follow the instructions to increase amount of resources available:" echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" echo fi mkdir -p /sources/logs /sources/dags /sources/plugins chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} exec /entrypoint airflow version environment: <<: *airflow-common-env _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-qwerty12345!} _PIP_ADDITIONAL_REQUIREMENTS: '' user: "0:0" volumes: - ${AIRFLOW_PROJ_DIR:-.}:/sources airflow-cli: <<: *airflow-common profiles: - debug environment: <<: *airflow-common-env CONNECTION_CHECK_MAX_COUNT: "0" command: - bash - -c - airflow volumes: postgres-db-volume:
Несколько популярных операторов в Apache Airflow:
EmptyOperator
Этот оператор представляет собой очень простой оператор в Apache Airflow, который не выполняет никаких реальных действий.
Ожидание или задержка: можно использовать EmptyOperator для создания задержки между выполнением задач в вашем DAG.
Создание структуры DAG: может быть использован для создания структуры вашего DAG, когда у вас еще нет конкретных операций для некоторых частей рабочего процесса.
Использование в качестве заполнителя: может понадобиться создать «место действия» для определенных зависимостей
Тестирование: при разработке и тестировании DAG, вы можете использовать EmptyOperator, чтобы вставить временную заглушку вместо реальных операций.
Пример кода с EmptyOperator:
from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.empty import EmptyOperator default_my_args = { 'owner': 'PB_Academy', 'retries': 5 } with DAG ( dag_id = 'dag_empty', description='ДАГ с использованием bash оператора', schedule_interval='@once', default_args= default_my_args, start_date=days_ago(1) ) as dag: start_task = EmptyOperator( task_id = 'start_task' ) task_1_1 = EmptyOperator( task_id = 'task_1_1' ) task_1_2 = EmptyOperator( task_id = 'task_1_2' ) union_task = EmptyOperator( task_id = 'union' ) task_2_1 = EmptyOperator( task_id = 'task_2_1' ) task_2_2 = EmptyOperator( task_id = 'task_2_2' ) finish_task = EmptyOperator( task_id = 'finish_task' ) start_task >> [task_1_1, task_1_2] >> union_task >>[task_2_1, task_2_2] >> finish_task

Начиная с версии 2.3.0 Apache Airflow появился данный оператор. До этого существовал DummyOperator. В версии 2.3.4 разработчики призывают переходить на EmptyOperator.
BashOperator
Этот оператор используется для выполнения произвольных bash‑команд или скриптов. Он является универсальным инструментом для выполнения различных командных операций в рамках ваших рабочих процессов.
Пример кода с BashOperator:
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_my_args = { 'owner': 'PB_Academy', 'retries': 5, 'retry_delay': timedelta(minutes=1) } with DAG( dag_id='dag_bush', default_args= default_my_args, description='ДАГ с использованием bash оператора', start_date= datetime(2024, 5, 7, 16, 50, 0), schedule_interval='@daily' ) as dag: task1 = BashOperator( task_id = 'first_task', bash_command= 'mkdir PB_Academy' ) task2 = BashOperator( task_id = 'second_task', bash_command= 'touch tmp.txt' ) task3 = BashOperator( task_id = 'thrird_task', bash_command= 'echo Hello World' ) task1 >> task2 >> task3
PostgresOperator
Данный оператор позволяет выполнять SQL-запросы в базе данных PostgreSQL. Он предоставляет удобный способ выполнения различных операций с базой данных PostgreSQL в рамках ваших DAG.
Для того, чтобы установить PostgresOperator рекомендуем выполнить следующую команду:
pip install apache-airflow-providers-postgres
Пример кода с PostgresOperator:
from airflow import DAG from datetime import datetime, timedelta from airflow.providers.postgres.operators.postgres import PostgresOperator default_my_args = { 'owner': 'PB_Academy', 'retries': 5 } with DAG( dag_id = 'dag_postgres', schedule_interval='@once', default_args= default_my_args, start_date= datetime(2023, 12, 8) ) as dag: task_pg_create = PostgresOperator( task_id='task_pg_create', postgres_conn_id='postgres_default', sql='CREATE TABLE test_1(id SERIAL, date TIMESTAMP);' ) task_pg_insert = PostgresOperator( task_id='task_pg_insert', postgres_conn_id='postgres_default', sql='INSERT INTO test_1 (date) VALUES (localtimestamp);' ) task_pg_create >> task_pg_insert
PythonOperator
Этот оператор позволяет запускать произвольные функции Python в вашем DAG. Это может быть полезно, если вам нужно выполнить какой‑то кастомный код или манипуляции с данными в Python.
Для общения между задачами в рамках одного dag существует механизм — XCom. Это словарь, из которого можно брать данные. Как конфиги, так и результаты других задач. Данные в XCom могут быть как public, так и private. Push — отправить данные в словарь, pull — получить. Стоит добавить, что XCom не подчищает за собой данные, так что нужно побеспокоиться, чтобы после выполнения сессии dag удалял историю.
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_my_args = { 'owner': 'PB_Academy', 'retries': 5, 'retry_delay': timedelta(minutes=5) } def print_param(ti): sql = ti.xcom_pull(task_ids = 'get_param', key = 'SQL') nosql = ti.xcom_pull(task_ids = 'get_param', key = 'NoSQL') print(f"SQL database: {sql} \n NoSQL database: {nosql}") def get_param(ti): ti.xcom_push(key = 'SQL', value = ['MS SQL','PostgreSQL','Oracle']) ti.xcom_push(key = 'NoSQL', value = ['MongoDB','Redis','Elasticsearch']) with DAG( dag_id='dag_python', default_args= default_my_args, description='-', start_date= datetime(2023, 12, 8, 8,47), schedule_interval='@once' ) as dag: task1 = PythonOperator( task_id='task_print_param', python_callable = print_param, ) task2 = PythonOperator( task_id='task_get_param', python_callable = get_param ) task2 >> task1
Task Flow API
Task Flow API — это часть Apache Airflow, которая предоставляет возможность программно создавать и управлять задачами и их зависимостями в DAG. Он позволяет создавать и изменять DAG и его структуру изнутри самого Python‑кода, а не только из конфигурационных файлов.
Пример кода с иcпользованием Task Flow API:
from datetime import datetime from airflow.decorators import dag, task default_my_args = { 'owner': 'PB_Academy', 'retries': 5 } @dag( dag_id = 'taskflow_api_dag', default_args = default_my_args, start_date = datetime(2023,12,10), schedule_interval='@daily' ) def calculate_etl(): @task(multiple_outputs = True) def get_numbers(): return { 'number_1' : 21, 'number_2' : 99 } @task def calculate(number_1, number_2): print(number_1 + number_2) dicts = get_numbers() calculate(dicts['number_1'], dicts['number_2']) my_dag = calculate_etl()
Дополнительно
catchup — параметр в Apache Airflow, который определяет, должны ли старые даты выполнения задач быть выполнены при первом запуске DAG. Когда catchup установлен в True, Apache Airflow автоматически запускает пропущенные даты выполнения задач после того, как DAG был включен в работу.
Пример кода с catch up:
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_my_args = { 'owner': 'PB_Academy', 'retries': 5 } with DAG( dag_id='dag_catchup', default_args= default_my_args, start_date= datetime(2023, 12, 7, 16, 50, 0), schedule_interval='@daily', catchup=True ) as dag: task1 = BashOperator( task_id = 'first_task', bash_command= 'echo Hello World' )
В Apache Airflow можно написать задачу, которая получает статус других задач. Данная задача будет полезна для:
Отчетность и мони��оринг: Задача получения статуса задач может использоваться для создания мониторинговых или отчетных процедур, которые сообщают о статусе выполнения других задач. Например, вы можете хотеть получить уведомление или отправить отчет, когда определенная задача завершается успешно или неудачно.
Автоматизация бизнес‑процессов: Получение статуса задач также может быть использовано для автоматизации бизнес‑процессов. Например, если определенная задача завершилась успешно, вы можете автоматически запускать следующие действия или процессы.
Управление зависимостями: Получение статуса задач может помочь в управлении зависимостями и выполнением последующих действий в зависимости от статуса предыдущих задач. Например, если одна задача завершилась неудачно, вы можете хотеть принять решение о перезапуске другой задачи или выполнении альтернативных действий.
Пример кода с получением статуса задачи:
from airflow.models.dagrun import DagRun from airflow.decorators import dag, task from datetime import datetime default_my_args = { 'owner': 'PB_Academy', 'retries': 5 } @dag( dag_id = 'dag_with_status', default_args = default_my_args, start_date = datetime(2023,12,10), schedule_interval='@daily' ) def calculate_etl(): @task() def get_status_dag(): dag_id = 'dag_with_status' dag_runs = DagRun.find(dag_id=dag_id) print(f'Length: {len(dag_runs)}') for dag_run in dag_runs: if dag_run.external_trigger == False : print(f'Dag Name {dag_run}\n Dag Status {dag_run.state}') @task() def success_task(): 4 / 2 @task() def fail_task(): 4 / 0 success_task() fail_task() get_status_dag() my_dag = calculate_etl()
Параметр schedule_interval используется для определения расписания выполнения задач в рамках DAG. Этот параметр указывает, с какой периодичностью DAG и его задачи должны выполняться.schedule_interval может быть определен различными способами, включая крон-выражения, объекты datetime.timedelta или ключевые слова, такие как @daily, @hourly и т. д.
Вывод
Таким образом, Apache Airflow часто выбирают для сложных рабочих процессов с большим количеством задач. Подводя итог, рассмотрим основные плюсы и минусы данной системы:
Apache Airflow | |||
Плюсы | Минусы | ||
Гибкость Airflow написан на Python, это делает его гибким и легко интегрируемым с другими инструментами. | Сложность настройки Наличие неявных зависимостей может усложнить начальную настройку и конфигурацию. | ||
Мощный пользовательский интерфейс Графический веб-интерфейс позволяет легко создавать и мониторить рабочие процессы. | Требования к ресурсам Для управления большими рабочими процессами требуются значительные вычислительные ресурсы. | ||
Расширяемость Поддерживает множество плагинов и интеграций с различными сервисами и базами данных. | |||
Масштабируемость Модульная архитектура и очереди сообщений позволяют обрабатывать большое количество задач. | |||
Для изучения данной темы рекомендуем прочитать книгу Баса Харенслака и Джулиана де Руйтера «Apache Airflow и конвейеры обработки данных»
