Как стать автором
Обновить

Таблица-справочник – генератор DAG? А что так можно было?

Уровень сложностиСредний
Время на прочтение16 мин
Количество просмотров7.5K

Оглавление

  1. Предистория

  2. Используемые технологии

  3. Подготовка стенда

  4. Теперь всё готово для работы над проектом

  5. Создание SCD-таблицы

  6. Создание инструмента для наполнения SCD-таблицы

  7. Генерация DAG через SCD-таблицу

  8. Заключение

Предистория

Здесь будет рассказано почему это всё получилось и коротко про бизнес-кейс, если не интересно, то переходи сразу к следующему разделу :)

Существуют такие метрики, которые выглядят просто, но очень хорошо отображают суть бизнеса в какой-то момент.

Такие метрики как: MAU, DAU, сумма продаж и прочее дают стейкхолдером хорошее понимание что сейчас с бизнесом.

Для дата-аналитика это выглядит так: дата, название метрики, значение.

Для дата-инженера это выглядит как семь кругов ада: извлечение из источника, очистка, обогащение, агрегация, расчет бизнес-логики, снова агрегация, опять очистка и только тогда метрика готова попасть "на стол". (количество кругов ада различается от команды к команде и от бизнес-задачи, но суть вы поняли :))

Я постоянно пытаюсь как-то автоматизировать свою работу и упростить жизнь и себе, и людям.

И тут появился хороший шанс это сделать.

Пришел бизнес с примерно таким запросом: "У нас существует более ста метрик, которые имеют вид: дата, название метрики, значение. Мы хотели бы это всё автоматизировать, потому что это собирается частично руками, частично какими-то скриптами и у этого всего разные владельцы. А мы хотим всё хранить в одном месте. Ну и самое главное – мы хотим версионировать все свои метрики, чтобы мы знали когда, зачем и почему изменилась та или иная метрика" – на самом этот запрос звучал так: "нам нужно хранить метрики и знать когда, что и почему с ними происходило" :)

И я начал думать.

На ум пришла сразу же идея – использовать SCD.

Сразу захотелось создать такую таблицу с типом "II", в которой будет метрика, логика, комментарии и прочее. У каждой строки будет дата создания, дата "удаления".

А ещё я очень хотел не только создать такую таблицу, но и иметь возможность генерировать DAG через эту таблицу

И я это сделал.

Об этом и пойдет дальше речь.

Если что, то весь код, все коммиты и прочее находятся в данном git-репозитории.

Используемые технологии:

  • MacBook Air (M1, 2020)

    • Оперативная память 16 Gb

    • macOS Monterey 12.6

  • Docker 4.22.0

    • Docker resources:

      • CPUs: 2

      • Memory: 4 Gb

      • Swap: 4 Gb

  • Python3.11

Подготовка стенда

Для начала скачаем себе docker-compose.yaml с официального сайт Airflow:

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'

Если его запустить, то мы получим там Python3.7. Но хотелось бы что-то побыстрее, поэтому давайте сделаем себе Python3.11. Для этого нужно создать Dockerfile со следующей командой:

FROM apache/airflow:2.6.3-python3.11

Затем в docker-compose.yaml изменяем стандартный образ на наш (строка 53):

  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3}
  build: .

Всё, готово. Теперь давайте соберем наш проект. Для этого выполним команду:

docker-compose up -d

* Если вы уже выполняли команду docker-compose up -d, то необходимо сначала выполнить команду docker-compose build и затем снова docker-compose up -d.

Ждем, когда скачаются все образы и поднимутся все контейнеры. После сборки Airflow он будет доступен по адресу http://0.0.0.0:8080/home

Создание виртуального окружения

Теперь нужно создать виртуальное окружение, чтобы корректно работала подсветка синтаксиса и работали все подсказки.

Для этого нужно выполнить команду:

python3.11 -m venv venv && \                                                                                                                                                     ~/_code/github/scd_dag_factory  
source venv/bin/activate && \
pip install --upgrade pip && \
pip install -r requirements.txt

В наше виртуальное окружение установятся все зависимости, которые необходимы для работы данного проекта.

Создание сервиса с базой данных в docker-compose

Для нашего проекта понадобится база данных (БД). Я по классике выбрал PostgreSQL (в оригинальном проекте был развернут инстанс GreenPlum 6).

Для того, чтобы корректно работала БД в нашей сборке мы добавим в конец docker-compose.yaml следующую команду:

  test_db:
    image: postgres
    restart: always
    ports:
      - "1:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres

После добавления этой команды необходимо заново перезапустить сборку командой:

docker-compose up -d

Теперь всё готово для работы над проектом

Для начала напишем простой DAG, который позволит проверить работу коннекта к нашей ранее созданной БД.

DAG, который проверяет коннект к БД в docker-сборке
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

from airflow.providers.postgres.hooks.postgres import PostgresHook

# Конфигурация DAG
OWNER = 'korsak0v'
DAG_ID = 'check_pg'
LOCAL_TZ = pendulum.timezone('Europe/Moscow')

# Описание возможных ключей для default_args
# https://github.com/apache/airflow/blob/343d38af380afad2b202838317a47a7b1687f14f/airflow/example_dags/tutorial.py#L39
args = {
    'owner': OWNER,
    'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ),
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
}


def check_pg_connect(**context):
    """"""
    pg = PostgresHook('test_db')

    df = pg.get_pandas_df('SELECT 1 AS one')

    if len(df) == 1:
        print(True)

with DAG(
        dag_id=DAG_ID,
        schedule_interval='10 0 * * *',
        default_args=args,
        tags=['check_pg_connect', 'test'],
        concurrency=1,
        max_active_tasks=1,
        max_active_runs=1,
) as dag:

    start = EmptyOperator(
        task_id='start',
    )

    check_pg_connect = PythonOperator(
        task_id='check_pg_connect',
        python_callable=check_pg_connect,
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> check_pg_connect >> end

Запустим его и посмотрим на него. Если раны горят зеленым, то всё идет по плану.

Далее давайте напишем наш типовой DAG, который будет собирать метрики по типу: дата, метрика, значение.

Типовой DAG, который будет собирать какую-то метрику (simple_dag)
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


# Конфигурация DAG
OWNER = 'korsak0v'
DAG_ID = 'simple_dag'
LOCAL_TZ = pendulum.timezone('Europe/Moscow')

# Названия коннекторов к PG
PG_CONNECT = 'test_db'

# Используемые таблицы в DAG
PG_TARGET_SCHEMA = 'dm'
PG_TARGET_TABLE = 'fct_sales'
PG_TMP_SCHEMA = 'stg'
PG_TMP_TABLE = f'tmp_{PG_TARGET_TABLE}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}'
INDEX_KPI = 1

sql_query = '''
SELECT 
	('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
	(random()*100)::int AS value,
	1 AS kpi_id
'''

LONG_DESCRIPTION = '# LONG_DESCRIPTION'

SHORT_DESCRIPTION = 'SHORT_DESCRIPTION'


args = {
    'owner': OWNER,
    'start_date': datetime(2023, 1, 1, tzinfo=LOCAL_TZ),
    'catchup': True,
    'retries': 3,
    'retry_delay': timedelta(hours=1),
}

with DAG(
        dag_id=DAG_ID,
        schedule_interval='10 0 * * *',
        default_args=args,
        tags=['dm'],
        description=SHORT_DESCRIPTION,
        concurrency=1,
        max_active_tasks=1,
        max_active_runs=1,
) as dag:
    dag.doc_md = LONG_DESCRIPTION

    start = EmptyOperator(
        task_id='start',
    )

    drop_tmp_before = PostgresOperator(
        task_id='drop_tmp_before',
        sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''',
        postgres_conn_id=PG_CONNECT
    )

    create_tmp = PostgresOperator(
        task_id='create_tmp',
        sql=f'''
        CREATE TABLE {PG_TMP_SCHEMA}.{PG_TMP_TABLE} AS
        {
            sql_query.format(
                start_date="{{ data_interval_start.format('YYYY-MM-DD') }}",
                end_date="{{ data_interval_end.format('YYYY-MM-DD') }}"
            )
        };
        ''',
        postgres_conn_id=PG_CONNECT
    )

    delete_from_target = PostgresOperator(
        task_id='delete_from_target',
        sql=f'''
        DELETE FROM {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}
        WHERE 
            date IN (
                SELECT 
                    date 
                FROM 
                    {PG_TMP_SCHEMA}.{PG_TMP_TABLE}
                )
        ''',
        postgres_conn_id=PG_CONNECT
    )

    insert_from_tmp_to_target = PostgresOperator(
        task_id='insert_from_tmp_to_target',
        sql=f'''
        INSERT INTO {PG_TARGET_SCHEMA}.{PG_TARGET_TABLE}("date", value, kpi_id)
        SELECT 
            "date", 
            value,
            kpi_id
        FROM 
            {PG_TMP_SCHEMA}.{PG_TMP_TABLE}
        ''',
        postgres_conn_id=PG_CONNECT
    )

    drop_tmp_after = PostgresOperator(
        task_id='drop_tmp_after',
        sql=f'''DROP TABLE IF EXISTS {PG_TMP_SCHEMA}.{PG_TMP_TABLE}''',
        postgres_conn_id=PG_CONNECT
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> drop_tmp_before >> create_tmp >> delete_from_target >> insert_from_tmp_to_target >> drop_tmp_after >> end
    

В генерируемых значениях есть уже колонка kpi_id. Пока не обращаем на неё внимание. О ней чуть позже.

Перед запуском DAG давайте создадим таблицу, в которую будут писаться метрики.

DDL код для создания таблицы фактов
CREATE SCHEMA dm;

CREATE SCHEMA stg;

DROP TABLE IF EXISTS dm.fct_sales;

CREATE TABLE dm.fct_sales (
	id bigserial PRIMARY KEY,
	"date" date NOT NULL,
	kpi_id int2 NOT NULL,
	value int4 NOT NULL
);

После этого мы можем запустить DAG и мы увидим как появляются данные в нашей таблице.

Уже хорошо. Мы, на текущий момент, имеем DAG, который собирает какую-то метрику (давайте примем за условность, что метрика реально собирается, хоть и значения рандомного генерируются).

Теперь мы можем перейти к созданию SCD-таблицы с метриками.

Создание SCD-таблицы

Немного повторюсь, я хочу в этой таблице хранить всю информацию, которая касается метрик. Соответсвенно я хотел бы иметь полную информацию о ней: название, бизнес-логику, комментарии аналитиков, комментарии дата-инженеров и прочее.

А самое главное – я хочу из этой информации генерировать DAG, поэтому нужно определить значения, которые не могут быть пустыми (NOT NULL).

Давайте создадим такую таблицу следующим скриптом:

DDL код для SCD-таблицы
CREATE TABLE public.dim_kpi_dag_gen_config (
	id serial4 NOT NULL,
	kpi_id int4 NOT NULL,
	dag_id varchar(255) NOT NULL,
	"owner" varchar(30) NOT NULL,
	start_date varchar(40) NOT NULL,
	metric_name_en varchar(255) NOT NULL,
	sql_query text NOT NULL,
	short_description_md varchar(255) NULL,
	long_description_md text NULL,
	cron varchar(50) NOT NULL,
	sensors varchar(255) NULL,
	tags varchar(255) NOT NULL,
	metric_line varchar(255) NULL,
	"source" varchar(255) NULL,
	bi_logic varchar(255) NULL,
	comment_pa text NULL,
	comment_de text NULL,
	is_actual bool NULL DEFAULT TRUE,
	created_at timestamptz NULL DEFAULT now(),
	changed_at timestamptz NULL,
	pg_environment varchar(10) NOT NULL DEFAULT 'prod'::character varying,
	airflow_environment varchar(10) NOT NULL DEFAULT 'dev'::character varying,
	CONSTRAINT dim_kpi_dag_gen_config_pkey PRIMARY KEY (id)
)

И давайте сразу же создадим таблицу, в которой будут храниться все наши собранные метрики:

DDL код для таблицы фактов (таблица для всех собираемых метрик)
CREATE TABLE public.fct_dm_kpi (
	"date" date NULL,
	value float8 NULL,
	kpi_id int4 NULL
);

Создание инструмента для наполнения SCD-таблицы

Ниже я покажу MVP-вариант, который может заполнять SCD-таблицу. На своем проекте, с помощью другой команды, мы прикрутили к нему веб-интерфейс.

SCD-инструмент, который позволяет актуализировать и заполнять таблицу
from connectors_to_databases import PostgreSQL


pg = PostgreSQL(
    port=1
)


TABLE = 'dim_kpi_dag_gen_config'


def gen_insert_sql_for_kpi_id(dict_kpi: dict = None) -> str:
    """
    Генерирует скрипт для вставки данных в SCD.

    Определяет есть ли такой ключ.
    Если нет, то делает вставку с нужными данными, указанными в dict_kpi.
    Если есть, то делает вставку с нужными данными, указанными в dict_kpi и дублирует информацию из прошлых строк.

    @param dict_kpi: Словарь с описанием kpi.
    @return: Строку для вставки значений в SCD.
    """

    # Проверка наличия kpi_id в таблице
    df_check = pg.execute_to_df(f'''
    SELECT
        kpi_id
    FROM
        {TABLE}
    WHERE
        kpi_id = {dict_kpi['kpi_id']}
    ''')

    # Проверяем есть ли такой kpi_id в таблице
    if len(df_check) >= 1:
        # В запросе исключаем те поля, которые генерируется сами через `DEFAULT`
        query = f'''
            SELECT 
                column_name 
            FROM 
                information_schema.columns 
            WHERE 
                table_name = '{TABLE}'
                AND column_name NOT IN (
                    'id', 'created_at', 'changed_at', 'is_actual', 
                    {', '.join(f"'{i}'" for i in dict_kpi)}
                )
        '''

        df = pg.execute_to_df(query)  # noqa: PD901

        insert_sql_column_current = ', '.join(value for value in df.column_name)
        insert_sql_column_modified = insert_sql_column_current + f''', {', '.join(i for i in dict_kpi)}'''

        list_values = []

        for value in dict_kpi.values():
            # Обработка одинарных кавычек в значениях. Они встречаются при указании дат.
            if "'" in str(value):
                value = value.replace("'", "''")
                list_values.append(f"'{value}'")
            elif value is None:
                list_values.append('NULL')
            else:
                list_values.append(f"'{value}'")

        insert_sql_column_values = insert_sql_column_current + f''', {', '.join(list_values)}'''

        sql_insert = f'''
        INSERT INTO {TABLE}
        (
            {insert_sql_column_modified}
        )
        SELECT 
            {insert_sql_column_values} 
        FROM 
            {TABLE}
        WHERE
            is_actual IS TRUE
            AND kpi_id = {dict_kpi['kpi_id']};
        '''
    else:
        # Если нет такого kpi_id в таблице, то генерируем вставку значений из словаря
        columns = ', '.join(value for value in dict_kpi)

        list_values = []

        for value in dict_kpi.values():
            if "'" in str(value):
                value = value.replace("'", "''")
                list_values.append(f"'{value}'")
            elif value is None:
                list_values.append('NULL')
            else:
                list_values.append(f"'{value}'")

        values = ', '.join(list_values)

        sql_insert = f'''
            INSERT INTO {TABLE}({columns})
            VALUES ({values});
            '''

    return sql_insert


def scd_dim_kpi(dict_kpi: dict = None) -> None:
    """
    Основная функция, которая принимает на вход словарь с описанием kpi.

    Каждый ключ – это поле в таблице SCD.
    Каждое значение – это значение поля в таблице SCD.

    @param dict_kpi: Словарь с описанием kpi по выбранным колонкам.
    @return: Ничего не возвращает, выполняет SQL-скрипт на вставку данных в SCD.
    """

    # Обновление changed_at в предыдущей актуальной записи
    update_changed_at_for_kpi_id = f'''
    UPDATE {TABLE} 
    SET 
        changed_at = NOW() 
    WHERE 
        kpi_id = {dict_kpi['kpi_id']} 
        AND is_actual IS TRUE;
    '''

    # Вставка новой записи с обновленными значениями полей
    insert_new_values_for_kpi_id = gen_insert_sql_for_kpi_id(dict_kpi=dict_kpi)

    # Обновление is_actual для каждого kpi_id
    update_is_actual_for_kpi_id = f'''
    UPDATE {TABLE} 
    SET 
        is_actual = false 
    WHERE 
        kpi_id = {dict_kpi['kpi_id']} 
        AND id <> (
            SELECT MAX(id) 
            FROM {TABLE} 
            WHERE kpi_id = {dict_kpi['kpi_id']}
        );
    '''

    # Собираем SQL-скрипт из разных кусков, чтобы он прошел в одной транзакции
    sql_query = update_changed_at_for_kpi_id + insert_new_values_for_kpi_id + update_is_actual_for_kpi_id

    print(sql_query)  # noqa: T201
    pg.execute_script(sql_query)

И теперь давайте вызовем нашу основную функцию scd_dim_kpi, чтобы получить первую запись с метрикой в нашей таблице.

# Пример использования
new_values = {
    'kpi_id': 1,
    'dag_id': 'test_1',
    'metric_name_en': 'test_1',
    'owner': 'korsak0v',
    'start_date': '2021-01-01',
    'cron': '10 0 * * *',
    'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''',
    'sql_query': '''
    SELECT
        date,
        count(values) AS value
    FROM
        fct_some_table_with_random_values
    WHERE
        date BETWEEN '{start_date}' AND '{end_date}'
    GROUP BY
        1
    ''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Вот что мы увидим в БД:

Первая созданная метрика в БД
Первая созданная метрика в БД

Отлично, метрика находится в БД и с этого момента можно сказать начинается её версионирование. Каждое изменение будет фиксироваться в БД отдельной строкой и мы сможем отследить все изменения.

Давайте для примера изменим поле comment_de:

new_values = {
    'kpi_id': 1,
    'comment_de': 'Сделали так, потому что потому'
}

# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Получим в БД такую информацию: kpi_id не изменился, изменилась информация в dim_kpi_dag_gen_config и также видно, когда была изменена метрика, ну и самое главное – флаг is_actual переместился на последнюю запись по данному kpi_id.

Теперь мы готовы генерировать DAG из этой таблицы.

Генерация DAG через SCD-таблицу

Давайте оставим simple_dag без изменений, но возьмем его за основу и сделаем из него функцию, которая будет возвращать DAG на основании полученных атрибутов.

DAG генератор DAG
from datetime import datetime, timedelta

import pendulum

from airflow import DAG

from airflow.sensors.external_task import ExternalTaskSensor

from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

from airflow.providers.postgres.hooks.postgres import PostgresHook


def create_kpi_of_dag(
        owner: str = None,
        dag_id: str = None,
        pg_target_schema: str = None,
        pg_target_table: str = None,
        index_kpi: str = None,
        pg_environment: str = None,
        airflow_environment: str = None,
        long_description: str = None,
        short_description: str = None,
        start_date: str = None,
        cron_expression: str = None,
        tags: str = None,
        sensors: str = None,
        sql_query: str = None,
) -> DAG:
    """
    Функция, которая генерирует типовой DAG для получения метрики.

    Вся логика описана и прокомментирована внутри функции.

    Некоторые моменты обработаны исключительно для функции, чтобы обработать какие-то атрибуты и получить желаемый
    эффект.

    @param owner: Владелец DAG.
    @param dag_id: Название DAG.
    @param pg_target_schema: Целевая схема в PostgreSQL.
    @param pg_target_table: Целевая таблица в PostgreSQL.
    @param index_kpi: ID показателя.
    @param pg_environment: Окружение PostgreSQL.
    @param airflow_environment: Окружение Airflow.
    @param long_description: Полное описание отчета.
    @param short_description: Короткое описание отчета.
    @param start_date: Дата начала работы DAG.
    @param cron_expression: Cron.
    @param tags: Tags.
    @param sensors: Sensors.
    @param sql_query: SQL-запрос для получения метрики.
    @return: Возвращает DAG.
    """
    # Конфигурация DAG
    local_tz = pendulum.timezone('Europe/Moscow')

    # Используемые таблицы в DAG
    pg_target_schema = pg_target_schema
    pg_target_table = pg_target_table
    pg_tmp_schema = 'stg'
    pg_tmp_table = f'tmp_{dag_id}_{{{{ data_interval_start.format("YYYY_MM_DD") }}}}'

    # Названия коннекторов к GP
    pg_connect = 'test_db_dev' if pg_environment == 'dev' else 'test_db'

    # Сделана заглушка атрибута. Это можно использовать для указания разных сценариев в зависимости от окружения
    airflow_environment = airflow_environment

    # Дата приходит в формате str и после парсинга, мы можем получить дату и любые элементы даты
    parse_date = pendulum.parse(start_date)

    args = {
        'owner': owner,
        'start_date': datetime(parse_date.year, parse_date.month, parse_date.day, tzinfo=local_tz),
        'catchup': True,
        'depends_on_past': True,
        'retries': 3,
        'retry_delay': timedelta(hours=1),
    }

    # Tags приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list
    raw_tags = list(tags.split(','))
    tags_ = []

    for i in raw_tags:
        tags_.append(  # noqa: PERF401
            i.replace("'", "")
            .replace(" ", '')
            .replace("[", "")
            .replace("]", "")
        )

    # Sensors приходят в str формате, поэтому нужно их правильно "разобрать" и превратить в list
    if sensors:
        raw_sensors = list(sensors.split(','))
        sensors_ = []
        for i in raw_sensors:
            sensors_.append(  # noqa: PERF401
                i.replace("'", "")
                .replace(' ', '')
                .replace("[", "")
                .replace("]", "")
            )
    else:
        sensors_ = None

    with DAG(
            dag_id=dag_id,
            schedule_interval=cron_expression,
            default_args=args,
            tags=tags_,
            description=short_description,
            concurrency=1,
            max_active_tasks=1,
            max_active_runs=1,
    ) as dag:
        dag.doc_md = long_description

        start = EmptyOperator(
            task_id='start',
        )

        # Если есть sensors, то мы создаем задачи с сенсорами, иначе создаем одну пустышку
        if sensors_:
            sensors_task = [
                ExternalTaskSensor(
                    task_id=f'sensor_{dag}',
                    external_dag_id=dag,
                    allowed_states=['success'],
                    mode='reschedule',
                    timeout=360000,  # длительность работы сенсора
                    poke_interval=600  # частота проверки
                ) for dag in sensors_
            ]
        else:
            sensors_task = [EmptyOperator(task_id=f'empty_{value}') for value in range(1)]

        drop_tmp_before = PostgresOperator(
            task_id='drop_tmp_before',
            sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''',
            postgres_conn_id=pg_connect
        )

        create_tmp = PostgresOperator(
            task_id='create_tmp',
            sql=f'''
            CREATE TABLE {pg_tmp_schema}.{pg_tmp_table} AS
            {
                sql_query.format(
                    start_date="{{ data_interval_start.format('YYYY-MM-DD') }}",
                    end_date="{{ data_interval_end.format('YYYY-MM-DD') }}"
                )
            };
            ''',
            postgres_conn_id=pg_connect
        )

        delete_from_target = PostgresOperator(
            task_id='delete_from_target',
            sql=f'''
            DELETE FROM {pg_target_schema}.{pg_target_table}
            WHERE 
                date IN (
                    SELECT 
                        date 
                    FROM 
                        {pg_tmp_schema}.{pg_tmp_table}
                    WHERE
                        kpi_id = {index_kpi}
                    )
            AND kpi_id = {index_kpi}
            ''',
            postgres_conn_id=pg_connect
        )

        insert_from_tmp_to_target = PostgresOperator(
            task_id='insert_from_tmp_to_target',
            sql=f'''
            INSERT INTO {pg_target_schema}.{pg_target_table}("date", value, kpi_id)
            SELECT 
                "date", 
                value, 
                {index_kpi} AS kpi_id 
            FROM 
                {pg_tmp_schema}.{pg_tmp_table}
            ''',
            postgres_conn_id=pg_connect
        )

        drop_tmp_after = PostgresOperator(
            task_id='drop_tmp_after',
            sql=f'''DROP TABLE IF EXISTS {pg_tmp_schema}.{pg_tmp_table}''',
            postgres_conn_id=pg_connect
        )

        end = EmptyOperator(
            task_id='end',
        )

        start >> sensors_task >> drop_tmp_before >> create_tmp >> delete_from_target >>\
        insert_from_tmp_to_target >> drop_tmp_after >> end

    return dag


# build a dag from dag config
def generator_of_morning_kpi_dag_to_gp() -> None:
    """
    Функция получает список config из БД и генерирует DAG's на основании функции `create_kpi_of_dag`.

    Итерируется по config и каждый раз выполняет функцию `create_kpi_of_dag`, которая возвращает DAG.

    @return: None
    """
    pg_hook = PostgresHook(postgres_conn_id='test_db')

    df = pg_hook.get_pandas_df(  # noqa: PD901
        '''
        SELECT 
            kpi_id,
            dag_id,
            "owner",
            sql_query,
            start_date,
            pg_environment,
            airflow_environment,
            short_description_md,
            long_description_md,
            cron,
            sensors,
            tags
        FROM 
            dim_kpi_dag_gen_config
        WHERE 
            is_actual IS TRUE 
        ORDER BY 
            id;
        '''
    )

    for i in range(len(df)):
        create_kpi_of_dag(
            owner=df.iloc[i].owner,
            dag_id=df.iloc[i].dag_id,
            pg_target_schema='public',
            pg_target_table='fct_dm_kpi',
            index_kpi=df.iloc[i].kpi_id,
            pg_environment=df.iloc[i].pg_environment,
            airflow_environment=df.iloc[i].airflow_environment,
            long_description=df.iloc[i].long_description_md,
            short_description=df.iloc[i].short_description_md,
            start_date=df.iloc[i].start_date,
            cron_expression=df.iloc[i].cron,
            tags=df.iloc[i].tags,
            sensors=df.iloc[i].sensors,
            sql_query=df.iloc[i].sql_query,
        )


generator_of_morning_kpi_dag_to_gp()

После добавления данного DAG мы сразу же видим результат – в нашем веб-интерфейсе появился DAG с ранее созданной метрикой:

Давайте запустим наш сгенерированный DAG и посмотрим на результат, но в начале отредактируем SQL-запрос под необходимый нам формат.

Изменение sql_query для DAG test_1
new_values = {
    'kpi_id': 1,
    'sql_query': '''
    SELECT 
        ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
        (random()*100)::int AS value
    ''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Всё, DAG работает и что-то собирает. Давайте сделаем для примера ещё несколько DAG.

Я создал пять дополнительных DAG по следующему шаблону (для демонстрации):

Hidden text
# Пример использования
new_values = {
    'kpi_id': 6,
    'dag_id': 'test_6',
    'metric_name_en': 'test_6',
    'owner': 'korsak0v',
    'start_date': '2000-01-01',
    'cron': '15 14 1 * *',
    'tags': '''['dm', 'pg', 'gen_dag', 'from_pg']''',
    'sql_query': '''
    SELECT 
        ('2023-'||((random()*11+1)::int)::varchar||'-'||((random()*27+1)::int)::varchar)::date AS date,
        (random()*100)::int AS value
    ''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

Всё работает корректно. В БД записи есть:

В Airflow DAG корректно отображаются и работают.

Давайте добавим ещё сенсоры для демонстрации. Я добавил новый DAG и если посмотреть на его граф, то мы можем увидеть EmptyOperatorempty_0. Сейчас это просто заглушка на случай если у нас нет сенсоров на другие DAG. При дагране он выполнится менее чем за секунду и наш пайплайн продолжит корректно работать.

Давайте добавим список сенсоров:

Добавление списка сенсоров для изменения графа DAG test_7
new_values = {
    'kpi_id': 7,
    'sensors': '''['test_1', 'test_2', 'test_3', 'test_4', 'test_5', 'test_6']''',
}


# Вызов функции
scd_dim_kpi(
    dict_kpi=new_values
)

И получим такой граф. Теперь вместо нашей заглушки мы имеем список сенсоров, которые будут проверять отработали ли какие-то DAG или нет.

Ну, и в конце давайте посмотрим, что насобирали наши сгенерированные DAG:

В дальнейшем мы сможем сделать JOIN между таблицей фактов и справочником, чтобы получить информацию по каждой из метрик.

Заключение

Я постарался описать максимально подробно свой кейс. Возможно, он является узконаправленным, но тут я рассказал об очередном генераторе DAG.

Также я не отрицаю, что это не идеально, но и идеала не существует ;)

Мог какие-то моменты упустить/упростить/не описать подробно. За всем этим можете обращаться в комментарии.

Важные моменты, на которые стоит обратить внимание: 

  1. Данная реализация не генерирует файл, а генерирует только объекты, поэтому во всех DAG будет исходный код нашего DAG, в котором происходит вся генерация. Если вам нужны физически созданные DAG, то можете прочитать мою предыдущую статью.

  2. Если некорректно передать какие-то атрибуты в функцию, то сломаются все DAG. Если к примеру вместо даты попадет какая-то строка, то произойдет исключение, функция не отработает и ни один DAG не будет создан. Но вы можете это улучшить при помощи обработки исключений и прочего.

  3. В данной реализации kpi_id нужно подставлять самостоятельно. В моем варианте не реализован автоинкремент. Но как я говорил ранее этот продукт доработан и другая команда реализовала уже это на backend.

  4. Я показал самую простую реализацию данного инструмента. Вы можете настраивать его под себя, с обработкой каждого атрибута, использовать фреймворки, использовать самописные обработчики и прочие.

  5. Можно сделать дополнительную нормализацию данных, сделать дополнительные справочники, привязать ORM-модели и реализовать сразу какой-то backend, чтобы не заполнять это руками.

  6. В демонстрационном DAG реализована идемпотентность, которая позволяет грузить метрики снапшотами за каждый день. Вы можете это регулировать также под свои нужды.

  7. Мой кейс решает определенную бизнес-задачу. Бизнес рад и я рад :)


Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.

Теги:
Хабы:
Всего голосов 8: ↑4 и ↓4+3
Комментарии9

Публикации

Работа

Data Scientist
38 вакансий

Ближайшие события