
Привет, Хабр!
Меня зовут Марк Порошин, в DV Group я занимаюсь Data Science. Мы работаем с большим количеством данных, на данный момент приближаемся к 10тб данных на нашем кластере Greenplum. Так как бизнес достаточно молодой, требования заказчиков, аналитиков постоянно меняются, да и сама структура данных периодически дополняется, поэтому мы выбрали достаточно современную технологию построения Data Warehouse — DataVault. Данные методологии очень привлекателны своей гибкостью, однако ценой за эту гибкость будет огромное количество таблиц. Это приводит сразу к двух основным проблемам:
Нужна база данных, которая поддерживает и хорошо справляется с большим количеством join-ов;
Способ автоматизации инкрементального наполнения таблиц, поскольку руками прописывать SQL запросы очень трудоемко, а еще это чревато ошибками.
Здесь я расскажу про технологию, которую мы используем в DV Group — dbt(data build tool), она позволяет во многом справиться со второй проблемой и очень хорошо себя зарекомендовала в нашем проекте.
Настройка проекта
Знакомство с dbt начнем с тестового проекта. В качество целевой базы данных будем использовать postgres, которую я настроил локально на своей машине. Создаем папку проекта, я буду работать в PyCharm, это вовсе необязательно, тут каждый выбирает сам. Необходимо настроить окружение python3 и установить необходимые зависимости.
pip install dbt-core==1.1.0 dbt-postgres==1.1.0
После этого инициализируем dbt проект:
(venv) ➜ PostgresDBTIntro dbt init
11:32:18 Running with dbt=1.1.0
Enter a name for your project (letters, digits, underscore): dbt_postgres_intro
Which database would you like to use?
[1] postgres
(Don\'t see the one you want? https://docs.getdbt.com/docs/available-adapters)
Enter a number: 1
11:33:04
Your new dbt project "dbt_postgres_intro" was created!
На данный момент у вас в проекте должна появиться папка с таким же названием, что вы указали в качестве имени проекта.
Первые шаги в dbt
Давайте немного пройдем по файлам, которые появились после инициализации проекта.

Под номером один находится файл dbt_project.yml, в котором мы описываем структуру проекта, переменные(vars), дефолтные типы материализаций моделей. Также здесь можно прописать хуки on-run-start, on-run-end. К этим тонкостям мы вернемся позже, а сейчас рассмотрим файл под номером 2 my_first_dbt_model.sql
/*
Welcome to your first dbt model!
Did you know that you can also configure models directly within SQL files?
This will override configurations stated in dbt_project.yml
Try changing "table" to "view" below
*/
{{
config(materialized='table')
}}
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
/*
Uncomment the line below to remove records with null `id` values
*/
-- where id is not null
Пропускаем блок комментариев, экранированных с помощью /* ... */
и видим:
{{
config(materialized='table')
}}
DBT построен на основе Jinja, поэтому {{ ... }}
используются для экранирования кода. В нем вызываем macro(читай “функцию”) - config, в который передаем аргументы для конфигурации нашей модели. В данном случае у нас всего лишь один аргумент materialized
со значением 'table'.
Это значит, что в результате запуска модели “my_first_dbt_model”, должна быть создана (пересоздана) таблица с таким же названием, как и название файла.
Следом идет sql код для выбора данных:
select *
from source_data
Прежде чем запускать модель, нужно разобраться с еще одним моментом.Пока мы еще нигде не прописали креденшены для подключения к инстансу нашего Postgres’a. Это делается с помощью файла profiles.yml
. В моем случае он выглядит следующим образом:
config:
send_anonymous_usage_stats: False
use_colors: True
partial_parse: True
dbt_postgres_intro:
outputs:
dev:
type: postgres
threads: 3
host: localhost
port: 5432
user: markporoshin
pass: "<password>"
dbname: dbt_intro_db
schema: public
target: dev
Я разместил этот файл на одном уровне с файлом dbt_project.yml
, сделано это для удобства дальнейшего деплоя. DBT предлагает стандартное расположение файла со всеми конфигурациями (/Users/<user>/.dbt/profiles.yml
на mac os). Чтобы узнать ваше дефолтное расположение, можно просто попробовать запустить модель, и в логах dbt напишет, где он по дефолту ищет файл с конфигами подключения:
dbt run --project-dir ./ -m my_first_dbt_model
Если же вы расположите profiles.yml
также как я, вызов модели будет выглядеть следующим образом:
dbt run --project-dir ./ --profiles-dir ./ --profile dbt_postgres_intro -m my_first_dbt_model
Здесь мы указываем расположением dbt проекта --project-dir ./
; путь к папке с файлом profiles.yml - --profiles-dir ./
; название профиля --profile dbt_postgres_intro
, поскольку у вас может быть несколько профилей в одном файле profiles.yml для разных проектов или разных окружений (например DEV, PROD)
При запуске модели для базы данных Postgres dbt дополнит его create table ... as ...
и мы получим следующий sql код для создания таблицы:
create table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" as (
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
);
Остановимся здесь чуть подробнее. DBT создал нам табличку, но в названии почему-то присутствует постфикс __dbt_tmp
. Это связано с тем, что dbt создает таблицу в несколько этапов:
-- создание новой таблицы
create table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" as (
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
);
-- если целевая таблица уже есть, переименуем ее в backup
alter table "dbt_intro_db"."public"."my_first_dbt_model" rename to "my_first_dbt_model__dbt_backup";
-- теперь переименуем новую таблицу в целевую
alter table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" rename to "my_first_dbt_model"
-- после того, как все предыдущие этапы прошли успешно, можем удалять backup
drop table if exists "dbt_intro_db"."public"."my_first_dbt_model__dbt_backup" cascade
dbt отслеживает успешность обновления таблицы, а если что-то пошло не так, возвращает все к “статусу кво”.
Проследить за тем, что именно делает dbt, при вызове модели, можно добавлением флага -d
:
dbt -d run --profiles-dir ./ --profile dbt_postgres_intro -m my_first_dbt_model
Как работает dbt
Чуть подробнее остановимся на том, что происходит, когда вы запускаете модель. При выполнении dbt run
dbt выполняет следующие действия:
Парсит модели, макросы, тесты итд. На этом этапе не выполняются никакие sql запросы;
Компилирует и запускает файлы уже не содержащие Jinja код.
Это важно понимать, чтобы избегать ошибок. Полезно прочитать статью в документации.
Понимание этого факта может помочь в дебаге запуска моделей. После успешной компиляции, скомпилированный файл можно найти в папке target/compiled
(генерируется автоматически), а если была успешно пройдена стадия run в папке target/run
можно найти sql код который будет выполнен.
Магия jinja
Наконец-то мы можем перейти к самому “вкусному” в dbt, тому, что помогает избавиться от написания boilerplate кода и сильно упростить жизнь data engineer =).
Для начала создадим новую dbt модель, чтобы немного наполнить нашу базу данными:
{{
config(
materialized='table',
)
}}
select 1 as id, 'Nikita' as name, 'Analytics' as type
union
select 2 as id, 'Stanislav' as name, 'Analytics' as type
union
select 3 as id, 'Alex' as name, 'CTO' as type
union
select 4 as id, 'Artem' as name, 'DevOps' as type
union
select 5 as id, 'Artem' as name, 'DataScience' as type
union
select 6 as id, 'Victor' as name, 'Backend' as type
union
select 7 as id, 'Mark' as name, 'DataEngineer' as type
Переменные
В dbt существует два способа работать с переменными.
Во-первых, вы можете их указать в файле dbt_projects.yml:
vars:
developer_name: "Nikita"
Дальше использовать в модели:
{{
config(
materialized='view',
)
}}
select
id,
type
from {{ ref('developers') }} d
where d.name = '{{ var('developer_name') }}'
Здесь мы видим сразу несколько новых моментов. В качестве материализации мы выбрали тип 'view'
, это приводит к созданию не таблицы, а view. Дальше мы берем в качестве источника данных {{ ref('developers') }}
, то есть мы хотим, чтобы dbt нашел модель developers и сам подставил путь к ней (возможно, что модель лежит не в дефолтной схеме или для нее задан alias
, это все можно настроить в macro config). И последнее, в условии where
с помощью макроса var
обращаемся к глобальным переменным dbt и вытягиваем значение переменной developer_name
.
Во втором случае мы можем использовать локальные переменные:
{{
config(
materialized='view',
)
}}
{% set type = 'DevOps' %}
select
id,
name
from {{ ref('developers') }} d
where type = '{{ type }}'
Создаем переменную с помощью ключевого слова set
и экранизируем это все с помощью {% … %}
.
Сразу зафиксируем, что в dbt по документации существует три типа “экранизации”:
{{ ... }}
— для вывода переменных или результатов выполнения макросов в скомпилированный файл;{% ... %}
— для объявления переменных, циклов, условных операторов и т.д.;{# ... #}
— комментарии.
Я встречал использование {%- ... -%}
, кажется это тоже самое, что и обычные скобки с процентами.
Циклы
Я думаю уже примерно понятна логика и структура Jinja инъекций в dbt. Ниже приведен пример модели, в которой используются массив и цикл:
{{
config(
materialized='table',
)
}}
{%- set types = ['Analytics', 'DataScience'] -%}
select
id,
name
from
{{ ref('developers') }}
where type in (
{%- for type in types -%}
'{{ type }}'
{%- if not loop.last %},{% endif -%}
{%- endfor -%}
)
Использование вспомогательных запросов
Зачастую хочется выполнить какой-то вспомогательный запрос, прежде чем запускать саму модель. Например, в контексте наших данных, мы хотим сначала получить разработчиков, наименования которых начинаются с буквы ‘а’, сохранить их в переменную, а потом использовать в целевом запросе. Понятно, что это все можно прописать в самом запросе, но существуют задачи, когда такое решение получается либо не оптимальным, либо громоздким, а иногда и вовсе невозможным. Рассмотрим использование вспомогательных запросов на примере:
{{
config(
materialized='table'
)
}}
{% set names_start_with_a_query %}
select
name
from
{{ ref('developers') }}
where lower(name) like 'a%'
{% endset %}
{% set names_start_with_a = [] %}
{% if execute %}
{% set names_start_with_a = run_query(names_start_with_a_query).columns[0].values() %}
{% endif %}
{{ log(names_start_with_a, info=True) }}
select
id,
name,
type
from
{{ ref('developers') }}
{% if names_start_with_a != () %}
where name in (
{%- for name in names_start_with_a %}
'{{ name }}'
{%- if not loop.last %},{% endif -%}
{%- endfor -%}
)
{% endif %}
После блока с конфигурацией модели, мы определяем переменные, names_start_with_a_query
, names_start_with_a
в которые записываем вспомогательный запрос и пустой массив.
Следом идет условный оператор, где мы выполняем запрос находящийся в переменной names_start_with_a_query
и записываем результат в переменную names_start_with_a
. Однако необходимо чуть подробнее остановиться на том, зачем нам нужна обертка выполнения запроса в условный оператор. Все дело в уже упомянутом жизненном цикле выполнения модели. execute
— специальная переменная, которая имеет значение True, если выполнение модели(макроса и тд) в “execute” моде, это значит, что в данный момент уже прошла стадия парсинга и можно выполнять sql запросы.
Пользовательские macro
Необходимость написания собственных macro объясняется несколькими причинами: во-первых, это уменьшение дублирования кода, во-вторых декомпозиция и на самом деле теми же аргументами, зачем нужны функции во всех языках программирования.
Создадим в папке macros файл so_important_macro.sql
:
{% macro so_important_macro(number) %}
{% set so_important_query %}
select 1 as info
union
select 2 as info
{% endset %}
{%- set info = run_query(so_important_query).columns[0].values() -%}
{{ log('number ' + number|string, info=True) }}
{{ return(info) }}
{% endmacro %}
И дальше можем использовать его в нашей модели:
{{
config(
materialized='table'
)
}}
{% if execute %}
{% set info = so_important_macro(4) %}
{{ log(' info: ' + info|string, info=True) }}
{% endif %}
select 1 as id
В результате в логах мы получим следующее:
Running with dbt=1.1.0
Found 8 models, 4 tests, 0 snapshots, 0 analyses, 168 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
Concurrency: 3 threads (target='dev')
1 of 1 START table model public.test_macro ..................................... [RUN]
number 4
info: (Decimal('1'), Decimal('2'))
1 of 1 OK created table model public.test_macro ................................ [SELECT 1 in 0.15s]
Finished running 1 table model in 0.24s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Инкрементальная материализация
Наконец мы перешли к самому интересному=)
Такой тип материализации позволяет инкрементально наполнять таблицу. Рассмотрим сначала данный тип на синтетическом примере. Предположим, что мы хотим на каждый запуск модели добавлять в нее максимальное значение в таблице +1, если в таблице нет данных, тогда вставляем 1.
{{
config(
materialized='incremental'
)
}}
{% set data_to_insert = 1 %}
{% if is_incremental() %}
{% set max_number_query %}
select max(num) from {{ this }}
{% endset %}
{% set data_to_insert = run_query(max_number_query).columns[0].values()[0]|int + 1 %}
{% endif %}
{{ log('number to insert: ' + data_to_insert|string, info=True)}}
select {{ data_to_insert }} as num
Макро is_incremental
доступен для моделей с типом incremental
и он возвращает True, если таблица уже существует. Это необходимо в случае наличия рекурсии в запросе, например при дедубликации данных.
Рассмотрим, что произойдет, если мы запустим модель в первый раз. Макро is_incremental()
вернет False и в итоге будет создана таблица с одной строчкой со значением 1
.
Если после этого мы попробуем запустить модель еще раз, тогда is_incremental()
вернет True. Внутри условного оператора мы определяем sql запрос, который возвращает максимальное значение из текущей таблицы(this
— специальная переменная dbt, которая возвращает Relation
на текущую таблицу). Таким образом при втором запуске в таблицу будет вставлено значение 2
, в третий раз 3
и так далее.
Теперь рассмотрим реальный пример использования инкрементальной материализации с дедубликацией. Предположим, что у вас есть таблица-источник raw_source
, в которую периодически вставляются данные, но там могут встречаться дубликаты строчек. Для удобства, предположим, что существует поле id
, которое уникально для набора остальных атрибутов, т.е по этому полю можно дедублицировать. Мы же хотим создать таблицу, в которой будут храниться только уникальные значения.
Для начала создадим в папке models
файл source.yml
в котором мы опишем источники данных (таблицы, которые наполняются из внешних источников и не являются моделями dbt):
version: 2
sources:
- name: raw
schema: public
tables:
- name: raw_source
И опишем модель stage_source.sql
:
{{
config(
materialized='incremental'
)
}}
select distinct on (src.id)
src.*
from
{{ source('raw', 'raw_source') }} src
{% if is_incremental() %}
left join
{{ this }} dst
on src.id = dst.id
where dst.id is null
{% endif %}
При первичным запуске итоговый select
запрос будет выглядеть следующим образом:
select distinct on (src.id)
src.*
from
"dbt_intro_db"."public"."raw_source" src
Видно, что мы выбираем все данные из raw_source
и дедублицируем их по src.id
Если же мы попробуем запустить второй раз:
select distinct on (src.id)
src.*
from
"dbt_intro_db"."public"."raw_source" src
left join
"dbt_intro_db"."public"."stage_source" dst
on src.id = dst.id
where dst.id is null
Теперь же мы сначала пытаемся найти данные, которых еще нет в stage_source
и после этого дедублицируем их по ключу src.id
Документация
Очень приятным дополнением в dbt является автоматическая генерация документации. Если вы активно используете ref
, source
dbt может автоматически построить DAG связей. Сгенерировать документацию и запустить сервер с ui можно следующим образом:
dbt docs generate --profiles-dir ./ --profile dbt_postgres_intro
dbt docs serve --profiles-dir ./ --profile dbt_postgres_intro

Так же можно писать документацию моделей в файле schema.yml
лежащим на уровне моделей, тогда все это тоже будет красиво оформлено в ui:

Заключение
Надеюсь мне удалось вас заинтересовать замечательной технологией dbt, если вы о ней еще не слышали или рассказать что-то новое для тех, кто уже присматривался к ней.
В dbt есть возможность писать свои плагины, это значительно расширяет потенциал. Пишите в комментариях ваши замечания и предложения.
В мыслях есть планы рассказать, как с помощью dbt можно строить datavault на базе greenplum и не испытывать боль =) На хабре уже есть статья на эту тему, но я бы хотел ее расширить уделить внимание деталям, тому как оркестрировать это все с помощью Dagster и ошибкам, которые мы совершили:
Исходники: ссылка.
P.S.
В качестве бонуса, мы в DV Group немного доработали адаптер dbt-postgres
для greenplum, чтобы можно было выбирать поле дистрибьюции, сжатие и патриционирование: ссылка на GitHub.