Динамическая генерация DAG в Airflow

    Всем привет! Меня зовут Антон, в Ростелекоме я занимаюсь разработкой центрального хранилища данных. Наше хранилище состоит из модулей, в качестве оркестратора которых используются несколько инстансов Informatica, часть из которых мы хотим перевести на Airflow в рамках перехода на open-source решения. Поскольку Informatica и Airflow принципиально разные инструменты, взять и повторить существующую реализацию не так уж и просто. Нам хотелось получить workflow, с одной стороны, максимально похожий на текущую реализацию и, с другой стороны, использующий самый интересный первый принцип Airflow — динамичность, которая даёт гибкость.


    В этой небольшой статье я хочу рассказать о по-настоящему динамической генерации ДАГов в Airflow. По этой теме в интернете в основном находится много статей от разработчиков из Индии, представляющих собой материалы вида "в Airflow можно генерировать даги динамически, вот пример: <пример по генерации 10 HelloWorld-тасков/дагов>". Нам же была интересна именно генерация дагов, которые будут изменяться во времени с переменным количеством и названиями тасков.



    На текущий момент Airflow внедрён для запуска модуля, формирующего пакеты данных на удалённых серверах источников для дальнейшей загрузки в хранилище. Он запускается по простому расписанию, рассматривать его детально не очень интересно. Также в скором времени будет внедрена оркестрация через Airflow модуля, доставляющего пакеты данных для дальнейшей загрузки по слоям в промежуточный стейджинг. Здесь нас поджидает ряд граблей, описания которых я нигде не нашел и хочу поделится опытом.


    По Airflow на Хабре есть пара статей от разработчиков из Mail.ru, в которых неплохо описаны базовые вещи:


    Общее описание Airflow
    Ветвления, параметризация через jinja и коммуникации в рамках ДАГа через Xcom


    Небольшой глоссарий:


    DAG/ДАГ — направленный ациклический граф. В данном случае имеется в виду последовательность действий, которые зависят друг от друга и не образуют циклов.
    SubDAG/Сабдаг — то же, что и ДАГ, но находящийся внутри другого ДАГа, запускающийся в рамках родительского ДАГа (т.е. являющийся таском) и не имеющий отдельного расписания.
    Operator/Оператор — конкретный шаг в даге, исполняющий определённое действие. Например, PythonOperator.
    Task/Таск — конкретный инстанс оператора при запуске ДАГа, визуализируется в виде квадратика в веб-интерфейсе. Например, PythonOperator, который называется run_task и запускается в ДАГе check_dag.


    Идея динамической генерации тасков в даге, проблемы и недостатки


    Входные данные:


    В репозитории оркестратора есть таблица, назовём её PKG_TABLE.
    Есть механизм, который добавляет в таблицу PKG_TABLE записи о том, что пакет данных готов к загрузке.


    Что мы хотели:


    ДАГ, который будет генерироваться для готовых к загрузке пакетов и запускать их загрузку (спойлер: в итоге всё получилось).


    С помощью кода, приведенного ниже, мы генерируем даг, состоящий из таска LatestOnlyOperator и зависящего от него сабдага-таска, который создаётся при запуске функции pkg_subdag_factory, получающей список пакетов из таблицы PKG_TABLE и генерирующей несколько PythonOperator'ов. Если пакетов на загрузку нет — генерируется DummyOperator.


    Первую версию решили сделать одним PythonOperator'ом, в дальнейшем переделав на детальный workflow средствами Airflow.


    # -*- coding: utf-8 -*-
    """
    Основной DAG для запуска доставки
    """
    
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.subdag_operator import SubDagOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.latest_only_operator import LatestOnlyOperator
    from airflow.hooks.oracle_hook import OracleHook
    
    from datetime import datetime, timedelta
    import logging
    
    from scripts.lib import run_load, select_pkg_data
    
    def pkg_subdag_factory(
            oracle_hook, parent_dag_name, child_dag_name, start_date,
            schedule_interval, param_dict):
    
        """
        функция, возвращающая DAG с переменным количеством PythonOperator\`ов
            (1 пакет - 1 PythonOperator)
    
        входные параметры:
        oracle_hook - airflow.hooks.oracle_hook.OracleHook
        parent_dag_name - имя "родительского" дага
        child_dag_name - имя создаваемого дага
        start_date - дата начала запуска расписания созданного дага
        schedule_interval - интервал запуска дага для расписания
        param_dict - словарь со входными параметрами
        """
    
        dag = DAG(
            '%s.%s' % (parent_dag_name, child_dag_name),
            schedule_interval=schedule_interval,
            start_date=start_date,
            catchup=False
            )
    
        logging.info('selecting pkg data...')
        pkg_set = select_pkg_data(oracle_hook)
    
        if len(pkg_set):
            logging.info('pkg_set:')
            logging.info(pkg_set)
    
            for pkg in pkg_set:
                pkg_id = pkg[1]
                pkg_dict = {'pkg_data_' + str(pkg_id): pkg}
                param_dict.update(pkg_dict)
                task_name = 'pkg_' + str(pkg_id)
                PythonOperator(
                    task_id=task_name,
                    python_callable=run_load,
                    op_kwargs={
                        'oracle_hook': oracle_hook,
                        'param_dict': param_dict,
                        'pkg_id': pkg_id
                        },
                    retries=0,
                    dag=dag
                )
    
        else:
            logging.info('Undelivered packages not found')
            DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag)
    
        return dag
    
    interval = '*/10 * * * *'
    
    args = {
        'owner': 'airflow',
        'start_date': datetime(2018, 11, 12)
        }
    
    oracle_hook = OracleHook('ora_meta')
    
    main_dag_name = 'load'
    load_dag_name = 'load_packages'
    
    param_dict = {
        # здесь находится длинный словарь с параметрами
        }
    
    main_dag = DAG(
        dag_id=main_dag_name,
        default_args=args,
        schedule_interval=interval,
        catchup=False
        )
    
    subdag = SubDagOperator(
        subdag=pkg_subdag_factory(
            oracle_hook, main_dag_name, load_dag_name,
            args['start_date'], interval, param_dict
            ),
        task_id=load_dag_name,
        dag=main_dag
        )
    
    # создаёт оператор, отраюатывающий только для последнего интервала расписания
    latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag)
    
    subdag.set_upstream(latest_only)

    На следующих скриншотах видно, как это выглядит в результате.
    Внешний вид ДАГа:



    Внешний вид cабдага при отсутствии пакетов для доставки:



    Внешний вид cабдага при наличии пакетов для доставки:



    Проблемы и нюансы


    • Catchup не работал так, как мы ожидали: после включения выключенного дага происходили множественные запуски (не за весь период расписания, но 2-3 одновременно были). Из-за этого пришлось добавить LatestOnlyOperator, чтобы все запуски, кроме последнего, происходили вхолостую.
    • Если создать сабдаг — его нужно явно включить через командную строку командой "airflow unpause <имя_сабдага>", иначе он не запускается, причём делать это нужно при создании каждого нового сабдага (сабдаг с новым именем), из-за чего динамически генерировать будет очень неудобно. Если в конфигурации airflow ($airflow_home/airflow.cfg) установить параметр "dags_are_paused_at_creation"=false, это будет не нужно, но это может привести к неприятным последствиям со случайным автоматическим запуском нового дага — мне кажется, что запускать новые даги надо явно вручную.

    Как написано в документации, «A key capability of Airflow is that these DAG Runs are atomic, idempotent items, <...>», что значит: "Подразумевается, что даг генерируется в неизменном виде". Из-за того, что мы нарушили это "key capability", мы узнали некоторые вещи:


    • Пустой даг (без тасков) запускается и не может закончиться, забивая все возможные параллели. Происходило это, если не было пакетов на загрузку в момент запуска дага. Для обхода этого создаётся DummyOperator.
    • Если во время работы таска даг перегенерируется и в обновившемся даге этого таска уже не будет — он остановится с прерыванием запущенного процесса. А происходит это при каждом такте шедулера, но не чаще, чем указано в параметре min_file_process_interval в конфигурации airflow ($airflow_home/airflow.cfg). Для обхода этого мы сделали генерацию тасков по пакетам не только по статусу «готов к загрузке», но и по статусу «загрузка в процессе», чтобы он продолжал генерироваться, пока загрузка идет.
    • Если в текущей версии дага нет какого-то таска, который был раньше — например, был таск с именем "pkg_123", который был прогружен раньше и он не создается в текущей версии дага, в веб-интерфейсе нельзя увидеть статистику по этому таску. Хотя в базе airflow вся информация сохраняетcя и на её основе можно построить внешними средствами красивый дашборд по старым запускам. Когда возникнет вопрос про частоту обновления ДАГов и возможность это отключить, про это можно почитать здесь.
    • Из-за динамической генерации task_id приходится в каждый такой таск прокидывать словарь с данными по всем текущим пакетам, а также id текущего пакета, чтобы при работе самой функции выбирать из этого же словаря нужные данные по id пакета. Иначе все таски запускались за один и тот же пакет.

    Execution_date в логах и фактическое время запуска


    Закончу еще одним нюансом Airflow, который поначалу путает и не описан простыми словами в других статьях — execution_date (которое отображается во всех логах, в интерфейсе и т.д.) и фактическое время запуска. В принципе описание есть в документации airflow и FAQ, но результат неочевиден, поэтому мне кажется, что требуется пояснение.


    Документация: "Шедулер запускает ваш джоб в конце периода"
    Результат: Если создать даг с расписанием, например, @daily, то запуск с execution_date "2018-01-01 00:00:00" фактически будет запущен "2018-02-01 00:00:00".


    Полезные ссылки:


    Документация по catchup
    Документация по LatestOnlyOperator
    Ещё документация по LatestOnlyOperator
    Пример использования LatestOnlyOperator
    Некоторые нюансы
    Вопрос про зависимости от предыдущего запуска
    Небольшой пример про динамическую генерацию
    Вопрос про динамическую генерацию с небольшим описанием

    Ростелеком
    110,00
    Компания
    Поделиться публикацией

    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое