Всем привет! Это моя первая публикация на Хабре, поэтому строго не судите. Хочу поделиться с вами историей о том, как я делал Data Vault руками... или custom migrate a Data Vault c нотками Data Vault 2.0. Достаточно интересный способ провести время, но для начала углубимся в краткий экскурс.
Data Vault Modeling — процесс преобразования обычной модели данных в бизнес‑ориентированную модель для хранения информации, основанную на бизнес ключах.
Органика Data Vault:
Hub — отдельная таблица, описывающая уникальный бизнес ключ. В моем случае состоящая из полей:
1. Хеш - бизнес ключ в формате md5 |
2. Источник выгрузки, можно вывести код источника, но я взял саму таблицу =) |
3. Время загрузки данных в таблицу Hub |
4. Бизнес ключ. |
2. Satellite — описательная информация хаба
1. Хеш хаба |
2. Время загрузки данных в таблицу Satellite |
3. Дата действия записи в Satellite (SCD2) |
4. Источник, откуда берется выгрузка |
5. Атрибут Satellite |
3. Link — отношение между единицами бизнеса (ключами хабов)
1. Хеш линка (для комплексного просмотра изменений при SCD2 - состоит из скрещения хешей хаба) |
2. Время загрузки данных в таблицу Link |
3. Дата действия записи в Link (SCD2)(ПРИ НАЛИЧИИ) |
4. Источник, откуда берется выгрузка |
5. Хеш первого хаба |
6. Хеш второго хаба |
7. ... третьего при наличии и тд. |
В качестве примера была использована демонстрационная база данных, которую можно выгрузить в открытом доступе или глянуть здесь.
Перед формированием DataVault, необходимо возвести наглядную модель, в моем случае я использовал drawio от google. Это не займет много времени, но поможет при формировании хранилища.
Итак, поехали!
Открываем скрипт в консоли psql под нужным пользователем, у меня по дефолту.
psql -f demo_small_YYYYMMDD.sql -U postgres
Тем самым получим базу данных demo с данными об авиаперевозках по России. Объем данных небольшой, как раз подойдет для примера.
Для начала нам необходимо настроить репликацию (надо это вам или нет, решайте сами, но я сделал). Во избежании проблем связанных с доступами, распределением нагрузок и целостности данных.
Шаг 1. Репликация.
Для репликации данных в другую схему создадим новое расширение postgres_fdw, которое позволяет постгре держать связь с внешними данными из других серверов.
Как формируется реплика
/* -- Создаем расширение*/
DROP EXTENSION IF EXISTS postgres_fdw CASCADE;
CREATE EXTENSION postgres_fdw;
/*-- Создание сервера, который использует расширение в качестве обертки сторонних данных*/
CREATE SERVER IF NOT EXISTS foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', port '5432', dbname 'demo');
CREATE USER MAPPING IF NOT EXISTS FOR postgres
SERVER foreign_server
OPTIONS (user 'postgres', password 'postgres');
/*-- Создадим схему foreign_tables, в которую будем лить реплику*/
DROP SCHEMA IF EXISTS foreign_tables;
CREATE SCHEMA IF NOT EXISTS foreign_tables;
IMPORT FOREIGN SCHEMA bookings -- название схемы исходника
FROM SERVER foreign_server
INTO foreign_tables;
Перед тем как создать сервер, поднимаем новую бд с именем DWHDataVault. Код исполнялся именно в ней. Итог 1 шага, мы получили реплицированные данные из исходника, но есть костыль... куда без них, ну или фича, кому как удобно.
Реплика переносит данные в виде внешних таблиц, к которым нельзя подвязать внешние ключи. Поэтому переливаем данные из слоя реплицирования foreign_tables в табличный, т.к. у внешних таблиц нет полного функционала табличной части...
Миграция в табличный слой
/*сперва создадим слой Stage для хранения функциональных таблиц*/
DROP SCHEMA IF EXISTS Stage CASCADE; CREATE SCHEMA IF NOT EXISTS Stage;
/*aircrafts*/
DROP TABLE IF EXISTS Stage.aircrafts CASCADE;
CREATE TABLE Stage.aircrafts AS TABLE foreign_tables.aircrafts;
/*aircrafts_data*/
DROP TABLE IF EXISTS Stage.aircrafts_data CASCADE;
CREATE TABLE Stage.aircrafts_data AS TABLE foreign_tables.aircrafts_data;
/*boarding_passes*/
DROP TABLE IF EXISTS Stage.boarding_passes CASCADE;
CREATE TABLE Stage.boarding_passes AS TABLE foreign_tables.boarding_passes;
/*airports*/
DROP TABLE IF EXISTS Stage.airports CASCADE;
CREATE TABLE Stage.Airports AS TABLE foreign_tables.airports;
-- Что касается таблицы airports_data, в исходнике есть поле coordinates, в котором по дефолту тип данных point, для дальнейшего преобразования таблицы в органику DataVault необходимо переопределить тип
/*airports_data*/
DROP TABLE IF EXISTS Stage.airports_data CASCADE;
CREATE TABLE Stage.airports_data AS TABLE foreign_tables.airports_data;
ALTER TABLE Stage.airports_data ALTER COLUMN coordinates TYPE VARCHAR;
/*bookings*/
DROP TABLE IF EXISTS Stage.bookings CASCADE;
CREATE TABLE Stage.bookings AS TABLE foreign_tables.bookings;
/*flights*/
DROP TABLE IF EXISTS Stage.flights CASCADE;
CREATE TABLE Stage.flights AS TABLE foreign_tables.flights;
/*seats*/
DROP TABLE IF EXISTS Stage.seats CASCADE;
CREATE TABLE Stage.seats AS TABLE foreign_tables.seats;
/*ticket_flights*/
DROP TABLE IF EXISTS Stage.ticket_flights CASCADE;
CREATE TABLE Stage.ticket_flights AS TABLE foreign_tables.ticket_flights;
/*tickets*/
DROP TABLE IF EXISTS Stage.tickets CASCADE;
CREATE TABLE Stage.tickets AS TABLE foreign_tables.tickets;
Шаг 2. Подготовка DDL для организации DataVault
Для начала необходимо определить схему, для миграция данных.
Схема для составляющих DataVault
DROP SCHEMA IF EXISTS data_vault CASCADE;
CREATE SCHEMA IF NOT EXISTS data_vault;
DDL для таблиц Hub
Первичным ключом в таблицах будет захешированный бизнес ключ.
Для начала удалим таблицы, если такие имеются.
Удаление таблиц
DROP TABLE IF EXISTS data_vault.Hub_flights CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_airports_data CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_aircrafts_data CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_seats CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_ticket_flights CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_boarding_passes CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_tickets CASCADE;
DROP TABLE IF EXISTS data_vault.Hub_bookings CASCADE;
Напишем функцию для формирования таблиц Hub. Функция принимает на вход два параметра:
1. Название таблицы Hub
2. Бизнес ключ
Функция DDL Hub
CREATE OR REPLACE FUNCTION data_vault.ddl_hub_table(
table_name TEXT , -- название таблицы Hub
business_key TEXT) -- какие колонки надо вытащить из таблицы источника указывается с форматом
RETURNS VOID AS $$
DECLARE
qwery TEXT;
BEGIN -- проверка на наличие данных в параметрах функции
IF
(table_name IS NULL) OR (business_key IS NULL)
THEN RAISE EXCEPTION 'Не заполнены параметры';
END IF;
qwery:= 'CREATE TABLE IF NOT EXISTS data_vault.' || table_name || '(' ||
'Hash_key Varchar(33) PRIMARY KEY,
record_sourse Varchar(20) NOT NULL,
Load_date timestamp NOT NULL ,' ||
business_key || ' NOT NULL);';
EXECUTE qwery;
END;
$$ LANGUAGE plpgsql;
Запрос без функции выглядит так.
CREATE TABLE IF NOT EXISTS data_vault.Hub_flights(
Hash_key Varchar(33) PRIMARY KEY,
record_sourse Varchar(20) NOT NULL,
Load_date timestamp NOT NULL,
flight_id bigint NOT NULL);
Код формирования DDL HUB
-- flights
SELECT data_vault.ddl_hub_table('Hub_flights', 'flight_id bigint')
-- airports_data
SELECT data_vault.ddl_hub_table('Hub_airports_data', 'airport_code bpchar(3)')
-- aircrafts_data
SELECT data_vault.ddl_hub_table('Hub_aircrafts_data', 'aircraft_code bpchar(3)')
-- seats
SELECT data_vault.ddl_hub_table('Hub_seats', 'aircraft_code_seat_no Varchar(20)')
-- ticket_flights
SELECT data_vault.ddl_hub_table('Hub_ticket_flights', 'ticket_no_flight_id Varchar(25)')
-- boarding_passes
SELECT data_vault.ddl_hub_table('Hub_boarding_passes', 'ticket_no_flight_id Varchar(25)')
-- tickets
SELECT data_vault.ddl_hub_table('Hub_tickets', 'ticket_no Varchar(20)')
-- bookings
SELECT data_vault.ddl_hub_table('Hub_bookings', 'book_ref Varchar(15)')
DDL для таблиц Link
DDL для Link прописан вручную, так как структура у таблиц может быть индивидуальной.
В некоторых таблицах, обработка данных ведется без хеширования, поэтому привязать Link к Satellite на этапе DDL не представляется возможным, но в таких случаях REFERENCES осуществляется на этапе обработки.
-- Для начала удалим таблицы, если такие имеются.
DROP TABLE IF EXISTS data_vault.Link_flights_airoport_data_departure CASCADE;
DROP TABLE IF EXISTS data_vault.Link_flights_airoport_data_arrival CASCADE;
DROP TABLE IF EXISTS data_vault.Link_flights_ticket_flights CASCADE;
DROP TABLE IF EXISTS data_vault.Link_ticket_flights_boarding_passes CASCADE;
DROP TABLE IF EXISTS data_vault.Link_ticket_flights_tickets CASCADE;
DROP TABLE IF EXISTS data_vault.Link_tickets_bookings CASCADE;
DROP TABLE IF EXISTS data_vault.Link_flights_aircrafts_data CASCADE;
DROP TABLE IF EXISTS data_vault.Link_seats_aircrafts_data CASCADE;
Код формирования DDL LINK
-- flights_airoport_data_departure
CREATE TABLE IF NOT EXISTS data_vault.Link_flights_airoport_data_departure(
Link_flights_airoport_data_departure_Hashkey Varchar(33),
load_date timestamp,
load_end_date timestamp,
record_sourse Varchar(20),
Hub_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_flights(hash_key),
Hub_airoport_data_Hash_key Varchar(33) REFERENCES data_vault.Hub_airports_data(hash_key)
);
-- flights_airoport_arrival_airport
CREATE TABLE IF NOT EXISTS data_vault.Link_flights_airoport_data_arrival(
Link_flights_airoport_data_arrival_Hashkey Varchar(33),
load_date timestamp,
load_end_date timestamp,
record_sourse Varchar(20),
Hub_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_flights(hash_key),
Hub_airoport_data_Hash_key Varchar(33) REFERENCES data_vault.Hub_airports_data(hash_key)
);
-- flights_ticket_flights
CREATE TABLE IF NOT EXISTS data_vault.Link_flights_ticket_flights(
Link_flights_ticket_flights_Hashkey Varchar(33),
load_date timestamp,
load_end_date timestamp,
record_sourse Varchar(30),
Hub_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_flights(hash_key),
ticket_no Varchar(33) , -- удалить потом, для инсерта первого
Hub_ticket_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_ticket_flights(hash_key));
-- ticket_flights_boarding_passes
CREATE TABLE IF NOT EXISTS data_vault.Link_ticket_flights_boarding_passes(
Link_ticket_flights_boarding_passes Varchar(40) ,
load_date timestamp,
load_end_date timestamp,
record_sourse Varchar(33),
ticket_no Varchar(33),
Hub_boarding_passes_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/,
Hub_ticket_flights_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/
);
-- ticket_flights_tickets
CREATE TABLE IF NOT EXISTS data_vault.Link_ticket_flights_tickets(
Link_ticket_flights_tickets Varchar(40),
load_date timestamp,
record_sourse Varchar(33), -- источники
Hub_ticket_flights_Hash_key Varchar(33) REFERENCES data_vault.Hub_ticket_flights(hash_key),
Hub_tickets_Hash_key Varchar(33) REFERENCES data_vault.Hub_tickets(hash_key)
);
-- tickets_bookings
CREATE TABLE IF NOT EXISTS data_vault.Link_tickets_bookings(
Link_tickets_bookings Varchar(33) PRIMARY KEY,
load_date timestamp,
load_end_date timestamp,
record_sourse Varchar(30),
Hub_ticket_Hash_key Varchar(33) REFERENCES data_vault.Hub_tickets(hash_key),
Hub_bookings_Hash_key Varchar(33) REFERENCES data_vault.Hub_bookings(hash_key)
);
-- flights_aircrafts_data
CREATE TABLE IF NOT EXISTS data_vault.Link_flights_aircrafts_data(
Link_flights_aircrafts_data_Hashkey Varchar(33),
load_date timestamp,
record_sourse Varchar(22),
Hub_flights_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/,
Hub_aircrafts_data_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/
);
-- seats_aircrafts_data
CREATE TABLE IF NOT EXISTS data_vault.Link_seats_aircrafts_data(
Link_seats_aircrafts_data_Hashkey Varchar(33),
load_date timestamp,
record_sourse Varchar(20),
Hub_aircrafts_data_Hash_key Varchar(33) /*REFERENCES в данном случае добавлены в коде с обработкой*/,
Hub_seats_Hash_key Varchar(33) /*REFERENCES в данном случае добавлен в коде с обработкой*/
);
DDL для таблиц Satellite
Очищаем таблицы, если такие есть
-- Чистим чистим
DROP TABLE IF exists data_vault.Sattelite_aircrafts_data_range;
DROP TABLE IF exists data_vault.Sattelite_aircrafts_data_model;
DROP TABLE IF exists data_vault.Sattelite_seats_fare_conditions;
DROP TABLE IF exists data_vault.Sattelite_airport_data_timezone;
DROP TABLE IF exists data_vault.Sattelite_airport_data_coordinates;
DROP TABLE IF exists data_vault.Sattelite_airport_data_airport_name;
DROP TABLE IF exists data_vault.Sattelite_airport_data_city;
DROP TABLE IF exists data_vault.Sattelite_ticket_flights_amount;
DROP TABLE IF exists data_vault.Sattelite_ticket_flights_fare_conditions;
DROP TABLE IF exists data_vault.Sattelite_boarding_passes_boarding_no;
DROP TABLE IF exists data_vault.Sattelite_boarding_passes_seat_no;
DROP TABLE IF exists data_vault.Sattelite_tickets_book_ref;
DROP TABLE IF exists data_vault.Sattelite_tickets_contact_data;
DROP TABLE IF exists data_vault.Sattelite_tickets_passenger_id;
DROP TABLE IF exists data_vault.Sattelite_tickets_passenger_name;
DROP TABLE IF exists data_vault.Sattelite_bookings_book_date;
DROP TABLE IF exists data_vault.Sattelite_bookings_total_amount;
DROP TABLE IF exists data_vault.Sattelite_flights_flight_no;
DROP TABLE IF exists data_vault.Sattelite_flights_scheduled_departure;
DROP TABLE IF exists data_vault.Sattelite_flights_scheduled_arrival;
DROP TABLE IF exists data_vault.Sattelite_flights_departure_airport;
DROP TABLE IF exists data_vault.Sattelite_flights_arrival_airport;
DROP TABLE IF exists data_vault.Sattelite_flights_status;
DROP TABLE IF exists data_vault.Sattelite_flights_aircraft_code;
DROP TABLE IF exists data_vault.Sattelite_flights_actual_departure;
DROP TABLE IF exists data_vault.Sattelite_flights_actual_arrival;
```
При формировании таблиц, возведем функцию, которая принимает на вход три параметра, первый — название таблицы Satellite, название хеша Hub, атрибут:
Функция DDL Satellite
CREATE OR REPLACE FUNCTION data_vault.ddl_Sattelite_table(
table_name TEXT, -- название таблицы сателлита
hub_hash TEXT, -- название хеша хаба, к которому относится сателлит
atrib TEXT) -- какие колонки надо вытащить из таблицы источника указывается вместе с форматом
RETURNS VOID AS $$
DECLARE
qwery TEXT;
BEGIN -- проверка на наличие данных в параметрах функции
IF
(table_name IS NULL) OR (hub_hash IS NULL) OR (atrib IS NULL)
THEN RAISE EXCEPTION 'Не заполнены параметры';
END IF;
qwery:= 'CREATE TABLE IF NOT EXISTS data_vault.' || table_name || '(' ||
hub_hash || '_Hash_key Varchar(33)' || ' REFERENCES data_vault.' || hub_hash || '(Hash_key),' ||
'load_date timestamp, load_end_date timestamp, record_sourse Varchar(20), ' || atrib || ', ' ||
'PRIMARY KEY (' || hub_hash || '_Hash_key ' || ', load_date));';
EXECUTE qwery;
END;
$$ LANGUAGE plpgsql;
Формирование таблиц Satellite:
Код формирование DDL Satellite
/*Sattelite_aircrafts_data*/
-- range
SELECT data_vault.ddl_Sattelite_table('Sattelite_aircrafts_data_range', 'Hub_aircrafts_data', 'range int4');
--model
SELECT data_vault.ddl_Sattelite_table('Sattelite_aircrafts_data_model', 'Hub_aircrafts_data', 'model TEXT');
/*Sattelite_seats*/
-- fare_conditions
SELECT data_vault.ddl_Sattelite_table('Sattelite_seats_fare_conditions', 'Hub_seats', 'fare_conditions Varchar(15)');
/*Sattelite_airoport_data*/
-- timezone
SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_timezone', 'Hub_airport_data', 'timezone Varchar(34)');
-- coordinates
SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_coordinates', 'Hub_airport_data', 'coordinates Varchar(50)');
-- airport_name
SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_airport_name', 'Hub_airport_data', 'airport_name text');
-- city
SELECT data_vault.ddl_Sattelite_table('Sattelite_airport_data_city', 'Hub_airport_data', 'city text');
/*Sattelite_ticket_flights*/
-- amount
SELECT data_vault.ddl_Sattelite_table('Sattelite_ticket_flights_amount', 'Hub_ticket_flights', 'amount numeric(10, 2)');
-- fare_conditions
SELECT data_vault.ddl_Sattelite_table('Sattelite_ticket_flights_fare_conditions', 'Hub_ticket_flights', 'fare_conditions Varchar(10)');
/*Sattelite_boarding_passes*/
-- boarding_no
SELECT data_vault.ddl_Sattelite_table('Sattelite_boarding_passes_boarding_no', 'Hub_boarding_passes', 'boarding_no int4');
-- seat_no
SELECT data_vault.ddl_Sattelite_table('Sattelite_boarding_passes_seat_no', 'Hub_boarding_passes', 'seat_no varchar(4)');
/*Sattelite_tickets*/
-- book_ref
SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_book_ref', 'Hub_ticket', 'book_ref bpchar(6)');
-- contact_data
SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_contact_data', 'Hub_ticket', 'contact_data jsonb');
-- passenger_id
SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_contact_data', 'Hub_ticket', 'passenger_id varchar(20)');
-- passenger_name
SELECT data_vault.ddl_Sattelite_table('Sattelite_tickets_passenger_name', 'Hub_ticket', 'passenger_name text');
/*Sattelite_bookings*/
-- book_date
SELECT data_vault.ddl_Sattelite_table('Sattelite_bookings_book_date', 'Hub_bookings', 'book_date timestamptz');
-- total_amount
SELECT data_vault.ddl_Sattelite_table('Sattelite_bookings_total_amount', 'Hub_bookings', 'total_amount numeric(10, 2)');
/*Sattelite_flights*/
-- flight_no
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_flight_no', 'Hub_flights', 'flight_no bpchar(6)');
-- scheduled_departure
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_scheduled_departure', 'Hub_flights', 'scheduled_departure timestamptz');
-- scheduled_arrival
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_scheduled_arrival', 'Hub_flights', 'scheduled_arrival timestamptz');
-- departure_airport
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_departure_airport', 'Hub_flights', 'departure_airport bpchar(3)');
-- arrival_airport
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_arrival_airport', 'Hub_flights', 'arrival_airport bpchar(3)');
-- status
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_status', 'Hub_flights', 'status varchar(20)');
-- aircraft_code
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_aircraft_code', 'Hub_flights', 'aircraft_code bpchar(3)');
-- actual_departure
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_actual_departure', 'Hub_flights', 'actual_departure timestamptz');
-- actual_arrival
SELECT data_vault.ddl_Sattelite_table('Sattelite_flights_actual_arrival', 'Hub_flights', 'actual_arrival timestamptz');
К итогам второго шага отнесем формирования ddl для сателлитов, хабов и линков и связи между ними путем формирования ограничений через REFERENCES. Остались заключительные шаги, с насыщением таблиц и постобработкой.
Шаг 3. Перенос данных из stage хранилища в DataVault.
После залива данных в схему stage из временных таблиц foreign_tables и формирования ddl всех составляющих DataVault. Данные необходимо разложить по Hub, Link и Satellite.
Начнем с Hub, напишем функцию, для миграции из схемы stage в схему data_vault.
Функция для переноса данных в Hub
CREATE OR REPLACE FUNCTION data_vault.insert_hub_table(
table_name TEXT , -- название таблицы хаба
business_key TEXT, -- какие колонки надо вытащить из таблицы источника указывается вместе с форматом
sourse TEXT, -- источник откуда тянутся данные
times TEXT, -- время наполнения таблицы данными
sourse_table TEXT) -- полное обозначение источника для запроса
RETURNS VOID AS $$
DECLARE
qwery TEXT;
BEGIN -- проверка на наличие данных в параметрах функции
IF
(table_name IS NULL) OR (business_key IS NULL)
THEN RAISE EXCEPTION 'Не заполнены параметры';
END IF;
qwery:= 'INSERT INTO data_vault.' || table_name || '(Hash_key, record_sourse, Load_date, ' || (regexp_split_to_array(business_key, '::'))[1] || ')' ||
' SELECT DISTINCT MD5(' || business_key || '), ' ||
sourse ||' , ' ||
times ||' , ' ||
(regexp_split_to_array(business_key, '::'))[1] ||
' FROM ' || sourse_table || ';';
EXECUTE qwery;
END;
$$ LANGUAGE plpgsql;
Так выглядит запрос описанный в функции.
INSERT INTO data_vault.Hub_flights (Hash_key, record_sourse, Load_date, flight_id)
SELECT DISTINCT
MD5(flight_id::varchar),
'flights',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
flight_id
FROM stage.flights;
Определим функцию для всех хабов.
Код наполнения данными таблиц Hab
/*Hub_flights*/
SELECT data_vault.insert_hub_table('Hub_flights',
'flight_id::varchar',
'''flights''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.flights')
/*Hub_airports_data*/
SELECT data_vault.insert_hub_table('Hub_airports_data',
'airport_code::varchar',
'''airports_data''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.airports_data')
/*Hub_aircrafts_data*/
SELECT data_vault.insert_hub_table('Hub_aircrafts_data',
'aircraft_code::varchar',
'''aircrafts_data''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.aircrafts_data')
/*Hub_seats*/
SELECT data_vault.insert_hub_table('Hub_seats',
'concat(aircraft_code, seat_no)',
'''seats''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.seats')
/*Hub_ticket_flights*/
SELECT data_vault.insert_hub_table('Hub_ticket_flights',
'(concat(ticket_no, '&', flight_id))::varchar',
'''ticket_flights''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.ticket_flights')
/*Hub_boarding_passes*/
SELECT data_vault.insert_hub_table('Hub_boarding_passes',
'(concat(ticket_no, '&', flight_id))::varchar',
'''boarding_passes''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.boarding_passes')
/*Hub_tickets*/
SELECT data_vault.insert_hub_table('Hub_tickets',
'(ticket_no)::varchar',
'''tickets''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.tickets')
/*Hub_bookings*/
SELECT data_vault.insert_hub_table('Hub_bookings',
'(book_ref)::varchar',
'''bookings''',
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$,
'stage.bookings')
Для наполнения таблиц Link я выкачу простынь, потому что нецелесообразно писать функцию, тк в параметрах будет много переменных.
Код наполнения данными таблиц Link
/*Link_flights_airoport_data_departure - departure_airport - аэропорт отправления*/
INSERT INTO data_vault.Link_flights_airoport_data_departure
(Link_flights_airoport_data_departure_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key)
SELECT DISTINCT
concat(flight_id, COALESCE(sf.departure_airport, sa.airport_code)),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'flights_airport_data',
MD5(flight_id :: varchar),
MD5(sa.airport_code)
FROM stage.flights sf
LEFT JOIN stage.airports_data sa
ON sf.departure_airport = sa.airport_code;
/*Link_flights_airoport_data - arrival_airport - аэропорт прибытия*/
INSERT INTO data_vault.Link_flights_airoport_data_arrival
(Link_flights_airoport_data_arrival_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key)
SELECT DISTINCT
concat(flight_id, COALESCE(sf.departure_airport, sa.airport_code)),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'flights_airport_data',
MD5(flight_id :: varchar),
MD5(sa.airport_code)
FROM stage.flights sf
LEFT JOIN stage.airports_data sa
ON sf.arrival_airport = sa.airport_code;
/*Link_flights_ticket_flights*/
INSERT INTO data_vault.Link_flights_ticket_flights (Link_flights_ticket_flights_Hashkey, load_date, load_end_date, record_sourse, hub_flights_hash_key, ticket_no, hub_ticket_flights_hash_key)
SELECT DISTINCT
(concat(tf.ticket_no, '&', f.flight_id)::varchar),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'flights_ticket_flights',
md5(f.flight_id :: varchar),
md5(tf.ticket_no),
MD5((concat(tf.ticket_no, '&', f.flight_id))::varchar)
FROM stage.ticket_flights tf
JOIN stage.flights f ON f.flight_id = tf.flight_id;
/*Link_ticket_flights_boarding_passes*/
INSERT INTO data_vault.Link_ticket_flights_boarding_passes (Link_ticket_flights_boarding_passes, load_date, load_end_date, record_sourse, ticket_no, Hub_boarding_passes_Hash_key, Hub_ticket_flights_Hash_key)
SELECT
DISTINCT concat(concat(tf.ticket_no, '&' ,tf.flight_id), concat(bp.ticket_no, '&' ,bp.flight_id)),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'ticket_flights_boarding_passes',
md5(bp.ticket_no),
concat(bp.ticket_no, '&',bp.flight_id),
concat(tf.ticket_no, '&',tf.flight_id)
FROM stage.boarding_passes bp
LEFT JOIN stage.ticket_flights tf
USING(ticket_no, flight_id);
/*Link_ticket_flights_tickets*/
INSERT INTO data_vault.Link_ticket_flights_tickets (Link_ticket_flights_tickets, load_date, record_sourse, Hub_ticket_flights_Hash_key, Hub_tickets_Hash_key)
SELECT DISTINCT
concat(concat(tf.ticket_no, '&', tf.flight_id), t.ticket_no),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'ticket_flights_tickets',
md5(concat(tf.ticket_no, '&', tf.flight_id)),
md5(t.ticket_no)
FROM stage.ticket_flights tf
full JOIN stage.tickets t
ON t.ticket_no = tf.ticket_no;
/*Link_ticket_bookings*/
INSERT INTO data_vault.Link_tickets_bookings (Link_tickets_bookings, load_date, load_end_date, record_sourse, Hub_ticket_Hash_key, Hub_bookings_Hash_key)
SELECT concat(ticket_no, book_ref),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'ticket_bookings',
md5(ticket_no),
md5(book_ref)
FROM stage.tickets t
FULL JOIN stage.bookings b USING(book_ref);
/*Link_flights_aircrafts_data*/
INSERT INTO data_vault.Link_flights_aircrafts_data (Link_flights_aircrafts_data_Hashkey, load_date, record_sourse, Hub_flights_Hash_key, Hub_aircrafts_data_Hash_key)
SELECT DISTINCT
concat(f.aircraft_code, ad.aircraft_code),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'flights_aircrafts_data',
f.aircraft_code,
ad.aircraft_code
FROM stage.flights f
FULL JOIN stage.aircrafts_data ad
ON f.aircraft_code = ad.aircraft_code;
/*Link_seats_aircrafts_data*/
INSERT INTO data_vault.Link_seats_aircrafts_data(Link_seats_aircrafts_data_Hashkey, load_date, record_sourse, Hub_aircrafts_data_Hash_key, Hub_seats_Hash_key)
SELECT DISTINCT
concat(ad.aircraft_code, concat(s.aircraft_code, s.seat_no)),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'seats_aircrafts_data',
ad.aircraft_code,
concat(s.aircraft_code, s.seat_no)
FROM stage.aircrafts_data ad
JOIN stage.seats s USING(aircraft_code);
Для заполнения данных в таблицах-спутниках написана функция, поскольку для каждого атрибута основной таблицы была создана отдельная таблица-спутник, которых в итоге получилось более двадцати. Зачем на каждый атрибут (желательно) делать свой сателлит.
Функция для заполнения данными таблиц Satellite
/*Функция*/
CREATE OR REPLACE FUNCTION data_vault.insert_data_from_table(
table_name TEXT , -- название таблицы саттелита
source_table_columns TEXT, -- какие колонки надо вытащить из таблицы источника
source_table_name TEXT, -- имя таблицы откуда тянутся данные
hash TEXT, -- из чего будет состоять хеш
times TEXT, -- время записи данных в таблицу
sourse_col TEXT) -- название источника
RETURNS VOID AS $$
DECLARE
qwery TEXT;
BEGIN -- проверка на наличие информации в параметрах
IF (table_name IS NULL) OR (source_table_columns IS NULL) OR (source_table_name IS NULL) OR (hash IS NULL) OR (times IS NULL) OR (sourse_col IS NULL)
THEN RAISE EXCEPTION 'Не заполнены параметры';
END IF;
qwery:= 'INSERT INTO ' || table_name ||
' (' || (regexp_split_to_array(source_table_columns, ','))[1] || ', load_date' || ' ,' || ' load_end_date' || ' ,' || ' record_sourse' || ', '
|| (regexp_split_to_array(source_table_columns, ','))[2] || ') ' || ' SELECT DISTINCT md5(' || hash || ') , '
|| times || ', ' || '''1111-11-11 11:11:11'':: timestamp'|| ' , ' || sourse_col || ', ' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || source_table_name;
EXECUTE qwery;
END;
$$ LANGUAGE plpgsql;
Определяем параметры функции к сателлитам (формат иерархии: Название таблицы как в исходнике, атрибуты таблицы, которые являются сателлитом):
Код заполнения данными таблиц Satellite
-- Sattelite_aircrafts_data
--#####################################################################################################
/*Аircrafts_data_range*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_aircrafts_data_range' , -- название таблицы саттелита
'Hub_aircrafts_data_Hash_key,range', -- какие колонки надо вытащить из таблицы источника
'stage.aircrafts_data', -- имя таблицы откуда тянутся данные
'aircraft_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''aircrafts_data''');
-- SELECT * FROM data_vault.Sattelite_aircrafts_data_range;
/*Аircrafts_data_model*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_aircrafts_data_model' , -- название таблицы саттелита
'Hub_aircrafts_data_Hash_key,model', -- какие колонки надо вытащить из таблицы источника
'stage.aircrafts_data', -- имя таблицы откуда тянутся данные
'aircraft_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''aircrafts_data''');
-- SELECT * FROM data_vault.Sattelite_aircrafts_data_model;
-- Sattelite_seats
--#####################################################################################################
/*fare_conditions*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_seats_fare_conditions' ,-- название таблицы саттелита
'Hub_seats_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника
'stage.seats', -- имя таблицы откуда тянутся данные
'concat(aircraft_code, seat_no)', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''seats''');
-- SELECT * FROM data_vault.Sattelite_seats_fare_conditions;
-- Sattelite_airoport_data
--#####################################################################################################
/*timezone*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_timezone' ,-- название таблицы саттелита
'Hub_airport_data_Hash_key,timezone', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_timezone;
/*coordinates*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_coordinates' , -- название таблицы саттелита
'Hub_airport_data_Hash_key,coordinates', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_coordinates;
/*airport_name*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_airport_name' , -- название таблицы саттелита
'Hub_airport_data_Hash_key,airport_name', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_airport_name;
/*city*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_airport_data_city' , -- название таблицы саттелита
'Hub_airport_data_Hash_key,city', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_city;
-- Sattelite_ticket_flights
--#####################################################################################################
/*amount*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_ticket_flights_amount' , -- название таблицы саттелита
'Hub_ticket_flights_Hash_key,amount', -- какие колонки надо вытащить из таблицы источника
'stage.ticket_flights', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_ticket_flights_amount;
/*fare_conditions*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_ticket_flights_fare_conditions' , -- название таблицы саттелита
'Hub_ticket_flights_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника
'stage.ticket_flights', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_ticket_flights_fare_conditions;
-- Sattelite_boarding_passes
--#####################################################################################################
/*boarding_no*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_boarding_passes_boarding_no' , -- название таблицы саттелита
'Hub_boarding_passes_Hash_key,boarding_no', -- какие колонки надо вытащить из таблицы источника
'stage.boarding_passes', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''boarding_passes''');
-- SELECT * FROM data_vault.Sattelite_boarding_passes_boarding_no;
/*seat_no*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_boarding_passes_seat_no' , -- название таблицы саттелита
'Hub_boarding_passes_Hash_key,seat_no', -- какие колонки надо вытащить из таблицы источника
'stage.boarding_passes', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''boarding_passes''');
-- SELECT * FROM data_vault.Sattelite_boarding_passes_seat_no;
-- Sattelite_tickets
--#####################################################################################################
/*book_ref*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_book_ref' , -- название таблицы саттелита
'Hub_ticket_Hash_key,book_ref', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_book_ref;
/*contact_data*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_contact_data' , -- название таблицы саттелита
'Hub_ticket_Hash_key,contact_data', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_contact_data;
/*passenger_id*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_passenger_id' , -- название таблицы саттелита
'Hub_ticket_Hash_key,passenger_id', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_passenger_id;
/*passenger_name*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_tickets_passenger_name' , -- название таблицы саттелита
'Hub_ticket_Hash_key,passenger_name', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_passenger_name;
-- Sattelite_bookings
--#####################################################################################################
/*book_date*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_bookings_book_date' , -- название таблицы саттелита
'Hub_bookings_Hash_key,book_date', -- какие колонки надо вытащить из таблицы источника
'stage.bookings', -- имя таблицы откуда тянутся данные
$$(book_ref)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''bookings''');
-- SELECT * FROM data_vault.Sattelite_bookings_book_date;
/*total_amount*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_bookings_total_amount' , -- название таблицы саттелита
'Hub_bookings_Hash_key,total_amount', -- какие колонки надо вытащить из таблицы источника
'stage.bookings', -- имя таблицы откуда тянутся данные
$$(book_ref)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''bookings''');
-- SELECT * FROM data_vault.Sattelite_bookings_total_amount;
-- Sattelite_flights
--#####################################################################################################
/*flight_no*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_flight_no' , -- название таблицы саттелита
'Hub_flights_Hash_key,flight_no', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_flight_no;
/*scheduled_departure*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_scheduled_departure' , -- название таблицы саттелита
'Hub_flights_Hash_key,scheduled_departure', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_scheduled_departure;
/*scheduled_arrival*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_scheduled_arrival' , -- название таблицы саттелита
'Hub_flights_Hash_key,scheduled_arrival', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_scheduled_arrival;
/*departure_airport*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_departure_airport' , -- название таблицы саттелита
'Hub_flights_Hash_key,departure_airport', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_departure_airport;
/*arrival_airport*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_arrival_airport' , -- название таблицы саттелита
'Hub_flights_Hash_key,arrival_airport', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_arrival_airport;
/*status*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_status' , -- название таблицы саттелита
'Hub_flights_Hash_key,status', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_status;
/*aircraft_code*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_aircraft_code' , -- название таблицы саттелита
'Hub_flights_Hash_key,aircraft_code', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_aircraft_code;
/*actual_departure*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_actual_departure' , -- название таблицы саттелита
'Hub_flights_Hash_key,actual_departure', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_actual_departure;
/*actual_arrival*/
SELECT data_vault.insert_data_from_table('data_vault.Sattelite_flights_actual_arrival' , -- название таблицы саттелита
'Hub_flights_Hash_key,actual_arrival', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_actual_arrival;
В схеме DataVault собраны и наполнены все три составляющих — Hub, Link, Satellite.
Шаг 4. Жизненный цикл DataVault.
Необходимость обработки данных в таблицах DataVault обосновывается изменениями, обновлениями или дополнениями информации в таблицах исходниках. Например, конструкция таблиц Hub должна оставаться неизменной, т.е. данные не могут подвергаться изменениям, только добавляться. В Link и Satellite наоборот, таблицы имеют в своем составе атрибут load_end_date — любая запись имеет свой срок жизни, если в источнике изменился атрибут, то в линке или сателлите обновится поле load_end_date и появится новая строка, согласно идеологии Дена Линстеда — идейного вдохновителя и основателя данного подхода.
Обработка для хабов примечательна тем, что данные сопоставляются с источником, если таких нет в хабе, то добавляем.
Обработка Hub
/*Hub_flights*/
INSERT INTO data_vault.Hub_flights (Hash_key, record_sourse, Load_date, flight_id)
SELECT DISTINCT
MD5(flight_id::varchar),
'flights',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
flight_id
FROM stage.flights f
LEFT JOIN data_vault.Hub_flights hf USING(flight_id)
WHERE load_date IS null;
/*Hub_airports_data*/
INSERT INTO data_vault.Hub_airports_data (Hash_key, record_sourse, Load_date, airport_code)
SELECT DISTINCT
MD5(airport_code::varchar),
'airports_data',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
airport_code
FROM stage.airports_data ad
LEFT JOIN data_vault.Hub_airports_data hf USING(airport_code)
WHERE load_date IS null;
/*Hub_aircrafts_data*/
INSERT INTO data_vault.Hub_aircrafts_data (Hash_key, record_sourse, Load_date, aircraft_code)
SELECT DISTINCT
MD5(aircraft_code::varchar),
'aircrafts_data',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
aircraft_code
FROM stage.aircrafts_data ad
LEFT JOIN data_vault.Hub_aircrafts_data hf USING(aircraft_code)
WHERE load_date IS null;
/*Hub_seats*/
INSERT INTO data_vault.Hub_seats (Hash_key, record_sourse, Load_date, aircraft_code_seat_no)
SELECT DISTINCT
MD5(concat(aircraft_code, seat_no)),
'seats',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
concat(aircraft_code, seat_no) aircraft_code_seat_no
FROM stage.seats ad
LEFT JOIN data_vault.Hub_seats hf ON concat(ad.aircraft_code, ad.seat_no) = hf.aircraft_code_seat_no
WHERE load_date IS null;
/*Hub_ticket_flights*/
INSERT INTO data_vault.Hub_ticket_flights (Hash_key, record_sourse, Load_date, ticket_no_flight_id)
SELECT DISTINCT
MD5(concat(ticket_no, '&', flight_id)::varchar),
'ticket_flights',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
concat(ticket_no, '&', flight_id)
FROM stage.ticket_flights еа
LEFT JOIN data_vault.Hub_ticket_flights htf ON (concat(еа.ticket_no, '&', еа.flight_id)) = ticket_no_flight_id
WHERE load_date IS null;
/*Hub_boarding_passes*/
INSERT INTO data_vault.Hub_boarding_passes (Hash_key, record_sourse, Load_date, ticket_no_flight_id)
SELECT DISTINCT
MD5(concat(ticket_no, '&', flight_id)::varchar),
'boarding_passes',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
concat(ticket_no, '&', flight_id)
FROM stage.ticket_flights еа
LEFT JOIN data_vault.Hub_ticket_flights htf ON (concat(еа.ticket_no, '&', еа.flight_id)) = ticket_no_flight_id
WHERE load_date IS null;
/*Hub_tickets*/
INSERT INTO data_vault.Hub_tickets (Hash_key, record_sourse, Load_date, ticket_no)
SELECT DISTINCT
MD5((ticket_no)::varchar),
'tickets',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
ticket_no
FROM stage.tickets еа
LEFT JOIN data_vault.Hub_tickets htf USING(ticket_no)
WHERE load_date IS null;
/*Hub_bookings*/
INSERT INTO data_vault.Hub_bookings (Hash_key, record_sourse, Load_date, book_ref)
SELECT DISTINCT
MD5((book_ref)::varchar),
'bookings',
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
book_ref
FROM stage.tickets еа
LEFT JOIN data_vault.Hub_bookings htf USING(book_ref)
WHERE load_date IS null;
Можно конечно не мостить простынь, а найти более изящное решение, но я не стал.
Самым трудозатратным на этом пути было прописать обработку таблиц link, так как каждая таблица имеет свою структуру — упростить обработку у меня не вышло, выкатываю очередную простынь.
Обработка Link
/*###########################
обработка - departure_airport
#############################*/
/*проверка на наличие изменение связей - если поменяют название аэропорта*/
DROP TABLE IF EXISTS data_vault.for_lfadp_update_time;
CREATE TABLE IF NOT EXISTS data_vault.for_lfadp_update_time AS
SELECT DISTINCT lfadd.hub_flights_hash_key,
lfadd.hub_airoport_data_hash_key,
MD5(hf.flight_id :: varchar) AS new_flight_id,
MD5(hf.departure_airport :: varchar) AS new_airport_name,
hf.flight_id new_flight,
hf.departure_airport new_airport,
to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time
FROM stage.flights hf
LEFT JOIN data_vault.Link_flights_airoport_data_departure lfadd ON md5(hf.flight_id::varchar) = lfadd.hub_flights_hash_key
WHERE (CASE WHEN upper(replace(lfadd.hub_airoport_data_hash_key, ' ', '')) = upper(replace(md5(hf.departure_airport::varchar), ' ', '')) THEN 1 ELSE 0 END) = 0;
-- обновляем время в линке на новое, если сменилось название аэропорта в исходнике
UPDATE data_vault.Link_flights_airoport_data_departure a
SET load_end_date = flut.new_fix_time
FROM data_vault.for_lfadp_update_time flut
WHERE a.hub_flights_hash_key =
flut.new_flight_id;
-- добавляем строку в таблицу Link_flights_airoport_data_departure новую
INSERT INTO data_vault.Link_flights_airoport_data_departure
(Link_flights_airoport_data_departure_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key)
SELECT
concat(new_flight, new_airport),
new_fix_time + INTERVAL '1 second',
'1111-11-11 11:11:11' :: timestamp,
'flights_airport_data',
new_flight_id,
new_airport_name
FROM data_vault.for_lfadp_update_time;
DROP TABLE IF EXISTS data_vault.for_lfadp_update_time;
/*обработка departure_airport завершена*/
/*###########################
обработка - arrival_airport
#############################*/
/*проверка на наличие изменение связей - если поменяют название аэропорта*/
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time AS
SELECT DISTINCT lfada.hub_flights_hash_key,
lfada.hub_airoport_data_hash_key,
MD5(hf.flight_id :: varchar) AS new_flight_id,
MD5(hf.arrival_airport :: varchar) AS new_airport_name,
hf.flight_id new_flight,
hf.arrival_airport new_airport,
to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time
FROM stage.flights hf
LEFT JOIN data_vault.Link_flights_airoport_data_arrival lfada ON md5(hf.flight_id::varchar) = lfada.hub_flights_hash_key
WHERE (CASE WHEN upper(replace(lfada.hub_airoport_data_hash_key, ' ', '')) = upper(replace(md5(hf.arrival_airport::varchar), ' ', '')) THEN 1 ELSE 0 END) = 0;
-- SELECT * FROM data_vault.for_lfada_update_time
-- обновляем время в линке на новое, если сменилось название аэропорта в исходнике
UPDATE data_vault.Link_flights_airoport_data_arrival a
SET load_end_date = flut.new_fix_time
FROM data_vault.for_lfada_update_time flut
WHERE a.hub_flights_hash_key =
flut.new_flight_id;
INSERT INTO data_vault.Link_flights_airoport_data_arrival
(Link_flights_airoport_data_arrival_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, Hub_airoport_data_Hash_key)
SELECT
concat(new_flight_id, new_airport_name),
new_fix_time + INTERVAL '1 second',
'1111-11-11 11:11:11' :: timestamp,
'flights_airport_data',
new_flight_id,
new_airport_name
FROM data_vault.for_lfada_update_time;
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
/*обработка arrival_airport завершена*/
/*################################
обработка - flights_ticket_flights
##################################*/
/*проверка если отменили рейс flight_id и перенесли билет на новый рейс - такое же может быть , значит load end date будет актуален*/
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
WITH
new AS (SELECT distinct
replace(concat(tf.ticket_no, tf.flight_id), ' ', '') new_data
, tf.ticket_no
, tf.flight_id n_flight_id
FROM stage.ticket_flights tf
LEFT JOIN data_vault.Link_flights_ticket_flights lftf
ON replace(concat(md5(tf.ticket_no), md5(tf.flight_id :: varchar)), ' ', '') = replace(concat(lftf.ticket_no , lftf.Hub_flights_Hash_key), ' ', '')
WHERE LENGTH(replace(concat(lftf.ticket_no, lftf.Hub_flights_Hash_key), ' ', '')) = 0),
old AS (SELECT distinct
lftf.ticket_no
, Hub_flights_Hash_key o_flight_id
FROM data_vault.Link_flights_ticket_flights lftf
LEFT JOIN stage.ticket_flights tf
ON replace(concat(md5(tf.ticket_no), md5(tf.flight_id :: varchar)), ' ', '') = replace(concat(lftf.ticket_no , lftf.Hub_flights_Hash_key), ' ', '')
WHERE LENGTH(replace(concat(tf.ticket_no, tf.flight_id), ' ', '')) = 0)
SELECT md5(NEW.ticket_no) ticket_no,
concat(OLD.ticket_no, o_flight_id) AS OLD_ticket_no_flight_id,
o_flight_id AS OLD_flight_id,
concat(NEW.ticket_no, '&', n_flight_id) AS NEW_ticket_no_flight_id,
md5(n_flight_id ::varchar) AS NEW_flight_id,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp AS new_fix_time
FROM old JOIN NEW ON OLD.ticket_no = NEW.ticket_no;
-- SELECT * FROM data_vault.for_lfada_update_time;
-- обновляем время в линке на новое, если сменилось название аэропорта в исходнике
UPDATE data_vault.Link_flights_ticket_flights a
SET load_end_date = flut.new_fix_time
FROM data_vault.for_lfada_update_time flut
WHERE concat(a.ticket_no, a.hub_flights_hash_key) =
flut.OLD_ticket_no_flight_id;
-- добавляем строку в таблицу Link_flights_ticket_flights новую, которую добавили в ticket_flights
INSERT INTO data_vault.Link_flights_ticket_flights (Link_flights_ticket_flights_Hashkey, load_date, load_end_date, record_sourse, Hub_flights_Hash_key, ticket_no, Hub_ticket_flights_Hash_key)
SELECT flut.NEW_ticket_no_flight_id,
to_char(now() - INTERVAL '1 second', 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'flights_ticket_flights',
md5(NEW_flight_id),
md5(ticket_no),
md5(flut.NEW_ticket_no_flight_id)
FROM data_vault.for_lfada_update_time flut;
/*обработка flights_ticket_flights завершена*/
/*########################################
обработка - ticket_flights_boarding_passes
##########################################*/
-- так как данные в таблице связаны с хабами , а обработка линка идет с не зашифрованными данными т.е. без md5, то удалим привязку
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes DROP CONSTRAINT IF EXISTS Hub_bp_fk;
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes DROP CONSTRAINT IF EXISTS Hub_tf_fk;
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
-- проверка на новые данные , если их нет то добавляем
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
WITH FIRST as(SELECT concat(bp.ticket_no, '&', bp.flight_id) AS concat_bp,
concat(tf.ticket_no, '&', tf.flight_id) AS concat_tf,
bp.ticket_no
FROM stage.boarding_passes bp
LEFT JOIN stage.ticket_flights tf USING(ticket_no, flight_id))
SELECT concat_bp,
concat_tf,
hub_boarding_passes_hash_key,
Hub_ticket_flights_Hash_key,
f.ticket_no
FROM FIRST f
LEFT JOIN data_vault.Link_ticket_flights_boarding_passes flut
ON f.concat_bp = Hub_ticket_flights_Hash_key AND f.concat_tf = Hub_boarding_passes_Hash_key
WHERE load_date IS null;
-- вставляем в таблицу обновленные элементы с источника, вставятся все поля если
INSERT INTO data_vault.Link_ticket_flights_boarding_passes (Link_ticket_flights_boarding_passes,
load_date, load_end_date, record_sourse,
ticket_no, Hub_boarding_passes_Hash_key,
Hub_ticket_flights_Hash_key)
SELECT concat(flut.concat_tf , flut.concat_bp),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'ticket_flights_boarding_passes',
md5(ticket_no),
flut.concat_bp,
flut.concat_tf
FROM data_vault.for_lfada_update_time flut;
-- если один из хешей не подгрузился, догружаем его, вытащив из уже известного, так как хеши таблиц boarding_passes и ticket_flights одинаковы
-- обновляем данные столбца ticket_flights_boarding_passes с хешем , если он будет не заполнен
UPDATE data_vault.Link_ticket_flights_boarding_passes a
SET (hub_boarding_passes_hash_key,
Hub_ticket_flights_Hash_key,
link_ticket_flights_boarding_passes) =
(concat(ticket_nom, '&', flight_id),
concat(ticket_nom, '&', flight_id),
concat(concat(ticket_nom, '&', flight_id), concat(ticket_nom, '&', flight_id)))
FROM
(SELECT CASE WHEN
length(CASE WHEN length(Hub_ticket_flights_Hash_key) < 2 OR length(hub_boarding_passes_hash_key) < 2
THEN substring(Hub_ticket_flights_Hash_key FROM 1 FOR position('&' IN Hub_ticket_flights_Hash_key) - 1)
ELSE '0'
END) < 1
THEN substring(hub_boarding_passes_hash_key FROM 1 FOR position('&' IN hub_boarding_passes_hash_key) - 1)
ELSE 'ошибка: необходима проверка данных' END ticket_nom,
CASE WHEN
length(CASE WHEN length(Hub_ticket_flights_Hash_key) < 2 OR length(hub_boarding_passes_hash_key) < 2
THEN substring(Hub_ticket_flights_Hash_key FROM position('&' IN Hub_ticket_flights_Hash_key) + 1)
ELSE '0'
END) < 1
THEN substring(hub_boarding_passes_hash_key FROM position('&' IN hub_boarding_passes_hash_key) + 1 )
ELSE 'ошибка: необходима проверка данных' END flight_id,
link_ticket_flights_boarding_passes,load_date,load_end_date,record_sourse,ticket_no,hub_boarding_passes_hash_key,Hub_ticket_flights_Hash_key
FROM data_vault.Link_ticket_flights_boarding_passes
WHERE length(hub_boarding_passes_hash_key) < 2
OR length(Hub_ticket_flights_Hash_key) < 2) flut
WHERE a.hub_boarding_passes_hash_key =
(concat(flut.ticket_nom, '&', flut.flight_id))
OR a.Hub_ticket_flights_Hash_key = (concat(flut.ticket_nom, '&', flut.flight_id));
-- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные
UPDATE data_vault.Link_ticket_flights_boarding_passes a
SET (hub_boarding_passes_hash_key,
hub_ticket_flights_hash_key) = (md5(b.hub_boarding_passes_hash_key), md5(b.hub_ticket_flights_hash_key))
FROM data_vault.Link_ticket_flights_boarding_passes b
WHERE (a.link_ticket_flights_boarding_passes = b.link_ticket_flights_boarding_passes)
AND (length(Hub_boarding_passes_Hash_key) < 30 OR (length(Hub_ticket_flights_Hash_key) < 30));
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes ADD CONSTRAINT Hub_bp_fk FOREIGN KEY (Hub_boarding_passes_Hash_key) REFERENCES data_vault.Hub_boarding_passes(hash_key);
ALTER TABLE data_vault.Link_ticket_flights_boarding_passes ADD CONSTRAINT Hub_tf_fk FOREIGN KEY (Hub_ticket_flights_Hash_key) REFERENCES data_vault.Hub_ticket_flights(hash_key);
/*обработка ticket_flights_boarding_passes завершена*/
/*################################
обработка - ticket_flights_tickets
##################################*/
/*обработка - обновляем данные в таблице, если в одном из источников данные не сгенирились, то вставляем их из другого источника*/
DROP TABLE IF EXISTS data_vault.for_lfada_update_time;
-- проверка на новые данные , если их нет то добавляем
CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
SELECT *
FROM data_vault.Link_ticket_flights_tickets ltft
FULL JOIN (SELECT concat(concat(tf.ticket_no, '&', tf.flight_id), t.ticket_no) link,
md5(concat(tf.ticket_no, '&', tf.flight_id)) new_hub_ticket_flights_hash_key,
md5(t.ticket_no) T_ticket_no,
t.ticket_no tticket_no,
md5(tf.ticket_no) TF_ticket_no,
tf.ticket_no tfticket_no,
md5(tf.flight_id :: varchar) flight_id,
tf.flight_id tflight_id
FROM stage.ticket_flights tf
full JOIN stage.tickets t
ON t.ticket_no = tf.ticket_no) we
ON we.link = ltft.Link_ticket_flights_tickets
WHERE link_ticket_flights_tickets IS NULL;
-- SELECT * FROM data_vault.for_lfada_update_time;
-- вставка данных в таблицу линк из таблиц источников, если их там нет
INSERT INTO data_vault.Link_ticket_flights_tickets (Link_ticket_flights_tickets, load_date, record_sourse, Hub_ticket_flights_Hash_key, Hub_tickets_Hash_key)
SELECT concat(COALESCE(tticket_no, tfticket_no), '&', CASE WHEN tflight_id :: varchar IS NULL THEN 'нет flight_id в ticket_flights' ELSE tflight_id :: varchar END, COALESCE(tticket_no, tfticket_no)),
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'ticket_flights_tickets',
CASE WHEN length(new_hub_ticket_flights_hash_key) > 2
THEN new_hub_ticket_flights_hash_key
ELSE concat(COALESCE(t_ticket_no, tf_ticket_no), '&', CASE WHEN flight_id :: varchar IS NULL THEN 'нет flight_id в ticket_flights' ELSE flight_id :: varchar END)
END,
COALESCE(t_ticket_no, tf_ticket_no)
FROM data_vault.for_lfada_update_time;
/*обработка ticket_flights_tickets завершена*/
/*#########################
обработка - ticket_bookings
##########################*/
-- если клиент сдал билет, следовательно запись в таблице удалится, а значит необходимо сделать пометку времени
UPDATE data_vault.Link_tickets_bookings a
SET load_end_date = to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp
FROM (WITH
new_data AS (
SELECT concat(ticket_no, book_ref) Link_tickets_bookings ,
ticket_no,
book_ref
FROM stage.tickets t
FULL JOIN stage.bookings b USING(book_ref))
SELECT ltb.Link_tickets_bookings
FROM data_vault.Link_tickets_bookings ltb
left JOIN new_data nd USING (Link_tickets_bookings)
WHERE nd.Link_tickets_bookings IS NULL) b
WHERE a.link_tickets_bookings = b.Link_tickets_bookings;
-- добавляем новые данные из источника в таблицу линк
INSERT INTO data_vault.Link_tickets_bookings (Link_tickets_bookings, load_date, load_end_date, record_sourse, Hub_ticket_Hash_key, Hub_bookings_Hash_key)
WITH new_data AS (
SELECT concat(ticket_no, book_ref) Link_tickets_bookings ,
ticket_no,
book_ref
FROM stage.tickets t
FULL JOIN stage.bookings b USING(book_ref))
SELECT nd.Link_tickets_bookings,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'1111-11-11 11:11:11' :: timestamp,
'ticket_bookings',
md5(nd.ticket_no),
md5(nd.book_ref)
FROM data_vault.Link_tickets_bookings ltb
RIGHT JOIN new_data nd USING (Link_tickets_bookings)
WHERE load_date IS NULL;
/*обработка ticket_bookings завершена*/
/*################################
обработка - flights_aircrafts_data
#################################*/
-- при добавлении новых данных удаляем привязку к хабам, так как обработка идет без хеширования, после обработки привязка будет восстановлена
ALTER TABLE data_vault.Link_flights_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_f_fk;
ALTER TABLE data_vault.Link_flights_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_ad_fk;
-- Насыщение таблицы новыми данными из источников, предусмотрен момент если данные в одном источнике есть, а в другом не подгрузились
INSERT INTO data_vault.Link_flights_aircrafts_data (Link_flights_aircrafts_data_Hashkey, load_date, record_sourse, Hub_flights_Hash_key, Hub_aircrafts_data_Hash_key)
WITH
new_data AS (
SELECT DISTINCT
concat(f.aircraft_code, ad.aircraft_code) link_flights_aircrafts_data_hashkey,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'flights_aircrafts_data',
f.aircraft_code AS f_aircraft_code,
ad.aircraft_code AS ad_aircraft_code
FROM stage.flights f
FULL JOIN stage.aircrafts_data ad
ON f.aircraft_code = ad.aircraft_code)
SELECT CASE WHEN length(lfad.link_flights_aircrafts_data_hashkey) < 5 OR -- тк коды должны быть идентичны, если нет данных в одной таблице , то подтягиваем их из другой
length(nd.link_flights_aircrafts_data_hashkey) < 5 THEN concat(COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code))
ELSE COALESCE(nd.link_flights_aircrafts_data_hashkey, lfad.link_flights_aircrafts_data_hashkey)
END,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'flights_aircrafts_data',
COALESCE(f_aircraft_code, ad_aircraft_code),
COALESCE(f_aircraft_code, ad_aircraft_code)
FROM data_vault.Link_flights_aircrafts_data lfad
RIGHT JOIN new_data nd ON lfad.link_flights_aircrafts_data_hashkey = nd.link_flights_aircrafts_data_hashkey
WHERE lfad.link_flights_aircrafts_data_hashkey IS NULL;
-- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные
UPDATE data_vault.Link_flights_aircrafts_data a
SET (Hub_flights_Hash_key,
Hub_aircrafts_data_Hash_key) = (md5(b.Hub_flights_Hash_key), md5(b.Hub_aircrafts_data_Hash_key))
FROM data_vault.Link_flights_aircrafts_data b
WHERE (a.Link_flights_aircrafts_data_Hashkey = b.Link_flights_aircrafts_data_Hashkey)
AND (length(Hub_flights_Hash_key) < 30 OR (length(Hub_aircrafts_data_Hash_key) < 30));
-- восстанавливаем привязку таблицы к новым данным
ALTER TABLE data_vault.Link_flights_aircrafts_data ADD CONSTRAINT Hub_f_fk FOREIGN KEY (Hub_flights_Hash_key) REFERENCES data_vault.Hub_flights(hash_key);
ALTER TABLE data_vault.Link_flights_aircrafts_data ADD CONSTRAINT Hub_ad_fk FOREIGN KEY (Hub_aircrafts_data_Hash_key) REFERENCES data_vault.Hub_aircrafts_data(hash_key);
/*обработка flights_aircrafts_data завершена*/
/*##############################
обработка - seats_aircrafts_data
###############################*/
-- при добавлении новых данных удаляем привязку к хабам, так как обработка идет без хеширования, после обработки привязка будет восстановлена
ALTER TABLE data_vault.Link_seats_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_ad_fk;
ALTER TABLE data_vault.Link_seats_aircrafts_data DROP CONSTRAINT IF EXISTS Hub_s_fk;
-- Насыщение таблицы новыми данными из источников, предусмотрен момент если данные в одном источнике есть, а в другом не подгрузились
INSERT INTO data_vault.Link_seats_aircrafts_data(Link_seats_aircrafts_data_Hashkey, load_date, record_sourse, Hub_aircrafts_data_Hash_key, Hub_seats_Hash_key)
WITH
new_data AS (
SELECT DISTINCT
concat(ad.aircraft_code, concat(s.aircraft_code, s.seat_no)) Link_seats_aircrafts_data_Hashkey,
ad.aircraft_code AS f_aircraft_code,
concat(s.aircraft_code, s.seat_no) AS ad_aircraft_code
FROM stage.aircrafts_data ad
FULL JOIN stage.seats s USING(aircraft_code))
SELECT
CASE
WHEN length(nd.link_seats_aircrafts_data_hashkey) < 6
THEN concat(COALESCE(f_aircraft_code, ad_aircraft_code), COALESCE(f_aircraft_code, ad_aircraft_code))
ELSE COALESCE(nd.Link_seats_aircrafts_data_Hashkey, lsad.Link_seats_aircrafts_data_Hashkey) END ,
to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp,
'seats_aircrafts_data',
COALESCE(f_aircraft_code, ad_aircraft_code),
COALESCE(f_aircraft_code, ad_aircraft_code)
FROM data_vault.Link_seats_aircrafts_data lsad
RIGHT JOIN new_data nd ON lsad.Link_seats_aircrafts_data_Hashkey = nd.Link_seats_aircrafts_data_Hashkey
WHERE load_date IS null;
-- хеш состоит из 32 символов , обновляем данные у которых длинна символов меньше 30, т.е. при последующих обработках захешированные данные, которые уже лежат в линке не будут подвержены переводу в формат md5. Обработка идет без хеширования, для перевода данных в формат md5 мы берем только те данные, у которых длина строки менее 30, т.е. новые данные
UPDATE data_vault.Link_seats_aircrafts_data a
SET (Hub_aircrafts_data_Hash_key,
Hub_seats_Hash_key) = (md5(b.Hub_aircrafts_data_Hash_key), md5(b.Hub_seats_Hash_key))
FROM data_vault.Link_seats_aircrafts_data b
WHERE (a.Link_seats_aircrafts_data_Hashkey = b.Link_seats_aircrafts_data_Hashkey)
AND (length(a.Hub_seats_Hash_key) < 30 OR (length(a.Hub_aircrafts_data_Hash_key) < 30));
-- восстанавливаем привязку таблицы к новым данным
ALTER TABLE data_vault.Link_seats_aircrafts_data ADD CONSTRAINT Hub_ad_fk FOREIGN KEY (Hub_aircrafts_data_Hash_key) REFERENCES data_vault.Hub_aircrafts_data(hash_key);
ALTER TABLE data_vault.Link_seats_aircrafts_data ADD CONSTRAINT Hub_s_fk FOREIGN KEY (Hub_seats_Hash_key) REFERENCES data_vault.Hub_seats(hash_key);
Хабы и линки обработаны, остались саттелиты.
Для обработки саттелитов была написана функция.
Функция для обработки Satellite
CREATE OR REPLACE FUNCTION data_vault.update_data_from_table(
table_name TEXT , -- название таблицы саттелита
source_table_columns TEXT, -- какие колонки надо вытащить из таблицы источника
source_table_name TEXT, -- имя таблицы откуда тянутся данные
hash TEXT, -- из чего будет состоять хеш
times TEXT, -- время записи данных в таблицу
sourse_col TEXT) -- название источника
RETURNS VOID AS $$
DECLARE
qwery TEXT;
BEGIN -- проверка на наличие информации в параметрах
IF
(table_name IS NULL) OR (source_table_columns IS NULL) OR (source_table_name IS NULL) OR (hash IS NULL) OR (times IS NULL) OR (sourse_col IS NULL)
THEN RAISE EXCEPTION 'Не заполнены параметры';
END IF;
-- добавление только новых записей из источника
qwery:= 'INSERT INTO ' || table_name || ' (' || (regexp_split_to_array(source_table_columns, ','))[1] || ', load_date' || ' ,load_end_date' || ' ,record_sourse , ' || (regexp_split_to_array(source_table_columns, ','))[2] || ') ' ||
'WITH new_data as (SELECT DISTINCT md5(' || hash || ') ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' ,' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM ' || source_table_name || ') ' ||
'SELECT ' || 'nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ', ' || times || ', ' || '''1111-11-11 11:11:11'':: timestamp' || ' , ' || sourse_col || ' , ' || 'nd.' || (regexp_split_to_array(source_table_columns, ','))[2] ||
' FROM ' || table_name || ' tn ' || 'RIGHT JOIN new_data nd ON (' || 'tn.' || (regexp_split_to_array(source_table_columns, ','))[1] || ') = (' || 'nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ')' ||
' WHERE tn.load_date IS NULL;' ||
-- обновление значений и показателя Load_end_date уже имеющихся аттрибутов, при изменении аттрибута в источнике
' DROP TABLE IF EXISTS data_vault.for_lfada_update_time; ' ||
' CREATE TABLE IF NOT EXISTS data_vault.for_lfada_update_time as
WITH
new_data AS (SELECT DISTINCT
md5(' || hash || ') ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' ,' ||
(regexp_split_to_array(source_table_columns, ','))[2] ||
' FROM ' || source_table_name || ' ) ' ||
', need_swap AS (SELECT nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' ,' ||
' nd.' || (regexp_split_to_array(source_table_columns, ','))[2] ||
' FROM ' || table_name || ' tn ' ||
'RIGHT JOIN new_data nd ON (concat(tn.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , tn.' || (regexp_split_to_array(source_table_columns, ','))[2] || ')) = (concat(nd.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , nd.' || (regexp_split_to_array(source_table_columns, ','))[2] || ')) ' ||
' WHERE tn.load_date IS NULL)' ||
' SELECT ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , ' ||
' load_date, load_end_date, record_sourse, need_swap.' || (regexp_split_to_array(source_table_columns, ','))[2] ||
' FROM ' || table_name || ' nd' ||
' JOIN need_swap USING(' || (regexp_split_to_array(source_table_columns, ','))[1] || '); ' ||
-- обновление поля смены даты load_end_date
' UPDATE ' || table_name || ' firsts ' ||
' SET load_end_date = ' || times ||
' FROM data_vault.for_lfada_update_time flut ' ||
' WHERE flut.' || (regexp_split_to_array(source_table_columns, ','))[1] || ' = firsts.' || (regexp_split_to_array(source_table_columns, ','))[1] ||' ;'
-- добавление новой записи
' INSERT INTO ' || table_name || ' (' || (regexp_split_to_array(source_table_columns, ','))[1] || ', load_date' || ' ,load_end_date' || ' ,record_sourse , ' || (regexp_split_to_array(source_table_columns, ','))[2] || ') ' ||
' SELECT ' || (regexp_split_to_array(source_table_columns, ','))[1] || ' , ' ||
times || ',' || ' load_end_date, record_sourse, ' || (regexp_split_to_array(source_table_columns, ','))[2] || ' FROM data_vault.for_lfada_update_time;';
EXECUTE qwery;
END;
$$ LANGUAGE plpgsql;
Обозначим обработку саттелитов.
Обработка Satellite
-- Satellite_aircrafts_data
--#####################################################################################################
/*Аircrafts_data_range*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_aircrafts_data_range' , -- название таблицы саттелита
'Hub_aircrafts_data_Hash_key,range', -- какие колонки надо вытащить из таблицы источника
'stage.aircrafts_data', -- имя таблицы откуда тянутся данные
'aircraft_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''aircrafts_data''');
-- SELECT * FROM data_vault.Sattelite_aircrafts_data_range;
/*Аircrafts_data_model*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_aircrafts_data_model' , -- название таблицы саттелита
'Hub_aircrafts_data_Hash_key,model', -- какие колонки надо вытащить из таблицы источника
'stage.aircrafts_data', -- имя таблицы откуда тянутся данные
'aircraft_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''aircrafts_data''');
-- SELECT * FROM data_vault.Sattelite_aircrafts_data_model;
-- Sattelite_seats
--#####################################################################################################
/*fare_conditions*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_seats_fare_conditions' ,-- название таблицы саттелита
'Hub_seats_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника
'stage.seats', -- имя таблицы откуда тянутся данные
'concat(aircraft_code, seat_no)', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''seats''');
-- SELECT * FROM data_vault.Sattelite_seats_fare_conditions;
-- Sattelite_airoport_data
--#####################################################################################################
/*timezone*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_timezone' ,-- название таблицы саттелита
'Hub_airport_data_Hash_key,timezone', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_timezone;
/*coordinates*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_coordinates' , -- название таблицы саттелита
'Hub_airport_data_Hash_key,coordinates', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_coordinates;
/*airport_name*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_airport_name' , -- название таблицы саттелита
'Hub_airport_data_Hash_key,airport_name', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_airport_name;
/*city*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_airport_data_city' , -- название таблицы саттелита
'Hub_airport_data_Hash_key,city', -- какие колонки надо вытащить из таблицы источника
'stage.airports_data', -- имя таблицы откуда тянутся данные
'airport_code::varchar', -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, --время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_airport_data_city;
-- Sattelite_ticket_flights
--#####################################################################################################
/*amount*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_ticket_flights_amount' , -- название таблицы саттелита
'Hub_ticket_flights_Hash_key,amount', -- какие колонки надо вытащить из таблицы источника
'stage.ticket_flights', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_ticket_flights_amount;
/*fare_conditions*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_ticket_flights_fare_conditions' , -- название таблицы саттелита
'Hub_ticket_flights_Hash_key,fare_conditions', -- какие колонки надо вытащить из таблицы источника
'stage.ticket_flights', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''airoport_data''');
-- SELECT * FROM data_vault.Sattelite_ticket_flights_fare_conditions;
-- Sattelite_boarding_passes
--#####################################################################################################
/*boarding_no*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_boarding_passes_boarding_no' , -- название таблицы саттелита
'Hub_boarding_passes_Hash_key,boarding_no', -- какие колонки надо вытащить из таблицы источника
'stage.boarding_passes', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''boarding_passes''');
-- SELECT * FROM data_vault.Sattelite_boarding_passes_boarding_no;
/*seat_no*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_boarding_passes_seat_no' , -- название таблицы саттелита
'Hub_boarding_passes_Hash_key,seat_no', -- какие колонки надо вытащить из таблицы источника
'stage.boarding_passes', -- имя таблицы откуда тянутся данные
$$(concat(ticket_no, '&', flight_id))::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''boarding_passes''');
-- SELECT * FROM data_vault.Sattelite_boarding_passes_seat_no;
-- Sattelite_tickets
--#####################################################################################################
/*book_ref*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_book_ref' , -- название таблицы саттелита
'Hub_ticket_Hash_key,book_ref', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_book_ref;
/*contact_data*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_contact_data' , -- название таблицы саттелита
'Hub_ticket_Hash_key,contact_data', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_contact_data;
/*passenger_id*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_passenger_id' , -- название таблицы саттелита
'Hub_ticket_Hash_key,passenger_id', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_passenger_id;
/*passenger_name*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_tickets_passenger_name' , -- название таблицы саттелита
'Hub_ticket_Hash_key,passenger_name', -- какие колонки надо вытащить из таблицы источника
'stage.tickets', -- имя таблицы откуда тянутся данные
$$(ticket_no)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''tickets''');
-- SELECT * FROM data_vault.Sattelite_tickets_passenger_name;
-- Sattelite_bookings
--#####################################################################################################
/*book_date*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_bookings_book_date' , -- название таблицы саттелита
'Hub_bookings_Hash_key,book_date', -- какие колонки надо вытащить из таблицы источника
'stage.bookings', -- имя таблицы откуда тянутся данные
$$(book_ref)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''bookings''');
-- SELECT * FROM data_vault.Sattelite_bookings_book_date;
/*total_amount*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_bookings_total_amount' , -- название таблицы саттелита
'Hub_bookings_Hash_key,total_amount', -- какие колонки надо вытащить из таблицы источника
'stage.bookings', -- имя таблицы откуда тянутся данные
$$(book_ref)::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''bookings''');
-- SELECT * FROM data_vault.Sattelite_bookings_total_amount;
-- Sattelite_flights
--#####################################################################################################
/*flight_no*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_flight_no' , -- название таблицы саттелита
'Hub_flights_Hash_key,flight_no', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_flight_no;
/*scheduled_departure*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_scheduled_departure' , -- название таблицы саттелита
'Hub_flights_Hash_key,scheduled_departure', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_scheduled_departure;
/*scheduled_arrival*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_scheduled_arrival' , -- название таблицы саттелита
'Hub_flights_Hash_key,scheduled_arrival', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_scheduled_arrival;
/*departure_airport*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_departure_airport' , -- название таблицы саттелита
'Hub_flights_Hash_key,departure_airport', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_departure_airport;
/*arrival_airport*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_arrival_airport' , -- название таблицы саттелита
'Hub_flights_Hash_key,arrival_airport', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_arrival_airport;
/*status*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_status' , -- название таблицы саттелита
'Hub_flights_Hash_key,status', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_status;
/*aircraft_code*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_aircraft_code' , -- название таблицы саттелита
'Hub_flights_Hash_key,aircraft_code', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_aircraft_code;
/*actual_departure*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_actual_departure' , -- название таблицы саттелита
'Hub_flights_Hash_key,actual_departure', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_actual_departure;
/*actual_arrival*/
SELECT data_vault.update_data_from_table('data_vault.Sattelite_flights_actual_arrival' , -- название таблицы саттелита
'Hub_flights_Hash_key,actual_arrival', -- какие колонки надо вытащить из таблицы источника
'stage.flights', -- имя таблицы откуда тянутся данные
$$flight_id::varchar$$, -- хеш
$$to_char(now(), 'YYYY-MM-DD HH24:MI:SS') :: timestamp$$, -- время залития
'''flights''');
-- SELECT * FROM data_vault.Sattelite_flights_actual_arrival;
На этом этапе можно четко сказать, что у нас построена схема модели данных DataVault составленная в тулзе гугла. Наполнены хабы, линки и сателлиты и прописан код к их обработке. Можно строить витрины!