Привет, Хабр!
Методология Data Vault была разработана Дэном Линстедом в конце 1990-х годов и предлагает гибкий, масштабируемый и проверяемый способ управления данными. Data Vault сочетает в себе самые лучшие черты нормализованных моделей данных и звездных схем.
В этой статье мы рассмотрим эту методологию и как с помощью нее проектировать DWH на примере.
Основные компоненты
Hubs (хабы)
Hubs представляют собой центральные таблицы, которые хранят бизнес-ключи. Бизнес-ключи идентифицируют основные бизнес-сущности, типо как клиенты, продукты или транзакции. Каждый бизнес-ключ является уникальным и неизменяемым идентификатором, который связывает данные между различными системами и модулями.
Хабы состоят из:
Бизнес-ключ — уникальный идентификатор бизнес-объекта.
Суррогатный ключ — внутренний идентификатор, используемый для связи хаба с другими таблицами.
Источник записи — информация о системе, из которой был загружен бизнес-ключ.
Пример хаба - таблица "Customer" H_Customer, где бизнес-ключом будет уникальный идентификатор клиента Customer_ID, а суррогатный ключ используется для внутренних связей.
Links (ссылки)
Links представляют отношения между хабами. Т.е таблицы, которые фиксируют взаимодействия или транзакции между бизнес-ключами. Основные элементы links:
Бизнес-ключи — идентификаторы из связанных хабов.
Суррогатный ключ — уникальный ключ для каждой комбинации бизнес-ключей.
Дата и время загрузки — метка времени, указывающая, когда запись была впервые загружена.
Источник записи — информация о системе, из которой была загружена связь.
Ссылки дают возможность добавлять новые отношения между хабами без изменения существующей структуры. Например, ссылка между таблицами "Customer" и "Product" может фиксировать транзакции покупки.
Satellites (сателлиты)
Спутники содержат дескриптивные атрибуты, связанные с бизнес-ключами или отношениями в хабах и ссылках. Спутники позволяют хранить исторические данные, фиксируя изменения атрибутов во времени. Основные элементы спутников:
Идентификатор хаба или ссылки — уникальный ключ, связанный с хабом или ссылкой.
Атрибуты — дескриптивные данные, такие как имя клиента, адрес, характеристики продукта и т.д.
Дата начала и окончания — временные метки, указывающие период действия данных.
Источник записи — информация о системе, из которой были загружены данные.
Например, спутник для хаба "Customer" может содержать атрибуты, как имя клиента, адрес и номер телефона.
Этапы создания Data Vault на примере
Допустим, есть компания, которая занимается электронной коммерцией и она хочет построить хранилище данных для анализа покупательских данных и управления запасами.
Первый шаг - идентификация ключевых бизнес-объектов и создание соответствующих хабов. В нашем случае это будут клиенты, продукты и заказы. Хабы будут содержать уникальные идентификаторы для каждого бизнес-объекта.
Создаем хабы для клиентов:
CREATE TABLE dv.H_Customer ( Customer_HK VARCHAR(64) PRIMARY KEY, -- Hash Key Customer_ID VARCHAR(64) NOT NULL, -- Business Key Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp Record_Source VARCHAR(64) NOT NULL -- Record Source );
Customer_HK является хэш-ключом, созданным на основе бизнес-ключа Customer_ID.
Следующий шаг — создание ссылок для фиксации отношений между бизнес-объектами. Например, ссылка между клиентами и заказами.
Создаем ссылки между клиентами и заказами:
CREATE TABLE dv.L_Customer_Order ( Customer_Order_HK VARCHAR(64) PRIMARY KEY, -- Hash Key Customer_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Customer Order_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Order Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp Record_Source VARCHAR(64) NOT NULL -- Record Source );
Customer_Order_HK — хэш-ключ, созданный на основе комбинации ключей Customer_HK и Order_HK.
Спутники позволят содержать дескриптивные атрибуты, связанные с бизнес-объектами. Создадим спутники для хранения информации о клиентах:
CREATE TABLE dv.S_Customer ( Customer_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Customer Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp End_DTS TIMESTAMP, -- End Timestamp Record_Source VARCHAR(64) NOT NULL, -- Record Source Customer_Name VARCHAR(255), -- Customer Name Customer_Email VARCHAR(255), -- Customer Email Customer_Phone VARCHAR(20) -- Customer Phone );
Customer_HK используется для связи со соответствующим хабом, а столбцы Customer_Name, Customer_Email и Customer_Phone хранят дескриптивную информацию о клиентах.
После определения структуры необходимо загрузить данные в хабы, ссылки и спутники. Для этого используется процесс ETL.
ETL-процесс для загрузки данных в хаб клиентов:
INSERT INTO dv.H_Customer (Customer_HK, Customer_ID, Load_DTS, Record_Source) SELECT MD5(CONCAT(Customer_ID, Record_Source)) AS Customer_HK, -- Creating the Hash Key Customer_ID, NOW() AS Load_DTS, 'SourceSystem1' AS Record_Source FROM staging.Customer WHERE NOT EXISTS ( SELECT 1 FROM dv.H_Customer WHERE dv.H_Customer.Customer_ID = staging.Customer.Customer_ID );
Важно мониторить изменения. Для этого используются столбцы Load_DTS и End_DTS в спутниках.
Пример обновления спутника при изменении данных о клиентах:
UPDATE dv.S_Customer SET End_DTS = NOW() WHERE Customer_HK = :Customer_HK AND End_DTS IS NULL; INSERT INTO dv.S_Customer (Customer_HK, Load_DTS, Record_Source, Customer_Name, Customer_Email, Customer_Phone) VALUES (:Customer_HK, NOW(), 'SourceSystem1', :Customer_Name, :Customer_Email, :Customer_Phone);
Этот процесс сначала закрывает текущую запись, установив End_DTS, а затем вставляет новую запись с обновленными данными.
Бизнес-слой добавляется поверх сырых данных для включения бизнес-правил, вычислений и агрегатов. слой помогает обеспечить более удобный доступ к данным и повышает их полезность для аналитики и отчетности.
Пример создания бизнес-слоя для вычисления жизненной ценности клиента:
CREATE TABLE dv.B_Customer_CLV ( Customer_HK VARCHAR(64) NOT NULL, -- Foreign Key to H_Customer CLV DECIMAL(18, 2), -- Calculated Lifetime Value Load_DTS TIMESTAMP NOT NULL, -- Load Timestamp Record_Source VARCHAR(64) NOT NULL -- Record Source ); INSERT INTO dv.B_Customer_CLV (Customer_HK, CLV, Load_DTS, Record_Source) SELECT H_Customer.Customer_HK, SUM(O.Order_Amount) AS CLV, -- Summing the order amounts to calculate CLV NOW() AS Load_DTS, 'BusinessCalculation' AS Record_Source FROM dv.H_Customer AS H_Customer JOIN dv.L_Customer_Order AS L_Customer_Order ON H_Customer.Customer_HK = L_Customer_Order.Customer_HK JOIN dv.S_Order AS S_Order ON L_Customer_Order.Order_HK = S_Order.Order_HK GROUP BY H_Customer.Customer_HK;
Создали новую таблицу в бизнес-слое, которая вычисляет и хранит жизненную ценность клиентов, основываясь на данных о заказах.
Можно автоматизировать процессы ETL, чтобы минимизировать ручной труд. Например, с помощью Apache Airflow:
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from datetime import datetime def load_customer_data(**kwargs): # загрузка данных в хаб клиентов import psycopg2 from psycopg2 import sql import hashlib conn = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="your_host", port="your_port" ) cursor = conn.cursor() # SQL запрос для загрузки данных в хаб клиентов load_customer_sql = """ INSERT INTO dv.H_Customer (Customer_HK, Customer_ID, Load_DTS, Record_Source) SELECT MD5(CONCAT(Customer_ID, Record_Source)) AS Customer_HK, Customer_ID, NOW() AS Load_DTS, 'SourceSystem1' AS Record_Source FROM staging.Customer WHERE NOT EXISTS ( SELECT 1 FROM dv.H_Customer WHERE dv.H_Customer.Customer_ID = staging.Customer.Customer_ID ); """ cursor.execute(load_customer_sql) conn.commit() cursor.close() conn.close() def load_order_data(**kwargs): # загрузка данных в хаб заказов import psycopg2 from psycopg2 import sql import hashlib conn = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="your_host", port="your_port" ) cursor = conn.cursor() # SQL запрос для загрузки данных в хаб заказов load_order_sql = """ INSERT INTO dv.H_Order (Order_HK, Order_ID, Load_DTS, Record_Source) SELECT MD5(CONCAT(Order_ID, Record_Source)) AS Order_HK, Order_ID, NOW() AS Load_DTS, 'SourceSystem1' AS Record_Source FROM staging.Order WHERE NOT EXISTS ( SELECT 1 FROM dv.H_Order WHERE dv.H_Order.Order_ID = staging.Order.Order_ID ); """ cursor.execute(load_order_sql) conn.commit() cursor.close() conn.close() default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 5, 21), 'email_on_failure': False, 'email_on_retry': False, } dag = DAG('data_vault_etl', default_args=default_args, schedule_interval='@daily') start = DummyOperator(task_id='start', dag=dag) load_customers = PythonOperator( task_id='load_customer_data', provide_context=True, python_callable=load_customer_data, dag=dag ) load_orders = PythonOperator( task_id='load_order_data', provide_context=True, python_callable=load_order_data, dag=dag ) end = DummyOperator(task_id='end', dag=dag) start >> load_customers >> load_orders >> end
Можно интегрировать Grafana и Prometheus.
Пример настройки мониторинга с использованием Grafana и Prometheus:
Установка Prometheus для сбора метрик:
global: scrape_interval: 15s scrape_configs: - job_name: 'airflow' static_configs: - targets: ['localhost:8080']
Настройка Grafana для визуализации данных из Prometheus:
{ "datasource": "Prometheus", "query": "sum(rate(airflow_task_duration_seconds_sum[1m])) by (task)" }
В общих чертах проектирование выглядит так.
Не забывайте документировать все процессы, архитектуру и используемые инструменты.
А больше про инструменты архитектуры можно узнать в рамках практических онлайн-курсов. Подробнее в каталоге.
