Всем привет.
Речь пойдет о методике ETL-процесса. Очень мало информации об этой важной области работы с данными. Я много раз видел на проектах, что в командах нет общего не то что фреймворка, а даже методики по загрузке данных. Либо есть, но у каждого разработчика своя. И те, что есть, не универсальны и не учитывают некоторые реперные точки. Иногда даже журнал загрузок не ведется. Созрела идея создать, перенести свою методику в питон код и поделиться. Начнем.
Когда мы создаем ETL- процессы основные требования: идемпотентность, обработка ошибок, целостность данных, инкрементальность, производительность, мониторинг, происхождение данных, и тд. Может еще что, но и этого достаточно.
Основные понятийные объекты в тексте: tech_query_id, tech_batch_id.
tech_query_id – является штампом момента запуска pipeline, вручную или автоматически, не важно. Если запустили загрузку для терабайт данных, то и штамп будет всего один для этой конкретной загрузки. Штамп tech_query_id проставляется в каждой строке в самих таблицах с данными цели и в журнале. А так же проставляется tech_batch_id, который является обычным номером пакета. Понятно зачем пакеты.., если данных миллионы, мы их бьем по пакетам. Номер пакета для каждого request_id начинается с единицы. Эта комбинация является ключом в таб��ицах с данными для трассировки происхождения и ключом в таблице журнала загрузок для фиксации метаданных, дельтапоинта и любой полезной информации в json поле.
Скрытый текст


Это и есть основная главная мощнейшая идея. Дальше только навеска, развитие и кайф от порядка. Опробована на 100-х миллионов строк и 100-х Гб в постгрес, кликхаус. Полет нормальный.
Подробнее про потоки. Есть два типа тасков в потоке: интеграционный, когда данные грузятся из одной БД в другую, и дистрибутивный тип, когда данные распространяются внутри одной БД по слоям.
Интеграционный – работает по дельте(инкрементальная загрузка) бизнес данных, не делает преобразования, просто грузит как есть в базу данных цель. В целевую таблицу добавляет две технические колонки, номер запроса и номер пакета, то есть привязываем конкретный датасет к записи в журнале по ключу запрос, пакет.
Дистрибутивный – тоже дельта, но уже по пакетам. Мы по журналу смотрим какие пакеты еще не загружены в выбранную цель и грузим их. Таким образом можно распространять данные во многие цели по слоям, делать трансформации SQL, dbt и видеть куда улетел конкретный пакет по ключу tech_query_id, tech_batch_id. Обеспечиваем происхождение данных.

Ниже в скрытом тексте пример таблиц с данными классической схемы медальон с журналом. В таблицах отображены ситуации, описанные в данной методике. В бронзу пришло как есть с дублем и временем в виде int8, и присвоенным ключом пакета. Данных много, пакетов 100 шт. В журнале фиксируются метаданные для пакета: время начала/завершения загрузки, источник, цель, условия для дельты и дополнительная информация в виде json. Далее в сильвер средствами SQL производится трансформация: изменен тип времени и убран дубль. Все тот же пакет, только метаданные в журнале второй строкой и уже для сильвер слоя. Далее все то же - загрузка в витрину, с агрегацией до часа.
Скрытый текст




Удалив конкретный пакет из журнала, можно его автоматически перезагрузить. Но нужно это делать осторожно, соблюдая историческую последовательность.
Данный подход превращает ETL в прозрачную, управляемую систему. Благодаря единой сквозной нумерации, вы всегда можете провести аудит. Внедрение такой методики позволило нам стабильно обрабатывать данные в PostgreSQL и ClickHouse без потери контроля над происхождением данных.
Можно ознакомиться с реализацией ядра на python в моем открытом репозитории: Ссылка на репозиторий.
Цель статьи не сам репозиторий, а методика. Не судите строго. Сейчас в нем реализована полноценная поддержка PostgreSQL, ClickHouse. Передача данных между различными базами данных реализована через бинарные потоки в оперативной памяти (in-memory). Благодаря этому ядро остается универсальным и не зависит от специфики конкретных СУБД. Все трансформации данных происходят средствами SQL внутри цели.
Обратите внимание: репозиторий содержит только ядро системы. Бизнес-логику (расчет дельты, логику разбиения на пакеты и настройки коннекторов) необходимо реализовывать отдельно, используя ядро как фундамент. То есть это как питоновский пакет.
Буду рад конструктивной критике, вопросам и вашим кейсам применения подобных подходов. Хотелось бы обсудить, поспорить, чтобы доработать и улучшить фреймворк.
