Многие команды сейчас меняют проприетарное ПО на открытые аналоги. Под «открытостью» мы понимаем не только миграцию с платного софта на бесплатный, но и новый подход к построению data-платформ, где каждый продукт развивает свой сегмент платформы с помощью релевантного стека технологий.
Полноценных open-source-альтернатив, которые закрыли бы все наши потребности, не нашлось. Поэтому мы решили создать свой «мультитул» — low-code-фреймворк для генерации гетерогенных Airflow DAG с незамысловатым названием dag_generator.
Цель этой статьи — поделиться опытом внедрения подобного инструмента. Генерация выполняется по старинке, так что про ИИ здесь пока ничего не найдете.
Идея и задача
Наш генератор задумывался как универсальное средство, позволяющее бизнесу максимально быстро «влететь» в платформу и выводить продукт в production в конвейерном режиме.
Здесь расскажем о принципах работы фреймворка и его инфраструктуре. Углубляться в плюсы и минусы DDD и DataMesh не будем, но отметим: именно подход DataMesh к разделению платформы на домены и продукты стал для нас отправной точкой.
Ключевые цели
Типовые операции — без кода.
Сквозная валидация конфигураций.
Конвейерная поставка, которая ускоряет внедрение и сокращает TTM.
Оркестратором на платформе выбрали Apache Airflow. Однако архитектура dag_generator достаточно проста, чтобы при необходимости мигрировать на Dagster, Prefect, Luigi или другие аналоги.
По задумке фреймворк должен был решать ETL-задачи, запускать модели, обращаться в другие системы и быть легким и тиражируемым компонентом платформы.
Архитектура фреймворка
В первую очередь фреймворк должен быть универсальным простым! Но задачи на платформе разнородные и сложные. Значит — надо декомпозировать.

dag_generator состоит из двух логических частей.
Ядро — Python-приложение с логикой вызова из CLI, Jinja2-шаблонами для генерации DAG-файлов, модулями генерации и валидации.
Джобы — отдельные Python-приложения, каждое из которых содержит YAML-схему своих атрибутов и реализацию логики под конкретную задачу. Например, построить DAG по SQL-скрипту или dbt-проекту, запустить KubernetesPodOperator с нужными параметрами или просто отправить email.
Джоб
Джоб — самостоятельное приложение со своим репозиторием, CI/CD и релизным циклом.
Для создания очередного джоба нужно всего три шага:
отнаследоваться от базовой модели джоба dag_generator;
��писать атрибуты джоба;
написать реализацию, которая возвращает Operator`ы, чтобы Airflow смог построить дерево задач в DAG.
Типовая схема джоба по отправке email:
from dag_generator.framework.yaml_model import BaseDagJob, gen_metadata from marshmallow import fields, validate class EmailJob(BaseDagJob): job_name = fields.String(required=True, validate=validate.NoneOf(("EmailSend")), metadata=gen_metadata(1, "JOB_NAME", "Наименование джоба (обязательный параметр)")) job_type = fields.String(required=True, validate=validate.Equal("email_job"), metadata=gen_metadata(2, "email_job", "Тип джоба (обязательный параметр)")) email_to = fields.String(required=True, allow_none=False, metadata=gen_metadata(3, "example@mail.com", "Список получателей (обязательный параметр)")) subject = fields.String(required=True, metadata=gen_metadata(4, "Airflow test subject", "Тема письма (обязательный параметр)")) body = fields.String(required=True, metadata=gen_metadata(5, "Airflow test body", "Тело письма (HTML) (обязательный параметр).)) attachment_path = fields.String(required=False, metadata=gen_metadata(6, "/tmp/test.txt", "Путь до вложения (необязательный параметр)"))
Реализация отправки:
from typing import Dict import os from airflow.operators.email import EmailOperator from dag_generator.utils.email_notifications.email_alert import DagStatusAlert def EmailSend(config: Dict): job_name = config.get("job_name", "email_job") email_to = config.get("email_to") subject = config.get("subject") body = config.get("body") attachment_path = config.get("attachment_path") email_to = DagStatusAlert.validate_emails(email_to) if not email_to: raise ValueError("Не найдено корректных email адресов для отправки") if attachment_path and os.path.isfile(attachment_path): files=[attachment_path] else: files = None email_job = EmailOperator( task_id = job_name, to=email_to, subject=subject, html_content=body, files=files ) email_job
Подключение джобов из python packages в качестве provider
Подгружая реализацию джобов в dag_generator, мы используем ту же концепцию, что и Airflow при подгрузке провайдеров.
Все джобы dag_generator имеют такую же структуру проекта и настройки pyproject.toml, как родительское ядро:

Благодаря этому джобы при установке попадают в ту же директорию и становятся доступны для импорта. При запуске dag_generator ищет файлы job.yaml и подхватывает реализацию с конфигурацией:
def _discover_all_dag_generator_jobs(self) -> None: """ Поиск всех установленных джобов в dag_generator. Она ищет файлы job.yaml и регистрирует джобы, объявленные в них. """ try: import dag_generator.jobs except ImportError: logger.info("You have no jobs installed.") return seen = set() jobs_path = os.path.dirname(dag_generator.jobs.__file__) for path in os.listdir(jobs_path): try: path = os.path.join(jobs_path, path) if path not in seen: seen.add(path) self._add_jobs_info_from_files_on_path(path) except Exception as e: logger.warning(f"Error when loading 'job.yaml' files from {path} airflow sources: {e}")
Далее все джобы доступны для импорта по одинаковому пути. Вот пример импорта sql_parser_job джоба из ядра dag_generator:
from dag_generator.jobs.sql_parser.realization.sql_parser_job import run_sql_parser_job
Принцип работы
Базовый сценарий использования dag_generator выглядит так (в роли пользователя может выступать приложение):

Генерация шаблона: пользователь создает YAML-шаблон DAG с джобами и заполняет его значениями.
Генерация DAG: пользователь запускает генерацию DAG-файла, указывая заполненный YAML. Происходит его валидация.
Git: полученный DAG помещается в git-репозиторий.
CI: Webhook запускает фазу CI с тестированием и предварительной подготовкой артефактов.
CD: Запускается фаза CD, которая доставляет DAG и артефакты в среду исполнения Airflow.
Клиентская часть
В клиентской части dag_generator делает три вещи: генерирует YAML-шаблоны по схемам marshmallow, валидирует заполненные пользователем YAML и создает на их основе DAG-файлы.
Генерация YAML-файла
Пользователь заполняет YAML-файл (который тоже можно сгенерировать) и запускает создание DAG.
YAML-спецификация на примере SQL-джоба:
# Dag generator version: 1.4.1 dag_id: sql_parser_job_<replace_with_your_name> # Наименование дага (обязательный параметр). Должен быть уникальным в рамках одного Airflow. start_date: '2025-11-19' # Дата начала в формате YYYY-MM-DD (обязательный параметр). schedule: '@once' # Расписание дага (обязательный параметр, но можно в значении указать null) catchup: false # Флаг восприятия параметра start_date как времени фактического запуска дага (обязательный параметр). domain_name: DOMAIN_NAME # Имя домена. (обязательный параметр). jobs: # Список джобов. Могут комбинироваться различные типы джобов (обязательный параметр) - job_name: JOB_NAME # Наименование джоба (обязательный параметр) job_type: sql_parser_job # Тип джоба filepath: ./test/test.sql # Путь до корректного sql файла на сервере, который будет преобразован в даг. Если указан параметр sql_folder_variable, путь должен быть относительно папки (обязательный параметр) connection_name: connection_id # Глобальный ID коннектора в Airflow через который будут выполняться все запросы, если для конкретного запроса не указан локальный ID коннектора (обязательный параметр)
Генерация DAG-файла
Генератор валидирует введенные значения и, если все хорошо, с помощью Jinja-шаблона создает DAG-файл.
Пример сгенерированного DAG по YAML выше:
# dag_generator version: 1.4.1 import pendulum import os from airflow.decorators import dag, task_group from airflow.utils.session import provide_session from airflow.operators.empty import EmptyOperator @dag( dag_id=os.path.basename(__file__).replace('.pyc','').replace('.py',''), start_date=pendulum.parse('2025-11-14', tz='Europe/Moscow'), schedule='@once', catchup=False, params={} ) def create_dag(): from dag_generator.jobs.sql_parser.realization.sql_parser_job import run_sql_parser_job sql_parser_demo_job_config = {'autocommit': False, 'connection_name': 'my_airflow_db_conn', 'filepath': '/path_to_sql_file_on_airflow_server', 'job_name': 'sql_parser_demo_job', 'job_type': 'sql_parser_job', } @task_group(group_id='sql_parser_demo_job', tooltip='') def sql_parser_demo_job(config): run_sql_parser_job(config) sql_parser_demo_job = sql_parser_demo_job(sql_parser_demo_job_config) start = EmptyOperator(task_id="start") done = EmptyOperator(task_id="end") start >> sql_parser_demo_job >> done create_dag()

DAG у нас получился динамический, и это неслучайно. В начале разработки мы выбирали между двумя подходами (спойлер: остановились на втором).
Подход 1. Статические DAG`и: генерация кода каждого оператора.
Плюс: шедулеру не нужно выполнять динамическое построение в момент парсинга.
Но минусов больше. Сгенерировать в памяти DAG и Operator — нативная операция Airflow, а вот сгенерировать код по декларированию этих объектов — уже совсем другая история. DAG`и «раздуваются» и плохо читаются, появляется много бойлерплейта. А при изменении порядка, состава или сигнатуры операторов приходится перегенерировать и заново пушить DAG в репозиторий.
Подход 2. Динамические DAG`и: в коде декларируются только группы задач и их конфигурация
Дерево задач строится шедулером Airflow при парсинге DAG-файлов. Есть накладные расходы на парсинг и планирование. Зато получаем гибкость и разделение ответственности: DAG не нужно перегенерировать при правках в его артефактах. Например, SQL-скрипт, по которому sql_parser-джоб строит дерево задач, может дорабатываться независимо от DAG`а в отдельном процессе и репозитории. А CI/CD-pipeline уже доставит этот прототип и вызывающий его DAG на файловую систему под Airflow. Ну, и код DAG получается лаконичным и читаемым.
Для наших процессов этот вариант подошел лучше.
Генерация и валидация: преимущества marshmallow (почему не Pydantic?)
Marshmallow обладает для нас двумя ключевыми преимуществами.
Генерация и валидация в одном флаконе
В отличие от похожих библиотек, marshmallow при инициализации схемы не требует данных для валидации. Одну и ту же модель атрибутов джоба можно использовать для двух независимых задач:
генерация YAML-шаблона — модель, основываясь на своих метаданных, выгружает структуру джоба в шаблонный YAML-файл с комментариями;
валидация заполненного YAML — та же самая модель проверяет заполненный пользователем YAML на соответствие типам и бизнес-правилам.
Вспомогательные метаданные
В marshmallow-схеме есть extra-поле metadata для описательной части. Мы прописываем в нем порядковый номер атрибута в сгенерированном YAML, дефолтное значение и комментарий для пользователя. Заполненный YAML-файл сериализуется в JSON и передается как config в DAG для построения дерева задач.
Пример валидации атрибутов SQL-джоба с metadata-полями:
from marshmallow import fields, validate class SqlFileDescription(YamlGenerator): filepath = fields.String(required=True, metadata=gen_metadata(1, "./test/test.sql", "Путь до корректного sql файла на сервере, который будет преобразован в даг. Если указан параметр sql_folder_variable, путь должен быть относительно папки (обязательный параметр)")) sql_dialect = fields.String(required=False, validate=validate.OneOf(SQL_LINEAGE_DIALECTS), load_default="ansi", metadata=gen_metadata(2, "impala", "Название диалекта sql, нужен для парсинга lineage. По умолчанию ansi. (необязательный параметр)"))
Серверная часть
На Airflow-сервере dag_generator отвечает за оркестрацию описанных джобов и построение дерева задач (Tasks) в Airflow. Именно здесь конфиг из YAML десериализуется, и джобы создают соответствующие Operator`ы.
Основные джобы
sql_parser
Этот джоб принимает на вход SQL-скрипт для построения витрин данных и решает несколько задач.
Распределение запросов по задачам Airflow
Запросы в SQL-скрипте — это набор инструкций с разделителем. Задача парсера — распределить эти инструкции по задачам, указав параметры на SQL-сессию.
Сценарии распределения
Объединение нескольких запросов в одну Airflow-задачу. Полезно, когда запросы легковесные или тесно связаны между собой.
Разбиение одного тяжелого запроса на несколько задач. Полезно, когда надо посчитать большой объем по частям, не меняя сам запрос.
Запуск n нижеследующих запросов параллельно.
Один запрос в одну задачу — поведение по умолчанию.
Указание параметров сессии без нарушения «чистого» SQL
Мы добавили DSL с инструкциями для парсера в виде блочных комментариев. Так можно задавать логику распределения и конфигурацию (например, connection к БД, объем памяти, алгоритм сжатия, формат хранения), не ломая синтаксис SQL. Любой фрагмент или весь скрипт по-прежнему можно запустить в SQL-клиенте или IDE.
Аналитики, тестировщики и дата-инженеры работают с одним и тем же артефактом.
Формирование Data Lineage
С помощью библиотеки sqllineage мы автоматически извлекаем данные о lineage и отправляем их в централизованное хранилище.
Отслеживание фрагментов SQL-скрипта в интерфейсе Airflow
Сопровождать проще, когда все прозрачно. В интерфейсе видно содержимое конкретной задачи: из DSL-хинтов была проставлена память на сессию, а запросы drop, create и insert into объединены, чтобы избежать лишних connect/disconnect.

Использование переменных в запросах
Пользователи ссылаются в SQL-скрипте на переменные, указав перед ними спецсимвол (например, :schema_name). Это может быть имя схемы, дата в where-предикате, количество памяти и т. п. IDE поддерживают этот функционал, поэтому валидность SQL-скрипта не нарушается.
Парсер заменяет эти переменные на Airflow variables, dag params или xcom в Jinja-нотации.
Переменные в SQL-скрипте:
SELECT c1 FROM :my_work_schema.t1 WHERE DTTM >= :increment_date
Переменные после парсинга и генерации:
SELECT c1 FROM {{ dag_run.conf.get('my_work_schema', 'default_schema_name') }} WHERE DTTM >= {{ dag_run.conf.get('increment_date_value', 'default_dttm_value') }}
Пример DSL-разметки
Рассмотрим пример DSL-разметки фрагмента SQL-скрипта для Apache Impala:
/* { "pre_query_sql_settings": [ "SET mem_limit = 1G" ], "split_query": { "split_column": "agreement_id_int", "num_chunks": 3, "num_threads": 3 } } */ INSERT INTO TABLE fl_test.gen_agreement SELECT col1 ,... FROM very_big_table as t ; /* { "op_name": "fl_test_gen_agreement_compute_stats" } */ compute stats fl_test.gen_agreement;
В результате парсинга мы получим следующую картину в DAG`е Airflow:

В хинтах мы указали:
память на сессию для выполнения запроса;
разбивка расчета входного объема данных на три бакета (операция выполняется через остаток от деления int-ключа таблицы на n);
количество параллельных сессий, в которые нужно посчитать исходный запрос.
Парсер создал таск-группу, в которой посчитал три промежуточных объекта и залил из них данные в целевую таблицу, предварительно обнулив ее. В хинтах также можно задать ожидание успешного завершения всех параллельных тасок или же залить данные асинхронно.
В конце считаем статистику отдельной задачей.
dbt
dbt — популярный инструмент трансформации данных, позволяющий строить витрины данных с использованием шаблонизированного кода в парадигме командной разработки ПО.
dbt-проект состоит из множества «моделей», между которыми dbt автоматически строит граф зависимостей благодаря макроинструкциям в моделях. Но из коробки его нельзя разложить в дерево задач DAG`а Airflow. Отсюда и необходимость нашего джоба.
Изначально мы пошли по обманчиво легкому пути — раскладывали dbt-модели на задачи через BashOperator. Однако с ростом проектов (~300+ моделей) мы столкнулись с плавающими ошибками, конфликтами доступа �� manifest.json, рекурсивными зависимостями и костылями предварительной обработки в CI/CD.
В итоге мы перешли на готовую библиотеку cosmos от Astronomer.
Она предоставляет из коробки:
построение графа зависимостей моделей в DAG;
компиляцию dbt-проекта разными способами (dbt ls, dbt parse и т. д.);
ExecutionMode для разных сценариев (LOCAL, DOCKER, KUBERNETES);
DbtDag с управляемыми встроенными тестами;
режим запуска моделей в venv;
управление параллельностью выполнения через threads;
запуск подмножества моделей по тегам в одной задаче.
Пример сгенерированного DAG-а с dbt-джобом на основе cosmos:
import pendulum import os from airflow.decorators import dag from airflow.operators.empty import EmptyOperator @dag( dag_id=os.path.basename(__file__).replace('.pyc','').replace('.py',''), start_date=pendulum.parse('2025-04-22', tz='Europe/Moscow'), schedule=[Dataset('dag://dbt_parent_dataset')], catchup=False, max_active_runs=30 ) def create_dag(): from dag_generator.jobs.dbt_loader.dbt_single_task.realization import run_single_task big_dbt_cosmos_job_config = {'exclude': ['tag:view'], 'job_name': 'big_dbt_cosmos_job', 'job_type': 'dbt.model', 'pool': 'dbt', 'profile': {'airflow_conn_id': 'my_conn', 'name': 'my_name', 'schema': 'some_schema', 'store_type': 'db_type', 'target': 'dbt_target'}, 'project': '/my_dbt_project', 'retries': 1, 'select': ['tag:my_tags'] } big_dbt_cosmos_job = run_model(big_dbt_cosmos_job_config) start = EmptyOperator(task_id="start") done = EmptyOperator(task_id="done", trigger_rule="all_done", outlets=[Dataset(f"dag://{start.dag_id}/done")]) start >> dbt_kno_mark_cdl_ss_cdl_job >> done create_dag()

Представление DAG в Airflow
Под высокой нагрузкой
«Разгрузка» DAG`ов
Большой dbt-проект в интерфейсе Airflow — зрелище не для слабонервных. Да и сам Airflow страдает: Scheduler и Webserver начинают задыхаться.
Первое, что рекомендуется сделать, — начать дробить проект на отдельные DAG-и. Делить проект можно по тегам, которыми размечаются модели. Зависимости между такими DAG-ами удобно выстраивать через dataset-ы. Также можно отправлять DAGи в разные ресурсные пулы Airflow.
Агрегация задач
Второе, что мы сделали, — отдельный джоб с группировкой нескольких dbt-моделей в одну Airflow-задачу. Используя dynamic task mapping, мы распределяем батчи dbt-моделей по задачам.
Логика распределения определяется тегами и --select-командой. Таким образом, одна задача запускает подмножество моделей согласно dbt run --select — команде с одним из указанных пользователем тегов.
Фрагмент реализации dbt-джоба с группировкой по --select-правилам:
run_dbt = DbtRunOperator.partial( task_id=f"cosmos_{config['job_name']}", project_dir=str(PROJECT), profile_config=profile_config, exclude=exclude, install_deps=install_dbt_deps, emit_datasets=False, env=ENVS, invocation_mode=InvocationMode.SUBPROCESS, dbt_executable_path="dbt", trigger_rule=config.get("trigger_rule", "all_success"), do_xcom_push=config.get("do_xcom_push", False), owner="URISAD_NDP_ETL", **operator_args, ).expand(select=select) return run_dbt
На такой DAG уже гораздо приятнее смотреть:

kubernetes_job
Этот джоб — обертка вокруг KubernetesPodOperator.
Пользователю не нужно расписывать параметры kubernetes.client.model-объектов — достаточно указать имя образа, путь к приложению и аргументы. Если нужно — можно добавить requests и limits.
В этом джобе удобно запускать обособленные приложения (например, Python-модели, Java-вычисления и т. п.).
Поскольку наш Airflow развернут в Kubernetes, мы можем переиспользовать готовый шаблон пода для воркеров, применяя его к нашему оператору. Не нужно заново описывать тома, порты, секреты и прочее.
Делается это нехитрой функцией:
from kubernetes import config from kubernetes.client import CoreV1Api from kubernetes.client.models import V1Pod def get_pod_template( cm_name: str = "airflow-pod-template", namespace: str = os.getenv("AIRFLOW__KUBERNETES__NAMESPACE", "default"), template_key: str = "template.yaml", exit_on_fail: bool = True, **kwargs ) -> V1Pod: """ Function, that creates V1Pod class from ConfigMap :param str cm_name: name of the ConfigMap :param str namespace: object name and auth scope, such as for teams and projects :param str template_key: key for find V1Pod params in ConfigMap :param bool exit_on_fail: If True then function raise exception of fail. If False function returns empty V1Pod """ try: config.load_incluster_config() template = CoreV1Api().read_namespaced_config_map(name=cm_name, namespace=namespace) pod_spec_raw = template.data[template_key] pod_spec = yaml.safe_load(pod_spec_raw) pod = client.ApiClient()._ApiClient__deserialize(pod_spec, V1Pod) except (Exception, yaml.YAMLError) as e: if exit_on_fail: raise e else: print(f"Error during incluster config load and reading cm with api: {e}") pod = V1Pod() return pod
Затем просто подставляем в параметр KubernetesPodOperator распакованный словарь:
full_pod_spec=get_pod_template(**config.get('pod_template_params', {}))
И помним, что дебажить удобнее с dry_run().
Инфраструктура, в которой работает фреймворк
Скорость работы DAG`ов
Препятствие: наши Airflow расположены в Kubernetes, причем не только production, но и остальные, включая dev и test. Быстрым вариантом запуска казался KubernetesExecutor — он уже был настроен. Но этот тип экзекьютора поднимает Pod на каждую задачу. Для ETL-задач в виде тысяч SQL- или dbt-операторов — неэффективно.
Решение: мы перешли на гибридный KubernetesCeleryExecutor. Для ETL-задач указываем очередь Celery, а для обособленных задач — Kubernetes.
Такое деление значительно ускорило ETL DAG`и.
Утилизация ресурсов кластера
CeleryExecutor эффективен, только если воркеры персистентны и готовы принять задачу из очереди в работу. Следовательно, когда задач нет, ресурсы простаивают.
Для масштабирования Celery-воркеров может подойти KEDA. В отличие от HPA, который смотрит только на потребление вычислительных ресурсов, KEDA умеет ориентироваться на прикладные метрики — например, количество задач в очереди.
VPA теоретически мог бы помочь для задач KubernetesExecutor, но есть тонкости с перезапуском пода.

Когда Scheduler распарсит и сериализует DAG в указанную очередь, он либо отправляет задачи на выполнение в брокер Redis (для Celery), либо обращается по Kubernetes API для создания dedicated-пода на выполнение задачи (для Kubernetes).
Обновление библиотек
Для задач KubernetesExecutor библиотеки можно установить в момент поднятия пода запуском пакетного менеджера. Но для ETL-задач, которые стартуют на Celery, обновление библиотек означает редеплой пода и прерывание текущих задач.
Версии некоторых библиотек (в основном внутрибанковских) могут меняться ежедневно. Поэтому мы добавили технические DAG`и без расписания: они запускаются по событию обновления библиотек через REST API и устанавливают venv с актуальными версиями на персистентных разделах.
Заключение
Благодаря dag_generator мы создали стандарт для постановки типовых бизнес-задач на регламентное выполнение без написания кода. Что мы получили:
гибкость — любая команда может написать свой джоб для решения конкретной задачи;
скорость — low-code-подход и валидация ускорили TTM;
поддержку — динамическая генерация DAG и разделение артефактов упростили сопровождение;
расширяемость — никто не запрещает вручную дописывать код в сгенерированные DAG, что оставляет свободу в сложных случаях.
Наш опыт показал, что даже среди зрелых open-source-инструментов есть место для своего «мультитула», который затачивается под процессы конкретной команды.
