Вас приветствует команда Data Sapience, и в сегодняшней публикации мы расскажем о реализации процедурного расширения для работы с MPP-движками Lakehouse-платформы данных Data Ocean Nova, которое стало доступным для пользователей.

В материале пойдет речь о возможностях, применимости и сценариях использования процедурного языка в аналитической платформе данных.
Мотивация
Наличие процедурного расширения в legacy-СУБД, таких как Oracle, MS SQL Server или Postgres\Greenplum, является довольно распространенной причиной не рассматривать Lakehouse-решения при выборе новой системы аналитического хранилища данных. Не каждый даже в мыслях готов представить, как процедурный SQL-код, который создавался и использовался годами в масштабах предприятия, в один момент станет недоступным. Вся экспертиза и накопленный опыт в момент обнулятся, и придется начинать все с начала, создавая новые пайплайны запуска и обработки данных с применением, например, airflow и dbt или других аналогов, позволяющих реализовать условные операторы или генерацию SQL-кода.
Признаться, я сам считаю такой подход правильным: использовать платформо-независимые инструменты генерации и оркестрации вроде Airflow, тем более если новый фреймворк создавать максимально не завязанным на очередной SQL диалект. Если в будущем через несколько лет вы решите сменить технологию еще раз, то это будет сделать гораздо проще и быстрее.
В то же время, очевидно, что разработка нового фреймворка потребует значительное время, ресурсы и знания в новой технической области. Кроме того, даже после перехода на стек airflow/dbt часть функциональных возможностей, которые есть у процедурных расширений, все равно останутся нереализуемыми. И главное, чего недостает по словам самих пользователей, – возврат результирующего датасета как результата вызова процедурного кода на клиентскую сторону ждет та же участь.
Представьте, что у вас есть несколько десятков или даже сотен сервисов в legacy системе-потребителе данных из КХД, работающих по следующему принципу:
Клиентская стороны вызывает процедуру с набором сессионных переменных;
Внутри процедуры происходит генерация исполняемого кода в зависимости от набора входных переменных;
SQL-код выполняет DML-операции по выборке и расчету данных, созданию временных объектов, обновлению перманентных объектов БД как результат всех манипуляций;
На клиентскую сторону возвращается результирующий SELECT/CURSOR перманентного объекта БД.
Именно с такой ситуацией столкнулись наши пользователи, и проблема усугубилась тем, что владельцы с��рвисов-потребителей не могут или не готовы менять на своей стороне устоявшийся годами функционал.
Вроде бы с выходом Spark 4 и добавлением его в состав платформы данных процедурное расширение появилось, и, казалось бы, вот оно счастье. Но, с другой стороны, Spark все же не про BI-доступ с быстрым откликом и высокой производительностью.
В итоге нам ничего не оставалось, как взяться за реализацию процедурного расширения для MPP SQL-движков платформы Impala и StarRocks.
Возможности процедурного расширения Nova Lakehouse Procedure SQL (LPSQL)
*Информация о текущих возможностях актуальна на момент публикации
В первом релизе процедурного расширения доступен следующий функционал:
Сохранение хранимых процедур в метасловаре системы для их последующего вызова;
Параметризация запуска хранимых процедур (входные параметры);
Работа с переменными: объявление, присвоение значения, в том числе из SQL-запросов;
Динамический SQL через конструкцию EXECUTE;
Работа с числовыми циклами FOR LOOP и WHILE END;
Работа с явными и неявными курсорами;
Возврат переменных из процедуры на клиентскую часть;
Возврат датасета через SELECT из процедуры на клиентскую часть;
Специальные функции для работы со словарем данных для использования в генерации динамических запросов;
Условные операторы;
Обработка исключений.
Примеры использования
Давайте посмотрим, как это выглядит на практике, и соберем несколько примеров, демонстрирующих основные возможности.
CREATE PROCEDURE demo_proc(IN X INT)
--PRC Example 1
--Declare vars
DECLARE row_nums INT;
BEGIN
--Set up session parameters
EXECUTE IMMEDIATE 'set mem_limit = 100m';
EXECUTE IMMEDIATE 'set mt_dop = 2';
--ReCreate temporary table
DROP TABLE IF EXISTS default.temp_table;
CREATE TABLE default.temp_table (
product_id_int BIGINT,
product_id_char STRING,
name STRING
);
--Create loop with counter
FOR i IN 1..X LOOP
--lets do some data transformation and multiply rows according to the counter
INSERT INTO default.temp_table
SELECT * FROM default.product;
END LOOP;
--Lets count total number of multiplied rows and put it into the variable
SELECT COUNT(*) INTO row_nums FROM default.temp_table;
--Return total number of rows from the procedure
SELECT row_nums as Result;
END;Для валидации и сохранения процедуры на настоящий момент необходимо использовать команду LPSQL с текстом кода, заключенного в символ кавычек.
LPSQP “текст процедуры”;
Вызов сохраненной процедуры выполняется командой CALL
LPSQL “CALL demo_proc(3);”Рассмотрим второй пример, выводящий список таблиц в базе данных. В нем используется ц��кл не на основе целочисленного автоинкрементального счетчика, а на основе курсора и возвращение dataset’а из SELECT-оператора на клиентскую сторону как результата работы.
CREATE PROCEDURE show_table_cursor()
--Пример динамического SQL запроса с циклом FOR LOOP
--Обход списка таблиц в схеме и запись результата в таблицу
BEGIN
DECLARE c1 CURSOR;
--ReCreate temp table
DROP TABLE IF EXISTS default.cursor_result_table;
CREATE TABLE default.cursor_result_table
(
TABLE_NAME STRING
) STORED AS PARQUET;
-- Курсор по результатам SHOW TABLES IN default
FOR c1 IN ('SHOW TABLES IN default') LOOP
-- c.name — имя таблицы из результата SHOW TABLES
EXECUTE IMMEDIATE
'INSERT INTO default.cursor_result_table VALUES(\"' c1.name '\")';
END LOOP;
-- Вывод результата
SELECT * FROM default.cursor_result_table order by 1;
END;Проверка результата

Теперь реализуем пример с вложенными циклами, возвращением курсора в переменные сессии и динамического SQL с этими переменными.
-- Пример реализации вложенного курсора
CREATE PROCEDURE create_runtime_dictiоnary()
BEGIN
DECLARE name STRING;
DECLARE Column STRING;
DECLARE Column_Type STRING;
DECLARE rn INT;
--Очистка/Создание таблицы результатов
EXECUTE IMMEDIATE 'DROP TABLE IF EXISTS default.cursor_result_table';
EXECUTE IMMEDIATE 'CREATE TABLE default.cursor_result_table
(
TABLE_NAME STRING,
COLUMN_NAME STRING,
COLUMN_TYPE STRING,
DIST_ROWS INT
) STORED AS PARQUET';
-- Объявляем курсор через строку запроса
DECLARE c1 CURSOR FOR 'SHOW TABLES IN default';
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Выход, если данные кончились
-- Вложенный курсор (динамический)
BEGIN
DECLARE c2 CURSOR FOR 'SHOW COLUMN STATS default.' || name;
OPEN c2;
LOOP
FETCH c2 INTO Column, Column_Type, rn;
IF SQLCODE <> 0 THEN LEAVE; END IF;
EXECUTE IMMEDIATE 'INSERT INTO default.cursor_result_table VALUES (''' name ''', ''' Column ''', ''' Column_Type ''', '|| rn ||')';
END LOOP;
CLOSE c2;
END;
END LOOP;
CLOSE c1;
END;
END;Создадим еще одну процедуру с вложенным вызовом предыдущей, которая возвращает список таблиц в базе, имеющих число уникальных строк в любом поле больше, чем входной параметр.
CREATE PROCEDURE show_tables(n INT)
--Пример вложенного вызова
BEGIN
--Drop view
EXECUTE 'DROP VIEW IF EXISTS default.V_show_table';
--Create session temp dictionary
CALL runtime_dict();
--Lets get list of tables with more than n distinct rows of any column
EXECUTE 'CREATE VIEW default.V_show_table AS SELECT DISTINCT table_name FROM default.cursor_result_table WHERE dist_rows >' || n;
SELECT * FROM default.V_show_table;
END;Теперь проверим результаты и запросим таблицы с числом уникальных значений больше 10000.

Хватит с нас абстрактных примеров. Пора уже перейти к решению самой типовой задачи в хранилище данных – реализации простого загрузчика по сценарию Slow Changing Dimensions Type I с помощью процедурного кода.
CREATE or replace PROCEDURE scd1_load(schema_nm string, tabname string, PK string)
--schema_nm - схема целевой таблицы
--tabname - целевая таблица
--PK - первичный ключ. Перечисление через запятую если ключ составной
BEGIN
DECLARE Column STRING
DECLARE UPD_FLDS string; --Список колонок для обновления
DECLARE INS_FLDS string; --Список колонок для вставки
DECLARE PK_FLDS string; --Выражение для равенства первичных ключей
DECLARE EQ_FLDS string; --Список бизнес полей для сравнения
DECLARE MERGE_SQL string; --Merge скрипт для загрузки целевой таблицы
DECLARE buf = '';
PK_FLDS = '1=1';
counter = 1;
tar_tab = schema_nm||'.'||tabname;
int_tab = schema_nm||'.INT_'||tabname;
LOOP --В цикле формируем равенство для первичных ключей
select SPLIT_PART(PK, ',',counter) into buf;
IF buf = '' THEN LEAVE; END IF;
PK_FLDS = PK_FLDS ' AND old.'buf||' = new.'||buf;
counter = counter+1;
END LOOP;
--Технические поля
DECLARE task_id_fld = 'tech_task_id'; --Идентификатор текущей загрузки.
DECLARE del_fld = 'tech_deleted_flg'; --Поле логического удаления
DECLARE chng_dttm_fld = 'tech_changed_dttm'; --Дата и время изменения записи
DECLARE c1 CURSOR FOR 'SHOW COLUMN STATS '|| tar_tab;
OPEN c1;
LOOP
FETCH c1 INTO Column, Column_Type, rn;
IF SQLCODE <> 0 THEN LEAVE; END IF;
IF Column != task_id_fld and Column != del_fld and Column != chng_dttm_fld
THEN
INS_FLDS = INS_FLDS '\tnew.'Column||', \n';
UPD_FLDS = UPD_FLDS '\told.'Column||' = new.'||Column||',\n';
EQ_FLDS = EQ_FLDS '\told.'Column||' != new.'||Column||' OR ';
END IF;
END LOOP;
MERGE_SQL = '
MERGE INTO '||tar_tab||' as old
using '||int_tab||' as new
on '||PK_FLDS||'
WHEN MATCHED AND new.'||del_fld||' !=1 AND ('||EQ_FLDS||' 1 = 0)
THEN UPDATE SET
'||UPD_FLDS||'\told.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT),
\told.'||del_fld||' = 0,
\told.'||chng_dttm_fld||' = CURRENT_TIMESTAMP()
WHEN MATCHED AND new.'||del_fld||' = 1 AND old.'||del_fld||' = 0
THEN UPDATE SET
old.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT),
old.'||del_fld||' = 1,
old.'||chng_dttm_fld||' = CURRENT_TIMESTAMP()
WHEN NOT MATCHED BY SOURCE AND old.'||del_fld||' = 0
THEN UPDATE SET
old.'||del_fld||' = 1,
old.'||chng_dttm_fld||' = CURRENT_TIMESTAMP(),
old.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT)
WHEN NOT MATCHED AND new.'||del_fld||' != 1
THEN INSERT VALUES (
'||INS_FLDS||'CURRENT_TIMESTAMP(),
CAST(UNIX_TIMESTAMP() AS INT),
new.'||del_fld||'
)
';
execute MERGE_SQL;
END;Пример вызова:
LPSQL "CALL scd1_load('DDS', 'ACCOUNT_TAB', 'id, fld1');";Подготовим еще один пример – рутину по сбору табличной статистики внутри базы данных. Процедура имеет два параметра: принудительный расчет статистики или сбор только по таблицам, у которых она отсутствует и ранее сбор не выполнялся, и параметр инкрементального расчета или полного расчета по выбору. Кстати, у себя в платформе данных мы реализовали инкрементальный сбор статистик для Iceberg-таблиц, который позволяет существенно экономить время и ресурсы вычислительного кластера.
CREATE or REPLACE PROCEDURE stats_missing_proc(cur_schema string, proc_mode int, inc_mode int )
--cur_schema - схема для проверки таблиц
--proc_mode - режим работы процедуры 0 - принудительный сбор | 1 - сбор только по таблицам без статистики
--inc_mode - 1 - инкрементальная статистика | 0 - полная статистика
BEGIN
DECLARE name string;
DECLARE clmn string;
DECLARE tp string;
DECLARE dv int;
DECLARE nlls int;
-- Техническая таблица
create table if not exists default.tech_compactor_proc
(
tabname string,
got_stats int
)
stored as parquet;
-- Таблица журналирования
create table if not exists default.tech_compactor_logs
(
tabname string,
query string,
dttm timestamp
)
stored as parquet;
truncate table default.tech_compactor_proc;
truncate table default.tech_compactor_logs;
-- Открываем курсор для прохождения по всем объектам схемы
DECLARE c1 CURSOR FOR 'SHOW TABLES IN '||cur_schema;
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Выход, если данные кончились
-- Открываем вложенный курсор для получения статистики по каждому объекту
DECLARE c2 CURSOR FOR 'show column stats '||cur_schema||'.'||name;
OPEN c2;
LOOP
FETCH c2 INTO clmn, tp, dv, nlls;
IF SQLCODE <> 0 THEN LEAVE;
END IF;
IF (dv < 0 AND proc_mode = 1) OR (proc_mode = 0) THEN
EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_proc SELECT ''' name ''', '||dv||'';
END IF;
END LOOP;
END LOOP;
-- Обработка исключений на случай если в схеме есть представления
EXCEPTION WHEN OTHERS THEN
END;
-- Сбор полной статистики
IF inc_mode = 0 THEN
DECLARE c1 CURSOR FOR 'select distinct tabname from default.tech_compactor_proc'
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Выход по завершению
EXECUTE IMMEDIATE 'COMPUTE STATS '||cur_schema||'.'||name;
EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT '''||cur_schema||'.'||name||''', ''COMPUTE STATS '||cur_schema||'.'||name||''', now()';
END LOOP;
END IF;
-- Сбор инкрементальной статистики
IF inc_mode = 1 THEN
DECLARE c1 CURSOR FOR 'select distinct tabname from default.tech_compactor_proc'
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Выход по завершению
EXECUTE IMMEDIATE 'COMPUTE INCREMENTAL STATS '||cur_schema||'.'||name;
EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT '''||cur_schema||'.'||name||''', ''COMPUTE INCREMENTAL STATS '||cur_schema||'.'||name||''', now()';
END LOOP;
END IF;
END;Заключение
Как вы можете видеть, процедурное расширение имеет обширные возможности самостоятельной разработки вспомогательных пользовательских решений с низким порогом входа, особенно для тех, кто переносит свой опыт с традиционных систем вычислений. Ознакомиться с примерами данной работы можно на нашем канале.
В следующем промышленном релизе платформы будет добавлена возможность использовать процедурный код для процессингового движка StarRocks. Нам бы хотелось, чтобы у конечного пользователя появился выбор движка исполнения без необходимости переписывать сам код, если SQL-операторы в нем универсальные. Также планируем расширить функционал совместимости с популярными процедурными диалектами T-SQL и PLSQL, сделать доступным API метастора для прямой работы со словарем данных движками в процедурах, реализовать аналог хорошо знакомого всем ораклистам представления ALL_SOURCES и добавить вывод информационных сообщений в OUTPUT-журнал.
Параллельно мы проводим исследования и разработку транспайлера запросов из Postgres и Greenplum в MPP-движки Data Ocean, чтобы не просто облегчить миграцию функционала в платформу, а сделать ее ненужной, так как на стороне клиентских приложений не потребуется никаких изменений и они будут “думать”, что по-прежнему посылают запросы и получают ответы из Greenplum и Postgres.
Следите за обновлениями, подписывайтесь на наш телеграм-канал и VKВидео.
