
Всем привет! Это моя первая публикация на Хабре, поэтому строго не судите. Хочу поделиться с вами историей о том, как я делал Data Vault руками... или custom migrate a Data Vault c нотками Data Vault 2.0. Достаточно интересный способ провести время, но для начала углубимся в краткий экскурс.
Data Vault Modeling — процесс преобразования обычной модели данных в бизнес‑ориентированную модель для хранения информации, основанную на бизнес ключах.
Органика Data Vault:
Hub — отдельная таблица, описывающая уникальный бизнес ключ. В моем случае состоящая из полей:
1. Хеш - бизнес ключ в формате md5 |
2. Источник выгрузки, можно вывести код источника, но я взял саму таблицу =) |
3. Время загрузки данных в таблицу Hub |
4. Бизнес ключ. |
2. Satellite — описательная информация хаба
1. Хеш хаба |
2. Время загрузки данных в таблицу Satellite |
3. Дата действия записи в Satellite (SCD2) |
4. Источник, откуда берется выгрузка |
5. Атрибут Satellite |
3. Link — отношение между единицами бизнеса (ключами хабов)
1. Хеш линка (для комплексного просмотра изменений при SCD2 - состоит из скрещения хешей хаба) |
2. Время загрузки данных в таблицу Link |
3. Дата действия записи в Link (SCD2)(ПРИ НАЛИЧИИ) |
4. Источник, откуда берется выгрузка |
5. Хеш первого хаба |
6. Хеш второго хаба |
7. ... третьего при наличии и тд. |
В качестве примера была использована демонстрационная база данных, которую можно выгрузить в открытом доступе или глянуть здесь.
Перед формированием DataVault, необходимо возвести наглядную модель, в моем случае я использовал drawio от google. Это не займет много времени, но поможет при формировании хранилища.
Итак, поехали!
Открываем скрипт в консоли psql под нужным пользователем, у меня по дефолту.
psql -f demo_small_YYYYMMDD.sql -U postgres
Тем самым получим базу данных demo с данными об авиаперевозках по России. Объем данных небольшой, как раз подойдет для примера.
Для начала нам необходимо настроить репликацию (надо это вам или нет, решайте сами, но я сделал). Во избежании проблем связанных с доступами, распределением нагрузок и целостности данных.
Шаг 1. Репликация.

Для репликации данных в другую схему создадим новое расширение postgres_fdw, которое позволяет постгре держать связь с внешними данными из других серверов.
Как формируется реплика
/* -- Создаем расширение*/ DROP EXTENSION IF EXISTS postgres_fdw CASCADE; CREATE EXTENSION postgres_fdw; /*-- Создание сервера, который использует расширение в качестве обертки сторонних данных*/ CREATE SERVER IF NOT EXISTS foreign_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'localhost', port '5432', dbname 'demo'); CREATE USER MAPPING IF NOT EXISTS FOR postgres SERVER foreign_server OPTIONS (user 'postgres', password 'postgres'); /*-- Создадим схему foreign_tables, в которую будем лить реплику*/ DROP SCHEMA IF EXISTS foreign_tables; CREATE SCHEMA IF NOT EXISTS foreign_tables; IMPORT FOREIGN SCHEMA bookings -- название схемы исходника FROM SERVER foreign_server INTO foreign_tables;
Перед тем как создать сервер, поднимаем новую бд с именем DWHDataVault. Код исполнялся именно в ней. Итог 1 шага, мы получили реплицированные данные из исходника, но есть костыль... куда без них, ну или фича, кому как удобно.
Реплика переносит данные в виде внешних таблиц, к которым нельзя подвязать внешние ключи. Поэтому переливаем данные из слоя реплицирования foreign_tables в табличный, т.к. у внешних таблиц нет полного функционала табличной части...
Миграция в табличный слой
/*сперва создадим слой Stage для хранения функциональных таблиц*/ DROP SCHEMA IF EXISTS Stage CASCADE; CREATE SCHEMA IF NOT EXISTS Stage; /*aircrafts*/ DROP TABLE IF EXISTS Stage.aircrafts CASCADE; CREATE TABLE Stage.aircrafts AS TABLE foreign_tables.aircrafts; /*aircrafts_data*/ DROP TABLE IF EXISTS Stage.aircrafts_data CASCADE; CREATE TABLE Stage.aircrafts_data AS TABLE foreign_tables.aircrafts_data; /*boarding_passes*/ DROP TABLE IF EXISTS Stage.boarding_passes CASCADE; CREATE TABLE Stage.boarding_passes AS TABLE foreign_tables.boarding_passes; /*airports*/ DROP TABLE IF EXISTS Stage.airports CASCADE; CREATE TABLE Stage.Airports AS TABLE foreign_tables.airports; -- Что касается таблицы airports_data, в исходнике есть поле coordinates, в котором по дефолту тип данных point, для дальнейшего преобразования таблицы в органику DataVault необходимо п��реопределить тип /*airports_data*/ DROP TABLE IF EXISTS Stage.airports_data CASCADE; CREATE TABLE Stage.airports_data AS TABLE foreign_tables.airports_data; ALTER TABLE Stage.airports_data ALTER COLUMN coordinates TYPE VARCHAR; /*bookings*/ DROP TABLE IF EXISTS Stage.bookings CASCADE; CREATE TABLE Stage.bookings AS TABLE foreign_tables.bookings; /*flights*/ DROP TABLE IF EXISTS Stage.flights CASCADE; CREATE TABLE Stage.flights AS TABLE foreign_tables.flights; /*seats*/ DROP TABLE IF EXISTS Stage.seats CASCADE; CREATE TABLE Stage.seats AS TABLE foreign_tables.seats; /*ticket_flights*/ DROP TABLE IF EXISTS Stage.ticket_flights CASCADE; CREATE TABLE Stage.ticket_flights AS TABLE foreign_tables.ticket_flights; /*tickets*/ DROP TABLE IF EXISTS Stage.tickets CASCADE; CREATE TABLE Stage.tickets AS TABLE foreign_tables.tickets;
Шаг 2. Подготовка DDL для организации DataVault
Для начала необходимо определить схему, для миграция данных.
Схема для составляющих DataVault
DROP SCHEMA IF EXISTS data_vault CASCADE; CREATE SCHEMA IF NOT EXISTS data_vault;
DDL для таблиц Hub
Первичным ключом в таблицах будет захешированный бизнес ключ.
Для начала удалим таблицы, если такие имеются.
Удаление таблиц
DROP TABLE IF EXISTS data_vault.Hub_flights CASCADE; DROP TABLE IF EXISTS data_vault.Hub_airports_data CASCADE; DROP TABLE IF EXISTS data_vault.Hub_aircrafts_data CASCADE; DROP TABLE IF EXISTS data_vault.Hub_seats CASCADE; DROP TABLE IF EXISTS data_vault.Hub_ticket_flights CASCADE; DROP TABLE IF EXISTS data_vault.Hub_boarding_passes CASCADE; DROP TABLE IF EXISTS data_vault.Hub_tickets CASCADE; DROP TABLE IF EXISTS data_vault.Hub_bookings CASCADE;
Напишем функцию для формирования таблиц Hub. Функция принимает на вход два параметра:
1. Название таблицы Hub
2. Бизнес ключ
Функция DDL Hub
CREATE OR REPLACE FUNCTION data_vault.ddl_hub_table( table_name TEXT , -- название таблицы Hub business_key TEXT) -- какие колонки надо вытащить из таблицы источника указывается с форматом RETURNS VOID AS $$ DECLARE qwery TEXT; BEGIN -- проверка на наличие данных в параметрах функции IF (table_name IS NULL) OR (business_key IS NULL) THEN RAISE EXCEPTION 'Не заполнены параметры'; END IF; qwery:= 'CREATE TABLE IF NOT EXISTS data_vault.' || table_name || '(' || 'Hash_key Varchar(33) PRIMARY KEY, record_sourse Varchar(20) NOT NULL, Load_date timestamp NOT NULL ,' || business_key || ' NOT NULL);'; EXECUTE qwery; END; $$ LANGUAGE plpgsql;
Запрос без функции выглядит так.
CREATE TABLE IF NOT EXISTS data_vault.Hub_flights( Hash_key Varchar(33) PRIMARY KEY, record_sourse Varchar(20) NOT NULL, Load_date timestamp NOT NULL, flight_id bigint NOT NULL);
Код формирования DDL HUB
-- flights SELECT data_vault.ddl_hub_table('Hub_flights', 'flight_id bigint') -- airports_data SELECT data_vault.ddl_hub_table('Hub_airports_data', 'airport_code bpchar(3)') -- aircrafts_data SELECT data_vault.ddl_hub_table('Hub_aircrafts_data', 'aircraft_code bpchar(3)') -- seats SELECT data_vault.ddl_hub_table('Hub_seats', 'aircraft_code_seat_no Varchar(20)') -- ticket_flights SELECT data_vault.ddl_hub_table('Hub_ticket_flights', 'ticket_no_flight_id Varchar(25)') -- boarding_passes SELECT data_vault.ddl_hub_table('Hub_boarding_passes', 'ticket_no_flight_id Varchar(25)') -- tickets SELECT data_vault.ddl_hub_table('Hub_tickets', 'ticket_no Varchar(20)') -- bookings SELECT data_vault.ddl_hub_table('Hub_bookings', 'book_ref Varchar(15)')
DDL для таблиц Link
DDL для Link прописан вручную, так как структура у таблиц может быть индивидуальной.
В некоторых таблицах, обработка данных ведется без хеширования, поэтому привязать Link к Satellite на этапе DDL не представляется возможным, но в таких случаях REFERENCES осуществляется на этапе обработки.
-- Для начала удалим таблицы, если такие имеются. DROP TABLE IF EXISTS data_vault.Link_flights_airoport_data_departure CASCADE; DROP TABLE IF EXISTS data_vault.Link_flights_airoport_data_arrival CASCADE; DROP TABLE IF EXISTS data_vault.Link_flights_ticket_flights CASCADE; DROP TABLE IF EXISTS data_vault.Link_ticket_flights_boarding_passes CASCADE; DROP TABLE IF EXISTS data_vault.Link_ticket_flights_tickets CASCADE; DROP TABLE IF EXISTS data_vault.Link_tickets_bookings CASCADE; DROP TABLE IF EXISTS data_vault.Link_flights_aircrafts_data CASCADE; DROP TABLE IF EXISTS data_vault.Link_seats_aircrafts_data CASCADE;
Код формирования DDL LINK
-- flights_airoport_data_departure CREATE TABLE IF NOT EXISTS data_vault.Link_flights_airoport_data_departure( Link_flights_airoport_data_departure_Hashkey Varchar(33), load_date timestamp, load_end_date timestamp, record_sourse Varchar(20), Hub_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_flights(hash_key), Hub_airoport_data_Hash_key Varchar(33) REFERENCES data_vault.Hub_airports_data(hash_key) ); -- flights_airoport_arrival_airport CREATE TABLE IF NOT EXISTS data_vault.Link_flights_airoport_data_arrival( Link_flights_airoport_data_arrival_Hashkey Varchar(33), load_date timestamp, load_end_date timestamp, record_sourse Varchar(20), Hub_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_flights(hash_key), Hub_airoport_data_Hash_key Varchar(33) REFERENCES data_vault.Hub_airports_data(hash_key) ); -- flights_ticket_flights CREATE TABLE IF NOT EXISTS data_vault.Link_flights_ticket_flights( Link_flights_ticket_flights_Hashkey Varchar(33), load_date timestamp, load_end_date timestamp, record_sourse Varchar(30), Hub_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_flights(hash_key), ticket_no Varchar(33) , -- удалить потом, для инсерта первого Hub_ticket_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_ticket_flights(hash_key)); -- ticket_flights_boarding_passes CREATE TABLE IF NOT EXISTS data_vault.Link_ticket_flights_boarding_passes( Link_ticket_flights_boarding_passes Varchar(40) , load_date timestamp, load_end_date timestamp, record_sourse Varchar(33), ticket_no Varchar(33), Hub_boarding_passes_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/, Hub_ticket_flights_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/ ); -- ticket_flights_tickets CREATE TABLE IF NOT EXISTS data_vault.Link_ticket_flights_tickets( Link_ticket_flights_tickets Varchar(40), load_date timestamp, record_sourse Varchar(33), -- источники Hub_ticket_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_ticket_flights(hash_key), Hub_tickets_Hash_key Varchar(33) REFERENCES data_vault.Hub_tickets(hash_key) ); -- tickets_bookings CREATE TABLE IF NOT EXISTS data_vault.Link_tickets_bookings( Link_tickets_bookings Varchar(33) PRIMARY KEY, load_date timestamp, load_end_date timestamp, record_sourse Varchar(30), Hub_ticket_Hash_key Varchar(33) REFERENCES data_vault.Hub_tickets(hash_key), Hub_bookings_Hash_key Varchar(33) REFERENCES data_vault.Hub_bookings(hash_key) ); -- flights_aircrafts_data CREATE TABLE IF NOT EXISTS data_vault.Link_flights_aircrafts_data( Link_flights_aircrafts_data_Hashkey Varchar(33), load_date timestamp, record_sourse Varchar(22), Hub_flights_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/, Hub_aircrafts_data_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/ ); -- seats_aircrafts_data CREATE TABLE IF NOT EXISTS data_vault.Link_seats_aircrafts_data( Link_seats_aircrafts_data_Hashkey Varchar(33), load_date timestamp, record_sourse Varchar(20), Hub_aircrafts_data_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/, Hub_seats_Hash_key Varchar(33) /*REFERENCES в данном случае добавлен в коде с обработкой*/ );
DDL для таблиц Satellite
Очищаем таблицы, если такие есть
-- Чистим чистим DROP TABLE IF exists data_vault.Sattelite_aircrafts_data_range; DROP TABLE IF exists data_vault.Sattelite_aircrafts_data_model; DROP TABLE IF exists data_vault.Sattelite_seats_fare_conditions; DROP TABLE IF exists data_vault.Sattelite_airport_data_timezone; DROP TABLE IF exists data_vault.Sattelite_airport_data_coordinates; DROP TABLE IF exists data_vault.Sattelite_airport_data_airport_name; DROP TABLE IF exists data_vault.Sattelite_airport_data_city; DROP TABLE IF exists data_vault.Sattelite_ticket_flights_amount; DROP TABLE IF exists data_vault.Sattelite_ticket_flights_fare_conditions; DROP TABLE IF exists data_vault.Sattelite_boarding_passes_boarding_no; DROP TABLE IF exists data_vault.Sattelite_boarding_passes_seat_no; DROP TABLE IF exists data_vault.Sattelite_tickets_book_ref; DROP TABLE IF exists data_vault.Sattelite_tickets_contact_data; DROP TABLE IF exists data_vault.Sattelite_tickets_passenger_id; DROP TABLE IF exists data_vault.Sattelite_tickets_passenger_name; DROP TABLE IF exists data_vault.Sattelite_bookings_book_date; DROP TABLE IF exists data_vault.Sattelite_bookings_total_amount; DROP TABLE IF exists data_vault.Sattelite_flights_flight_no; DROP TABLE IF exists data_vault.Sattelite_flights_scheduled_departure; DROP TABLE IF exists data_vault.Sattelite_flights_scheduled_arrival; DROP TABLE IF exists data_vault.Sattelite_flights_departure_airport; DROP TABLE IF exists data_vault.Sattelite_flights_arrival_airport; DROP TABLE IF exists data_vault.Sattelite_flights_status; DROP TABLE IF exists data_vault.Sattelite_flights_aircraft_code; DROP TABLE IF exists data_vault.Sattelite_flights_actual_departure; DROP TABLE IF exists data_vault.Sattelite_flights_actual_arrival; ```
При формировании таблиц, возведем функцию, которая принимает на вход три параметра, первый — название таблицы Satellite, название хеша Hub, атрибут:
Функция DDL Satellite
CREATE OR REPLACE FUNCTION data_vault.ddl_Sattelite_table( table_name TEXT, -- название таблицы сателлита hub_hash TEXT, -- название хеша хаба, к которому относится сателлит atrib TEXT) -- какие колонки надо вытащить из таблицы источника указывается вместе с форматом RETURNS VOID AS $$ DECLARE qwery TEXT; BEGIN -- проверка на наличие данных в параметрах функции IF (table_name IS NULL) OR (hub_hash IS NULL) OR (atrib IS NULL) THEN RAISE EXCEPTION 'Не заполнены параметры'; END IF; qwery:= 'CREATE TABLE IF NOT EXISTS data_vault.' || table_name || '(' || hub_hash || '_Hash_key Varchar(33)' || ' REFERENCES data_vault.' || hub_hash || '(Hash_key),' || 'load_date timestamp, load_end_date timestamp, record_sourse Varchar(20), ' || atrib || ', ' || 'PRIMARY KEY (' || hub_hash || '_Hash_key ' || ', load_date));'; EXECUTE qwery; END; $$ LANGUAGE plpgsql;
Формирование таблиц Satellite:
Код формирование DDL Satellite
/*Sattelite_aircrafts_data*/ -- range SELECT data_vault.ddl_Sattelite_table('Sattelite_aircrafts_data_range', 'Hub_aircrafts_data', 'range int4'); --model SELECT data_vault.ddl_Sattelite_table('Sattelite_aircrafts_data_model', 'Hub_aircrafts_data', 'model TEXT'); /*Sattelite_seats*/ -- fare_conditions SELECT data_vault.ddl_Sattelite_table('Sattelite_seats_fare_conditions', 'Hub_seats', 'fare_conditions Varchar(15)'); /*Sattelite_airoport_data*/ -- timezone SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_timezone', 'Hub_airport_data', 'timezone Varchar(34)'); -- coordinates SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_coordinates', 'Hub_airport_data', 'coordinates Varchar(50)'); -- airport_name SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_airport_name', 'Hub_airport_data', 'airport_name text'); -- city SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_city', 'Hub_airport_data', 'city text'); /*Sattelite_ticket_flights*/ -- amount SELECT data_vault.ddl_Sattelite_table('Sattelite_ticket_flights_amount', 'Hub_ticket_flights', 'amount numeric(10, 2)'); -- fare_conditions SELECT data_vault.ddl_Sattelite_table('Sattelite_ticket_flights_fare_conditions', 'Hub_ticket_flights', 'fare_conditions Varchar(10)'); /*Sattelite_boarding_passes*/ -- boarding_no SELECT data_vault.ddl_Sattelite_table('Sattelite_boarding_passes_boarding_no', 'Hub_boarding_passes', 'boarding_no int4'); -- seat_no SELECT data_vault.ddl_Sattelite_table('Sattelite_boarding_passes_seat_no', 'Hub_boarding_passes', 'seat_no varchar(4)'); /*Sattelite_tickets*/ -- book_ref SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_book_ref', 'Hub_ticket', 'book_ref bpchar(6)'); -- contact_data SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_contact_data', 'Hub_ticket', 'contact_data jsonb'); -- passenger_id SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_contact_data', 'Hub_ticket', 'passenger_id varchar(20)'); -- passenger_name SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_passenger_name', 'Hub_ticket', 'passenger_name text'); /*Sattelite_bookings*/ -- book_date SELECT data_vault.ddl_Sattelite_table('Sattelite_bookings_book_date', 'Hub_bookings', 'book_date timestamptz'); -- total_amount SELECT data_vault.ddl_Sattelite_table('Sattelite_bookings_total_amount', 'Hub_bookings', 'total_amount numeric(10, 2)'); /*Sattelite_flights*/ -- flight_no SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_flight_no', 'Hub_flights', 'flight_no bpchar(6)'); -- scheduled_departure SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_scheduled_departure', 'Hub_flights', 'scheduled_departure timestamptz'); -- scheduled_arrival SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_scheduled_arrival', 'Hub_flights', 'scheduled_arrival timestamptz'); -- departure_airport SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_departure_airport', 'Hub_flights', 'departure_airport bpchar(3)'); -- arrival_airport SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_arrival_airport', 'Hub_flights', 'arrival_airport bpchar(3)'); -- status SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_status', 'Hub_flights', 'status varchar(20)'); -- aircraft_code SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_aircraft_code', 'Hub_flights', 'aircraft_code bpchar(3)'); -- actual_departure SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_actual_departure', 'Hub_flights', 'actual_departure timestamptz'); -- actual_arrival SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_actual_arrival', 'Hub_flights', 'actual_arrival timestamptz');
К итогам второго шага отнесем формирования ddl для сателлитов, хабов и линков и связи между ними путем формирования ограничений через REFERENCES. Остались заключительные шаги, с насыщением таблиц и постобработкой.
Шаг 3. Перенос данных из stage хранилища в DataVault.
После залива данных в схему stage из временных таблиц foreign_tables и формирования ddl всех составляющих DataVault. Данные необходимо разложить по Hub, Link и Satellite.
Начнем с Hub, напишем функцию, для миграции из схемы stage в схему data_vault.
Функция для переноса данных в Hub
CREATE OR REPLACE FUNCTION data_vault.insert_hub_table( table_name TEXT , -- название таблицы хаба business_key TEXT, -- какие колонки надо вытащить из таблицы источника указывается вместе с форматом sourse TEXT, -- источник откуда тянутся данные times TEXT, -- время наполнения таблицы данными sourse_table TEXT) -- полное обозначение источника для запроса RETURNS VOID AS $$ DECLARE qwery TEXT; BEGIN -- проверка на наличие данных в параметрах функции IF (table_name IS NULL) OR (business_key IS NULL) THEN RAISE EXCEPTION 'Не заполнены параметры'; END IF; qwery:= 'INSERT INTO data_vault.' || table_name || '(Hash_key, record_sourse, Load_date, ' || (regexp_split_to_array(business_key, '::'))[1] || ')' || ' SELECT DISTINCT MD5(' || business_key || '), ' || sourse ||' , ' || times ||' , ' || (regexp_split_to_array(business_key, '::'))[1] || ' FROM ' || sourse_table || ';'; EXECUTE qwery; END; $$ LANGUAGE plpgsql;
Так выглядит запрос описанный в функции.
INSERT INTO data_vault.Hub_flights (Hash_key, record_sourse, Load_date, flight_id) SELECT DISTINCT MD5(flight_id::varchar), 'flights', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, flight_id FROM stage.flights;
Определим функцию для всех хабов.
Код наполнения данными таблиц Hab
/*Hub_flights*/ SELECT data_vault.insert_hub_table('Hub_flights', 'flight_id::varchar', '''flights''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.flights') /*Hub_airports_data*/ SELECT data_vault.insert_hub_table('Hub_airports_data', 'airport_code::varchar', '''airports_data''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.airports_data') /*Hub_aircrafts_data*/ SELECT data_vault.insert_hub_table('Hub_aircrafts_data', 'aircraft_code::varchar', '''aircrafts_data''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.aircrafts_data') /*Hub_seats*/ SELECT data_vault.insert_hub_table('Hub_seats', 'concat(aircraft_code, seat_no)', '''seats''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.seats') /*Hub_ticket_flights*/ SELECT data_vault.insert_hub_table('Hub_ticket_flights', '(concat(ticket_no, '&', flight_id))::varchar', '''ticket_flights''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.ticket_flights') /*Hub_boarding_passes*/ SELECT data_vault.insert_hub_table('Hub_boarding_passes', '(concat(ticket_no, '&', flight_id))::varchar', '''boarding_passes''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.boarding_passes') /*Hub_tickets*/ SELECT data_vault.insert_hub_table('Hub_tickets', '(ticket_no)::varchar', '''tickets''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.tickets') /*Hub_bookings*/ SELECT data_vault.insert_hub_table('Hub_bookings', '(book_ref)::varchar', '''bookings''', $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, 'stage.bookings')
Для наполнения таблиц Link я выкачу простынь, потому что нецелесообразно писать функцию, тк в параметрах будет много переменных.
Код наполнения данными таблиц Link
/*Link_flights_airoport_data_departure - departure_airport - аэропорт отправления*/ INSERT INTO data_vault.Link_flights_airoport_data_departure (Link_flights_airoport_data_departure_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key) SELECT DISTINCT concat(flight_id, COALESCE(sf.departure_airport, sa.airport_code)), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'flights_airport_data', MD5(flight_id :: varchar), MD5(sa.airport_code) FROM stage.flights sf LEFT JOIN stage.airports_data sa ON sf.departure_airport = sa.airport_code; /*Link_flights_airoport_data - arrival_airport - аэропорт прибытия*/ INSERT INTO data_vault.Link_flights_airoport_data_arrival (Link_flights_airoport_data_arrival_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key) SELECT DISTINCT concat(flight_id, COALESCE(sf.departure_airport, sa.airport_code)), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'flights_airport_data', MD5(flight_id :: varchar), MD5(sa.airport_code) FROM stage.flights sf LEFT JOIN stage.airports_data sa ON sf.arrival_airport = sa.airport_code; /*Link_flights_ticket_flights*/ INSERT INTO data_vault.Link_flights_ticket_flights (Link_flights_ticket_flights_Hashkey, load_date, load_end_date, record_sourse, hub_flights_hash_key, ticket_no, hub_ticket_flights_hash_key) SELECT DISTINCT (concat(tf.ticket_no, '&', f.flight_id)::varchar), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'flights_ticket_flights', md5(f.flight_id :: varchar), md5(tf.ticket_no), MD5((concat(tf.ticket_no, '&', f.flight_id))::varchar) FROM stage.ticket_flights tf JOIN stage.flights f ON f.flight_id = tf.flight_id; /*Link_ticket_flights_boarding_passes*/ INSERT INTO data_vault.Link_ticket_flights_boarding_passes (Link_ticket_flights_boarding_passes, load_date, load_end_date, record_sourse, ticket_no, Hub_boarding_passes_Hash_key, Hub_ticket_flights_Hash_key) SELECT DISTINCT concat(concat(tf.ticket_no, '&' ,tf.flight_id), concat(bp.ticket_no, '&' ,bp.flight_id)), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'ticket_flights_boarding_passes', md5(bp.ticket_no), concat(bp.ticket_no, '&',bp.flight_id), concat(tf.ticket_no, '&',tf.flight_id) FROM stage.boarding_passes bp LEFT JOIN stage.ticket_flights tf USING(ticket_no, flight_id); /*Link_ticket_flights_tickets*/ INSERT INTO data_vault.Link_ticket_flights_tickets (Link_ticket_flights_tickets, load_date, record_sourse, Hub_ticket_flights_Hash_key, Hub_tickets_Hash_key) SELECT DISTINCT concat(concat(tf.ticket_no, '&', tf.flight_id), t.ticket_no), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'ticket_flights_tickets', md5(concat(tf.ticket_no, '&', tf.flight_id)), md5(t.ticket_no) FROM stage.ticket_flights tf full JOIN stage.tickets t ON t.ticket_no = tf.ticket_no; /*Link_ticket_bookings*/ INSERT INTO data_vault.Link_tickets_bookings (Link_tickets_bookings, load_date, load_end_date, record_sourse, Hub_ticket_Hash_key, Hub_bookings_Hash_key) SELECT concat(ticket_no, book_ref), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'ticket_bookings', md5(ticket_no), md5(book_ref) FROM stage.tickets t FULL JOIN stage.bookings b USING(book_ref); /*Link_flights_aircrafts_data*/ INSERT INTO data_vault.Link_flights_aircrafts_data (Link_flights_aircrafts_data_Hashkey, load_date, record_sourse, Hub_flights_Hash_key, Hub_aircrafts_data_Hash_key) SELECT DISTINCT concat(f.aircraft_code, ad.aircraft_code), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'flights_aircrafts_data', f.aircraft_code, ad.aircraft_code FROM stage.flights f FULL JOIN stage.aircrafts_data ad ON f.aircraft_code = ad.aircraft_code; /*Link_seats_aircrafts_data*/ INSERT INTO data_vault.Link_seats_aircrafts_data(Link_seats_aircrafts_data_Hashkey, load_date, record_sourse, Hub_aircrafts_data_Hash_key, Hub_seats_Hash_key) SELECT DISTINCT concat(ad.aircraft_code, concat(s.aircraft_code, s.seat_no)), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'seats_aircrafts_data', ad.aircraft_code, concat(s.aircraft_code, s.seat_no) FROM stage.aircrafts_data ad JOIN stage.seats s USING(aircraft_code);
Для заполнения данных в таблицах-спутниках написана функция, поскольку для каждого атрибута основной таблицы была создана отдельная таблица-спутник, которых в итоге получилось более двадцати. Зачем на каждый атрибут (желательно) делать свой сателлит.
Функция для заполнения данными таблиц Satellite
/*Функция*/ CREATE OR REPLACE FUNCTION data_vault.insert_data_from_table( table_name TEXT , -- название таблицы саттелита source_table_columns TEXT, -- какие колонки надо вытащить из таблицы источника source_table_name TEXT, -- имя таблицы откуда тянутся данные hash TEXT, -- из чего будет состоять хеш times TEXT, -- время записи данных в таблицу sourse_col TEXT) -- название источника RETURNS VOID AS $$ DECLARE qwery TEXT; BEGIN -- проверка на наличие информации в параметрах IF (table_name IS NULL) OR (source_table_columns IS NULL) OR (source_table_name IS NULL) OR (hash IS NULL) OR (times IS NULL) OR (sourse_col IS NULL) THEN RAISE EXCEPTION 'Не заполнены параметры'; END IF; qwery:= 'INSERT INTO ' || table_name || ' (' || (regexp_split_to_array(source_table_columns, ','))[1] || ', load_date' || ' ,' || ' load_end_date' || ' ,' || ' record_sourse' || ', ' || (regexp_split_to_array(source_table_columns, ','))[2] || ') ' || ' SELECT DISTINCT md5(' || hash || ') , ' || times || ', ' || '''1111-11-11 11:11:11'':: timestamp'|| ' , ' || sourse_col || ', ' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || source_table_name; EXECUTE qwery; END; $$ LANGUAGE plpgsql;
Определяем параметры функции к сателлитам (формат иерархии: Название таблицы как в исходнике, атрибуты таблицы, которые являются сателлитом):
Код заполнения данными таблиц Satellite
-- Sattelite_aircrafts_data --##################################################################################################### /*Аircrafts_data_range*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_aircrafts_data_range' , -- название таблицы саттелита 'Hub_aircrafts_data_Hash_key,range', -- какие колонки надо вытащить из таблицы источника 'stage.aircrafts_data', -- имя таблицы откуда тянутся данные 'aircraft_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''aircrafts_data'''); -- SELECT * FROM data_vault.Sattelite_aircrafts_data_range; /*Аircrafts_data_model*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_aircrafts_data_model' , -- название таблицы саттелита 'Hub_aircrafts_data_Hash_key,model', -- какие колонки надо вытащить из таблицы источника 'stage.aircrafts_data', -- имя таблицы откуда тянутся данные 'aircraft_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''aircrafts_data'''); -- SELECT * FROM data_vault.Sattelite_aircrafts_data_model; -- Sattelite_seats --##################################################################################################### /*fare_conditions*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_seats_fare_conditions' ,-- название таблицы саттелита 'Hub_seats_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника 'stage.seats', -- имя таблицы откуда тянутся данные 'concat(aircraft_code, seat_no)', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''seats'''); -- SELECT * FROM data_vault.Sattelite_seats_fare_conditions; -- Sattelite_airoport_data --##################################################################################################### /*timezone*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_timezone' ,-- название таблицы саттелита 'Hub_airport_data_Hash_key,timezone', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_timezone; /*coordinates*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_coordinates' , -- название таблицы саттелита 'Hub_airport_data_Hash_key,coordinates', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_coordinates; /*airport_name*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_airport_name' , -- название таблицы саттелита 'Hub_airport_data_Hash_key,airport_name', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_airport_name; /*city*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_city' , -- название таблицы саттелита 'Hub_airport_data_Hash_key,city', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_city; -- Sattelite_ticket_flights --##################################################################################################### /*amount*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_ticket_flights_amount' , -- название таблицы саттелита 'Hub_ticket_flights_Hash_key,amount', -- какие колонки надо вытащить из таблицы источника 'stage.ticket_flights', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_ticket_flights_amount; /*fare_conditions*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_ticket_flights_fare_conditions' , -- название таблицы саттелита 'Hub_ticket_flights_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника 'stage.ticket_flights', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_ticket_flights_fare_conditions; -- Sattelite_boarding_passes --##################################################################################################### /*boarding_no*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_boarding_passes_boarding_no' , -- название таблицы саттелита 'Hub_boarding_passes_Hash_key,boarding_no', -- какие колонки надо вытащить из таблицы источника 'stage.boarding_passes', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''boarding_passes'''); -- SELECT * FROM data_vault.Sattelite_boarding_passes_boarding_no; /*seat_no*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_boarding_passes_seat_no' , -- название таблицы саттелита 'Hub_boarding_passes_Hash_key,seat_no', -- какие колонки надо вытащить из таблицы источника 'stage.boarding_passes', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''boarding_passes'''); -- SELECT * FROM data_vault.Sattelite_boarding_passes_seat_no; -- Sattelite_tickets --##################################################################################################### /*book_ref*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_book_ref' , -- название таблицы саттелита 'Hub_ticket_Hash_key,book_ref', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_book_ref; /*contact_data*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_contact_data' , -- название таблицы саттелита 'Hub_ticket_Hash_key,contact_data', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_contact_data; /*passenger_id*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_passenger_id' , -- название таблицы саттелита 'Hub_ticket_Hash_key,passenger_id', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_passenger_id; /*passenger_name*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_passenger_name' , -- название таблицы саттелита 'Hub_ticket_Hash_key,passenger_name', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_passenger_name; -- Sattelite_bookings --##################################################################################################### /*book_date*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_bookings_book_date' , -- название таблицы саттелита 'Hub_bookings_Hash_key,book_date', -- какие колонки надо вытащить из таблицы источника 'stage.bookings', -- имя таблицы откуда тянутся данные $$(book_ref)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''bookings'''); -- SELECT * FROM data_vault.Sattelite_bookings_book_date; /*total_amount*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_bookings_total_amount' , -- название таблицы саттелита 'Hub_bookings_Hash_key,total_amount', -- какие колонки надо вытащить из таблицы источника 'stage.bookings', -- имя таблицы откуда тянутся данные $$(book_ref)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''bookings'''); -- SELECT * FROM data_vault.Sattelite_bookings_total_amount; -- Sattelite_flights --##################################################################################################### /*flight_no*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_flight_no' , -- название таблицы саттелита 'Hub_flights_Hash_key,flight_no', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_flight_no; /*scheduled_departure*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_scheduled_departure' , -- название таблицы саттелита 'Hub_flights_Hash_key,scheduled_departure', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_scheduled_departure; /*scheduled_arrival*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_scheduled_arrival' , -- название таблицы саттелита 'Hub_flights_Hash_key,scheduled_arrival', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_scheduled_arrival; /*departure_airport*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_departure_airport' , -- название таблицы саттелита 'Hub_flights_Hash_key,departure_airport', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_departure_airport; /*arrival_airport*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_arrival_airport' , -- название таблицы саттелита 'Hub_flights_Hash_key,arrival_airport', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_arrival_airport; /*status*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_status' , -- название таблицы саттелита 'Hub_flights_Hash_key,status', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_status; /*aircraft_code*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_aircraft_code' , -- название таблицы саттелита 'Hub_flights_Hash_key,aircraft_code', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_aircraft_code; /*actual_departure*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_actual_departure' , -- название таблицы саттелита 'Hub_flights_Hash_key,actual_departure', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_actual_departure; /*actual_arrival*/ SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_actual_arrival' , -- название таблицы саттелита 'Hub_flights_Hash_key,actual_arrival', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_actual_arrival;
В схеме DataVault собраны и наполнены все три составляющих — Hub, Link, Satellite.
Шаг 4. Жизненный цикл DataVault.
Необходимость обработки данных в таблицах DataVault обосновывается изменениями, обновлениями или дополнениями информации в таблицах исходниках. Например, конструкция таблиц Hub должна оставаться неизменной, т.е. данные не могут подвергаться изменениям, только добавляться. В Link и Satellite наоборот, таблицы имеют в своем составе атрибут load_end_date — любая запись имеет свой срок жизни, если в источнике изменился атрибут, то в линке или сателлите обновится поле load_end_date и появится новая строка, согласно идеологии Дена Линстеда — идейного вдохновителя и основателя данного подхода.
Обработка для хабов примечательна тем, что данные сопоставляются с источником, если таких нет в хабе, то добавляем.
Обработка Hub
/*Hub_flights*/ INSERT INTO data_vault.Hub_flights (Hash_key, record_sourse, Load_date, flight_id) SELECT DISTINCT MD5(flight_id::varchar), 'flights', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, flight_id FROM stage.flights f LEFT JOIN data_vault.Hub_flights hf USING(flight_id) WHERE load_date IS null; /*Hub_airports_data*/ INSERT INTO data_vault.Hub_airports_data (Hash_key, record_sourse, Load_date, airport_code) SELECT DISTINCT MD5(airport_code::varchar), 'airports_data', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, airport_code FROM stage.airports_data ad LEFT JOIN data_vault.Hub_airports_data hf USING(airport_code) WHERE load_date IS null; /*Hub_aircrafts_data*/ INSERT INTO data_vault.Hub_aircrafts_data (Hash_key, record_sourse, Load_date, aircraft_code) SELECT DISTINCT MD5(aircraft_code::varchar), 'aircrafts_data', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, aircraft_code FROM stage.aircrafts_data ad LEFT JOIN data_vault.Hub_aircrafts_data hf USING(aircraft_code) WHERE load_date IS null; /*Hub_seats*/ INSERT INTO data_vault.Hub_seats (Hash_key, record_sourse, Load_date, aircraft_code_seat_no) SELECT DISTINCT MD5(concat(aircraft_code, seat_no)), 'seats', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, concat(aircraft_code, seat_no) aircraft_code_seat_no FROM stage.seats ad LEFT JOIN data_vault.Hub_seats hf ON concat(ad.aircraft_code, ad.seat_no) = hf.aircraft_code_seat_no WHERE load_date IS null; /*Hub_ticket_flights*/ INSERT INTO data_vault.Hub_ticket_flights (Hash_key, record_sourse, Load_date, ticket_no_flight_id) SELECT DISTINCT MD5(concat(ticket_no, '&', flight_id)::varchar), 'ticket_flights', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, concat(ticket_no, '&', flight_id) FROM stage.ticket_flights еа LEFT JOIN data_vault.Hub_ticket_flights htf ON (concat(еа.ticket_no, '&', еа.flight_id)) = ticket_no_flight_id WHERE load_date IS null; /*Hub_boarding_passes*/ INSERT INTO data_vault.Hub_boarding_passes (Hash_key, record_sourse, Load_date, ticket_no_flight_id) SELECT DISTINCT MD5(concat(ticket_no, '&', flight_id)::varchar), 'boarding_passes', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, concat(ticket_no, '&', flight_id) FROM stage.ticket_flights еа LEFT JOIN data_vault.Hub_ticket_flights htf ON (concat(еа.ticket_no, '&', еа.flight_id)) = ticket_no_flight_id WHERE load_date IS null; /*Hub_tickets*/ INSERT INTO data_vault.Hub_tickets (Hash_key, record_sourse, Load_date, ticket_no) SELECT DISTINCT MD5((ticket_no)::varchar), 'tickets', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, ticket_no FROM stage.tickets еа LEFT JOIN data_vault.Hub_tickets htf USING(ticket_no) WHERE load_date IS null; /*Hub_bookings*/ INSERT INTO data_vault.Hub_bookings (Hash_key, record_sourse, Load_date, book_ref) SELECT DISTINCT MD5((book_ref)::varchar), 'bookings', to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, book_ref FROM stage.tickets еа LEFT JOIN data_vault.Hub_bookings htf USING(book_ref) WHERE load_date IS null;
Можно конечно не мостить простынь, а найти более изящное решение, но я не стал.
Самым трудозатратным на этом пути было прописать обработку таблиц link, так как каждая таблица имеет свою структуру — упростить обработку у меня не вышло, выкатываю очередную простынь.
Обработка Link
/*########################### обработка - departure_airport #############################*/ /*проверка на наличие изменение связей - если поменяют название аэропорта*/ DROP TABLE IF EXISTS data_vault.for_lfadp_update_time; CREATE TABLE IF NOT EXISTS data_vault.for_lfadp_update_time AS SELECT DISTINCT lfadd.hub_flights_hash_key, lfadd.hub_airoport_data_hash_key, MD5(hf.flight_id :: varchar) AS new_flight_id, MD5(hf.departure_airport :: varchar) AS new_airport_name, hf.flight_id new_flight, hf.departure_airport new_airport, to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time FROM stage.flights hf LEFT JOIN data_vault.Link_flights_airoport_data_departure lfadd ON md5(hf.flight_id::varchar) = lfadd.hub_flights_hash_key WHERE (CASE WHEN upper(replace(lfadd.hub_airoport_data_hash_key, ' ', '')) = upper(replace(md5(hf.departure_airport::varchar), ' ', '')) THEN 1 ELSE 0 END) = 0; -- обновляем время в линке на новое, если сменилось название аэропорта в исходнике UPDATE data_vault.Link_flights_airoport_data_departure a SET load_end_date = flut.new_fix_time FROM data_vault.for_lfadp_update_time flut WHERE a.hub_flights_hash_key = flut.new_flight_id; -- добавляем строку в таблицу Link_flights_airoport_data_departure новую INSERT INTO data_vault.Link_flights_airoport_data_departure (Link_flights_airoport_data_departure_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key) SELECT concat(new_flight, new_airport), new_fix_time + INTERVAL '1 second', '1111-11-11 11:11:11' :: timestamp, 'flights_airport_data', new_flight_id, new_airport_name FROM data_vault.for_lfadp_update_time; DROP TABLE IF EXISTS data_vault.for_lfadp_update_time; /*обработка departure_airport завершена*/ /*########################### обработка - arrival_airport #############################*/ /*проверка на наличие изменение связей - если поменяют название аэропорта*/ DROP TABLE IF EXISTS data_vault.for_lfada_update_time; CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time AS SELECT DISTINCT lfada.hub_flights_hash_key, lfada.hub_airoport_data_hash_key, MD5(hf.flight_id :: varchar) AS new_flight_id, MD5(hf.arrival_airport :: varchar) AS new_airport_name, hf.flight_id new_flight, hf.arrival_airport new_airport, to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time FROM stage.flights hf LEFT JOIN data_vault.Link_flights_airoport_data_arrival lfada ON md5(hf.flight_id::varchar) = lfada.hub_flights_hash_key WHERE (CASE WHEN upper(replace(lfada.hub_airoport_data_hash_key, ' ', '')) = upper(replace(md5(hf.arrival_airport::varchar), ' ', '')) THEN 1 ELSE 0 END) = 0; -- SELECT * FROM data_vault.for_lfada_update_time -- обновляем время в линке на новое, если сменилось название аэропорта в исходнике UPDATE data_vault.Link_flights_airoport_data_arrival a SET load_end_date = flut.new_fix_time FROM data_vault.for_lfada_update_time flut WHERE a.hub_flights_hash_key = flut.new_flight_id; INSERT INTO data_vault.Link_flights_airoport_data_arrival (Link_flights_airoport_data_arrival_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key) SELECT concat(new_flight_id, new_airport_name), new_fix_time + INTERVAL '1 second', '1111-11-11 11:11:11' :: timestamp, 'flights_airport_data', new_flight_id, new_airport_name FROM data_vault.for_lfada_update_time; DROP TABLE IF EXISTS data_vault.for_lfada_update_time; /*обработка arrival_airport завершена*/ /*################################ обработка - flights_ticket_flights ##################################*/ /*проверка если отменили рейс flight_id и перенесли билет на новый рейс - такое же может быть , значит load end date будет актуален*/ DROP TABLE IF EXISTS data_vault.for_lfada_update_time; CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as WITH new AS (SELECT distinct replace(concat(tf.ticket_no, tf.flight_id), ' ', '') new_data , tf.ticket_no , tf.flight_id n_flight_id FROM stage.ticket_flights tf LEFT JOIN data_vault.Link_flights_ticket_flights lftf ON replace(concat(md5(tf.ticket_no), md5(tf.flight_id :: varchar)), ' ', '') = replace(concat(lftf.ticket_no , lftf.Hub_flights_Hash_key), ' ', '') WHERE LENGTH(replace(concat(lftf.ticket_no, lftf.Hub_flights_Hash_key), ' ', '')) = 0), old AS (SELECT distinct lftf.ticket_no , Hub_flights_Hash_key o_flight_id FROM data_vault.Link_flights_ticket_flights lftf LEFT JOIN stage.ticket_flights tf ON replace(concat(md5(tf.ticket_no), md5(tf.flight_id :: varchar)), ' ', '') = replace(concat(lftf.ticket_no , lftf.Hub_flights_Hash_key), ' ', '') WHERE LENGTH(replace(concat(tf.ticket_no, tf.flight_id), ' ', '')) = 0) SELECT md5(NEW.ticket_no) ticket_no, concat(OLD.ticket_no, o_flight_id) AS OLD_ticket_no_flight_id, o_flight_id AS OLD_flight_id, concat(NEW.ticket_no, '&', n_flight_id) AS NEW_ticket_no_flight_id, md5(n_flight_id ::varchar) AS NEW_flight_id, to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time FROM old JOIN NEW ON OLD.ticket_no = NEW.ticket_no; -- SELECT * FROM data_vault.for_lfada_update_time; -- обновляем время в линке на новое, если сменилось название аэропорта в исходнике UPDATE data_vault.Link_flights_ticket_flights a SET load_end_date = flut.new_fix_time FROM data_vault.for_lfada_update_time flut WHERE concat(a.ticket_no, a.hub_flights_hash_key) = flut.OLD_ticket_no_flight_id; -- добавляем строку в таблицу Link_flights_ticket_flights новую, которую добавили в ticket_flights INSERT INTO data_vault.Link_flights_ticket_flights (Link_flights_ticket_flights_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, ticket_no, Hub_ticket_flights_Hash_key) SELECT flut.NEW_ticket_no_flight_id, to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'flights_ticket_flights', md5(NEW_flight_id), md5(ticket_no), md5(flut.NEW_ticket_no_flight_id) FROM data_vault.for_lfada_update_time flut; /*обработка flights_ticket_flights завершена*/ /*######################################## обработка - ticket_flights_boarding_passes ##########################################*/ -- так как данные в таблице связаны с хабами , а обработка линка идет с не зашифрованными данными т.е. без md5, то удалим привязку ALTER TABLE data_vault.Link_ticket_flights_boarding_passes DROP CONSTRAINT IF EXISTS Hub_bp_fk; ALTER TABLE data_vault.Link_ticket_flights_boarding_passes DROP CONSTRAINT IF EXISTS Hub_tf_fk; DROP TABLE IF EXISTS data_vault.for_lfada_update_time; -- проверка на новые данные , если их нет то добавляем CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as WITH FIRST as(SELECT concat(bp.ticket_no, '&', bp.flight_id) AS concat_bp, concat(tf.ticket_no, '&', tf.flight_id) AS concat_tf, bp.ticket_no FROM stage.boarding_passes bp LEFT JOIN stage.ticket_flights tf USING(ticket_no, flight_id)) SELECT concat_bp, concat_tf, hub_boarding_passes_hash_key, Hub_ticket_flights_Hash_key, f.ticket_no FROM FIRST f LEFT JOIN data_vault.Link_ticket_flights_boarding_passes flut ON f.concat_bp = Hub_ticket_flights_Hash_key AND f.concat_tf = Hub_boarding_passes_Hash_key WHERE load_date IS null; -- вставляем в таблицу обновленные элементы с источника, вставятся все поля если INSERT INTO data_vault.Link_ticket_flights_boarding_passes (Link_ticket_flights_boarding_passes, load_date, load_end_date, record_sourse, ticket_no, Hub_boarding_passes_Hash_key, Hub_ticket_flights_Hash_key) SELECT concat(flut.concat_tf , flut.concat_bp), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'ticket_flights_boarding_passes', md5(ticket_no), flut.concat_bp, flut.concat_tf FROM data_vault.for_lfada_update_time flut; -- если один из хешей не подгрузился, догружаем его, вытащив из уже известного, так как хеши таблиц boarding_passes и ticket_flights одинаковы -- обновляем данные столбца ticket_flights_boarding_passes с хешем , если он будет не заполнен UPDATE data_vault.Link_ticket_flights_boarding_passes a SET (hub_boarding_passes_hash_key, Hub_ticket_flights_Hash_key, link_ticket_flights_boarding_passes) = (concat(ticket_nom, '&', flight_id), concat(ticket_nom, '&', flight_id), concat(concat(ticket_nom, '&', flight_id), concat(ticket_nom, '&', flight_id))) FROM (SELECT CASE WHEN length(CASE WHEN length(Hub_ticket_flights_Hash_key) < 2 OR length(hub_boarding_passes_hash_key) < 2 THEN substring(Hub_ticket_flights_Hash_key FROM 1 FOR position('&' IN Hub_ticket_flights_Hash_key) - 1) ELSE '0' END) < 1 THEN substring(hub_boarding_passes_hash_key FROM 1 FOR position('&' IN hub_boarding_passes_hash_key) - 1) ELSE 'ошибка: необходима проверка данных' END ticket_nom, CASE WHEN length(CASE WHEN length(Hub_ticket_flights_Hash_key) < 2 OR length(hub_boarding_passes_hash_key) < 2 THEN substring(Hub_ticket_flights_Hash_key FROM position('&' IN Hub_ticket_flights_Hash_key) + 1) ELSE '0' END) < 1 THEN substring(hub_boarding_passes_hash_key FROM position('&' IN hub_boarding_passes_hash_key) + 1 ) ELSE 'ошибка: необходима проверка данных' END flight_id, link_ticket_flights_boarding_passes,load_date,load_end_date,record_sourse,ticket_no,hub_boarding_passes_hash_key,Hub_ticket_flights_Hash_key FROM data_vault.Link_ticket_flights_boarding_passes WHERE length(hub_boarding_passes_hash_key) < 2 OR length(Hub_ticket_flights_Hash_key) < 2) flut WHERE a.hub_boarding_passes_hash_key = (concat(flut.ticket_nom, '&', flut.flight_id)) OR a.Hub_ticket_flights_Hash_key = (concat(flut.ticket_nom, '&', flut.flight_id)); -- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные UPDATE data_vault.Link_ticket_flights_boarding_passes a SET (hub_boarding_passes_hash_key, hub_ticket_flights_hash_key) = (md5(b.hub_boarding_passes_hash_key), md5(b.hub_ticket_flights_hash_key)) FROM data_vault.Link_ticket_flights_boarding_passes b WHERE (a.link_ticket_flights_boarding_passes = b.link_ticket_flights_boarding_passes) AND (length(Hub_boarding_passes_Hash_key) < 30 OR (length(Hub_ticket_flights_Hash_key) < 30)); ALTER TABLE data_vault.Link_ticket_flights_boarding_passes ADD CONSTRAINT Hub_bp_fk FOREIGN KEY (Hub_boarding_passes_Hash_key) REFERENCES data_vault.Hub_boarding_passes(hash_key); ALTER TABLE data_vault.Link_ticket_flights_boarding_passes ADD CONSTRAINT Hub_tf_fk FOREIGN KEY (Hub_ticket_flights_Hash_key) REFERENCES data_vault.Hub_ticket_flights(hash_key); /*обработка ticket_flights_boarding_passes завершена*/ /*################################ обработка - ticket_flights_tickets ##################################*/ /*обработка - обновляем данные в таблице, если в одном из источников данные не сгенирились, то вставляем их из другого источника*/ DROP TABLE IF EXISTS data_vault.for_lfada_update_time; -- проверка на новые данные , если их нет то добавляем CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as SELECT * FROM data_vault.Link_ticket_flights_tickets ltft FULL JOIN (SELECT concat(concat(tf.ticket_no, '&', tf.flight_id), t.ticket_no) link, md5(concat(tf.ticket_no, '&', tf.flight_id)) new_hub_ticket_flights_hash_key, md5(t.ticket_no) T_ticket_no, t.ticket_no tticket_no, md5(tf.ticket_no) TF_ticket_no, tf.ticket_no tfticket_no, md5(tf.flight_id :: varchar) flight_id, tf.flight_id tflight_id FROM stage.ticket_flights tf full JOIN stage.tickets t ON t.ticket_no = tf.ticket_no) we ON we.link = ltft.Link_ticket_flights_tickets WHERE link_ticket_flights_tickets IS NULL; -- SELECT * FROM data_vault.for_lfada_update_time; -- вставка данных в таблицу линк из таблиц источников, если их там нет INSERT INTO data_vault.Link_ticket_flights_tickets (Link_ticket_flights_tickets, load_date, record_sourse, Hub_ticket_flights_Hash_key, Hub_tickets_Hash_key) SELECT concat(COALESCE(tticket_no, tfticket_no), '&', CASE WHEN tflight_id :: varchar IS NULL THEN 'нет flight_id в ticket_flights' ELSE tflight_id :: varchar END, COALESCE(tticket_no, tfticket_no)), to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'ticket_flights_tickets', CASE WHEN length(new_hub_ticket_flights_hash_key) > 2 THEN new_hub_ticket_flights_hash_key ELSE concat(COALESCE(t_ticket_no, tf_ticket_no), '&', CASE WHEN flight_id :: varchar IS NULL THEN 'нет flight_id в ticket_flights' ELSE flight_id :: varchar END) END, COALESCE(t_ticket_no, tf_ticket_no) FROM data_vault.for_lfada_update_time; /*обработка ticket_flights_tickets завершена*/ /*######################### обработка - ticket_bookings ##########################*/ -- если клиент сдал билет, следовательно запись в таблице удалится, а значит необходимо сделать пометку времени UPDATE data_vault.Link_tickets_bookings a SET load_end_date = to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp FROM (WITH new_data AS ( SELECT concat(ticket_no, book_ref) Link_tickets_bookings , ticket_no, book_ref FROM stage.tickets t FULL JOIN stage.bookings b USING(book_ref)) SELECT ltb.Link_tickets_bookings FROM data_vault.Link_tickets_bookings ltb left JOIN new_data nd USING (Link_tickets_bookings) WHERE nd.Link_tickets_bookings IS NULL) b WHERE a.link_tickets_bookings = b.Link_tickets_bookings; -- добавляем новые данные из источника в таблицу линк INSERT INTO data_vault.Link_tickets_bookings (Link_tickets_bookings, load_date, load_end_date, record_sourse, Hub_ticket_Hash_key, Hub_bookings_Hash_key) WITH new_data AS ( SELECT concat(ticket_no, book_ref) Link_tickets_bookings , ticket_no, book_ref FROM stage.tickets t FULL JOIN stage.bookings b USING(book_ref)) SELECT nd.Link_tickets_bookings, to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, '1111-11-11 11:11:11' :: timestamp, 'ticket_bookings', md5(nd.ticket_no), md5(nd.book_ref) FROM data_vault.Link_tickets_bookings ltb RIGHT JOIN new_data nd USING (Link_tickets_bookings) WHERE load_date IS NULL; /*обработка ticket_bookings завершена*/ /*################################ обработка - flights_aircrafts_data #################################*/ -- при добавлении новых данных удаляем привязку к хабам, так как обработка идет без хеширования, после обработки привязка будет восстановлена ALTER TABLE data_vault.Link_flights_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_f_fk; ALTER TABLE data_vault.Link_flights_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_ad_fk; -- Насыщение таблицы новыми данными из источников, предусмотрен момент если данные в одном источнике есть, а в другом не подгрузились INSERT INTO data_vault.Link_flights_aircrafts_data (Link_flights_aircrafts_data_Hashkey, load_date, record_sourse, Hub_flights_Hash_key, Hub_aircrafts_data_Hash_key) WITH new_data AS ( SELECT DISTINCT concat(f.aircraft_code, ad.aircraft_code) link_flights_aircrafts_data_hashkey, to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'flights_aircrafts_data', f.aircraft_code AS f_aircraft_code, ad.aircraft_code AS ad_aircraft_code FROM stage.flights f FULL JOIN stage.aircrafts_data ad ON f.aircraft_code = ad.aircraft_code) SELECT CASE WHEN length(lfad.link_flights_aircrafts_data_hashkey) < 5 OR -- тк коды должны быть идентичны, если нет данных в одной таблице , то подтягиваем их из другой length(nd.link_flights_aircrafts_data_hashkey) < 5 THEN concat(COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code)) ELSE COALESCE(nd.link_flights_aircrafts_data_hashkey, lfad.link_flights_aircrafts_data_hashkey) END, to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'flights_aircrafts_data', COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code) FROM data_vault.Link_flights_aircrafts_data lfad RIGHT JOIN new_data nd ON lfad.link_flights_aircrafts_data_hashkey = nd.link_flights_aircrafts_data_hashkey WHERE lfad.link_flights_aircrafts_data_hashkey IS NULL; -- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные UPDATE data_vault.Link_flights_aircrafts_data a SET (Hub_flights_Hash_key, Hub_aircrafts_data_Hash_key) = (md5(b.Hub_flights_Hash_key), md5(b.Hub_aircrafts_data_Hash_key)) FROM data_vault.Link_flights_aircrafts_data b WHERE (a.Link_flights_aircrafts_data_Hashkey = b.Link_flights_aircrafts_data_Hashkey) AND (length(Hub_flights_Hash_key) < 30 OR (length(Hub_aircrafts_data_Hash_key) < 30)); -- восстанавливаем привязку таблицы к новым данным ALTER TABLE data_vault.Link_flights_aircrafts_data ADD CONSTRAINT Hub_f_fk FOREIGN KEY (Hub_flights_Hash_key) REFERENCES data_vault.Hub_flights(hash_key); ALTER TABLE data_vault.Link_flights_aircrafts_data ADD CONSTRAINT Hub_ad_fk FOREIGN KEY (Hub_aircrafts_data_Hash_key) REFERENCES data_vault.Hub_aircrafts_data(hash_key); /*обработка flights_aircrafts_data завершена*/ /*############################## обработка - seats_aircrafts_data ###############################*/ -- при добавлении новых данных удаляем привязку к хабам, так как обработка идет без хеширования, после обработки привязка будет восстановлена ALTER TABLE data_vault.Link_seats_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_ad_fk; ALTER TABLE data_vault.Link_seats_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_s_fk; -- Насыщение таблицы новыми данными из источников, предусмотрен момент если данные в одном источнике есть, а в другом не подгрузились INSERT INTO data_vault.Link_seats_aircrafts_data(Link_seats_aircrafts_data_Hashkey, load_date, record_sourse, Hub_aircrafts_data_Hash_key, Hub_seats_Hash_key) WITH new_data AS ( SELECT DISTINCT concat(ad.aircraft_code, concat(s.aircraft_code, s.seat_no)) Link_seats_aircrafts_data_Hashkey, ad.aircraft_code AS f_aircraft_code, concat(s.aircraft_code, s.seat_no) AS ad_aircraft_code FROM stage.aircrafts_data ad FULL JOIN stage.seats s USING(aircraft_code)) SELECT CASE WHEN length(nd.link_seats_aircrafts_data_hashkey) < 6 THEN concat(COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code)) ELSE COALESCE(nd.Link_seats_aircrafts_data_Hashkey, lsad.Link_seats_aircrafts_data_Hashkey) END , to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp, 'seats_aircrafts_data', COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code) FROM data_vault.Link_seats_aircrafts_data lsad RIGHT JOIN new_data nd ON lsad.Link_seats_aircrafts_data_Hashkey = nd.Link_seats_aircrafts_data_Hashkey WHERE load_date IS null; -- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные UPDATE data_vault.Link_seats_aircrafts_data a SET (Hub_aircrafts_data_Hash_key, Hub_seats_Hash_key) = (md5(b.Hub_aircrafts_data_Hash_key), md5(b.Hub_seats_Hash_key)) FROM data_vault.Link_seats_aircrafts_data b WHERE (a.Link_seats_aircrafts_data_Hashkey = b.Link_seats_aircrafts_data_Hashkey) AND (length(a.Hub_seats_Hash_key) < 30 OR (length(a.Hub_aircrafts_data_Hash_key) < 30)); -- восстанавливаем привязку таблицы к новым данным ALTER TABLE data_vault.Link_seats_aircrafts_data ADD CONSTRAINT Hub_ad_fk FOREIGN KEY (Hub_aircrafts_data_Hash_key) REFERENCES data_vault.Hub_aircrafts_data(hash_key); ALTER TABLE data_vault.Link_seats_aircrafts_data ADD CONSTRAINT Hub_s_fk FOREIGN KEY (Hub_seats_Hash_key) REFERENCES data_vault.Hub_seats(hash_key);
Хабы и линки обработаны, остались саттелиты.
Для обработки саттелитов была написана функция.
Функция для обработки Satellite
CREATE OR REPLACE FUNCTION data_vault.update_data_from_table( table_name TEXT , -- название таблицы саттелита source_table_columns TEXT, -- какие колонки надо вытащить из таблицы источника source_table_name TEXT, -- имя таблицы откуда тянутся данные hash TEXT, -- из чего будет состоять хеш times TEXT, -- время записи данных в таблицу sourse_col TEXT) -- название источника RETURNS VOID AS $$ DECLARE qwery TEXT; BEGIN -- проверка на наличие информации в параметрах IF (table_name IS NULL) OR (source_table_columns IS NULL) OR (source_table_name IS NULL) OR (hash IS NULL) OR (times IS NULL) OR (sourse_col IS NULL) THEN RAISE EXCEPTION 'Не заполнены параметры'; END IF; -- добавление только новых записей из источника qwery:= 'INSERT INTO ' || table_name || ' (' || (regexp_split_to_array(source_table_columns, ','))[1] || ', load_date' || ' ,load_end_date' || ' ,record_sourse , ' || (regexp_split_to_array(source_table_columns, ','))[2] || ') ' || 'WITH new_data as (SELECT DISTINCT md5(' || hash || ') ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' ,' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || source_table_name || ') ' || 'SELECT ' || 'nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ', ' || times || ', ' || '''1111-11-11 11:11:11'':: timestamp' || ' , ' || sourse_col || ' , ' || 'nd.' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || table_name || ' tn ' || 'RIGHT JOIN new_data nd ON (' || 'tn.' || (regexp_split_to_array(source_table_columns, ','))[1] || ') = (' || 'nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ')' || ' WHERE tn.load_date IS NULL;' || -- обновление значений и показателя Load_end_date уже имеющихся аттрибутов, при изменении аттрибута в источнике ' DROP TABLE IF EXISTS data_vault.for_lfada_update_time; ' || ' CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as WITH new_data AS (SELECT DISTINCT md5(' || hash || ') ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' ,' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || source_table_name || ' ) ' || ', need_swap AS (SELECT nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' ,' || ' nd.' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || table_name || ' tn ' || 'RIGHT JOIN new_data nd ON (concat(tn.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , tn.' || (regexp_split_to_array(source_table_columns, ','))[2] || ')) = (concat(nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , nd.' || (regexp_split_to_array(source_table_columns, ','))[2] || ')) ' || ' WHERE tn.load_date IS NULL)' || ' SELECT ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , ' || ' load_date, load_end_date, record_sourse, need_swap.' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || table_name || ' nd' || ' JOIN need_swap USING(' || (regexp_split_to_array(source_table_columns, ','))[1] || '); ' || -- обновление поля смены даты load_end_date ' UPDATE ' || table_name || ' firsts ' || ' SET load_end_date = ' || times || ' FROM data_vault.for_lfada_update_time flut ' || ' WHERE flut.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' = firsts.' || (regexp_split_to_array(source_table_columns, ','))[1] ||' ;' -- добавление новой записи ' INSERT INTO ' || table_name || ' (' || (regexp_split_to_array(source_table_columns, ','))[1] || ', load_date' || ' ,load_end_date' || ' ,record_sourse , ' || (regexp_split_to_array(source_table_columns, ','))[2] || ') ' || ' SELECT ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , ' || times || ',' || ' load_end_date, record_sourse, ' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM data_vault.for_lfada_update_time;'; EXECUTE qwery; END; $$ LANGUAGE plpgsql;
Обозначим обработку саттелитов.
Обработка Satellite
-- Satellite_aircrafts_data --##################################################################################################### /*Аircrafts_data_range*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_aircrafts_data_range' , -- название таблицы саттелита 'Hub_aircrafts_data_Hash_key,range', -- какие колонки надо вытащить из таблицы источника 'stage.aircrafts_data', -- имя таблицы откуда тянутся данные 'aircraft_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''aircrafts_data'''); -- SELECT * FROM data_vault.Sattelite_aircrafts_data_range; /*Аircrafts_data_model*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_aircrafts_data_model' , -- название таблицы саттелита 'Hub_aircrafts_data_Hash_key,model', -- какие колонки надо вытащить из таблицы источника 'stage.aircrafts_data', -- имя таблицы откуда тянутся данные 'aircraft_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''aircrafts_data'''); -- SELECT * FROM data_vault.Sattelite_aircrafts_data_model; -- Sattelite_seats --##################################################################################################### /*fare_conditions*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_seats_fare_conditions' ,-- название таблицы саттелита 'Hub_seats_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника 'stage.seats', -- имя таблицы откуда тянутся данные 'concat(aircraft_code, seat_no)', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''seats'''); -- SELECT * FROM data_vault.Sattelite_seats_fare_conditions; -- Sattelite_airoport_data --##################################################################################################### /*timezone*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_timezone' ,-- название таблицы саттелита 'Hub_airport_data_Hash_key,timezone', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_timezone; /*coordinates*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_coordinates' , -- название таблицы саттелита 'Hub_airport_data_Hash_key,coordinates', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_coordinates; /*airport_name*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_airport_name' , -- название таблицы саттелита 'Hub_airport_data_Hash_key,airport_name', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_airport_name; /*city*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_city' , -- название таблицы саттелита 'Hub_airport_data_Hash_key,city', -- какие колонки надо вытащить из таблицы источника 'stage.airports_data', -- имя таблицы откуда тянутся данные 'airport_code::varchar', -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_airport_data_city; -- Sattelite_ticket_flights --##################################################################################################### /*amount*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_ticket_flights_amount' , -- название таблицы саттелита 'Hub_ticket_flights_Hash_key,amount', -- какие колонки надо вытащить из таблицы источника 'stage.ticket_flights', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_ticket_flights_amount; /*fare_conditions*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_ticket_flights_fare_conditions' , -- название таблицы саттелита 'Hub_ticket_flights_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника 'stage.ticket_flights', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''airoport_data'''); -- SELECT * FROM data_vault.Sattelite_ticket_flights_fare_conditions; -- Sattelite_boarding_passes --##################################################################################################### /*boarding_no*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_boarding_passes_boarding_no' , -- название таблицы саттелита 'Hub_boarding_passes_Hash_key,boarding_no', -- какие колонки надо вытащить из таблицы источника 'stage.boarding_passes', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''boarding_passes'''); -- SELECT * FROM data_vault.Sattelite_boarding_passes_boarding_no; /*seat_no*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_boarding_passes_seat_no' , -- название таблицы саттелита 'Hub_boarding_passes_Hash_key,seat_no', -- какие колонки надо вытащить из таблицы источника 'stage.boarding_passes', -- имя таблицы откуда тянутся данные $$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''boarding_passes'''); -- SELECT * FROM data_vault.Sattelite_boarding_passes_seat_no; -- Sattelite_tickets --##################################################################################################### /*book_ref*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_book_ref' , -- название таблицы саттелита 'Hub_ticket_Hash_key,book_ref', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_book_ref; /*contact_data*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_contact_data' , -- название таблицы саттелита 'Hub_ticket_Hash_key,contact_data', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_contact_data; /*passenger_id*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_passenger_id' , -- название таблицы саттелита 'Hub_ticket_Hash_key,passenger_id', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_passenger_id; /*passenger_name*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_passenger_name' , -- название таблицы саттелита 'Hub_ticket_Hash_key,passenger_name', -- какие колонки надо вытащить из таблицы источника 'stage.tickets', -- имя таблицы откуда тянутся данные $$(ticket_no)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''tickets'''); -- SELECT * FROM data_vault.Sattelite_tickets_passenger_name; -- Sattelite_bookings --##################################################################################################### /*book_date*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_bookings_book_date' , -- название таблицы саттелита 'Hub_bookings_Hash_key,book_date', -- какие колонки надо вытащить из таблицы источника 'stage.bookings', -- имя таблицы откуда тянутся данные $$(book_ref)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''bookings'''); -- SELECT * FROM data_vault.Sattelite_bookings_book_date; /*total_amount*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_bookings_total_amount' , -- название таблицы саттелита 'Hub_bookings_Hash_key,total_amount', -- какие колонки надо вытащить из таблицы источника 'stage.bookings', -- имя таблицы откуда тянутся данные $$(book_ref)::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''bookings'''); -- SELECT * FROM data_vault.Sattelite_bookings_total_amount; -- Sattelite_flights --##################################################################################################### /*flight_no*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_flight_no' , -- название таблицы саттелита 'Hub_flights_Hash_key,flight_no', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_flight_no; /*scheduled_departure*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_scheduled_departure' , -- название таблицы саттелита 'Hub_flights_Hash_key,scheduled_departure', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_scheduled_departure; /*scheduled_arrival*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_scheduled_arrival' , -- название таблицы саттелита 'Hub_flights_Hash_key,scheduled_arrival', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_scheduled_arrival; /*departure_airport*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_departure_airport' , -- название таблицы саттелита 'Hub_flights_Hash_key,departure_airport', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_departure_airport; /*arrival_airport*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_arrival_airport' , -- название таблицы саттелита 'Hub_flights_Hash_key,arrival_airport', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_arrival_airport; /*status*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_status' , -- название таблицы саттелита 'Hub_flights_Hash_key,status', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_status; /*aircraft_code*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_aircraft_code' , -- название таблицы саттелита 'Hub_flights_Hash_key,aircraft_code', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_aircraft_code; /*actual_departure*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_actual_departure' , -- название таблицы саттелита 'Hub_flights_Hash_key,actual_departure', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_actual_departure; /*actual_arrival*/ SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_actual_arrival' , -- название таблицы саттелита 'Hub_flights_Hash_key,actual_arrival', -- какие колонки надо вытащить из таблицы источника 'stage.flights', -- имя таблицы откуда тянутся данные $$flight_id::varchar$$, -- хеш $$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития '''flights'''); -- SELECT * FROM data_vault.Sattelite_flights_actual_arrival;
На этом этапе можно четко сказать, что у нас построена схема модели данных DataVault составленная в тулзе гугла. Наполнены хабы, линки и сателлиты и прописан код к их обработке. Можно строить витрины!
