Всем привет.

Речь пойдет о методике ETL-процесса. Очень мало информации об этой важной области работы с данными. Я много раз видел на проектах, что в командах нет общего не то что фреймворка, а даже методики по загрузке данных. Либо есть, но у каждого разработчика своя. И те, что есть, не универсальны и не учитывают некоторые реперные точки. Иногда даже журнал загрузок не ведется. Созрела идея создать, перенести свою методику в питон код и поделиться. Начнем.

Когда мы создаем ETL- процессы основные требования: идемпотентность, обработка ошибок, целостность данных, инкрементальность, производительность, мониторинг, происхождение данных, и тд. Может еще что, но и этого достаточно.

Основные понятийные объекты в тексте: tech_query_id, tech_batch_id.

tech_query_id – является штампом момента запуска pipeline, вручную или автоматически, не важно. Если запустили загрузку для терабайт данных, то и штамп будет всего один для этой конкретной загрузки. Штамп tech_query_id проставляется в каждой строке в самих таблицах с данными цели и в журнале. А так же проставляется tech_batch_id, который является обычным номером пакета. Понятно зачем пакеты.., если данных миллионы, мы их бьем по пакетам. Номер пакета для каждого request_id  начинается с единицы. Эта комбинация является ключом в таб��ицах с данными для трассировки происхождения и ключом в таблице журнала загрузок для фиксации метаданных, дельтапоинта и любой полезной информации в json поле.

Скрытый текст
Таблица с техническими колонками
Таблица с техническими колонками
Журнал загрузок - два пакета, две строки
Журнал загрузок - два пакета, две строки

Это и есть основная главная мощнейшая идея. Дальше только навеска, развитие и кайф от порядка. Опробована на 100-х миллионов строк и 100-х Гб в постгрес, кликхаус. Полет нормальный.
Подробнее про потоки. Есть два типа тасков в потоке: интеграционный, когда данные грузятся из одной БД в другую, и дистрибутивный тип, когда данные распространяются внутри одной БД по слоям.
Интеграционный – работает по дельте(инкрементальная загрузка) бизнес данных, не делает преобразования, просто грузит как есть в базу данных цель. В целевую таблицу добавляет две технические колонки, номер запроса и номер пакета, то есть привязываем конкретный датасет к записи в журнале по ключу запрос, пакет.
Дистрибутивный – тоже дельта, но уже по пакетам. Мы по журналу смотрим какие пакеты еще не загружены в выбранную цель и грузим их. Таким образом можно распространять данные во многие цели по слоям, делать трансформации SQL, dbt и видеть куда улетел конкретный пакет по ключу tech_query_id, tech_batch_id.  Обеспечиваем происхождение данных.

Схема потоков и пакетов
Схема потоков и пакетов

Ниже в скрытом тексте пример таблиц с данными классической схемы медальон с журналом. В таблицах отображены ситуации, описанные в данной методике. В бронзу пришло как есть с дублем и временем в виде int8, и присвоенным ключом пакета. Данных много, пакетов 100 шт. В журнале фиксируются метаданные для пакета: время начала/завершения загрузки, источник, цель, условия для дельты и дополнительная информация в виде json.  Далее в сильвер средствами SQL производится трансформация: изменен тип времени и убран дубль. Все тот же пакет, только метаданные в журнале второй строкой и уже для сильвер слоя. Далее все то же - загрузка в витрину, с агрегацией до часа.

Скрытый текст
Слой bronze - данные в сыром виде
Слой bronze - данные в сыром виде
Слой silver - данные в детальном дедублицированом виде, изменены типы
Слой silver - данные в детальном дедублицированом виде, изменены типы
Слой gold - одна из витрин, данные агрегированном до часа виде
Слой gold - одна из витрин, данные агрегированном до часа виде
Журнал загрузок - источник, цель, json колонка для условий выборки
Журнал загрузок - источник, цель, json колонка для условий выборки

Удалив конкретный пакет из журнала, можно его автоматически перезагрузить. Но нужно это делать осторожно, соблюдая историческую последовательность.

Данный подход превращает ETL в прозрачную, управляемую систему. Благодаря единой сквозной нумерации, вы всегда можете провести аудит. Внедрение такой методики позволило нам стабильно обрабатывать данные в PostgreSQL и ClickHouse без потери контроля над происхождением данных.

Можно ознакомиться с реализацией ядра на python в моем открытом репозитории: Ссылка на репозиторий.

Цель статьи не сам репозиторий, а методика. Не судите строго. Сейчас в нем реализована полноценная поддержка PostgreSQL, ClickHouse. Передача данных между различными базами данных реализована через бинарные потоки в оперативной памяти (in-memory). Благодаря этому ядро остается универсальным и не зависит от специфики конкретных СУБД. Все трансформации данных происходят средствами SQL внутри цели.

Обратите внимание: репозиторий содержит только ядро системы. Бизнес-логику (расчет дельты, логику разбиения на пакеты и настройки коннекторов) необходимо реализовывать отдельно, используя ядро как фундамент. То есть это как питоновский пакет.

Буду рад конструктивной критике, вопросам и вашим кейсам применения подобных подходов. Хотелось бы обсудить, поспорить, чтобы доработать и улучшить фреймворк.