
Речь пойдет о моем пет-проекте - SSDF (super SQL data flow).
Когда-то я работал в одной компании, у нас было ХД на MSSQL и самописный велосипед для организации ETL/data flow; так и назывался - dataflow.
Выглядел он следующим образом (если описывать вкратце).
Пункт загрузки описывался как одна строка в таблице, основные поля - источник и назначение, например, источник - это view, назначение всегда таблица, ещё давайте упомянем поле горизонта загрузки. Dataflow генерировала и выполняла (для типа view) код удаления из назначения по горизонту и код вставки из вью. Были, конечно, и другие типы - процедура, скрипт, более сложные.
Таблица этих пунктов и являла собой поток данных, и, порядок прописывался жестко, так же был параллелизм (два и более под одним номером n выполняются одновременно).
Главное, что я из этого вынес - что так можно работать и работать вполне неплохо.
Плюсы:
развертывается такая система просто как SQL код на сервере БД
Она на SQL на котором пишет её пользователь, ему её легко понять и может даже модифицировать
работа через код + автоматизация некоторых действий по обслуживанию.
Разработка - просто инсерт/апдейт таблицы.
Можно обновлять статистику для назначения (такой функционал был, да)логи выполнения
история изменений (на триггерах)
В общем, мне это так нравилось, что я всегда хотел работать через dataflow, а если мне что-то не хватало - дорабатывать.
И вот, спустя какое‑то время, изучая PostgreSQL, я вернулся к идее разработать свою dataflow, с прибамбасами. Нужно же что‑то писать, чтобы изучать.
Основное, что не устраивала в имеющейся системе — очередность и параллелизм. Если первый раз очередность вы пропишите хорошо, то со временем она превращается в черти‑что, да и параллелизм вида «1, n‑параллельно — 2, 3, 4, k‑параллельно..» не самый лучший. Плюс, если что‑то падает или есть какие‑то разовые изменения (регламентные пропуски в загрузке данных из определенных источников, из‑за работ; или наоборот — источник интегрировал в себя новую систему и сейчас вываливает вам её данные в целом) — то очередность будет не оптимальная да и вообще — надо будет вмешиваться руками, чтобы остановить загрузку того, что идёт за тем чего на этот раз не было.
Поэтому я придумал такую штуку — описывать поток данных через зависимости между пунктами.
У каждого пункта есть назначения в качестве зависимостей.
А порядок вычисляем в моменте — то есть, то что без зависимостей идёт по 1м номером, все что зависит от 1го идёт под 2м, всё что зависит от 1+2 — под 3м и т. д.
Давайте уже вставим картинку и посмотрим как это выглядит:

Как видим, выглядит достаточно просто и логично.
Переходим к описанию велосипеда
(чтобы поставить себе, нужно завести в PostgreSQL базу “demo” и скачать с гитхаба 2 скрипта и применить их. Да, не забудьте поставить необходимые расширения. В гите всё написано)
Базово, расчет порядка выполнения реализован через вью:select * from df.vw_flow
;
для просмотра потока данных удобно им же и пользоваться, поскольку есть порядок, а основную таблицу df.flow использовать только для записи потока данных
Как можно обратить внимание, порядок реализован двумя полями vw_ordr и vw_ordr_max. Первое поле означает минимальный номер, по порядку которого пункт может быть выполнен, второе максимальный номер.
Зачем это нужно?
Для этого обратимся к параллелизму, как он работает. Есть ресурсы сервера, которые не бесконечные, поэтому степень параллелизма нужно ограничивать. Но как это делать? Можно задать втупую (думаю, и такой вариант нужен). А можно запрашивать необходимые ресурсы для каждого пункты перед его выполнением. Или ожидать, если их нет.
Так вот, программа, помимо прочего, имеет приоритет пунктов. Означает это, что в условиях того что пункты оказываются на одной стадии, в случае нехватки ресурсов будет выполнятся в первую очередь приоритетный пункт.
Из-за этого пункт может смещаться (откладываться) и для этого и нужно поле vw_ordr_max - чтобы не отложить его выше зависимых от него пунктов.
Есть два режима выполнения потока — strict и full. Первый подразумевает что все зависимости были выполнены, для запуска пункта, второй — хотя бы одна.
И отчетные даты. Каждое назначение имеет готовность за отчетную дату, это помимо того что полезно само по себе; используется для выполнения потока. Если зависимость находится вне исполняемого потока (например, он ограничен указанным процессом), то если эта зависимость «свежее» по отчетной дате чем назначение — это засчитывается за выполнение.
и что?
а ничего
нужно просто описать поток и поставить его на выполнение. Пример потока уже вы видели на картинке выше. Выполнение выглядит так:
call df.exec_flow('strict');
call df.exec_flow('full');
call df.exec_flow('strict', array['Collection']);
Смотреть за выполнением можно в таблицах df.batch_log и df.exec_log
Особенно интересна в этом плане первая таблица, так как туда помещается рассчитанный поток выполнения, там же в случае ошибок выключаются пункты. Вторая таблица - это лог выполнения сгенерированного SQL кода.
Предположим, поток был выполнен командой:
call df.exec_flow('full', array['Collection']);
у нас такой лог:select
bl.id
_batch ,
bl.id
, f."source", f.dest, bl.vw_ordr, bl.vw_ordr_max, bl.priority, bl.executed, bl.startdate, bl.enabled, bl.job
from df.batch_log bl
join df.flow f on
bl.id
=
f.id
where
bl.id
_batch = (select max(bl2.id_batch ) from df.batch_log bl2 )
order by startdate, bl.vw_ordr, bl.priority, bl.vw_ordr_max;

по startdate мы видим, в каком порядке выполнялись пункты.
Что можно ещё отметить:
запрос ресурсов я ещё не реализовал. Реализовал структуру под это, нужно лишь написать содержимое двух функций - и программа будет работать. Сейчас ресурсы бесконечные. Думаю на двумя путями реализации - туманным на основе реальной оценки реальных ресурсов и более реальным но несколько муторным в эксплуатации - на основе весов (оценить пункты по потреблению ресурсов и общее количество ресурсов, ресурсы прям так и рассматривать: ЦПУ, память, псп каждого диска. Хотя тоже момент - количество строк может быть разным сегодня и завтра.. эх..)
возможность реализации более сложных типов. Например, для dataflow в компании я реализовывал обновление справочника с апдейтом только изменившихся строк и сохранением даты первоначальной загрузки (дата обновления также имеется)
Статус проекта: sample project
Можно скачать, погонять. Главное - возможность ознакомится с идеей.
Скорее всего механизм вычисления потока ещё требует отладки. Типы загрузки требуют доработки, например в процедуру не передается горизонт, вопрос, нужно ли делать ли что-то с полем партицирования (в dataflow оно обрабатывалось отдельно, с другой стороны - можно партицировать по cond_col или навешивать простой констрейнт. Можно наверное отдельно - например, если партиции никак не связать с условием, а сортировать по партициям неохота, можно в цикле обработать каждую партицию. Я такое делал, только ручкам. Прирост, что называется, был :-)
В принципе все, как мне кажется я описал свой проект. Можно подытожить.
Почему я написал проект и статью - интересно узнать, как пипл смотрит на такой способ работы с потоком данных, насколько интересен такой проект. Какие фичи нужны. Подробно можно посмотреть на гитхабе (ссылка в начале).