Pull to refresh

SSDF — велосипед для ETL на SQL

Level of difficultyMedium
Reading time5 min
Views394
ещё один велосипед автора
ещё один велосипед автора


Речь пойдет о моем пет-проекте - SSDF (super SQL data flow).

Когда-то я работал в одной компании, у нас было ХД на MSSQL и самописный велосипед для организации ETL/data flow; так и назывался - dataflow.

Выглядел он следующим образом (если описывать вкратце).

Пункт загрузки описывался как одна строка в таблице, основные поля - источник и назначение, например, источник - это view, назначение всегда таблица, ещё давайте упомянем поле горизонта загрузки. Dataflow генерировала и выполняла (для типа view) код удаления из назначения по горизонту и код вставки из вью. Были, конечно, и другие типы - процедура, скрипт, более сложные.

Таблица этих пунктов и являла собой поток данных, и, порядок прописывался жестко, так же был параллелизм (два и более под одним номером n выполняются одновременно).

Главное, что я из этого вынес - что так можно работать и работать вполне неплохо.

Плюсы:

  1. развертывается такая система просто как SQL код на сервере БД

  2. Она на SQL на котором пишет её пользователь, ему её легко понять и может даже модифицировать

  3. работа через код + автоматизация некоторых действий по обслуживанию.
    Разработка - просто инсерт/апдейт таблицы.
    Можно обновлять статистику для назначения (такой функционал был, да)

  4. логи выполнения

  5. история изменений (на триггерах)

В общем, мне это так нравилось, что я всегда хотел работать через dataflow, а если мне что-то не хватало - дорабатывать.

И вот, спустя какое‑то время, изучая PostgreSQL, я вернулся к идее разработать свою dataflow, с прибамбасами. Нужно же что‑то писать, чтобы изучать.

Основное, что не устраивала в имеющейся системе — очередность и параллелизм. Если первый раз очередность вы пропишите хорошо, то со временем она превращается в черти‑что, да и параллелизм вида «1, n‑параллельно — 2, 3, 4, k‑параллельно..» не самый лучший. Плюс, если что‑то падает или есть какие‑то разовые изменения (регламентные пропуски в загрузке данных из определенных источников, из‑за работ; или наоборот — источник интегрировал в себя новую систему и сейчас вываливает вам её данные в целом) — то очередность будет не оптимальная да и вообще — надо будет вмешиваться руками, чтобы остановить загрузку того, что идёт за тем чего на этот раз не было.

Поэтому я придумал такую штуку — описывать поток данных через зависимости между пунктами.

У каждого пункта есть назначения в качестве зависимостей.

А порядок вычисляем в моменте — то есть, то что без зависимостей идёт по 1м номером, все что зависит от 1го идёт под 2м, всё что зависит от 1+2 — под 3м и т. д.

Давайте уже вставим картинку и посмотрим как это выглядит:

Как видим, выглядит достаточно просто и логично.

Переходим к описанию велосипеда

(чтобы поставить себе, нужно завести в PostgreSQL базу “demo” и скачать с гитхаба 2 скрипта и применить их. Да, не забудьте поставить необходимые расширения. В гите всё написано)

Базово, расчет порядка выполнения реализован через вью:
select * from df.vw_flow;

для просмотра потока данных удобно им же и пользоваться, поскольку есть порядок, а основную таблицу df.flow использовать только для записи потока данных
Как можно обратить внимание, порядок реализован двумя полями vw_ordr и vw_ordr_max. Первое поле означает минимальный номер, по порядку которого пункт может быть выполнен, второе максимальный номер.

Зачем это нужно?

Для этого обратимся к параллелизму, как он работает. Есть ресурсы сервера, которые не бесконечные, поэтому степень параллелизма нужно ограничивать. Но как это делать? Можно задать втупую (думаю, и такой вариант нужен). А можно запрашивать необходимые ресурсы для каждого пункты перед его выполнением. Или ожидать, если их нет.

Так вот, программа, помимо прочего, имеет приоритет пунктов. Означает это, что в условиях того что пункты оказываются на одной стадии, в случае нехватки ресурсов будет выполнятся в первую очередь приоритетный пункт.

Из-за этого пункт может смещаться (откладываться) и для этого и нужно поле vw_ordr_max - чтобы не отложить его выше зависимых от него пунктов.

Есть два режима выполнения потока — strict и full. Первый подразумевает что все зависимости были выполнены, для запуска пункта, второй — хотя бы одна.

И отчетные даты. Каждое назначение имеет готовность за отчетную дату, это помимо того что полезно само по себе; используется для выполнения потока. Если зависимость находится вне исполняемого потока (например, он ограничен указанным процессом), то если эта зависимость «свежее» по отчетной дате чем назначение — это засчитывается за выполнение.

и что?
а ничего
нужно просто описать поток и поставить его на выполнение. Пример потока уже вы видели на картинке выше. Выполнение выглядит так:

call df.exec_flow('strict');

call df.exec_flow('full');

call df.exec_flow('strict', array['Collection']);

Смотреть за выполнением можно в таблицах df.batch_log и df.exec_log

Особенно интересна в этом плане первая таблица, так как туда помещается рассчитанный поток выполнения, там же в случае ошибок выключаются пункты. Вторая таблица - это лог выполнения сгенерированного SQL кода.

Предположим, поток был выполнен командой:

call df.exec_flow('full', array['Collection']);

у нас такой лог:
select bl.id_batch , bl.id, f."source", f.dest, bl.vw_ordr, bl.vw_ordr_max, bl.priority, bl.executed, bl.startdate, bl.enabled, bl.job
from df.batch_log bl
join df.flow f on bl.id = f.id
where bl.id_batch = (select max(bl2.id_batch ) from df.batch_log bl2 )
order by startdate, bl.vw_ordr, bl.priority, bl.vw_ordr_max;

по startdate мы видим, в каком порядке выполнялись пункты.

Что можно ещё отметить:

  • запрос ресурсов я ещё не реализовал. Реализовал структуру под это, нужно лишь написать содержимое двух функций - и программа будет работать. Сейчас ресурсы бесконечные. Думаю на двумя путями реализации - туманным на основе реальной оценки реальных ресурсов и более реальным но несколько муторным в эксплуатации - на основе весов (оценить пункты по потреблению ресурсов и общее количество ресурсов, ресурсы прям так и рассматривать: ЦПУ, память, псп каждого диска. Хотя тоже момент - количество строк может быть разным сегодня и завтра.. эх..)

  • возможность реализации более сложных типов. Например, для dataflow в компании я реализовывал обновление справочника с апдейтом только изменившихся строк и сохранением даты первоначальной загрузки (дата обновления также имеется)

Статус проекта: sample project

Можно скачать, погонять. Главное - возможность ознакомится с идеей.

Скорее всего механизм вычисления потока ещё требует отладки. Типы загрузки требуют доработки, например в процедуру не передается горизонт, вопрос, нужно ли делать ли что-то с полем партицирования (в dataflow оно обрабатывалось отдельно, с другой стороны - можно партицировать по cond_col или навешивать простой констрейнт. Можно наверное отдельно - например, если партиции никак не связать с условием, а сортировать по партициям неохота, можно в цикле обработать каждую партицию. Я такое делал, только ручкам. Прирост, что называется, был :-)

В принципе все, как мне кажется я описал свой проект. Можно подытожить.

Почему я написал проект и статью - интересно узнать, как пипл смотрит на такой способ работы с  потоком данных, насколько интересен такой проект. Какие фичи нужны. Подробно можно посмотреть на гитхабе (ссылка в начале).

Tags:
Hubs:
Rating0
Comments2

Articles