Полное руководство по созданию DAG в Apache Airflow DAG, позволяющих создать конвейер данных из разных источников, запускаемый в определенные периоды времени с заданной логикой. Первая часть. Источник: DAGs: The Definitive Guide от astronomer.io
Добро пожаловать в полное руководство по Apache Airflow DAG, представленное командой Astronomer. Эта электронная книга охватывает все, что вам нужно знать для работы с DAG, от строительных блоков, из которых они состоят, до рекомендаций по их написанию, динамической генерации, тестированию, отладке и многому другому. Это руководство, написанное практикующими для практикующих.
1. DAGs: С чего начать?
Что такое DAG?
DAG — это ориентированный ациклический граф, концептуальное представление серии действий или, другими словами, математическая абстракция конвейера данных (data pipeline).
Хотя оба термина, DAG и data pipeline, используются в разных сферах, они представляют собой почти идентичный механизм. В двух словах, DAG (или конвейер) определяет последовательность этапов выполнения в любом неповторяющемся алгоритме.
Аббревиатура DAG расшифровывается так:
DIRECTED — направленный. В общем, если существует несколько задач (тасков), каждая из них должна иметь по крайней мере одну материнскую (предыдущую) или дочернюю (последующую) задачу. Возможно, предыдущих или последующих задач будет больше одной. (Однако важно отметить, что существуют также DAG, которые имеют несколько параллельных задач, что означает отсутствие зависимостей между параллельным задачами.)
ACYCLIC — ациклический. Ни одна задача не может создавать данные, которые будут ссылаться на самих себя. Это может создать проблему бесконечного цикла. В DAG нет циклов.
GRAPH — граф. В математике граф представляет собой конечный набор узлов с вершинами, соединяющими узлы. В контексте разработки данных каждый узел в графе представляет собой задачу. Все задачи изложены в четкой структуре, с дискретными процессами, происходящими в заданных точках, и прозрачными взаимосвязями с другими задачами.
Когда нужны DAG?
Создание кода рабочего процесса вручную снижает производительность инженеров, и это одна из причин, по которой существует множество полезных инструментов для автоматизации процесса, таких как Apache Airflow. Первым шагом к эффективной автоматизации является осознание того, что DAG может быть оптимальным решением для перемещения данных практически в любой области, связанной с вычислениями.
“В Astronomer мы считаем, что использование инструмента конвейера данных на основе кода, такого как Airflow, должно являться стандартом”, - говорит Кентен Данас, ведущий разработчик в Astronomer.
Конвейеры, основанные на коде, чрезвычайно динамичны. Если вы можете написать это в коде, то вы можете сделать это в своем конвейере данных.
Конвейеры на основе кода обладают большими возможностями расширения. Вы можете интегрироваться практически с любой существующей системой, если у нее есть API.
Конвейеры на основе кода более управляемы: поскольку все находится в коде, он может легко интегрироваться в ваш CI / CD управления версиями и общие рабочие процессы разработчика.
(Примечание переводчика: Airflow написан на python, соответственнно, и язык, используемый в DAG-ах - это python).
Пример DAG
Это ключевое качество ориентированного графа: данные могут следовать только в одном направлении. Например, данные могут передаваться из A в B, но никогда из B в A. Точно так же, как вода течет по трубам в одном направлении, данные должны следовать в направлении, определенном графиком. Узлы, от которых исходят данные, считаются материнскими (верхними), в то время как узлы принимающие данные считаются дочерними (нижними).
Вдобавок к тому, что данные перемещаются в одном направлении, есть еще одно свойство: узлы никогда не становятся самореферентными. То есть они никогда не смогут проинформировать самих себя (передать данные сами себе), так как это может создать бесконечный цикл. Таким образом, данные могут переходить от A к B, далее к C / D / E, но, оказавшись там, ни один последующий процесс никогда не сможет привести обратно к A / B / C / D / E, поскольку данные перемещаются вниз по графику. Данные, поступающие из нового источника, такого как узел G, все равно могут привести к узлам, которые уже подключены, но никакие последующие данные не могут быть переданы обратно в G. Это определяющее качество ациклического графа.
Почему эти условия должны соблюдаться для конвейеров данных? Если бы у F был обратный переток в узел D, мы бы увидели график, где D информирует E, который информирует F, который информирует D и так далее. Это создает сценарий, в котором конвейер может работать бесконечно, никогда не останавливаясь. Подобно воде, которая никогда не попадет в кран, такой цикл стал бы пустой перегонкой потока данных.
Чтобы представить этот пример в реальности, представьте, что приведенный выше DAG представляет собой такой процесс обработки данных:
Узел A может быть кодом для извлечения данных из API.
Узел B может быть кодом для анонимизации данных и удаления любого IP-адреса.
Узел D может быть кодом для проверки отсутствия повторяющейся по ID записи.
Узел E может помещать эти данные в базу данных.
Узел F может выполнять SQL-запрос к новым таблицам для обновления информационной панели (дашборда).
DAG в Airflow
В Airflow DAG - это ваш конвейер данных. Он представляет собой набор инструкций, которые должны быть выполнены в определенном порядке. Это выгодно для управления данными (data orchestration) по нескольким причинам:
Зависимости в DAG гарантируют, что ваши задачи обработки данных каждый раз выполняются в одном и том же порядке, это создает надежный процесс для вашей повседневной инфраструктуры обработки данных.
Графический компонент DAG позволяет визуализировать зависимости в пользовательском интерфейсе Airflow.
Поскольку каждый путь в DAG является линейным, легко разрабатывать и тестировать конвейеры данных на соответствие ожидаемым результатами.
DAG Airflow начинается с задачи (task), написанной на Python. Вы можете думать о задачах как об узлах вашего DAG: каждый из них представляет собой одно действие, и оно может зависеть как от вышестоящих, так и от нижестоящих задач.
Задачи упаковываются в операторы, которые являются строительными блоками Airflow, определяющими поведение входящих в них задач. Например, задача оператора Python Operator выполнит функцию Python, в то время как задача, заключенная в Sensor Operator, будет ждать сигнала перед завершением действия.
На следующей диаграмме показано, как эти концепции работают на практике. Как видите, написав один DAG-файл на Python, вы можете начать определять сложные взаимосвязи между данными и действиями.
Оцените гибкость DAG на следующем реальном примере:
Используя один DAG (например, для операций с клиентами, показанных желтым цветом), вы можете:
Извлечь данные из хранилища данных и загрузить их в корзину AWS S3.
Обучить модель данных, завершить преобразование данных, в зависимости от используемых вами данных.
Сохранить результаты предыдущего действия в базе данных.
Отправить информацию обо всем процессе в различные системы сбора метрик и отчетности.
Организации используют DAG и конвейеры, которые интегрируются с отдельными интерфейсами для извлечения, загрузки и преобразования данных. Но без платформ оркестровки, подобных Astro от Astronomer, эти инструменты не взаимодействуют друг с другом. Если во время загрузки произойдет ошибка, другие инструменты об этом не узнают. Преобразование будет выполнено на основе неверных данных или вчерашних данных и выдаст неточный отчет. Однако, этого легко избежать — платформа оркестровки данных может находиться поверх всего, связывая DAG вместе, организуя поток данных и оповещая в случае сбоев. Контроль за сквозным жизненным циклом данных позволяет предприятиям поддерживать взаимозависимость во всех системах, что жизненно важно для эффективного управления данными.
От операторов к DagRuns: Реализация DAG в Airflow
Хотя DAG — это простые структуры, для их определения в коде требуется более сложная инфраструктура, чем просто набор узлов и связей между ними. Это важно понять, если мы хотим запускать DAG часто и безотказно.
Airflow включает в себя ряд структур, которые позволяют нам определять DAG в коде. Хотя у них есть уникальные названия, они соответствуют тем концепциям, которые мы обсуждали.
Как выполняется работа в Airflow
Операторы являются строительными блоками Airflow.
Операторы содержат логику того, как данные обрабатываются в конвейере. Существуют разные Операторы для разных типов работы: некоторые Операторы выполняют общие типы кода, в то время как другие предназначены для выполнения очень специфических типов работ. Мы рассмотрим различные типы операторов в главе Операторы.
Задача (таск, task) — это экземпляр Оператора.
Для того чтобы Оператор мог совершить работу в контексте DAG, он должен быть создан с помощью Задачи. Другими словами, вы используете Задачи для конфигурации важного контекста для вашей работы, в том числе при ее выполнении в вашем DAG.
Задачи — это узлы в DAG.
В Airflow DAG — это группа Задач, которые были скомпонованы для последующего выполнения направленным, ациклическим образом. Планировщик Airflow анализирует DAG, чтобы найти Задачи, готовые к выполнению, на основе их зависимостей. Если Задача готова к выполнению, Планировщик отправляет ее Исполнителю.
Выполнение Задачи в реальном времени называется task instance - Экземпляром Задачи (это также принято называть task run - Выполнением Задачи). Airflow регистрирует информацию об Экземплярах Задач, включая время их выполнения и статус, в базе метаданных (metadata database).
DAG run — это однократное конкретное выполнение DAG.
Если Экземпляр Задачи - это выполнение Задачи, то DAG run, Запуск DAG — это просто экземпляр полного DAG, который был запущен или выполняется в данный момент. На уровне кода DAG становится Запуском DAG, как только у него появляется Дата исполнения, execution_date
. Как и в случае с экземплярами задач, информация о каждом Запуске DAG регистрируется в базе метаданных Airflow.
2. Строительные блоки DAG
Планирование и Расписания (Timetables) в Airflow
Одной из фундаментальных особенностей Apache Airflow является возможность планировать задания. Исторически пользователи Airflow могли планировать свои дни, задав schedule_interval
с выражением cron, timedelta или предустановкой Расписания.
Расписания (Timetables), выпущенные в версии DAG 2.2, привнесли в планирование еще больше гибкости. Расписания позволяют пользователям создавать свои собственные пользовательские расписания с помощью Python, эффективно устраняя ограничения cron. С помощью расписаний теперь вы можете запланировать запуск DAG в любое время для любого условия выбора.
В этом руководстве мы рассмотрим концепции планирования DAG и различные способы планирования Airflow с акцентом на расписания. Для получения дополнительных инструкций по этим концепциям ознакомьтесь с нашим расписанием на вебинаре Airflow.
Примечание: В этом руководстве мы не рассматриваем концепцию execution_date
, которая устарела с версии 2.2. Если вы используете более старые версии Airflow, ознакомьтесь с этим документом для получения дополнительной информации об execution_date
.
Концепции планирования
В Airflow есть несколько терминов и параметров, связанных с планированием, которые важно понимать.
Интервал данных — Data interval
Интервал данных - это свойство каждого запуска DAG, представляющее период данных, с которым должна работать каждая задача. Например, для DAG, запланированного запускаться ежечасно, каждый интервал данных будет начинаться в начале часа (минута 0) и заканчивается в конце часа (минута 59). Запуск DAG обычно выполняется в конце интервала данных, в зависимости от того, есть ли в расписании вашего DAG “пробелы”.
Логическая Дата — Logical Date
Логическая дата запуска DAG совпадает с датой начала интервала данных. Она не определяет, когда на самом деле будет выполнен DAG. До версии 2.2 это называлось датой выполнения (execution date).
Backfilling (обратная загрузка) и Catchup (улавливание )
Мы не будем подробно рассматривать здесь эти концепции, но они могут быть связаны с планированием. Мы рекомендуем ознакомиться с документацией Apache Airflow, чтобы понять, как они работают.
(Информация по Catchup ниже добавлена из документации)
DAG со start_date, при необходимости с end_date и schedule_interval, определяет серию интервалов, которые планировщик превращает в отдельные запуски Dag и выполняет. Ключевой особенностью Airflow является то, что эти запуски DAG являются атомарными, идемпотентными (то есть не влияющими друг на друга) элементами, и планировщик по умолчанию проверяет время жизни DAG (от начала до конца / текущего момента, по одному интервалу за раз) и запускает DAG для любого интервала, который не был запущен (или был очищен). Эта концепция называется Catch up («наверстать упущенное»).
Если ваш DAG написан для обработки собственного catchup (т.е. не ограничен интервалом, а установлено “Now”, “Сейчас”), тогда вам нужно отключить catchup (либо в самом DAG с помощью dag.catchup = False), либо по умолчанию на уровне файла конфигурации с catchup_by_default = False. Это даст указание планировщику создать запуск DAG только для текущего экземпляра интервальной серии DAG.
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
В приведенном выше примере, если DAG будет получен планировщиком 2016-01-02 в 6 утра (или из командной строки), будет создан один запуск DAG с датой выполнения 2016-01-01, а следующий будет создан сразу после полуночи утром 2016-01-03 с датой выполнения 2016-01-03.
Если бы вместо этого значение dag.catchup было True, планировщик создал бы запуск DAG для каждого завершенного интервала между 2015-12-01 и 2016-01-02 (но не для 2016-01-02, поскольку этот интервал не завершен), и стал бы выполнять их последовательно. Такое поведение отлично подходит для атомарных наборов данных, которые можно легко разделить на периоды. Выключение catchup потребуется, если ваш запуск DAG выполняют внутреннюю загрузку.
Параметры
Следующие параметры являются производными от концепций, описанных выше, и важны для обеспечения выполнения вашего DAG в нужное время:
data_interval_start
: объект datetime, определяющий начальную дату и время интервала данных. Расписание DAG будет возвращать этот параметр для каждого запуска DAG. Этот параметр либо создается автоматически Airflow, либо может быть задан пользователем при реализации пользовательского расписания.data_interval_end
: объект datetime, определяющий дату и время окончания интервала данных. Расписание DAG вернет этот параметр для каждого запуска DAG. Этот параметр либо создается автоматически Airflow, либо может быть задан пользователем при реализации пользовательского расписания.schedule_interval
: параметр, который можно задать на уровне DAG, чтобы определить, когда этот DAG будет запущен. Этот аргумент принимает выражения cron или объекты timedelta (подробнее об этом см. следующий раздел). Schedule_interval все еще может быть определен в Airflow 2.2+, и это расписание будет автоматически преобразовано в расписание с помощью Airflow.timetable
: параметр, который может быть установлен на уровне DAG для определения его расписания (пользовательское или встроенное). Расписания могут быть определены явно в DAG (подробнее об этом ниже) или будут определены автоматически Airflow в тех случаях, когда предоставляется schedule_interval. Для каждого DAG должно быть определено либоtimetable
, либоschedule_interval
, но не оба вместе.start_date
: первая дата, когда будет выполнен ваш DAG. Этот параметр обязателен для того, чтобы ваш DAG был поставлен в расписание Airflow.end_date
: последняя дата, когда будет выполнен ваш DAG. Этот параметр является необязательным.
Пример
Вот простой пример того, как эти концепции работают вместе. Предположим, у нас есть DAG, который планируется запускать каждые 5 минут. Принимая самый последний DAG Run, логическая дата - 2021-10-08 19:12:36
, что совпадает с data_interval_start
, показанным на скриншоте ниже. Значение data_interval_end
равно 5:
Если мы посмотрим на следующий запуск в пользовательском интерфейсе, логическая дата - 2021-10-08 19:17:36
.
Это происходит через 5 минут после предыдущих логических данных и совпадает с интервалом data_interval_end
последнего DAG Run, поскольку в расписании нет пробелов.
Также отображается интервал данных для запуска следующего DAG Run. Run After, который является датой и временем, на которые запланирован запуск на следующий DAG Run, совпадает с data_interval_end
текущего запуска DAG: 2021-10-08 19:22:36.
Ниже мы рассмотрим, как использовать schedule_interval
или расписания для планирования вашего DAG.
Интервал расписания
Для конвейеров с базовыми расписаниями вы можете определить schedule_interval
в вашем DAG. Для версий Airflow до версии 2.2 это единственный механизм для определения расписания DAG.
Установка интервала расписания
Выражение Cron.
Вы можете передать любое выражение cron в виде строки параметру schedule_interval
в вашем DAG. Например, если вы хотите запланировать свой DAG в 4:05 утра каждый день вы бы использовали schedule_interval=’5 4 * * * ’
. Если вам нужна помощь в создании правильного выражения cron, crontab guru — отличный ресурс.
Представление Cron.
Airflow может использовать представления cron для общих базовых расписаний. Например, schedule_interval=’@hourly’
запланирует запуск DAG в начале каждого часа. Для получения полного списка предустановок ознакомьтесь с документацией Airflow. Если ваш DAG не должен выполняться по расписанию и будет запускаться только вручную или внешне другим процессом, вы можете установить schedule_interval=None
.
Timedelta.
Если вы хотите запланировать свой DAG на определенную частоту (ежечасно, каждые 5 минут и т.д.), а не в определенное время, вы можете передать объект timedelta
в расписание. Например, schedule_interval=timedelta(minutes=30)
будет запускать DAG каждые тридцать минут, а schedule_interval=timedelta(days=1)
будет запускать DAG каждый день.
Замечание: Не делайте расписание вашего DAG динамическим (например, datetime.now()
)! Это приведет к ошибке в планировщике.
Установка интервала расписания (Schedule Interval) и Логической даты (Logical Date)/ Даты исполнения (Execution Date)
Airflow был разработан для ETL в расчете на то, что данные постоянно поступают из какого-либо источника, а затем будут суммироваться, объединяться через регулярные промежутки времени. Если вы хотите обобщить данные за понедельник, вы можете сделать это только после окончания понедельника (например, во вторник в 12:01AM). Однако это предположение оказалось неподходящим для многих других целей, для которых используется Airflow. Это несоответствие и привело к появлению расписаний (Timetables), которые были введены в Airflow 2.2.
В результате каждый Запуск Dag имеет logical_date
, которая отделена от времени ожидаемого начала запуска Dag (logical_date
назывался execution_date
до Airflow 2.2). Фактически, Dag не разрешается запускать до тех пор, пока не настанет logical_date
для следующего Dag. Таким образом, если вы запускаете ежедневный DAG, Запуск Dag для данных за понедельник фактически не будет выполняться до вторника. В этом примере логической датой будет понедельник 12:01 AM, хотя на самом деле Запуск Dag начнется только во вторник 12:01 AM.
Если вы хотите передать запуску Dag временную метку, которая представляет “самое раннее время, в которое этот Dag мог быть запущен”, используйте {{ next_ds }}
из jinja templating macros. (Лучше всего сделать Запуск Dag идемпотентным (т.е. способным к повторному запуску без последствий), что предотвращает использование datetime.now().
)
Ограничения интервала планирования
Взаимосвязь между schedule_interval
DAG и его logical_date
приводит к особенно неинтуитивным результатам, когда интервал между запусками Dag нерегулярен. Наиболее распространенный пример неравномерного интервала - это когда DAG выполняются только в рабочие дни (Пн-Пт). В этом случае запуск Run с execution_date
пятницы не будет выполняться до понедельника, даже несмотря на то, что все данные пятницы будут доступны в субботу. Это означает, что DAG, желаемое поведение которого заключается в подведении итогов в конце каждого рабочего дня, на самом деле не может быть задан с использованием только schedule_interval
. В версиях Airflow до версии 2.2 вместо этого необходимо запланировать запуск DAG каждый день (включая выходные) и включить логику в сам DAG, чтобы пропускать все задачи в течение дней, в которые DAG на самом деле не нужно запускать.
Кроме того, трудно или невозможно реализовать ситуации, подобные приведенным ниже, с использованием интервала расписания (schedule_interval):
Запланировать DAG в разное время в разные дни, например, в 2 часа дня по четвергам и в 4 часа дня по субботам.
Запланировать DAG ежедневно, за исключением праздничных дней.
Запланировать DAG несколько раз в день с неравномерными интервалами (например, в 1 час дня и 4:30 вечера).
В следующем разделе мы опишем, как эти ограничения были устранены в Airflow 2.2 с введением расписаний (timetables).
Расписания (timetables)
Расписания (timetables), введенные в Airflow 2.2, устраняют ограничения cron-выражений и timedelta, позволяя пользователям определять свои собственные расписания в коде Python. Все расписания DAG определяются их внутренними timetables.
В будущем timetables станут основным методом планирования в Airflow. Вы все еще можете определить schedule_interval
, но Airflow преобразует это к timetables за кулисами. И если выражения cron или timedelta недостаточно для вашего варианта использования, вы можете определить свою собственную timetable.
Пользовательские timetables могут быть зарегистрированы как часть плагина Airflow. Они должны быть подклассом Timetable
, и содержать следующие методы, оба из которых возвращают DataInterval
, имеющий начало и конец:
next_dagrun_info
: Возвращает интервал данных для регулярного расписания DAG.infer_manual_data_interval
: Возвращает интервал данных при ручном запуске DAG.
Ниже мы покажем пример реализации этих методов в пользовательском timetable.
Пример пользовательского Расписания
Для его реализации давайте запустим наш DAG в 6:00 и 16:30. Поскольку это расписание имеет время выполнения с разными часами и минутами, оно не может быть представлено одним выражением cron. Но мы можем легко реализовать это расписание с помощью пользовательского timetable!
Для начала нам нужно определить методы next_dagrun_info
и infer_manual_data_interval
. Прежде чем углубляться в код, полезно продумать, какими будут интервалы данных для нужного нам расписания. Помните, что время выполнения DAG должно быть концом интервала данных, поскольку в нашем интервале нет пробелов. Итак, в этом случае для DAG, который мы хотим запустить в 6:00 и 16:30, у нас есть два разных чередующихся интервала:
Запуск в 6:00: Интервал передачи данных составляет от 16:30 предыдущего дня до 6:00 текущего дня.
Запуск в 16:30: Интервал передачи данных составляет с 6:00 до 16:30 текущего дня.
Имея это в виду, сначала определим next_dagrun_info
. Этот метод обеспечивает Airflow логикой для вычисления интервала данных для запланированных запусков.
def next_dagrun_info(self, *, last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,) -> Optional[DagRunInfo]:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
delta = timedelta(days=1)
if last_start.hour == 6: # If previous period started at 6:00, next period will start at 16:30 and end at 6:00 following day
next_start = last_start.set(hour=16, minute=30).replace(tzinfo=UTC)
next_end = (last_start+delta).replace(tzinfo=UTC)
else: # If previous period started at 14:30, next period will start at 6:00 next day and end at 14:30
next_start = (last_start+delta).set(hour=6, minute=0).replace(tzinfo=UTC)
next_end = (last_start+delta).replace(tzinfo=UTC)
else: # This is the first ever run on the regular schedule. First data interval will always start at 6:00 and end at 16:30
next_start = restriction.earliest
if next_start is None: # No start_date. Don’t schedule.
return None
if not restriction.catchup: # If the DAG has catchup=-dFalse, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
next_start = next_start.set(hour=6, minute=0).replace(tzinfo=UTC)
next_end = next_start.set(hour=16, minute=30).replace(tzinfo=UTC)
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG’s scheduled end; don’t schedule.
return DagRunInfo.interval(start=next_start, end=next_end)
Пройдемся по логике этого кода:
Если был предыдущий запуск DAG:
Если предыдущий запуск DAG начался в 6:00, то следующий запуск DAG должен начаться в 16:30 и закончиться в 6:00 следующего дня.
Если предыдущий запуск DAG начался в 16:30, то запуск DAG должен начаться в 6:00 на следующий день и заканчивается в 16:30 на следующий день.
Если это первый запуск DAG:
Проверить дату начала. Если таковой нет, то DAG не может быть запланирован.
Проверить, является ли
catchup=False
. Если это так, то самой ранней датой для рассмотрения должна быть текущая дата. В противном случае это дата начала DAG.Мы установим, чтобы первый запуск DAG всегда начинался в 6:00, поэтому обновим время начала интервала до 6:00, а окончания - до 16:30.
Если у DAG есть дата окончания, не планируем запуск DAG после истечения этой даты.
Затем мы определяем интервал данных для запускаемых вручную запусков DAG, определяя метод infer_manual_data_interval
. Код выглядит следующим образом:
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
delta = timedelta(days=1)
# If time is between 6:00 and 16:30, period ends at 6am and starts at 16:30 previous day
if run_after >= run_after.set(hour=6, minute=0) and run_after<= run_after.set(hour=16, minute=30):
start = (run_after-delta).set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
end = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
# If time is after 16:30 but before midnight, period is between 6:00 and 16:30 the same day
elif run_after >= run_after.set(hour=16, minute=30) and run_after.hour <= 23:
start = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
end = run_after.set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
# If time is after midnight but before 6:00, period is between 6:00 and 16:30 the previous day
else:
start = (run_after-delta).set(hour=6, minute=0).replace(tzinfo=UTC)
end = (run_after-delta).set(hour=16, minute=30).replace(tzinfo=UTC)
return DataInterval(start=start, end=end)
Этот метод определяет, каков самый последний полный интервал данных, основанный на текущем времени. Существует три сценария:
Текущее время находится между 6:00 и 16:30: В этом случае интервал данных составляет от 16:30 предыдущего дня до 6:00 текущего дня.
Текущее время после 16:30, но до полуночи: в этом случае интервал данных составляет с 6:00 до 16:30 текущего дня.
Текущее время - после полуночи, но до 6:00: В этом случае интервал данных составляет с 6:00 до 16:30 предыдущего дня.
Нам нужно учитывать периоды времени в одном и том же временном интервале (с 6:00 до 16:30) в разные дни, отличные от дня запуска DAG, что требует трех наборов логики. При определении пользовательских расписаний всегда имейте в виду, на чем должен основываться последний полный интервал данных, когда должен выполняться DAG.
Теперь мы можем взять эти два метода и объединить их в класс Timetable
, который будет составлять наш плагин Air flow. Полный пользовательский плагин расписания приведен ниже:
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
delta = timedelta(days=1)
# If time is between 6:00 and 16:30, period ends at 6am and starts at 16:30 previous day
if run_after >= run_after.set(hour=6, minute=0) and run_after <= run_after.set(hour=16, minute=30):
start = (run_after-delta).set(hour=16, hour=30, second=0).replace(tzinfo=UTC)
end = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
# If time is after 16:30 but before midnight, period is between 6:00 and 16:30 the same day
elif run_after >= run_after.set(hour=16, minute=30) and run_after.hour <= 23:
start = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
end = run_after.set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
# If time is after midnight but before 6:00, period is between 6:00 and 16:30 the previous day
else:
start = (run_after-delta).set(hour=6, minute=0).replace(tzinfo=UTC)
end = (run_after-delta).set(hour=16, minute=30).replace(tzinfo=UTC)
return DataInterval(start=start, end=end)
def next_dagrun_info(self, *,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
delta = timedelta(days=1)
if last_start.hour == 6: # If previous period started at 6:00, next period will start at 16:30 and end at 6:00 following day
next_start = last_start.set(hour=16, minute=30).replace(tzinfo=UTC)
next_end = (last_start+delta).replace(tzinfo=UTC)
else: # If previous period started at 14:30, next period will start at 6:00 next day and end at 14:30
next_start = (last_start+delta).set(hour=6, minute=0).replace(tzinfo=UTC)
next_end = (last_start+delta).replace(tzinfo=UTC)
else: # This is the first ever run on the regular schedule. First data interval will always start at 6:00 and end at 16:30
next_start = restriction.earliest
if next_start is None: # No start_date. Don’t schedule.
return None
if not restriction.catchup: # If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
next_start = next_start.set(hour=6, minute=0).replace(tzinfo=UTC)
next_end = next_start.set(hour=16, minute=30).resplace(tzinfo=UTC)
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG’s scheduled end; don’t schedule.
return DagRunInfo.interval(start=next_start, end=next_end)
class UnevenIntervalsTimetablePlugin(AirflowPlugin):
name = "uneven_intervals_timetable_plugin"
timetables = [UnevenIntervalsTimetable]
Обратите внимание, что поскольку расписания являются плагинами, вам нужно будет перезапустить Airflow и веб-сервер после их добавления или обновления.
В DAG мы можем затем импортировать пользовательский плагин timetable
и использовать его для планирования DAG, установив параметр расписания:
from uneven_intervals_timetable import UnevenIntervalsTimetable
with DAG(
dag_id="example_timetable_dag",
start_date=datetime(2021, 10, 9),
max_active_runs=1,
timetable=UnevenIntervalsTimetable(),
default_args={
“retries”: 1,
“retry_delay”: timedelta(minutes=3),
},
catchup=True
) as dag:
Глядя на древовидное представление (Tree View) в пользовательском интерфейсе (начиная с версии 2.3, оно было заменено на GridView), мы видим, что этот DAG запускался дважды в день в 6:00 и 16:30 с даты начала 10/9/2021.
Следующий запуск по расписанию запланирован на интервал, начинающийся 10/12/2021 в 16:30 и заканчивающийся на следующий день в 6:00. Этот запуск будет запущен в конце интервала передачи данных, то есть после 10/13/2021 6:00.
Если мы запустим DAG вручную после 16:30, но до полуночи, мы увидим, что интервал данных для этого запуска был между 6:00 и 16:30 этого дня, как и ожидалось.
Это простое расписание, которое может быть легко скорректировано для других параметров использования. В целом, расписания можно полностью настраивать, если реализованы вышеприведенные методы.
Примечание: Будьте осторожны при реализации логики расписания, чтобы ваш метод next_dagrun_info
не возвращал data_interval_start
, который был бы раньше, чем start_date
вашего DAG . Это приведет к тому, что задачи не будут выполняться для этого запуска DAG.
Текущие ограничения
При внедрении пользовательских timetables необходимо учитывать некоторые ограничения:
Методы расписания должны возвращать один и тот же результат каждый раз, когда они вызываются (например, избегайте таких вещей, как HTTP-запросы). Они не предназначены для реализации запуска на основе событий.
Расписания анализируются планировщиком при создании запусков Dag, поэтому избегайте медленного или длинного кода, который может повлиять на производительность Airflow.