Вас приветствует команда Data Sapience, и в сегодняшней публикации мы расскажем о реализации процедурного расширения для работы с MPP-движками Lakehouse-платформы данных Data Ocean Nova, которое стало доступным для пользователей.

В материале пойдет речь о возможностях, применимости и сценариях использования процедурного языка в аналитической платформе данных.
Мотивация
Наличие процедурного расширения в legacy-СУБД, таких как Oracle, MS SQL Server или Postgres\Greenplum, является довольно распространенной причиной не рассматривать Lakehouse-решения при выборе новой системы аналитического хранилища данных. Не каждый даже в мыслях готов представить, как процедурный SQL-код, который создавался и использовался годами в масштабах предприятия, в один момент станет недоступным. Вся экспертиза и накопленный опыт в момент обнулятся, и придется начинать все с начала, создавая новые пайплайны запуска и обработки данных с применением, например, airflow и dbt или других аналогов, позволяющих реализовать условные операторы или генерацию SQL-кода.
Признаться, я сам считаю такой подход правильным: использовать платформо-независимые инструменты генерации и оркестрации вроде Airflow, тем более если новый фреймворк создавать максимально не завязанным на очередной SQL диалект. Если в будущем через несколько лет вы решите сменить технологию еще раз, то это будет сделать гораздо проще и быстрее.
В то же время, очевидно, что разработка нового фреймворка потребует значительное время, ресурсы и знания в новой технической области. Кроме того, даже после перехода на стек airflow/dbt часть функциональных возможностей, которые есть у процедурных расширений, все равно останутся нереализуемыми. И главное, чего недостает по словам самих пользователей, – возврат результирующего датасета как результата вызова процедурного кода на клиентскую сторону ждет та же участь.
Представьте, что у вас есть несколько десятков или даже сотен сервисов в legacy системе-потребителе данных из КХД, работающих по следующему принципу:
Клиентская стороны вызывает процедуру с набором сессионных переменных;
Внутри процедуры происходит генерация исполняемого кода в зависимости от набора входных переменных;
SQL-код выполняет DML-операции по выборке и расчету данных, созданию временных объектов, обновлению перманентных объектов БД как результат всех манипуляций;
На клиентскую сторону возвращается результирующий SELECT/CURSOR перманентного объекта БД.
Именно с такой ситуацией столкнулись наши пользователи, и проблема усугубилась тем, что владельцы с��рвисов-потребителей не могут или не готовы менять на своей стороне устоявшийся годами функционал.
Вроде бы с выходом Spark 4 и добавлением его в состав платформы данных процедурное расширение появилось, и, казалось бы, вот оно счастье. Но, с другой стороны, Spark все же не про BI-доступ с быстрым откликом и высокой производительностью.
В итоге нам ничего не оставалось, как взяться за реализацию процедурного расширения для MPP SQL-движков платформы Impala и StarRocks.
Возможности процедурного расширения Nova Lakehouse Procedure SQL (LPSQL)
*Информация о текущих возможностях актуальна на момент публикации
В первом релизе процедурного расширения доступен следующий функционал:
Сохранение хранимых процедур в метасловаре системы для их последующего вызова;
Параметризация запуска хранимых процедур (входные параметры);
Работа с переменными: объявление, присвоение значения, в том числе из SQL-запросов;
Динамический SQL через конструкцию EXECUTE;
Работа с числовыми циклами FOR LOOP и WHILE END;
Работа с явными и неявными курсорами;
Возврат переменных из процедуры на клиентскую часть;
Возврат датасета через SELECT из процедуры на клиентскую часть;
Специальные функции для работы со словарем данных для использования в генерации динамических запросов;
Условные операторы;
Обработка исключений.
Примеры использования
Давайте посмотрим, как это выглядит на практике, и соберем несколько примеров, демонстрирующих основные возможности.
CREATE PROCEDURE demo_proc(IN X INT) --PRC Example 1 --Declare vars DECLARE row_nums INT; BEGIN --Set up session parameters EXECUTE IMMEDIATE 'set mem_limit = 100m'; EXECUTE IMMEDIATE 'set mt_dop = 2'; --ReCreate temporary table DROP TABLE IF EXISTS default.temp_table; CREATE TABLE default.temp_table ( product_id_int BIGINT, product_id_char STRING, name STRING ); --Create loop with counter FOR i IN 1..X LOOP --lets do some data transformation and multiply rows according to the counter INSERT INTO default.temp_table SELECT * FROM default.product; END LOOP; --Lets count total number of multiplied rows and put it into the variable SELECT COUNT(*) INTO row_nums FROM default.temp_table; --Return total number of rows from the procedure SELECT row_nums as Result; END;
Для валидации и сохранения процедуры на настоящий момент необходимо использовать команду LPSQL с текстом кода, заключенного в символ кавычек.
LPSQP “текст процедуры”;
Вызов сохраненной процедуры выполняется командой CALL
LPSQL “CALL demo_proc(3);”
Рассмотрим второй пример, выводящий список таблиц в базе данных. В нем используется ц��кл не на основе целочисленного автоинкрементального счетчика, а на основе курсора и возвращение dataset’а из SELECT-оператора на клиентскую сторону как результата работы.
CREATE PROCEDURE show_table_cursor() --Пример динамического SQL запроса с циклом FOR LOOP --Обход списка таблиц в схеме и запись результата в таблицу BEGIN DECLARE c1 CURSOR; --ReCreate temp table DROP TABLE IF EXISTS default.cursor_result_table; CREATE TABLE default.cursor_result_table ( TABLE_NAME STRING ) STORED AS PARQUET; -- Курсор по результатам SHOW TABLES IN default FOR c1 IN ('SHOW TABLES IN default') LOOP -- c.name — имя таблицы из результата SHOW TABLES EXECUTE IMMEDIATE 'INSERT INTO default.cursor_result_table VALUES(\"' c1.name '\")'; END LOOP; -- Вывод результата SELECT * FROM default.cursor_result_table order by 1; END;
Проверка результата

Теперь реализуем пример с вложенными циклами, возвращением курсора в переменные сессии и динамического SQL с этими переменными.
-- Пример реализации вложенного курсора CREATE PROCEDURE create_runtime_dict() BEGIN DECLARE name STRING; DECLARE Column STRING; DECLARE Column_Type STRING; DECLARE rn INT; --Очистка/Создание таблицы результатов EXECUTE IMMEDIATE 'DROP TABLE IF EXISTS default.cursor_result_table'; EXECUTE IMMEDIATE 'CREATE TABLE default.cursor_result_table ( TABLE_NAME STRING, COLUMN_NAME STRING, COLUMN_TYPE STRING, DIST_ROWS INT ) STORED AS PARQUET'; -- Объявляем курсор через строку запроса DECLARE c1 CURSOR FOR 'SHOW TABLES IN default'; OPEN c1; LOOP FETCH c1 INTO name; IF SQLCODE <> 0 THEN LEAVE; END IF; -- Выход, если данные кончились -- Вложенный курсор (динамический) BEGIN DECLARE c2 CURSOR FOR 'SHOW COLUMN STATS default.' || name; OPEN c2; LOOP FETCH c2 INTO Column, Column_Type, rn; IF SQLCODE <> 0 THEN LEAVE; END IF; EXECUTE IMMEDIATE 'INSERT INTO default.cursor_result_table VALUES (''' name ''', ''' Column ''', ''' Column_Type ''', '|| rn ||')'; END LOOP; CLOSE c2; END; END LOOP; CLOSE c1; END; END;
Создадим еще одну процедуру с вложенным вызовом предыдущей, которая возвращает список таблиц в базе, имеющих число уникальных строк в любом поле больше, чем входной параметр.
CREATE PROCEDURE show_tables(n INT) --Пример вложенного вызова BEGIN --Drop view EXECUTE 'DROP VIEW IF EXISTS default.V_show_table'; --Create session temp dictionary CALL runtime_dict(); --Lets get list of tables with more than n distinct rows of any column EXECUTE 'CREATE VIEW default.V_show_table AS SELECT DISTINCT table_name FROM default.cursor_result_table WHERE dist_rows >' || n; SELECT * FROM default.V_show_table; END;
Теперь проверим результаты и запросим таблицы с числом уникальных значений больше 10000.

Хватит с нас абстрактных примеров. Пора уже перейти к решению самой типовой задачи в хранилище данных – реализации простого загрузчика по сценарию Slow Changing Dimensions Type I с помощью процедурного кода.
CREATE or replace PROCEDURE scd1_load(schema_nm string, tabname string, PK string) --schema_nm - схема целевой таблицы --tabname - целевая таблица --PK - первичный ключ. Перечисление через запятую если ключ составной BEGIN DECLARE Column STRING DECLARE UPD_FLDS string; --Список колонок для обновления DECLARE INS_FLDS string; --Список колонок для вставки DECLARE PK_FLDS string; --Выражение для равенства первичных ключей DECLARE EQ_FLDS string; --Список бизнес полей для сравнения DECLARE MERGE_SQL string; --Merge скрипт для загрузки целевой таблицы DECLARE buf = ''; PK_FLDS = '1=1'; counter = 1; tar_tab = schema_nm||'.'||tabname; int_tab = schema_nm||'.INT_'||tabname; LOOP --В цикле формируем равенство для первичных ключей select SPLIT_PART(PK, ',',counter) into buf; IF buf = '' THEN LEAVE; END IF; PK_FLDS = PK_FLDS ' AND old.'buf||' = new.'||buf; counter = counter+1; END LOOP; --Технические поля DECLARE task_id_fld = 'tech_task_id'; --Идентификатор текущей загрузки. DECLARE del_fld = 'tech_deleted_flg'; --Поле логического удаления DECLARE chng_dttm_fld = 'tech_changed_dttm'; --Дата и время изменения записи DECLARE c1 CURSOR FOR 'SHOW COLUMN STATS '|| tar_tab; OPEN c1; LOOP FETCH c1 INTO Column, Column_Type, rn; IF SQLCODE <> 0 THEN LEAVE; END IF; IF Column != task_id_fld and Column != del_fld and Column != chng_dttm_fld THEN INS_FLDS = INS_FLDS '\tnew.'Column||', \n'; UPD_FLDS = UPD_FLDS '\told.'Column||' = new.'||Column||',\n'; EQ_FLDS = EQ_FLDS '\told.'Column||' != new.'||Column||' OR '; END IF; END LOOP; MERGE_SQL = ' MERGE INTO '||tar_tab||' as old using '||int_tab||' as new on '||PK_FLDS||' WHEN MATCHED AND new.'||del_fld||' !=1 AND ('||EQ_FLDS||' 1 = 0) THEN UPDATE SET '||UPD_FLDS||'\told.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT), \told.'||del_fld||' = 0, \told.'||chng_dttm_fld||' = CURRENT_TIMESTAMP() WHEN MATCHED AND new.'||del_fld||' = 1 AND old.'||del_fld||' = 0 THEN UPDATE SET old.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT), old.'||del_fld||' = 1, old.'||chng_dttm_fld||' = CURRENT_TIMESTAMP() WHEN NOT MATCHED BY SOURCE AND old.'||del_fld||' = 0 THEN UPDATE SET old.'||del_fld||' = 1, old.'||chng_dttm_fld||' = CURRENT_TIMESTAMP(), old.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT) WHEN NOT MATCHED AND new.'||del_fld||' != 1 THEN INSERT VALUES ( '||INS_FLDS||'CURRENT_TIMESTAMP(), CAST(UNIX_TIMESTAMP() AS INT), new.'||del_fld||' ) '; execute MERGE_SQL; END;
Пример вызова:
LPSQL "CALL scd1_load('DDS', 'ACCOUNT_TAB', 'id, fld1');";
Подготовим еще один пример – рутину по сбору табличной статистики внутри базы данных. Процедура имеет два параметра: принудительный расчет статистики или сбор только по таблицам, у которых она отсутствует и ранее сбор не выполнялся, и параметр инкрементального расчета или полного расчета по выбору. Кстати, у себя в платформе данных мы реализовали инкрементальный сбор статистик для Iceberg-таблиц, который позволяет существенно экономить время и ресурсы вычислительного кластера.
CREATE or REPLACE PROCEDURE stats_missing_proc(cur_schema string, proc_mode int, inc_mode int ) --cur_schema - схема для проверки таблиц --proc_mode - режим работы процедуры 0 - принудительный сбор | 1 - сбор только по таблицам без статистики --inc_mode - 1 - инкрементальная статистика | 0 - полная статистика BEGIN DECLARE name string; DECLARE clmn string; DECLARE tp string; DECLARE dv int; DECLARE nlls int; -- Техническая таблица create table if not exists default.tech_compactor_proc ( tabname string, got_stats int ) stored as parquet; -- Таблица журналирования create table if not exists default.tech_compactor_logs ( tabname string, query string, dttm timestamp ) stored as parquet; truncate table default.tech_compactor_proc; truncate table default.tech_compactor_logs; -- Открываем курсор для прохождения по всем объектам схемы DECLARE c1 CURSOR FOR 'SHOW TABLES IN '||cur_schema; OPEN c1; LOOP FETCH c1 INTO name; IF SQLCODE <> 0 THEN LEAVE; END IF; -- Выход, если данные кончились -- Открываем вложенный курсор для получения статистики по каждому объекту DECLARE c2 CURSOR FOR 'show column stats '||cur_schema||'.'||name; OPEN c2; LOOP FETCH c2 INTO clmn, tp, dv, nlls; IF SQLCODE <> 0 THEN LEAVE; END IF; IF (dv < 0 AND proc_mode = 1) OR (proc_mode = 0) THEN EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_proc SELECT ''' name ''', '||dv||''; END IF; END LOOP; END LOOP; -- Обработка исключений на случай если в схеме есть представления EXCEPTION WHEN OTHERS THEN END; -- Сбор полной статистики IF inc_mode = 0 THEN DECLARE c1 CURSOR FOR 'select distinct tabname from default.tech_compactor_proc' OPEN c1; LOOP FETCH c1 INTO name; IF SQLCODE <> 0 THEN LEAVE; END IF; -- Выход по завершению EXECUTE IMMEDIATE 'COMPUTE STATS '||cur_schema||'.'||name; EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT '''||cur_schema||'.'||name||''', ''COMPUTE STATS '||cur_schema||'.'||name||''', now()'; END LOOP; END IF; -- Сбор инкрементальной статистики IF inc_mode = 1 THEN DECLARE c1 CURSOR FOR 'select distinct tabname from default.tech_compactor_proc' OPEN c1; LOOP FETCH c1 INTO name; IF SQLCODE <> 0 THEN LEAVE; END IF; -- Выход по завершению EXECUTE IMMEDIATE 'COMPUTE INCREMENTAL STATS '||cur_schema||'.'||name; EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT '''||cur_schema||'.'||name||''', ''COMPUTE INCREMENTAL STATS '||cur_schema||'.'||name||''', now()'; END LOOP; END IF; END;
Заключение
Как вы можете видеть, процедурное расширение имеет обширные возможности самостоятельной разработки вспомогательных пользовательских решений с низким порогом входа, особенно для тех, кто переносит свой опыт с традиционных систем вычислений. Ознакомиться с примерами данной работы можно на нашем канале.
В следующем промышленном релизе платформы будет добавлена возможность использовать процедурный код для процессингового движка StarRocks. Нам бы хотелось, чтобы у конечного пользователя появился выбор движка исполнения без необходимости переписывать сам код, если SQL-операторы в нем универсальные. Также планируем расширить функционал совместимости с популярными процедурными диалектами T-SQL и PLSQL, сделать доступным API метастора для прямой работы со словарем данных движками в процедурах, реализовать аналог хорошо знакомого всем ораклистам представления ALL_SOURCES и добавить вывод информационных сообщений в OUTPUT-журнал.
Параллельно мы проводим исследования и разработку транспайлера запросов из Postgres и Greenplum в MPP-движки Data Ocean, чтобы не просто облегчить миграцию функционала в платформу, а сделать ее ненужной, так как на стороне клиентских приложений не потребуется никаких изменений и они будут “думать”, что по-прежнему посылают запросы и получают ответы из Greenplum и Postgres.
Следите за обновлениями, подписывайтесь на наш телеграм-канал и VKВидео.
