Как стать автором
Обновить
828.09
OTUS
Цифровые навыки от ведущих экспертов

SQL на стероидах – 5 кейсов использования dbt + Jinja

Время на прочтение 10 мин
Количество просмотров 4.9K

SQL – это нескучно. С современными инструментами возможности языка кратно возросли. Мультитул для моделирования данных dbt, современные колоночные аналитические СУБД позволяют буквально творить с данными чудеса.

Меня зовут Артемий и я Analytics Engineer. И сегодня я подготовил небольшой экскурс в реальные и интересные сценарии использования гибридного SQL:

  • Операции Pivot и Unpivot для табличных данных

  • Генерирование суррогатного ключа и ключа конкатенации

  • Гибкая фильтрация записей из таблиц-источников

  • Автоматизация экспорта данных из Хранилища в S3

  • Валютные курсы, Continuous Integration, Data Quality

Операции Pivot и Unpivot для табличных данных

Операция pivot позволяет повернуть плоскую двумерную таблицу таким образом, что значения в строках образуют новые колонки. Операция unpivot позволит выполнить обратное действие и уменьшить размерность таблицы, превратив колонки в строки. Иначе это можно назвать манипуляциями со сводными таблицами (сross-tables).

Проще всего показать это на небольших примерах:
Input: orders

| size | color |
|------|-------|
| S    | red   |
| S    | blue  |
| S    | red   |
| M    | red   |


Pivot output:

| size | red | blue |
|------|-----|------|
| S    | 2   | 1    |
| M    | 1   | 0    |

--------------

Input: orders

| date       | size | color | status     |
|------------|------|-------|------------|
| 2017-01-01 | S    | red   | complete   |
| 2017-03-01 | S    | red   | processing |


Unpivot output:

| date       | status     | field_name | value |
|------------|------------|------------|-------|
| 2017-01-01 | complete   | size       | S     |
| 2017-01-01 | complete   | color      | red   |
| 2017-03-01 | processing | size       | S     |
| 2017-03-01 | processing | color      | red   |

Соответствующие макросы dbt, реализующие преобразования данных через SQL, позволяют просто и гибко сформировать результирующий набор данных требуемого формата, без привязки к используемой СУБД.

Реальная ситуация, в которой это может применяться – подготовка выгрузки по заранее оговоренной структуре для внешних пользователей. Например, выгрузка пользовательских атрибутов в формате ключ-значение.

Исходный набор данных:

Результирующий набор данных:

Макрос, используемый в SQL-скрипте выглядит довольно просто:

{{ dbt_utils.unpivot(

 relation=ref('dim_chauffeurs_labels'),

 cast_to='varchar',

 exclude=['actual_dt', 'chauffeur_id'],

 field_name='label',

 value_name='value'

) }}

Преимущество такого подхода в том, что сколько бы дополнительных атрибутов не появилось, это не потребует ни единого изменения в существующий код, что очень удобно.

Генерирование суррогатного ключа и ключа конкатенации

Использование суррогатных ключей – важнейший аспект любого хранилища данных. Это ключи, позволяющие однозначно идентифицировать строки и соединять таблицы фактов и измерений. Обычно я использую хэш-функции, подавая на вход значения бизнес-ключей из систем-источников. Эта практика также широко распространена в Data Vault 2.0 и позволяет иметь согласованные результаты между данными из различных источников.

Но иногда в случае составного ключа вы хотели бы сохранить интерпретируемость и читаемость (human-readable) всех значений. В частности это особенно актуально в маркетинговой аналитике, где ключ строки может содержать набор меток: utm_source, utm_medium, utm_campaign, ads_group_id, keyword_id

Удобочитаемые (human-readable) значения помогают быстро отслеживать источники и исправлять баги, в то время как хешированные суррогатные ключи используются для оптимальных соединений таблиц:

Генерация этих ключей выполняется с помощью макросов hash(), concat(), surrogate_key() (адаптер MS SQL):

hash(), concat(), surrogate_key() macros
------------------------

--- MSSQL hash macro ---

------------------------

{% macro hash(field) -%}

   HASHBYTES('SHA2_256', {{field}})

{%- endmacro %}

 

--------------------------

--- MSSQL concat macro ---

--------------------------

{% macro concat(fields) -%}

   concat({{ fields|join(', ') }})

{%- endmacro %}

 

 

--------------------------

--- Surrogate hash key ---

--------------------------

{%- macro surrogate_key(field_list) -%}

 

   {%- if varargs|length >= 1 %}

 

   {%- do exceptions.warn("Warning: the `surrogate_key` macro now takes a single list argument instead of multiple string arguments. Support for multiple string arguments will be deprecated in a future release of dbt-utils.") -%}

 

   {# first argument is not included in varargs, so add first element to field_list_xf #}

   {%- set field_list_xf = [field_list] -%}

 

   {%- for field in varargs %}

   {%- set _ = field_list_xf.append(field) -%}

   {%- endfor -%}

 

   {%- else -%}

 

   {# if using list, just set field_list_xf as field_list #}

   {%- set field_list_xf = field_list -%}

 

   {%- endif -%}

 

 

   {%- set fields = [] -%}

 

   {%- for field in field_list_xf -%}

 

       {%- set _ = fields.append(

           "coalesce(cast(" ~ field ~ " as NVARCHAR " ~ "), '')"

       ) -%}

 

       {%- if not loop.last %}

           {%- set _ = fields.append("'-'") -%}

       {%- endif -%}

 

   {%- endfor -%}

 

   {# {{ concat(fields) }} #}

   {{ mybi_dbt_core.hash(mybi_dbt_core.concat(fields)) }}

 

{%- endmacro -%}

Следует обратить особое внимание на следующие моменты:

  • Преобразование составных значений атрибутов в текстовую форму

  • Обработка значений null, если таковые присутствуют

  • Последовательность соединения атрибутов ключа

  • Использование разделителя между значениями ключа (тире, подчеркивания или любого другого)

Вызов макросов в коде выглядит следующим образом:

select

	  {{ surrogate_key(["dt", "campaign_id", "ads_id"]) }} as hash_key
  , {{ concat_key(["dt", "campaign_id", "ads_id"]) }} as concat_key

Актуальный код, который будет запущен в СУБД (скомпилированная версия):

select

	  HASHBYTES('SHA2_256', concat(coalesce(cast([dt] as NVARCHAR ), ''), '-', coalesce(cast([campaign_id] as NVARCHAR ), ''), '-', coalesce(cast([ads_id] as NVARCHAR ), ''))) as hash_key
	, concat(coalesce(cast([dt] as NVARCHAR ), ''), '-', coalesce(cast([campaign_id] as NVARCHAR ), ''), '-', coalesce(cast([ads_id] as NVARCHAR ), '')) as concat_key

Гибкая фильтрация записей из таблиц-источников

Как правило Хранилище Данных объединяет информацию из десятков или даже сотен источников. Каждый из них обладает своими особенностями и в разных случаях имеет смысл ограничивать выборки, обращаясь к исходным данным.

Имейте в виду следующие сценарии:

  • Чтение актуальных данных (deleted_flag is not true)

  • Формирование ограниченной выборки для разработки и Continuous Integration

  • Поиск дельты для инкрементального наполнения витрин

При этом помните, что названия колонок, по которым мы фильтруем данные могут отличаться от источника к источнику. Различаться могут также и типы данных – epoch (bigint) vs timestamp.

Все эти вводные хорошо ложатся на шаблонизированное формирование выражений в конструкции WHERE.

select

   ...

from {{ source('backend', 'orders') }}

{{ hevo_filter_rows(
   deleted_rows_filter=true,
   last_n_days_of_data=false,
   timestamp_column='created_at',
   incremental_column='__metadata_timestamp'   
) }}

Сам макрос может выглядеть следующим образом (с пояснениями):

macro hevo_filter_rows()
-- filter data for deleted rows; resize for dev, ci pipelines; apply incremental load filters

{% macro hevo_filter_rows(

   deleted_rows_filter=true,

   last_n_days_of_data=false,

   timestamp_column='created_at',

   incremental_column='__metadata_timestamp'

) -%}

  

   {#- prepare expression to filter deleted rows by flag __hevo__marked_deleted where it exists  -#}

   {%- set deleted_rows_expression = '__hevo__marked_deleted is not true' if deleted_rows_filter == true else '1 = 1' -%}

 

   {#- cast epoch to timestamp if necessary -#}

   {%- set timestamp_column = epoch_to_timestamp('__hevo__ingested_at')

       if timestamp_column == '__hevo__ingested_at' else timestamp_column -%}

 

   {#- prepare expression to filter rows to last 'development_days_of_data' (e.g. last 3 days) -#}  

   {% set get_dev_watermark_query = 'select dateadd(day, ' ~ -1 * var('development_days_of_data') ~ ', current_timestamp::date)' %}

   {# {{ log("get_dev_watermark_query: " ~ get_dev_watermark_query, info=True) }} #}

   {% if execute %}

       {% set dev_watermark = "'" ~ run_query(get_dev_watermark_query).columns[0][0] ~ "'" %}

   {% endif %}       

   {%- set dev_rows_expression = timestamp_column ~ ' >= ' ~ dev_watermark

       if target.name in ['dev', 'ci'] and last_n_days_of_data == true else '1 = 1' -%}

 

   {#- prepare expression to filter only delta rows on incremental build -#}

   {%- set incremental_expression =

       epoch_to_timestamp('__hevo__ingested_at') ~ '

           >= dateadd(h, -24, (select max(' ~ incremental_column ~ ') from ' ~ this ~ '))'

       if is_incremental() else '1 = 1' -%}

 

   {#- prepare final filter expression -#}

   where 1 = 1

       and {{ deleted_rows_expression }}

       and {{ dev_rows_expression }}

       and {{ incremental_expression }}

 

{%- endmacro -%}

Результат для первого (с нуля) формирования таблицы в dev-контуре:

select 

   ...

from {{ source('backend', 'orders') }}

where 1 = 1
   and __hevo__marked_deleted is not true
   and created_at >= '2021-08-10 00:00:00'
   and 1 = 1

 

Автоматизация экспорта данных из Хранилища в S3

А теперь давайте вспомним первый пример, где мы формировали unpivot-таблицу для внешних пользователей. Предположим, что в этот раз они обратились к нам с просьбой сформировать выгрузку не внутри Хранилища, но как плоские файлы в Object Storage (S3).

Вполне реальный сценарий. Мы могли бы начать думать об инструментах, которые способны это сделать, выстраивать пайплайны, выбирать языки программирования, но не следует привлекать новые сущности без крайней на то необходимости. Воспользуемся тем, что умеет сама СУБД.

Операция UNLOAD (синтаксис Amazon Redshift), выполняемая как post-hook модели dbt:

UNLOAD to S3
{{
   config(
       materialized='view',
       post_hook=[
           "
           {%- if target.name == 'prod' -%}
               UNLOAD ('SELECT actual_dt, chauffeur_id, label, value FROM {{ this.schema }}.{{ this.table }} ORDER BY actual_dt, chauffeur_id')
               TO 's3://my-analytics-bucket/dwh/unload/chauffeurs_labels'
               IAM_ROLE 'arn:aws:iam::88002000000:role/redshift-role'
               FORMAT AS CSV
               PARTITION BY (actual_dt) INCLUDE
               HEADER
               CLEANPATH
           {%- endif -%}
           "
       ]      
   )
}}
 

{{ dbt_utils.unpivot(
 relation=ref('dim_chauffeurs_labels'),
 cast_to='varchar',
 exclude=['actual_dt', 'chauffeur_id'],
 field_name='label',
 value_name='value'
) }}

Таким образом, на каждый регулярный расчет (раз в сутки, раз в час) будет сформирована и сохранена в S3 по указанному пути csv-выгрузка. А чтобы не хранить данные в СУБД, материализуем выгрузку как представление (view, виртуальная таблица).

8 файлов по количеству узлов в кластере (принцип MPP-базы данных)

Еще больше примеров

Ранее я писал о других интересных сценариях и использовании современного аналитического стека:

Интеграционные тесты для Хранилища Данных – Настраиваем Slim CI для DWH

  • Формирование окружения для CI

  • Гибкое управление схемой, использование идентификатора Pull Request

Курсы валют и аналитика – использование обменных курсов в Хранилище Данных

  • Макросы для работы с внешними данными (External Tables)

  • Автоматизация пересчета курсов валют на лету

​​Кто ответит за качество аналитики: QA для Хранилища Данных

  • Тестирование ожиданий (expectations) от данных

  • Формирование тестов на бизнес-логику

Практические навыки и живое общение

Между тем, если вас интересует не только чтение, но и практика вкупе с получением знаний из первых уст, ознакомьтесь с платформой ОТУС и занятиями, в рамках которых я и мои коллеги делимся всех тем, что умеем и любим делать.

Data Engineer – один из самых успешных тиражных курсов, в запусках которого я участвую уже более 2-х лет. К новому старту готовы кардинальные обновления по содержанию, используемым инструментам, инфраструктуре, включая выделенные вебинары на разбор домашних заданий.

Analytics Engineer – попытка закрыть потребность на людей-мультиинструменталистов, которые сильны и в понимании специфики бизнеса, моделировании и в инженерной части. Львиная доля курса посвящена современным аналитическим СУБД, BI-инструментам, практикам продвинутой аналитики и моделирования в dbt.

Спасибо за внимание!


Материал подготовлен в рамках курса Data Engineer. Если вас интересует развитие в сфере работы с данными с нуля до Pro, предлагаем узнать про специализацию Data Engineer.

Также приглашаем всех желающих на Demo-урок «Введение в оркестрацию». На этом занятии подробно разберем, что такое платформы оркестрации, какие решения есть сегодня на рынке и даже углубимся в практический пример использования одной из самых распространенных платформ на сегодня: Apache Airflow.

РЕГИСТРАЦИЯ

Теги:
Хабы:
+5
Комментарии 0
Комментарии Комментировать

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS