Вас приветствует команда Data Sapience, и в сегодняшней публикации мы расскажем о реализации процедурного расширения для работы с MPP-движками Lakehouse-платформы данных Data Ocean Nova, которое стало доступным для пользователей.

В материале пойдет речь о возможностях, применимости и сценариях использования процедурного языка в аналитической платформе данных.

Мотивация

Наличие процедурного расширения в legacy-СУБД, таких как Oracle, MS SQL Server или Postgres\Greenplum, является довольно распространенной причиной не рассматривать Lakehouse-решения при выборе новой системы аналитического хранилища данных. Не каждый даже в мыслях готов представить, как процедурный SQL-код, который создавался и использовался годами в масштабах предприятия, в один момент станет недоступным. Вся экспертиза и накопленный опыт в момент обнулятся, и придется начинать все с начала, создавая новые пайплайны запуска и обработки данных с применением, например, airflow и dbt или других аналогов, позволяющих реализовать условные операторы или генерацию SQL-кода.

Признаться, я сам считаю такой подход правильным: использовать платформо-независимые инструменты генерации и оркестрации вроде Airflow, тем более если новый фреймворк создавать максимально не завязанным на очередной SQL диалект. Если в будущем через несколько лет вы решите сменить технологию еще раз, то это будет сделать гораздо проще и быстрее.

В то же время, очевидно, что разработка нового фреймворка потребует значительное время, ресурсы и знания в новой технической области. Кроме того, даже после перехода на стек airflow/dbt часть функциональных возможностей, которые есть у процедурных расширений, все равно останутся нереализуемыми. И главное, чего недостает по словам самих пользователей, – возврат результирующего датасета как результата вызова процедурного кода на клиентскую сторону ждет та же участь.

Представьте, что у вас есть несколько десятков или даже сотен сервисов в legacy системе-потребителе данных из КХД, работающих по следующему принципу:

  • Клиентская стороны вызывает процедуру с набором сессионных переменных;

  • Внутри процедуры происходит генерация исполняемого кода в зависимости от набора входных переменных;

  • SQL-код выполняет DML-операции по выборке и расчету данных, созданию временных объектов, обновлению перманентных объектов БД как результат всех манипуляций;

  • На клиентскую сторону возвращается результирующий SELECT/CURSOR перманентного объекта БД.

Именно с такой ситуацией столкнулись наши пользователи, и проблема усугубилась тем, что владельцы с��рвисов-потребителей не могут или не готовы менять на своей стороне устоявшийся годами функционал.

Вроде бы с выходом Spark 4 и добавлением его в состав платформы данных процедурное расширение появилось, и, казалось бы, вот оно счастье. Но, с другой стороны, Spark все же не про BI-доступ с быстрым откликом и высокой производительностью.

В итоге нам ничего не оставалось, как взяться за реализацию процедурного расширения для MPP SQL-движков платформы Impala и StarRocks.

Возможности процедурного расширения Nova Lakehouse Procedure SQL (LPSQL)

*Информация о текущих возможностях актуальна на момент публикации

В первом релизе процедурного расширения доступен следующий функционал:

  • Сохранение хранимых процедур в метасловаре системы для их последующего вызова;

  • Параметризация запуска хранимых процедур (входные параметры);

  • Работа с переменными: объявление, присвоение значения, в том числе из SQL-запросов;

  • Динамический SQL через конструкцию EXECUTE;

  • Работа с числовыми циклами FOR LOOP и WHILE END;

  • Работа с явными и неявными курсорами;

  • Возврат переменных из процедуры на клиентскую часть;

  • Возврат датасета через SELECT из процедуры на клиентскую часть;

  • Специальные функции для работы со словарем данных для использования в генерации динамических запросов;

  • Условные операторы;

  • Обработка исключений.

Примеры использования

Давайте посмотрим, как это выглядит на практике, и соберем несколько примеров, демонстрирующих основные возможности.

CREATE PROCEDURE demo_proc(IN X INT)
--PRC Example 1
--Declare vars
DECLARE row_nums INT;
BEGIN
	
	--Set up session parameters
	EXECUTE IMMEDIATE 'set mem_limit = 100m';
	EXECUTE IMMEDIATE 'set mt_dop = 2';
	
	--ReCreate temporary table
	DROP TABLE IF EXISTS default.temp_table;
	CREATE TABLE default.temp_table (
		product_id_int BIGINT,
		product_id_char STRING,
	  	name STRING
	);
	--Create loop with counter 
	FOR i IN 1..X LOOP
		--lets do some data transformation and multiply rows according to the counter
		INSERT INTO default.temp_table
		SELECT * FROM default.product;
	END LOOP;
	
	--Lets count total number of multiplied rows and put it into the variable
	SELECT COUNT(*) INTO row_nums FROM default.temp_table;
	
	--Return total number of rows from the procedure
	SELECT row_nums as Result;
END;

Для валидации и сохранения процедуры на настоящий момент необходимо использовать команду LPSQL с текстом кода, заключенного в символ кавычек.

LPSQP “текст процедуры”;

Вызов сохраненной процедуры выполняется командой CALL

LPSQL “CALL demo_proc(3);”

Рассмотрим второй пример, выводящий список таблиц в базе данных. В нем используется ц��кл не на основе целочисленного автоинкрементального счетчика, а на основе курсора и возвращение dataset’а из SELECT-оператора на клиентскую сторону как результата работы.

CREATE PROCEDURE show_table_cursor()

--Пример динамического SQL запроса с циклом FOR LOOP
--Обход списка таблиц в схеме и запись результата в таблицу

BEGIN

 DECLARE c1 CURSOR;

	--ReCreate temp table
 	DROP TABLE IF EXISTS default.cursor_result_table;
 	CREATE TABLE default.cursor_result_table
	(
   TABLE_NAME STRING
 	) STORED AS PARQUET;

 -- Курсор по результатам SHOW TABLES IN default
 FOR c1 IN ('SHOW TABLES IN default') LOOP
   -- c.name — имя таблицы из результата SHOW TABLES
   EXECUTE IMMEDIATE
     'INSERT INTO default.cursor_result_table VALUES(\"'  c1.name  '\")';

 END LOOP;
 -- Вывод результата

 SELECT * FROM default.cursor_result_table order by 1;
END;

Проверка результата

Рис. Результат работы вызова процедуры show_table_cursor
Рис. Результат работы вызова процедуры show_table_cursor

Теперь реализуем пример с вложенными циклами, возвращением курсора в переменные сессии и динамического SQL с этими переменными. 

--  Пример реализации вложенного курсора

CREATE PROCEDURE create_runtime_dictiоnary()

BEGIN
 DECLARE name STRING;
 DECLARE Column STRING;
 DECLARE Column_Type STRING;
 DECLARE rn INT;

  --Очистка/Создание таблицы результатов
 EXECUTE IMMEDIATE 'DROP TABLE IF EXISTS default.cursor_result_table';
 EXECUTE IMMEDIATE 'CREATE TABLE default.cursor_result_table
 (
   TABLE_NAME STRING,
   COLUMN_NAME STRING,
	COLUMN_TYPE STRING,
	DIST_ROWS INT
 ) STORED AS PARQUET';

 -- Объявляем курсор через строку запроса
 DECLARE c1 CURSOR FOR  'SHOW TABLES IN default';
 OPEN c1;
 LOOP
   FETCH c1 INTO name;
   IF SQLCODE <> 0 THEN LEAVE;
	END IF; -- Выход, если данные кончились

   -- Вложенный курсор (динамический)
   BEGIN
     DECLARE c2 CURSOR FOR  'SHOW COLUMN STATS default.' || name;

     OPEN c2;

     LOOP
       FETCH c2 INTO Column, Column_Type, rn;
       IF SQLCODE <> 0 THEN LEAVE; END IF;
       EXECUTE IMMEDIATE 'INSERT INTO default.cursor_result_table VALUES ('''  name  ''', '''  Column  ''', '''  Column_Type  ''', '|| rn ||')';
     END LOOP;

     CLOSE c2;

   END;

 END LOOP;
CLOSE c1;
END;

END;

Создадим еще одну процедуру с вложенным вызовом предыдущей, которая возвращает список таблиц в базе, имеющих число уникальных строк в любом поле больше, чем входной параметр.

CREATE PROCEDURE show_tables(n INT)
--Пример вложенного вызова

BEGIN

	--Drop view
	EXECUTE  'DROP VIEW IF EXISTS default.V_show_table';
	
	--Create session temp dictionary
	CALL runtime_dict();

	--Lets get list of tables with more than n distinct rows of any column

	EXECUTE  'CREATE VIEW default.V_show_table AS SELECT DISTINCT table_name FROM default.cursor_result_table WHERE dist_rows >' || n;

SELECT * FROM  default.V_show_table;

END;

Теперь проверим результаты и запросим таблицы с числом уникальных значений больше 10000.

Рис. Результат работы вызова процедуры show_tables
Рис. Результат работы вызова процедуры show_tables

Хватит с нас абстрактных примеров. Пора уже перейти к решению самой типовой задачи в хранилище данных – реализации простого загрузчика по сценарию Slow Changing Dimensions Type I с помощью процедурного кода.

CREATE or replace PROCEDURE scd1_load(schema_nm string, tabname string, PK string)

--schema_nm - схема целевой таблицы
--tabname - целевая таблица
--PK - первичный ключ. Перечисление через запятую если ключ составной

BEGIN

	DECLARE Column STRING
	DECLARE UPD_FLDS string; --Список колонок для обновления
	DECLARE INS_FLDS string; --Список колонок для вставки
	DECLARE PK_FLDS string; --Выражение для равенства первичных ключей
	DECLARE EQ_FLDS string; --Список бизнес полей для сравнения
	DECLARE MERGE_SQL string; --Merge скрипт для загрузки целевой таблицы

	DECLARE buf = '';
	PK_FLDS = '1=1';

	counter = 1;

	tar_tab = schema_nm||'.'||tabname;
	int_tab = schema_nm||'.INT_'||tabname;

	LOOP --В цикле формируем равенство для первичных ключей

		select SPLIT_PART(PK, ',',counter) into buf;
		IF buf = '' THEN LEAVE; END IF;
		PK_FLDS = PK_FLDS  ' AND old.'buf||' = new.'||buf;
		counter = counter+1;

	END LOOP;

	--Технические поля
	DECLARE task_id_fld = 'tech_task_id'; --Идентификатор текущей загрузки.
	DECLARE del_fld = 'tech_deleted_flg'; --Поле логического удаления
	DECLARE chng_dttm_fld = 'tech_changed_dttm'; --Дата и время изменения записи
	DECLARE c1 CURSOR FOR  'SHOW COLUMN STATS '|| tar_tab;

   OPEN c1;

	LOOP
		FETCH c1 INTO Column, Column_Type, rn;
		IF SQLCODE <> 0 THEN LEAVE; END IF;
		IF Column != task_id_fld and Column != del_fld and Column != chng_dttm_fld
		THEN
			INS_FLDS = INS_FLDS  '\tnew.'Column||', \n';
			UPD_FLDS = UPD_FLDS  '\told.'Column||' = new.'||Column||',\n';
			EQ_FLDS = EQ_FLDS    '\told.'Column||' != new.'||Column||' OR ';
		END IF;
	END LOOP;

	MERGE_SQL = '
		MERGE INTO '||tar_tab||' as old
		using '||int_tab||' as new
		on '||PK_FLDS||'
		WHEN MATCHED AND new.'||del_fld||' !=1 AND ('||EQ_FLDS||' 1 = 0)
		THEN UPDATE SET
		'||UPD_FLDS||'\told.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT),
		\told.'||del_fld||' = 0,
		\told.'||chng_dttm_fld||' = CURRENT_TIMESTAMP()
		WHEN MATCHED AND new.'||del_fld||' = 1 AND old.'||del_fld||' = 0
		THEN UPDATE SET
			old.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT),
			old.'||del_fld||' = 1,
			old.'||chng_dttm_fld||' = CURRENT_TIMESTAMP()
		WHEN NOT MATCHED BY SOURCE AND old.'||del_fld||' = 0
		THEN UPDATE SET
			old.'||del_fld||' = 1,
			old.'||chng_dttm_fld||' = CURRENT_TIMESTAMP(),
			old.'||task_id_fld||' = CAST(UNIX_TIMESTAMP() AS INT)
		WHEN NOT MATCHED AND new.'||del_fld||' != 1
		THEN INSERT VALUES (
		'||INS_FLDS||'CURRENT_TIMESTAMP(),
		CAST(UNIX_TIMESTAMP() AS INT),
			new.'||del_fld||'
		)
		';

	execute MERGE_SQL;

END;

Пример вызова:

LPSQL "CALL scd1_load('DDS', 'ACCOUNT_TAB', 'id, fld1');";

Подготовим еще один пример – рутину по сбору табличной статистики внутри базы данных. Процедура имеет два параметра: принудительный расчет статистики или сбор только по таблицам, у которых она отсутствует и ранее сбор не выполнялся, и параметр инкрементального расчета или полного расчета по выбору. Кстати, у себя в платформе данных мы реализовали инкрементальный сбор статистик для Iceberg-таблиц, который позволяет существенно экономить время и ресурсы вычислительного кластера.

CREATE or REPLACE PROCEDURE stats_missing_proc(cur_schema string, proc_mode int, inc_mode int )

--cur_schema - схема для проверки таблиц
--proc_mode - режим работы процедуры 0 - принудительный сбор | 1 - сбор только по таблицам без статистики
--inc_mode - 1 - инкрементальная статистика | 0 - полная статистика

BEGIN

	DECLARE name string;
	DECLARE clmn string;
	DECLARE tp string;
	DECLARE dv int;
	DECLARE nlls int;

	-- Техническая таблица
  	create table if not exists default.tech_compactor_proc
	(
		tabname string,
		got_stats int
	)
	stored as parquet;

	-- Таблица журналирования
	create table if not exists default.tech_compactor_logs
	(
		tabname string,
		query string,
		dttm timestamp
	)
	stored as parquet;

	truncate table default.tech_compactor_proc;
	truncate table default.tech_compactor_logs;

	--	Открываем курсор для прохождения по всем объектам схемы		
	DECLARE c1 CURSOR FOR  'SHOW TABLES IN '||cur_schema;
	OPEN c1;
	  LOOP
	    FETCH c1 INTO name;
	    IF SQLCODE <> 0 THEN LEAVE;
		END IF; -- Выход, если данные кончились

		--	Открываем вложенный курсор для получения статистики по каждому объекту
		DECLARE c2 CURSOR FOR 'show column stats '||cur_schema||'.'||name;		
		OPEN  c2;
			LOOP
				FETCH c2 INTO clmn, tp, dv, nlls;
				IF SQLCODE <> 0 THEN LEAVE;
				END IF;
				IF (dv < 0 AND proc_mode = 1) OR (proc_mode = 0) THEN
					EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_proc SELECT ''' name ''', '||dv||'';
				END IF;
			END LOOP;

		END LOOP;

		-- Обработка исключений на случай если в схеме есть представления
		EXCEPTION WHEN OTHERS THEN
		END;

	-- Сбор полной статистики
	IF inc_mode = 0 THEN
		DECLARE c1 CURSOR FOR 'select distinct tabname from default.tech_compactor_proc'

		OPEN c1;
		LOOP
			FETCH c1 INTO name;
		    IF SQLCODE <> 0 THEN LEAVE;
			END IF; -- Выход по завершению		
			EXECUTE IMMEDIATE 'COMPUTE STATS '||cur_schema||'.'||name;
			EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT '''||cur_schema||'.'||name||''', ''COMPUTE STATS '||cur_schema||'.'||name||''', now()';
		END LOOP;

	END IF;

	-- Сбор инкрементальной статистики
	IF inc_mode = 1 THEN
		DECLARE c1 CURSOR FOR 'select distinct tabname from default.tech_compactor_proc'
		OPEN c1;
		LOOP
			FETCH c1 INTO name;
		    IF SQLCODE <> 0 THEN LEAVE;
			END IF; -- Выход по завершению		

			EXECUTE IMMEDIATE 'COMPUTE INCREMENTAL STATS '||cur_schema||'.'||name;
			EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT '''||cur_schema||'.'||name||''', ''COMPUTE INCREMENTAL STATS '||cur_schema||'.'||name||''', now()';

		END LOOP;
	END IF;

END;

Заключение

Как вы можете видеть, процедурное расширение имеет обширные возможности самостоятельной разработки вспомогательных пользовательских решений с низким порогом входа, особенно для тех, кто переносит свой опыт с традиционных систем вычислений. Ознакомиться с примерами данной работы можно на нашем канале

В следующем промышленном релизе платформы будет добавлена возможность использовать процедурный код для процессингового движка StarRocks. Нам бы хотелось, чтобы у конечного пользователя появился выбор движка исполнения без необходимости переписывать сам код, если SQL-операторы в нем универсальные. Также планируем расширить функционал совместимости с популярными процедурными диалектами T-SQL и PLSQL, сделать доступным API метастора для прямой работы со словарем данных движками в процедурах, реализовать аналог хорошо знакомого всем ораклистам представления  ALL_SOURCES и добавить вывод информационных сообщений в OUTPUT-журнал.

Параллельно мы проводим исследования и разработку транспайлера запросов из Postgres и Greenplum в MPP-движки Data Ocean, чтобы не просто облегчить миграцию функционала в платформу, а сделать ее ненужной, так как на стороне клиентских приложений не потребуется никаких изменений и они будут “думать”, что по-прежнему посылают запросы и получают ответы из Greenplum и Postgres.

Следите за обновлениями, подписывайтесь на наш телеграм-канал и VKВидео.