
Введение
Привет, Хабр!
Меня зовут Марк Порошин, я занимаюсь DataScience в DV Group. Недавно я уже рассказывал про то, как начать трансформировать данные с помощью dbt. Сегодня я решил поделиться, как мы в DV Group поженили dbt, Greenplum и DataVault, собрали все грабли, что могли; немного поконтрибьютили в open-source, но по итогу остались очень довольны результатом.
Расскажу сначала пару слов о том, что такое DataVault. DataVault - методология построения хранилища, предполагающая высокую нормализацию данных (3ая нормальная форма). Основными ее компонентами являются:
hub - “сущность” хранит только первичный и бизнес-ключ;
satellite - “свойства сущности”, относятся многие к одному с хабом и хранит свойства сущности;
link - “связь между сущностями” - отношение многие ко многим между сущностями (не обязательно двумя).
Чтобы стало чуть понятнее, давайте рассмотрим пример. Предположим мы хотим хранить информацию о запусках рекламных кампаний. У нас есть данные о том, когда клиенты запускали кампанию для каких-то товаров. Как же в этом случае может выглядеть ER диаграмма?

Можно заметить, что в сателлитах есть поле effective_from
и <entity>_hashdiff
, благодаря которому в DavaVault реализуется SCD2, это дает возможность реализовывать “версионность” данных.
Больше почитать про Data Vault можно здесь:
Прежде чем переходить к основной части, хочу дать поделиться статьей, потому что я начинал изучать эту тему именно с нее и во многом статьи будут пересекаться, но я хочу больше сконцентрироваться на деталях, а еще обсудить ошибки, которые мы совершили.
Постановка задачи
Из внешних источников данных мы периодически загружаем историю покупок пользователей в таблицу pure.pure_transactions
на Greenplum и хотим преобразовать ее в структуру Data Vault, т.е. разбить данные на хабы, линки и сателлиты. Преобразование происходит в 3 этапа.
Сначала нужно подготовить таблицу с данными, которые будут загружаться (мы будем выбирать данные за 1 день).
Далее необходимо обогатить данные всеми необходимыми хешами, но об этом дальше.
И, наконец, расщепить данные на сущности.

Таблица pure.pure_transactions
описывает историю покупок пользователей с некоторой метаинформацией. К сожалению, показать ее полностью я не могу, но в рамках статьи нам необходимы только следующие поля:
id транзакции(
transaction_id
);дата транзакции(
transaction_date
);цена товара(
price
);количество купленного товара(
quantity
);наименование товара(
product_name
);id категории товара(
cat_id
);
Мы выделили из этих данных две сущности
транзакция или строчка в чеке (
transaction_id
);товар (
product_name
);
Теперь когда у нас есть представление о том, чего мы хотим и какие у нас данные, перейдем к самому интересному.
Адаптер для greenplum
Прежде чем начать писать dbt-код, хочу немного рассказать про особенности работы с Greenplum. Greenplum — база построена на основе Postgres, поэтому синтаксис SQL запросов практически полностью совпадает, но есть ряд значительных отличий, которые будут использоваться в дальнейшем, и которые стали причиной реализации отдельного адаптера для dbt. Подробнее про это можно почитать здесь. А еще хочу поделиться интересным докладом, он будет полезен всем, кто начинает работать Greenplum.
Функциональность адаптера
Важная особенность Greenplum — возможность указать поле дистрибьюции. По этому полю Greenplum будет “раскладывать” данные по сегментам и по этому же полю будут самые эффективные join-ы. Указать параметр можно следующим образом:
{{
config(
...
distributed_by='<field_name>'
...
)
}}
Сжатие и колоночная ориентация
Greenplum предназначен для работы с большими данными, уменьшение времени на чтение/запись за счет сжатия является значительным фактором, который позволяет сократить время выполнения запроса. В dbt при использовании адаптера для Greenplum, это можно имплементировать следующим образом:
{{
config(
...
appendonly='true',
orientation='column',
compresstype='ZSTD',
compresslevel=4,
blocksize=32768
...
)
}}
Здесь мы указали параметр appendonly='true'
, он позволяет Greenplum создать таблицу оптимизированную для вставок. А еще мы добавили, что хотим использовать колоночную ориентацию orientation='column'
. И, наконец, указывали тип сжатия compresstype='ZSTD'
, который хотим использовать и его параметры compresslevel=4
, blocksize=32768
. Указанные значения являются параметрами по умолчанию — их можно не прописывать отдельно, если они для вас подходят.
Партиционирование
Последней важной особенностью является партиционирование, в postgres тоже есть эта возможность, но такой возможности нет у адаптера dbt-postgres(прошу поправить меня, если я ошибаюсь). Партиционирование позволяет разбить таблицу на несколько физических файлов по некоторому условию и читать только необходимые партиции. Из-за того что в Greenplum нельзя настраивать партиционирование, во время создания таблицы с помощью create table as select
, реализация этой фичи получилась не очень симпатичной. Требуется указать строчку с определением полей и строчку с определением партиционирования:
{% set fields_string %}
id int4 null,
incomingdate timestamp NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (incomingdate)
(
START ('2021-01-01'::timestamp) INCLUSIVE
END ('2023-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
...
fields_string=fields_string,
raw_partition=raw_partition,
default_partition_name='other_data'
...
)
}}
Построение DataVault
Raw
Сперва нужно выделить данные за день, для этого создали модель raw.raw_transaction
:
{{
config(
schema='raw',
materialized='table'
)
}}
with transaction_day_dedup as (
select * from (
select *,
row_number() over (
partition by pa."transaction_id"
order by pa."savetime" asc
) as rn
from {{ source('pure', 'pure_transactions') }} pa
where
'{{ var('raw_transactions')['start_date'] }}' <= transaction_date
and
transaction_date < '{{ var('raw_transactions')['end_date'] }}'
) as h
where rn = 1
)
select
"transaction_id" as transaction_id,
"transaction_date" as transaction_date,
"price" as price,
"quantity" as quantity,
"product_name" as product_name,
"cat_id" as cat_id,
...
'PURE_TRANSACTIONS' as record_source
from transaction_day_dedup ra
Здесь мы с помощью CTE выбирали данные за один день и дедублицировали по полю transaction_id
. После запуска модели в таблице raw.raw_transaction
у нас оказались данные за 1 день, если указать соответствующие переменные var('raw_transactions')['start_date']
и var('raw_transactions')['end_date']
:
vars:
raw_transactions:
start_date: '2022-01-01 00:00:00.0'
end_date: '2022-01-02 00:00:00.0'
Stage
К данным в таблице raw.raw_transaction
добавили первичные ключи, которые будут использоваться в сущностях DataVault.
Мы использовали пакет dbtvault (его пришлось немного доработать, чтобы он поддерживал последнюю версию dbt). Он позволяет сократить количество boilerplate кода.
Чтобы установить необходимые зависимости мы добавили в корень dbt проекта файл package.yml
со следующим содержанием:
packages:
- git: "https://github.com/markporoshin/dbtvault.git"
revision: develop
и вызвать команду:
dbt deps
После этого у вас появится папка dbt_packages, в которой будут находиться исходники установленных пакетов.
В модели stage_transactions
мы завели переменную yaml_metadata
и указываем поля, которые станут основой для ключей. Их существует два типа:
Первичные ключи сущностей: хабы и линки;
HASHDIFF — хеши для отслеживания изменений в свойствах сущности, которые строятся из полей сателлита.
{{
config(
schema='stage',
materialized='table',
)
}}
{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
LOAD_DATE: (SAVETIME + 1 * INTERVAL '1 day')
EFFECTIVE_FROM: 'SAVETIME'
hashed_columns:
TRANSACTION_PK:
- 'transaction_id'
TRANSACTION_HASHDIFF:
is_hashdiff: true
columns:
- 'price'
- 'quantity'
- 'transaction_date'
PRODUCT_PK:
- 'product_name'
PRODUCT_HASHDIFF:
is_hashdiff: true
columns:
- 'cat_id'
LINK_TRANSACTION_PRODUCT_PK:
- 'transaction_id'
- 'product_name'
...
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
Результатом вызова модели получится таблица stage.stage_transactions
в базе данных со следующими полями:
transaction_id
transaction_date
price
quantity
product_name
cat_id
transaction_pk
transaction_hashdiff
product_pk
product_hashdiff
link_transaction_product_pk
load_date
effective_from
record_source
…
Теперь у нас есть все необходимые хеши для того чтобы строить хранилище в методологии DataVault.
Создание Хаба
Рассмотрим создание хаба на примере сущности “продукт”, из исходных данных у нас есть его название cleanedname
, которое является бизнес-ключом(natural key), на stage стадии мы создали первичный ключ product_pk
, а также поле classid
, которое является его свойством.
Модель хаба product будет выглядеть следующий образом:
{{
config(
schema='raw_vault',
materialized='incremental',
distributed_by='product_pk',
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "product_pk" -%}
{%- set src_nk = "cleanedname" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{{ config(schema='raw_vault') }}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) }}
В начале мы описываем конфигурацию модели указываем схему, тип материализации и ключ дистрибьюции. После этого определяем переменные:
source_model
— таблицу источник, из которой будет происходить выгрузка данных для пополнения хаба;src_pk
— название поля, в котором хранится первичный ключ хаба;src_nk
— название поля, в котором хранится бизнес-ключ хаба;load_date
— название поля с датой загрузки данных;src_source
— название поля, в котором хранится наименование источника данных.
И вызываем макрос для генерации кода. В результате появится таблица с следующим DDL:
CREATE TABLE raw_vault.h_product (
product_pk text NULL,
cleanedname text NULL,
load_date text NULL,
record_source unknown NULL
)
WITH (
appendonly=true,
blocksize=32768,
orientation=column,
compresstype=zstd,
compresslevel=4
)
DISTRIBUTED BY (product_pk);
Рассмотрим еще один пример модели хаба. Дело в том, что сущность "транзакция"(сточка чека), в отличие от продукта однозначно относится со временем, когда совершили покупку, и хочется добавить поле incomingdate
для того чтобы реализовать партиционирование по нему. У нас на данный момент число транзакций превысило миллиард и обновление хаба без партиционирования занимает несколько часов.
{% set fields_string %}
transaction_pk text NULL,
load_date text NULL,
record_source text NULL,
transaction_id text NULL,
transaction_date timestamp NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (transaction_date)
(
START ('2020-01-01'::timestamp) INCLUSIVE
END ('2028-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
schema='raw_vault',
materialized='incremental',
compresslevel=4,
distributed_by='transaction_pk',
fields_string=fields_string,
raw_partition=raw_partition
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "transaction_pk" -%}
{%- set src_nk = "transaction_date" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{%- set src_extra = ["transaction_date"] -%}
{%- set partition_cause = "'" + var('h_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('h_transaction')['start_date'] + "'" -%}
{{ config(schema='raw_vault') }}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model,
src_extra=src_extra, partition_cause=partition_cause) }}
Это уже не совсем соответствует подходу DataVault, но следование ему в точности обходилось бы слишком дорого.
DDL модели h_transaction
:
CREATE TABLE raw_vault.h_transaction (
transaction_pk text NULL,
load_date text NULL,
record_source text NULL,
transaction_id text NULL,
transaction_date timestamp NULL
)
WITH (
appendonly=true,
blocksize=32768,
orientation=column,
compresstype=zstd,
compresslevel=4
)
DISTRIBUTED BY (transaction_pk)
PARTITION BY RANGE(transaction_date)
(
START ('2020-01-01 00:00:00'::timestamp without time zone) END ('2028-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval) WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4')
COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4),
DEFAULT PARTITION extra WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4')
COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)
);
Создание сателлита
У хаба продуктов есть свойство cat_id
, поэтому мы создали сателлит для его хранения:
{{
config(
schema='raw_vault',
materialized='incremental',
distributed_by='product_pk',
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "product_pk" -%}
{%- set src_hashdiff = "product_hashdiff" -%}
{%- set src_payload = ["cat_id, productname"] -%}
{%- set src_eff = "effective_from" -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_eff=src_eff,
src_ldts=src_ldts, src_source=src_source,
source_model=source_model) }}
Модель выглядит практически также, как их хаб за исключением трех дополнительных переменных:
src_hashdiff
— название поля, хранящие хеш данного набора свойств;src_eff
— название поля, хранящее дату, с которой данный кортеж актуален;src_payload
— список полей, которые составляют свойства данной сущности.
Аналогично с хабом нам потребовалось внедрить партиционирования для сателлитов, для этого рассмотрим сателлит для сущности транзакция:
{% set fields_string %}
transaction_pk text NULL,
transaction_id text NULL,
transaction_hashdiff text NULL,
price float4 NULL,
quantity float4 NULL,
transaction_date timestamp NULL,
load_date text NULL,
record_source text NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (transaction_date)
(
START ('2020-01-01'::timestamp) INCLUSIVE
END ('2028-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
schema='raw_vault',
materialized='incremental',
compresslevel=4,
distributed_by='transaction_pk',
fields_string=fields_string,
raw_partition=raw_partition
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "transaction_pk" -%}
{%- set src_hashdiff = "transaction_hashdiff" -%}
{%- set src_payload = [
"price",
"quantity",
"itemsum",
"transaction_id",
"transaction_date"
] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}
{{ config(schema='raw_vault') }}
{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_eff=src_eff,
src_ldts=src_ldts, src_source=src_source,
source_model=source_model, partition_cause=partition_cause) }}
Создание линки
Теперь, когда у нас есть два хаба и сателлиты к ним, осталось только создать таблицу, связывающую их. В роли такой сущности в DavaVault служат ссылки.
{% set fields_string %}
link_transaction_product_pk text NULL,
transaction_pk text NULL,
product_pk text NULL,
load_date text NULL,
record_source text NULL,
transaction_date timestamp NULL
{% endset %}
{% set raw_partition %}
PARTITION BY RANGE (transaction_date)
(
START ('2020-01-01'::timestamp) INCLUSIVE
END ('2028-01-01'::timestamp) EXCLUSIVE
EVERY (INTERVAL '1 day'),
DEFAULT PARTITION extra
);
{% endset %}
{{
config(
schema='raw_vault',
materialized='incremental',
compresslevel=4,
distributed_by='link_transaction_product_pk',
fields_string=fields_string,
raw_partition=raw_partition
)
}}
{%- set source_model = "stage_transactions" -%}
{%- set src_pk = "link_transaction_product_pk" -%}
{%- set src_fk = ["transaction_pk", "product_pk"] -%}
{%- set src_ldts = "load_date" -%}
{%- set src_source = "record_source" -%}
{%- set src_extra = ["incomingdate"] -%}
{%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}
{{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model,
src_extra=src_extra, partition_cause=partition_cause) }}
В методологии DavaVault есть возможность создавать сателлиты для link, однако кажется, что если у связи есть свойства, значит можно выделить еще одну сущность. Кстати, насчет связей между более чем двумя сущностями — лично я пришел к выводу, что лучше избегать подобные структуры без острой необходимости, поскольку последующие запросы получаются нетривиальными особенно если в подобной связи встречаются NULL.
Выводы
Я надеюсь, что мне удалось вас убедить, что применение dbt в связке с datavault позволяет сильно облегчить построение хранилища DavaVault. Я буду очень рад замечаниям, вопросам и комментариям, а так же приглашаю присоединиться к улучшению адаптера для Greenplum =)
Хочу подвести небольшой итог и выделить основные рекомендации, которые я могу дать на основе совершенных ошибок:
Используйте партиционирование везде, где это возможно;
Изучите SQL запросы, которые компилирует dbtvault, поскольку иногда их можно оптимизировать в контексте вашей задачи;
В greenplum есть потрясающая фича
external table
так что можно отказаться от хранения исходных данных (pure
схема в статье).
У меня осталось непокрытой последняя тема о том, как автоматизировать процесс наполнения хранилища с помощью Dagster(развитие Airflow от его же создателей). Если тема актуальна, пишите в комментарии, я расскажу о ней.