
Оглавление
Предистория
Здесь будет рассказано почему это всё получилось и коротко про бизнес-кейс, если не интересно, то переходи сразу к следующему разделу :)
Существуют такие метрики, которые выглядят просто, но очень хорошо отображают суть бизнеса в какой-то момент.
Такие метрики как: MAU, DAU, сумма продаж и прочее дают стейкхолдером хорошее понимание что сейчас с бизнесом.
Для дата-аналитика это выглядит так: дата, название метрики, значение.
Для дата-инженера это выглядит как семь кругов ада: извлечение из источника, очистка, обогащение, агрегация, расчет бизнес-логики, снова агрегация, опять очистка и только тогда метрика готова попасть "на стол". (количество кругов ада различается от команды к команде и от бизнес-задачи, но суть вы поняли :))
Я постоянно пытаюсь как-то автоматизировать свою работу и упростить жизнь и себе, и людям.
И тут появился хороший шанс это сделать.
Пришел бизнес с примерно таким запросом: "У нас существует более ста метрик, которые имеют вид: дата, название метрики, значение. Мы хотели бы это всё автоматизировать, потому что это собирается частично руками, частично какими-то скриптами и у этого всего разные владельцы. А мы хотим всё хранить в одном месте. Ну и самое главное – мы хотим версионировать все свои метрики, чтобы мы знали когда, зачем и почему изменилась та или иная метрика" – на самом этот запрос звучал так: "нам нужно хранить метрики и знать когда, что и почему с ними происходило" :)
И я начал думать.
На ум пришла сразу же идея – использовать SCD.
Сразу захотелось создать такую таблицу с типом "II", в которой будет метрика, логика, комментарии и прочее. У каждой строки будет дата создания, дата "удаления".
А ещё я очень хотел не только создать такую таблицу, но и иметь возможность генерировать DAG через эту таблицу
И я это сделал.
Об этом и пойдет дальше речь.
Если что, то весь к��д, все коммиты и прочее находятся в данном git-репозитории.
Используемые технологии:
MacBook Air (M1, 2020)
Оперативная память 16 Gb
macOS Monterey 12.6
Docker 4.22.0
Docker resources:
CPUs: 2
Memory: 4 Gb
Swap: 4 Gb
Python3.11
Подготовка стенда
Для начала скачаем себе docker-compose.yaml с официального сайт Airflow:
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'
Если его запустить, то мы получим там Python3.7. Но хотелось бы что-то побыстрее, поэтому давайте сделаем себе Python3.11. Для этого нужно создать Dockerfile со следующей командой:
FROM apache/airflow:2.6.3-python3.11
Затем в docker-compose.yaml изменяем стандартный образ на наш (строка 53):
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3} build: .
Всё, готово. Теперь давайте соберем наш проект. Для этого выполним команду:
docker-compose up -d
* Если вы уже выполняли команду docker-compose up -d, то необходимо сначала выполнить команду docker-compose build и затем снова docker-compose up -d.
Ждем, когда скачаются все образы и поднимутся все контейнеры. После сборки Airflow он будет доступен по адресу http://0.0.0.0:8080/home
Создание виртуального окружения
Теперь нужно создать виртуальное окружение, чтобы корректно работала подсветка синтаксиса и работали все подсказки.
Для этого нужно выполнить команду:
python3.11 -m venv venv && \ ~/_code/github/scd_dag_factory source venv/bin/activate && \ pip install --upgrade pip && \ pip install -r requirements.txt
В наше виртуальное окружение установятся все зависимости, которые необходимы для работы данного проекта.
Создание сервиса с базой данных в docker-compose
Для нашего проекта понадобится база данных (БД). Я по классике выбрал PostgreSQL (в оригинальном проекте был развернут инстанс GreenPlum 6).
Для того, чтобы корректно работала БД в нашей сборке мы добавим в конец docker-compose.yaml следующую команду:
test_db: image: postgres restart: always ports: - "1:5432" environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres
После добавления этой команды необходимо заново перезапустить сборку командой:
docker-compose up -d
Теперь всё готово для работы над проектом
Для начала напишем простой DAG, который позволит проверить работу коннекта к нашей ранее созданной БД.
DAG, который проверяет коннект к БД в docker-сборке
from datetime import datetime, timedelta import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook # Конфигурация DAG OWNER = 'korsak0v' DAG_ID = 'check_pg' LOCAL_TZ = pendulum.timezone('Europe/Moscow') # Описание возможных ключей для default_args # https://github.com/apache/airflow/blob/343d38af380afad2b202838317a47a7b1687f14f/airflow/example_dags/tutorial.py#L39 args = { 'owner': OWNER, 'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ), 'catchup': True, 'retries': 3, 'retry_delay': timedelta(hours=1), } def check_pg_connect(**context): """""" pg = PostgresHook('test_db') df = pg.get_pandas_df('SELECT 1 AS one') if len(df) == 1: print(True) with DAG( dag_id=DAG_ID, schedule_interval='10 0 * * *', default_args=args, tags=['check_pg_connect', 'test'], concurrency=1, max_active_tasks=1, max_active_runs=1, ) as dag: start = EmptyOperator( task_id='start', ) check_pg_connect = PythonOperator( task_id='check_pg_connect', python_callable=check_pg_connect, ) end = EmptyOperator( task_id='end', ) start >> check_pg_connect >> end
Запустим его и посмотрим на него. Если раны горят зеленым, то всё идет по плану.
Далее давайте напишем наш типовой DAG, который будет собирать метрики по типу: дата, метрика, значение.
Типовой DAG, который будет собирать какую-то метрику (simple_dag)
from datetime import datetime, timedelta import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.providers.postgres.operators.postgres import PostgresOperator # Конфигурация DAG OWNER = 'korsak0v' DAG_ID = 'simple_dag' LOCAL_TZ = pendulum.timezone('Europe/Moscow') # Названия коннекторов к PG PG_CONNECT = 'test_db' # Используемые таблицы в DAG PG_TARGET_SCHEMA = 'dm' PG_TARGET_TABLE = 'fct_sales' PG_TMP_SCHEMA = 'stg' PG_TMP_TABLE = f'tmp_{PG_TARGET_TABLE}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}' INDEX_KPI = 1 sql_query = ''' SELECT ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date, (random()*100)::int AS value, 1 AS kpi_id ''' LONG_DESCRIPTION = '# LONG_DESCRIPTION' SHORT_DESCRIPTION = 'SHORT_DESCRIPTION' args = { 'owner': OWNER, 'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ), 'catchup': True, 'retries': 3, 'retry_delay': timedelta(hours=1), } with DAG( dag_id=DAG_ID, schedule_interval='10 0 * * *', default_args=args, tags=['dm'], description=SHORT_DESCRIPTION, concurrency=1, max_active_tasks=1, max_active_runs=1, ) as dag: dag.doc_md = LONG_DESCRIPTION start = EmptyOperator( task_id='start', ) drop_tmp_before = PostgresOperator( task_id='drop_tmp_before', sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''', postgres_conn_id=PG_CONNECT ) create_tmp = PostgresOperator( task_id='create_tmp', sql=f''' CREATE TABLE {PG_TMP_SCHEMA}.{PG_TMP_TABLE} AS { sql_query.format( start_date="{{ data_interval_start.format('YYYY-MM-DD') }}", end_date="{{ data_interval_end.format('YYYY-MM-DD') }}" ) }; ''', postgres_conn_id=PG_CONNECT ) delete_from_target = PostgresOperator( task_id='delete_from_target', sql=f''' DELETE FROM {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE} WHERE date IN ( SELECT date FROM {PG_TMP_SCHEMA}.{PG_TMP_TABLE} ) ''', postgres_conn_id=PG_CONNECT ) insert_from_tmp_to_target = PostgresOperator( task_id='insert_from_tmp_to_target', sql=f''' INSERT INTO {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}("date", value, kpi_id) SELECT "date", value, kpi_id FROM {PG_TMP_SCHEMA}.{PG_TMP_TABLE} ''', postgres_conn_id=PG_CONNECT ) drop_tmp_after = PostgresOperator( task_id='drop_tmp_after', sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''', postgres_conn_id=PG_CONNECT ) end = EmptyOperator( task_id='end', ) start >> drop_tmp_before >> create_tmp >> delete_from_target >> insert_from_tmp_to_target >> drop_tmp_after >> end
В генерируемых значениях есть уже колонка kpi_id. Пока не обращаем на неё внимание. О ней чуть позже.
Перед запуском DAG давайте создадим таблицу, в которую будут писаться метрики.
DDL код для создания таблицы фактов
CREATE SCHEMA dm; CREATE SCHEMA stg; DROP TABLE IF EXISTS dm.fct_sales; CREATE TABLE dm.fct_sales ( id bigserial PRIMARY KEY, "date" date NOT NULL, kpi_id int2 NOT NULL, value int4 NOT NULL );
После этого мы можем запустить DAG и мы увидим как появляются данные в нашей таблице.

Уже хорошо. Мы, на текущий момент, имеем DAG, который собирает какую-то метрику (давайте примем за условность, что метрика реально собирается, хоть и значения рандомного генерируются).
Теперь мы можем перейти к созданию SCD-таблицы с метриками.
Создание SCD-таблицы
Немного повторюсь, я хочу в этой таблице хранить всю информацию, которая касается метрик. Соответсвенно я хотел бы иметь полную информацию о ней: название, бизнес-логику, комментарии аналитиков, комментарии дата-инженеров и прочее.
А самое главное – я хочу из этой информации генерировать DAG, поэтому нужно определить значения, которые не могут быть пустыми (NOT NULL).
Давайте создадим такую таблицу следующим скриптом:
DDL код для SCD-таблицы
CREATE TABLE public.dim_kpi_dag_gen_config ( id serial4 NOT NULL, kpi_id int4 NOT NULL, dag_id varchar(255) NOT NULL, "owner" varchar(30) NOT NULL, start_date varchar(40) NOT NULL, metric_name_en varchar(255) NOT NULL, sql_query text NOT NULL, short_description_md varchar(255) NULL, long_description_md text NULL, cron varchar(50) NOT NULL, sensors varchar(255) NULL, tags varchar(255) NOT NULL, metric_line varchar(255) NULL, "source" varchar(255) NULL, bi_logic varchar(255) NULL, comment_pa text NULL, comment_de text NULL, is_actual bool NULL DEFAULT TRUE, created_at timestamptz NULL DEFAULT now(), changed_at timestamptz NULL, pg_environment varchar(10) NOT NULL DEFAULT 'prod'::character varying, airflow_environment varchar(10) NOT NULL DEFAULT 'dev'::character varying, CONSTRAINT dim_kpi_dag_gen_config_pkey PRIMARY KEY (id) )
И давайте сразу же создадим таблицу, в которой будут храниться все наши собранные метрики:
DDL код для таблицы фактов (таблица для всех собираемых метрик)
CREATE TABLE public.fct_dm_kpi ( "date" date NULL, value float8 NULL, kpi_id int4 NULL );
Создание инструмента для наполнения SCD-таблицы
Ниже я покажу MVP-вариант, который может заполнять SCD-таблицу. На своем проекте, с помощью другой команды, мы прикрутили к нему веб-интерфейс.
SCD-инструмент, который позволяет актуализировать и заполнять таблицу
from connectors_to_databases import PostgreSQL pg = PostgreSQL( port=1 ) TABLE = 'dim_kpi_dag_gen_config' def gen_insert_sql_for_kpi_id(dict_kpi: dict = None) -> str: """ Генерирует скрипт для вставки данных в SCD. Определяет есть ли такой ключ. Если нет, то делает вставку с нужными данными, указанными в dict_kpi. Если есть, то делает вставку с нужными данными, указанными в dict_kpi и дублирует информацию из прошлых строк. @param dict_kpi: Словарь с описанием kpi. @return: Строку для вставки значений в SCD. """ # Проверка наличия kpi_id в таблице df_check = pg.execute_to_df(f''' SELECT kpi_id FROM {TABLE} WHERE kpi_id = {dict_kpi['kpi_id']} ''') # Проверяем есть ли такой kpi_id в таблице if len(df_check) >= 1: # В запросе исключаем те поля, которые генерируется сами через `DEFAULT` query = f''' SELECT column_name FROM information_schema.columns WHERE table_name = '{TABLE}' AND column_name NOT IN ( 'id', 'created_at', 'changed_at', 'is_actual', {', '.join(f"'{i}'" for i in dict_kpi)} ) ''' df = pg.execute_to_df(query) # noqa: PD901 insert_sql_column_current = ', '.join(value for value in df.column_name) insert_sql_column_modified = insert_sql_column_current + f''', {', '.join(i for i in dict_kpi)}''' list_values = [] for value in dict_kpi.values(): # Обработка одинарных кавычек в значениях. Они встречаются при указании дат. if "'" in str(value): value = value.replace("'", "''") list_values.append(f"'{value}'") elif value is None: list_values.append('NULL') else: list_values.append(f"'{value}'") insert_sql_column_values = insert_sql_column_current + f''', {', '.join(list_values)}''' sql_insert = f''' INSERT INTO {TABLE} ( {insert_sql_column_modified} ) SELECT {insert_sql_column_values} FROM {TABLE} WHERE is_actual IS TRUE AND kpi_id = {dict_kpi['kpi_id']}; ''' else: # Если нет такого kpi_id в таблице, то генерируем вставку значений из словаря columns = ', '.join(value for value in dict_kpi) list_values = [] for value in dict_kpi.values(): if "'" in str(value): value = value.replace("'", "''") list_values.append(f"'{value}'") elif value is None: list_values.append('NULL') else: list_values.append(f"'{value}'") values = ', '.join(list_values) sql_insert = f''' INSERT INTO {TABLE}({columns}) VALUES ({values}); ''' return sql_insert def scd_dim_kpi(dict_kpi: dict = None) -> None: """ Основная функция, которая принимает на вход словарь с описанием kpi. Каждый ключ – это поле в таблице SCD. Каждое значение – это значение поля в таблице SCD. @param dict_kpi: Словарь с описанием kpi по выбранным колонкам. @return: Ничего не возвращает, выполняет SQL-скрипт на вставку данных в SCD. """ # Обновление changed_at в предыдущей актуальной записи update_changed_at_for_kpi_id = f''' UPDATE {TABLE} SET changed_at = NOW() WHERE kpi_id = {dict_kpi['kpi_id']} AND is_actual IS TRUE; ''' # Вставка новой записи с обновленными значениями полей insert_new_values_for_kpi_id = gen_insert_sql_for_kpi_id(dict_kpi=dict_kpi) # Обновление is_actual для каждого kpi_id update_is_actual_for_kpi_id = f''' UPDATE {TABLE} SET is_actual = false WHERE kpi_id = {dict_kpi['kpi_id']} AND id <> ( SELECT MAX(id) FROM {TABLE} WHERE kpi_id = {dict_kpi['kpi_id']} ); ''' # Собираем SQL-скрипт из разных кусков, чтобы он прошел в одной транзакции sql_query = update_changed_at_for_kpi_id + insert_new_values_for_kpi_id + update_is_actual_for_kpi_id print(sql_query) # noqa: T201 pg.execute_script(sql_query)
И теперь давайте вызовем нашу основную функцию scd_dim_kpi, чтобы получить первую запись с метрикой в нашей таблице.
# Пример использования new_values = { 'kpi_id': 1, 'dag_id': 'test_1', 'metric_name_en': 'test_1', 'owner': 'korsak0v', 'start_date': '2021-01-01', 'cron': '10 0 * * *', 'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''', 'sql_query': ''' SELECT date, count(values) AS value FROM fct_some_table_with_random_values WHERE date BETWEEN '{start_date}' AND '{end_date}' GROUP BY 1 ''', } # Вызов функции scd_dim_kpi( dict_kpi=new_values )
Вот что мы увидим в БД:

Отлично, метрика находится в БД и с этого момента можно сказать начинается её версионирование. Каждое изменение будет фиксироваться в БД отдельной строкой и мы сможем отследить все изменения.
Давайте для примера изменим поле comment_de:
new_values = { 'kpi_id': 1, 'comment_de': 'Сделали так, потому что потому' } # Вызов функции scd_dim_kpi( dict_kpi=new_values )
Получим в БД такую информацию: kpi_id не изменился, изменилась информация в dim_kpi_dag_gen_config и также видно, когда была изменена метрика, ну и самое главное – флаг is_actual переместился на последнюю запись по данному kpi_id.

Теперь мы готовы генерировать DAG из этой таблицы.
Генерация DAG через SCD-таблицу
Давайте оставим simple_dag без изменений, но возьмем его за основу и сделаем из него функцию, которая будет возвращать DAG на основании полученных атрибутов.
DAG генератор DAG
from datetime import datetime, timedelta import pendulum from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.empty import EmptyOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.postgres.hooks.postgres import PostgresHook def create_kpi_of_dag( owner: str = None, dag_id: str = None, pg_target_schema: str = None, pg_target_table: str = None, index_kpi: str = None, pg_environment: str = None, airflow_environment: str = None, long_description: str = None, short_description: str = None, start_date: str = None, cron_expression: str = None, tags: str = None, sensors: str = None, sql_query: str = None, ) -> DAG: """ Функция, которая генерирует типовой DAG для получения метрики. Вся логика описана и прокомментирована внутри функции. Некоторые моменты обработаны исключительно для функции, чтобы обработать какие-то атрибуты и получить желаемый эффект. @param owner: Владелец DAG. @param dag_id: Название DAG. @param pg_target_schema: Целевая схема в PostgreSQL. @param pg_target_table: Целевая таблица в PostgreSQL. @param index_kpi: ID показателя. @param pg_environment: Окружение PostgreSQL. @param airflow_environment: Окружение Airflow. @param long_description: Полное описание отчета. @param short_description: Короткое описание отчета. @param start_date: Дата начала работы DAG. @param cron_expression: Cron. @param tags: Tags. @param sensors: Sensors. @param sql_query: SQL-запрос для получения метрики. @return: Возвращает DAG. """ # Конфигурация DAG local_tz = pendulum.timezone('Europe/Moscow') # Используемые таблицы в DAG pg_target_schema = pg_target_schema pg_target_table = pg_target_table pg_tmp_schema = 'stg' pg_tmp_table = f'tmp_{dag_id}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}' # Названия коннекторов к GP pg_connect = 'test_db_dev' if pg_environment == 'dev' else 'test_db' # Сделана заглушка атрибута. Это можно использовать для указания разных сценариев в зависимости от окружения airflow_environment = airflow_environment # Дата приходит в формате str и после парсинга, мы можем получить дату и любые элементы даты parse_date = pendulum.parse(start_date) args = { 'owner': owner, 'start_date': datetime(parse_date.year, parse_date.month, parse_date.day, tzinfo=local_tz), 'catchup': True, 'depends_on_past': True, 'retries': 3, 'retry_delay': timedelta(hours=1), } # Tags приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list raw_tags = list(tags.split(',')) tags_ = [] for i in raw_tags: tags_.append( # noqa: PERF401 i.replace("'", "") .replace(" ", '') .replace("[", "") .replace("]", "") ) # Sensors приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list if sensors: raw_sensors = list(sensors.split(',')) sensors_ = [] for i in raw_sensors: sensors_.append( # noqa: PERF401 i.replace("'", "") .replace(' ', '') .replace("[", "") .replace("]", "") ) else: sensors_ = None with DAG( dag_id=dag_id, schedule_interval=cron_expression, default_args=args, tags=tags_, description=short_description, concurrency=1, max_active_tasks=1, max_active_runs=1, ) as dag: dag.doc_md = long_description start = EmptyOperator( task_id='start', ) # Если есть sensors, то мы создаем задачи с сенсорами, иначе создаем одну пустышку if sensors_: sensors_task = [ ExternalTaskSensor( task_id=f'sensor_{dag}', external_dag_id=dag, allowed_states=['success'], mode='reschedule', timeout=360000, # длительность работы сенсора poke_interval=600 # частота проверки ) for dag in sensors_ ] else: sensors_task = [EmptyOperator(task_id=f'empty_{value}') for value in range(1)] drop_tmp_before = PostgresOperator( task_id='drop_tmp_before', sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''', postgres_conn_id=pg_connect ) create_tmp = PostgresOperator( task_id='create_tmp', sql=f''' CREATE TABLE {pg_tmp_schema}.{pg_tmp_table} AS { sql_query.format( start_date="{{ data_interval_start.format('YYYY-MM-DD') }}", end_date="{{ data_interval_end.format('YYYY-MM-DD') }}" ) }; ''', postgres_conn_id=pg_connect ) delete_from_target = PostgresOperator( task_id='delete_from_target', sql=f''' DELETE FROM {pg_target_schema}.{pg_target_table} WHERE date IN ( SELECT date FROM {pg_tmp_schema}.{pg_tmp_table} WHERE kpi_id = {index_kpi} ) AND kpi_id = {index_kpi} ''', postgres_conn_id=pg_connect ) insert_from_tmp_to_target = PostgresOperator( task_id='insert_from_tmp_to_target', sql=f''' INSERT INTO {pg_target_schema}.{pg_target_table}("date", value, kpi_id) SELECT "date", value, {index_kpi} AS kpi_id FROM {pg_tmp_schema}.{pg_tmp_table} ''', postgres_conn_id=pg_connect ) drop_tmp_after = PostgresOperator( task_id='drop_tmp_after', sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''', postgres_conn_id=pg_connect ) end = EmptyOperator( task_id='end', ) start >> sensors_task >> drop_tmp_before >> create_tmp >> delete_from_target >>\ insert_from_tmp_to_target >> drop_tmp_after >> end return dag # build a dag from dag config def generator_of_morning_kpi_dag_to_gp() -> None: """ Функция получает список config из БД и генерирует DAG's на основании функции `create_kpi_of_dag`. Итерируется по config и каждый раз выполняет функцию `create_kpi_of_dag`, которая возвращает DAG. @return: None """ pg_hook = PostgresHook(postgres_conn_id='test_db') df = pg_hook.get_pandas_df( # noqa: PD901 ''' SELECT kpi_id, dag_id, "owner", sql_query, start_date, pg_environment, airflow_environment, short_description_md, long_description_md, cron, sensors, tags FROM dim_kpi_dag_gen_config WHERE is_actual IS TRUE ORDER BY id; ''' ) for i in range(len(df)): create_kpi_of_dag( owner=df.iloc[i].owner, dag_id=df.iloc[i].dag_id, pg_target_schema='public', pg_target_table='fct_dm_kpi', index_kpi=df.iloc[i].kpi_id, pg_environment=df.iloc[i].pg_environment, airflow_environment=df.iloc[i].airflow_environment, long_description=df.iloc[i].long_description_md, short_description=df.iloc[i].short_description_md, start_date=df.iloc[i].start_date, cron_expression=df.iloc[i].cron, tags=df.iloc[i].tags, sensors=df.iloc[i].sensors, sql_query=df.iloc[i].sql_query, ) generator_of_morning_kpi_dag_to_gp()
После добавления данного DAG мы сразу же видим результат – в нашем веб-интерфейсе появился DAG с ранее созданной метрикой:

Давайте запустим наш сгенерированный DAG и посмотрим на результат, но в начале отредактируем SQL-запрос под необходимый нам формат.
Изменение sql_query для DAG test_1
new_values = { 'kpi_id': 1, 'sql_query': ''' SELECT ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date, (random()*100)::int AS value ''', } # Вызов функции scd_dim_kpi( dict_kpi=new_values )
Всё, DAG работает и что-то собирает. Давайте сделаем для примера ещё несколько DAG.

Я создал пять дополнительных DAG по следующему шаблону (для демонстрации):
Hidden text
# Пример использования new_values = { 'kpi_id': 6, 'dag_id': 'test_6', 'metric_name_en': 'test_6', 'owner': 'korsak0v', 'start_date': '2000-01-01', 'cron': '15 14 1 * *', 'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''', 'sql_query': ''' SELECT ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date, (random()*100)::int AS value ''', } # Вызов функции scd_dim_kpi( dict_kpi=new_values )
Всё работает корректно. В БД записи есть:

В Airflow DAG корректно отображаются и работают.

Давайте добавим ещё сенсоры для демонстрации. Я добавил новый DAG и если посмотреть на его граф, то мы можем увидеть EmptyOperator – empty_0. Сейчас это просто заглушка на случай если у нас нет сенсоров на другие DAG. При дагране он выполнится менее чем за секунду и наш пайплайн продолжит корректно работать.

Давайте добавим список сенсоров:
Добавление списка сенсоров для изменения графа DAG test_7
new_values = { 'kpi_id': 7, 'sensors': '''['test_1', 'test_2', 'test_3', 'test_4', 'test_5', 'test_6']''', } # Вызов функции scd_dim_kpi( dict_kpi=new_values )
И получим такой граф. Теперь вместо нашей заглушки мы имеем список сенсоров, которые будут проверять отработали ли какие-то DAG или нет.

Ну, и в конце давайте посмотрим, что насобирали наши сгенерированные DAG:

В дальнейшем мы сможем сделать JOIN между таблицей фактов и справочником, чтобы получить информацию по ��аждой из метрик.
Заключение
Я постарался описать максимально подробно свой кейс. Возможно, он является узконаправленным, но тут я рассказал об очередном генераторе DAG.
Также я не отрицаю, что это не идеально, но и идеала не существует ;)
Мог какие-то моменты упустить/упростить/не описать подробно. За всем этим можете обращаться в комментарии.
Важные моменты, на которые стоит обратить внимание:
Данная реализация не генерирует файл, а генерирует только объекты, поэтому во всех DAG будет исходный код нашего DAG, в котором происходит вся генерация. Если вам нужны физически созданные DAG, то можете прочитать мою предыдущую статью.
Если некорректно передать какие-то атрибуты в функцию, то сломаются все DAG. Если к примеру вместо даты попадет какая-то строка, то произойдет исключение, функция не отработает и ни один DAG не будет создан. Но вы можете это улучшить при помощи обработки исключений и прочего.
В данной реализации
kpi_idнужно подставлять самостоятельно. В моем варианте не реализован автоинкремент. Но как я говорил ранее этот продукт доработан и другая команда реализовала уже это на backend.Я показал самую простую реализацию данного инструмента. Вы можете настраивать его под себя, с обработкой каждого атрибута, использовать фреймворки, использовать самописные обработчики и прочие.
Можно сделать дополнительную нормализацию данных, сделать дополнительные справочники, привязать ORM-модели и реализовать сразу какой-то backend, чтобы не заполнять это руками.
В демонстрационном DAG реализована идемпотентность, которая позволяет грузить метрики снапшотами за каждый день. Вы можете это регулировать также под свои нужды.
Мой кейс решает определенную бизнес-задачу. Бизнес рад и я рад :)
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.
