Как стать автором
Обновить

Пишем движок SQL на Spark. Часть 8: CREATE FUNCTION

Уровень сложностиСредний
Время на прочтение12 мин
Количество просмотров1K
В предыдущих сериях ( 1 2 3 4 5 6 7 Ы ) рассмотрели, как написать на Java собственный интерпретатор объектно-ориентированного диалекта SQL, заточенный на задачи подготовки и трансформации наборов данных, и работающий как тонкая прослойка поверх Spark RDD API.

Штука получилась довольно продвинутая, с поддержкой императивщины типа циклов/ветвлений/переменных, и даже с поддержкой пользовательских процедур. И в плане этой самой императивщины расширяемая: может импортировать функции из Java classpath, равно как и операторы выражений. То есть, если необходимо, можно написать функцию на Java, или определить новый оператор, и использовать потом в любом выражении на SQL.


Круто? Ещё как круто. Но как-то однобоко. Если в языке у нас поддерживаются функции, то почему бы не дать нашим пользователям определять их самостоятельно? Вот прямо через CREATE FUNCTION? Тем более, что вся необходимая для этого инфраструктура уже вовсю присутствует. Да и процедуры на уровне интерпретатора у нас уже поддерживаются ведь…



Функция для затравки.


Но на самом деле не так всё просто.


Если обратиться к предыдущей части, то выяснится, что процедура на SQL — это кусок AST, грубо выдернутый из исходника и каждый раз интерпретируемый заново с новым контекстом переменных. А импортируемая из Java classpath функция дёргается путём прямого вызова «синглтона» соответствующего «функционального» класса с передачей ему параметров.


Для нужд пользовательских функций оба эти подхода не подходят. Интерпретация AST штука крайне медленная, и если функцию надо вызвать для каждой из многих миллионов записей в датасете, например, в списке выражений SELECT, то она будет тормозить как не в себя.


На примере:


CREATE FUNCTION polyDescr() RECORD AS
   RETURN POLY_VERTICES() || ' vertices, area: ' || POLY_AREA || 'sq.m, perimeter: '
       || POLY_PERIMETER() || 'm, ' || POLY_HOLES() || ' holes';

SELECT name, category, polyDescr() AS descr FROM polygons INTO described_polygons;

Если полигонов в датасете пара сотен тыщ, то пробегать по AST функции polyDescr() эти самые пару сотен тыщ раз будет несколько, кгхмм… накладно.


Для процедур-то такое терпимо, потому что они вызываются на уроне скрипта глобально, и не так часто (и вообще, цикломатика у нас в интерпретаторе контролируется, и по умолчанию на злоупотребления будет ругаться и требовать явного указания глубины вложенности циклов и максимального количества повторов).


Значит, репарсинг AST не прокатывает. Какие есть ещё варианты? Скомпилировать код функции на SQL в класс Java, и вызывать его напрямую, как это делается с функциями из classpath?


Ну, для такого придётся использовать кодогенерацию в байткод Java.


Только я вам сразу скажу — кодогенерация это ОТСТОЙ.

Генерировать байткод для JVM напрямую — противно, неудобно, и чревато целой уймой всяких fringe cases, которые фиг предусмотришь. И по моему собственному опыту (который насчитывает уже 25 лет), если использовать что-нибудь типа классическое, типа библиотеки javassist — это вообще лучший способ увязнуть в разработке если не навечно, то надолго, и результат будет не то что бы удовлетворительный. Оно всё древнее, следовательно, весьма поганенькое в плане юзабилити, а соответствующий пропозал для современного API кодогенерации всё ещё остаётся на уровне пропозала, и доберётся до продакшена в какой-нибудь Java 30. Что-то больно долго ждать…


Даже если по(д)смотреть, как этсамая кодогенерация реализована на уровне того же Spark SQL, то там авторы явно испытывают такое же отвращение к классической Java кодогенерации. Но их решение ИМХО ещё менее вменяемое: в коде на Scala генерируется исходный текст кода на Java, и пропускается через JANINO — игрушечный компилятор в байткод. Быстрый, но с поддержкой Java на уровне 7 версии, и то крайне кривенькой и ущербной. И вот эти скомпилированные классы подгружаются в текущую JVM, откуда и дёргаются. Дикое извращение, просто потому что так всё равно проще и предсказуемее, чем по классике писать инструкции в байткод напрямую.


Поэтому мы не будем использовать кодогенерацию ни в каком виде.


Мы выберем этакий средний путь: интерпретатор, но не по AST, а по предварительно подготовленной объектной модели. По производительности он будет лишь чуть-чуть медленнее честной кодогенерации (потому что отлично JIT-ится), зато по сложности имплементации куда проще. Тем более, что у нас уже есть опыт: выражения у нас уже вычисляются простенькой стековой виртуальной машиной, и мы спокойно можем добавить ещё одну, но заточенную уже не для формул, а для императивщины.


Вопрос тут в чём — насколько обширное подмножество языка будет поддерживаться в пользовательских функциях. Императивщина (сиречь LET/IF/LOOP) явно нужна вся, а кроме неё?


Ну, начнём, а там видно будет.


Для начала, определим синтаксис для CREATE FUNCTION в ANTLR (парсер у нас на нём):


create_func
 : ( K_CREATE ( S_OR K_REPLACE )? )? K_FUNCTION func ( S_OPEN_PAR proc_param ( S_COMMA proc_param )* S_CLOSE_PAR )?
  K_RECORD?
  K_AS? ( K_RETURN? expression | K_BEGIN func_stmts K_END K_FUNCTION? )
 ;

В переводе на чуть более человекообразный, это у нас


[CREATE [OR REPLACE]] FUNCTION function_name[([parameters])] [RECORD] [AS]
    RETURN expression;
[CREATE [OR REPLACE]] FUNCTION function_name[([parameters])] [RECORD] [AS] BEGIN
    control_flow_statements...
END [FUNCTION];

То есть, функцию можно определить либо как именованную формулу, которая делает RETURN результата выражения, в котором используется переменные, созданные из формальных параметров, либо как именованный блок императивного кода, заданного между BEGIN и END, и в котором может быть несколько RETURN в разных ветках. Точнее, каждая ветка должна оканчиваться своим RETURN.


То есть, нужен оператор RETURN, имеющий смысла только в контексте тела функции. Какие ещё нюансы надо предусмотреть?


Например, ключевое слово RECORD. Оно означает, что функция будет иметь доступ к полям записи, если используется в контексте SELECT, который итерируется по всем записям датасета (как в примере с polyDescr()). Без этого ключевого слова функция может быть использована и вне контекста SELECT, например, просто в верхнеуровневом коде. Прямо как та самая daysPerMonth() с картинки из шапки поста:


CREATE FUNCTION daysPerMonth(@year, @month) AS BEGIN
   IF $month IN [4,6,9,11] THEN RETURN 30; END;
   IF $month == 2 THEN
      IF ($year % 400 == 0) OR ($year % 4 == 0) AND ($year % 100 <> 0) THEN RETURN 29;
      ELSE RETURN 28; END;
   END;
   RETURN 31;
END;

Тут у нас доступа к полям записей нет, и мы можем дёргать эту функцию из любого контекста. А если спецификатор RECORD присутствует, то надо как-то неявно (например, через стек) передавать соответствующую запись и её ключ, чтобы внутри можно было использовать встроенные функции, которые позволяют добраться до ключа текущей записи, или копаться внутри её объекта.


Но вроде пока бы ничего сложного. Попробуем имплементировать?


Попробуем. Но сначала в язык придётся добавить исключения. Потому что с пользовательскими функциями очень логично добавляется такая бизнесовая функциональность, как проверка параметров на валидность, например. Что делать, если в daysPerMonth() передан 25-й месяц? Хорошо было бы выкинуть ошибку, обозвать пользователя нехорошими словами, или сделать что-то подобное.


Окей, нам теперь точно нужна возможность аварийно завершить выполнение с выдачей сообщения об ошибке. Ну или хотя бы возможность написать сообщение в лог.


Прикидываемся Ораклом, и добавляем оператор языка RAISE:


raise_stmt
 : K_RAISE T_MSGLVL? expression
 ;

Или чуть более развёрнуто,


RAISE [Level] 'Сообщение об исключительной ситуации';

Где Level у нас INFO (сообщение пишется в stdout), WARNING (в stderr), или ERROR (не только отругаться, но сразу аварийно завершиться). Плюс синонимы, такие как DEBUG для INFO и EXCEPTION для ERROR.


Добавим в функцию:


CREATE FUNCTION daysPerMonth(@year, @month) AS BEGIN
   IF $month NOT IN RANGE[1,12] THEN RAISE ERROR 'Invalid month #'  || $month; END;
   IF $month IN [4,6,9,11] THEN RETURN 30; END;
   IF $month == 2 THEN
      IF ($year % 400 == 0) OR ($year % 4 == 0) AND ($year % 100 <> 0) THEN RETURN 29;
      ELSE RETURN 28; END;
   END;
   RETURN 31;
END;

Итого, для пользовательских функций у нас получается необходимым и достаточным следующий список операторов:


enum Statement {
  LET, // — определить переменную,
  LOOP, // — итератор по массиву,
  IF, // — ветвление,
  RETURN, // — возврат значения (новый),
  RAISE; // — выброс исключения (новый).
}

Всего-то пять. Это совсем немного, и соответствующий стековый (и, понятное дело, рекурсивный) конечный автомат получается весьма компактным:


private static class CallContext {
    private final Object key;
    private final DataRecord<?> rec;

    boolean returnReached = false;
    Object returnValue = null;

    public CallContext(Object key, DataRecord<?> rec) {
        this.key = key;
        this.rec = rec;
    }

    void eval(List<StatementItem> items, VariablesContext vc) {
        for (StatementItem fi : items) {
            if (returnReached) {
                return;
            }

            switch (fi.statement) {
                case RETURN: {
                    returnValue = Expressions.eval(key, rec, fi.expression, vc);
                    returnReached = true;
                    return;
                }
                case LET: {
                    vc.put(fi.control, Expressions.eval(key, rec, fi.expression, vc));
                    break;
                }
                case IF: {
                    if (Expressions.bool(key, rec, fi.expression, vc)) {
                        eval(fi.mainBranch, vc);
                    } else {
                        if (fi.elseBranch != null) {
                            eval(fi.elseBranch, vc);
                        }
                    }
                    break;
                }
                case LOOP: {
                    Object expr = Expressions.eval(key, rec, fi.expression, vc);
                    boolean loop = expr != null;

                    Object[] loopValues = null;
                    if (loop) {
                        loopValues = new ArrayWrap(expr).data();

                        loop = loopValues.length > 0;
                    }

                    if (loop) {
                        VariablesContext vvc = new VariablesContext(vc);
                        for (Object loopValue : loopValues) {
                            if (returnReached) {
                                return;
                            }

                            vvc.put(fi.control, loopValue);
                            eval(fi.mainBranch, vvc);
                        }
                    } else {
                        if (fi.elseBranch != null) {
                            eval(fi.elseBranch, vc);
                        }
                    }
                    break;
                }
                case RAISE: {
                    Object msg = Expressions.eval(key, rec, fi.expression, vc);

                    switch (MsgLvl.get(fi.control)) {
                        case INFO -> System.out.println(msg);
                        case WARNING -> System.err.println(msg);
                        default -> {
                            returnReached = true;
                            throw new RaiseException(String.valueOf(msg));
                        }
                    }
                    break;
                }
            }
        }
    }
}

Что тут происходит?


Всё просто. В экземпляр CallContext при его создании может попасть запись датасета вместе со своим ключом (в случае функции, определённой со спецификатором RECORD), либо null-ы в обоих этих полях, если функция глобального контекста.


Поле returnReached нужно для того, чтобы в вызывающей логике можно было определить, что надо либо возвращать значение, которое было ранее положено в returnValue (по умолчанию null), либо выкидывать исключение, потому операторы закончились, а в соответствующей ветке так и не было встречено RETURN.


Ну, а стек операторов и контекст переменных текущего вызова передаются прямиком в eval(), который и проходит по нему от начала до конца. (Вообще говоря, стек операторов передавать на каждый вызов несколько избыточно — функция у нас штука иммутабельная, и от вызова к вызову не будет меняться. Но это такой задел на будущее на самом деле. А то мало ли вдруг захочется кешировать результаты, и что там ещё обычно делают SQL движки в целях оптимизации :)


«Экземпляр оператора» определён в общем виде следующим образом:


public static class StatementItem implements Serializable {
    final Statement statement;
    final String control;
    final List<Expressions.ExprItem<?>> expression;
    final List<StatementItem> mainBranch;
    final List<StatementItem> elseBranch;

    private StatementItem(Statement statement, String control, List<Expressions.ExprItem<?>> expression, List<StatementItem> mainBranch, List<StatementItem> elseBranch) {
        this.statement = statement;
        this.control = control;
        this.expression = expression;
        this.mainBranch = mainBranch;
        this.elseBranch = elseBranch;
    }

    @Override
    public String toString() {
        return statement.name() + ((control != null) ? " $" + control : "");
    }
}

Statement это тот самый enum.


В поле control складывается либо имя управляющей переменной для LOOP/LET, либо уровень для RAISE.


В expression попадает управляющее выражение. Оно есть в каждом операторе. Для IF это буль, для LOOP массив, для LET собственно формула переменной, а для RETURN/RAISE выражение результата.


Рекурсивные списки mainBranch и elseBranch имеют смысл только для IF и LOOP (да, в LOOP у нас тоже есть опциональное ELSE, выполняемое, если в итерируемом массиве ноль элементов).


Для каждого из StatementItem можно сделать простенький builder function, равно как и для экземпляра функции целиком, но код там настолько тривиальный, что я его цитировать не буду.


Сама же функция-обёртка наследуется от того же самого объекта, описывающего функции, импортируемые из classpath. Для функции уровня RECORD она выглядит таким образом (а без него ещё проще):


private static class RecordFunction extends Function.WholeRecord<Object, DataRecord<?>> {
    protected final String name;
    protected final String descr;
    protected final ListOrderedMap<String, Param> params;
    protected final List<StatementItem> items;
    protected final VariablesContext vc;

    public RecordFunction(String name, String descr, ListOrderedMap<String, Param> params,
                          List<StatementItem> items, VariablesContext vc) {
        this.name = name;
        this.descr = descr;
        this.params = params;
        this.items = items;
        this.vc = vc;
    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public String descr() {
        return descr;
    }

    @Override
    public Object call(Deque<Object> args) {
        VariablesContext thisCall = new VariablesContext(vc);
        Object key = args.pop();
        DataRecord<?> rec = (DataRecord<?>) args.pop();
        for (int i = 0; i < params.size(); i++) {
            Object a = args.pop();
            thisCall.put(params.get(i), (a == null) ? params.getValue(i).defaults : a);
        }

        CallContext cc = new CallContext(key, rec);
        cc.eval(items, thisCall);
        if (cc.returnReached) {
            return cc.returnValue;
        }
        throw new RuntimeException("Called function " + name + " with no RETURN");
    }
}

Тут у нас перед тем, как дёрнуть CallContext, из стека извлекаются те самые неявные параметры с записью датасета и её ключом, а также проставляются дефолтные значения для тех параметров, в которые был передан NULL.


После чего он и дёргается. А потом проверяется, а был ли RETURN. И если был, то возвращаем значение. Это универсальная обёртка, и ей вообще пофигу, какой стек операторов пришёл из парсера, какие были аргументы и т.п.


С точки же зрения интерпретатора выражений такая функция по поведению ничем не будет отличаться от того, что импортируется из classpath, и в соответствующем классе, отвечающем за вычисление выражений, тоже ничего менять не нужно.


Ну и последнее из важного. Надо подвязать это дело в самый верхний уровень интерпретатора, который проходит по всем операторам скрипта. Ну, это делается в одной-единственной точке:


Портяночка Java кода
private void createFunction(TDL.Create_funcContext ctx) {
    String funcName = resolveName(ctx.func().L_IDENTIFIER());

    if (Functions.FUNCTIONS.containsKey(funcName)) {
        throw new InvalidConfigurationException("Attempt to CREATE FUNCTION which overrides pluggable \"" + funcName + "\"");
    }

    if ((ctx.K_REPLACE() == null) && library.functions.containsKey(funcName)) {
        throw new InvalidConfigurationException("FUNCTION " + funcName + " has already been defined. Offending definition at line " + ctx.K_CREATE().getSymbol().getLine());
    }

    boolean recordLevel = ctx.K_RECORD() != null;
    List<TDLFunction.StatementItem> items;
    if (ctx.K_BEGIN() == null) {
        items = List.of(TDLFunction.funcReturn(expression(ctx.expression().children, recordLevel ? ExpressionRules.RECORD : ExpressionRules.LOOSE)));
    } else {
        items = funcStatements(ctx.func_stmts().func_stmt(), recordLevel ? ExpressionRules.RECORD : ExpressionRules.LOOSE);
    }

    TDLFunction.Builder func = TDLFunction.builder(funcName, items, variables);
    buildParams(ctx.proc_param(), func);

    library.functions.put(funcName, recordLevel ? func.recordLevel() : func.loose());
}

private List<TDLFunction.StatementItem> funcStatements(List<TDL.Func_stmtContext> stmts, ExpressionRules rules) {
    List<TDLFunction.StatementItem> items = new ArrayList<>();

    for (TDL.Func_stmtContext funcStmt : stmts) {
        if (funcStmt.let_func() != null) {
            items.add(TDLFunction.funcLet(resolveName(funcStmt.let_func().var_name().L_IDENTIFIER()),
                    expression(funcStmt.let_func().expression().children, rules)
            ));
        }
        if (funcStmt.if_func() != null) {
            items.add(TDLFunction.funcIf(expression(funcStmt.if_func().expression().children, rules),
                    funcStatements(funcStmt.if_func().func_stmts(0).func_stmt(), rules),
                    (funcStmt.if_func().func_stmts(1) != null)
                            ? funcStatements(funcStmt.if_func().func_stmts(1).func_stmt(), rules)
                            : null
            ));
        }
        if (funcStmt.loop_func() != null) {
            items.add(TDLFunction.funcLoop(resolveName(funcStmt.loop_func().var_name().L_IDENTIFIER()),
                    expression(funcStmt.loop_func().expression().children, rules),
                    funcStatements(funcStmt.loop_func().func_stmts(0).func_stmt(), rules),
                    (funcStmt.loop_func().func_stmts(1) != null)
                            ? funcStatements(funcStmt.loop_func().func_stmts(1).func_stmt(), rules)
                            : null
            ));
        }
        if (funcStmt.return_func() != null) {
            items.add(TDLFunction.funcReturn(expression(funcStmt.return_func().expression().children, rules)));
        }
        if (funcStmt.raise_stmt() != null) {
            String lvl = (funcStmt.raise_stmt().T_MSGLVL() != null) ? funcStmt.raise_stmt().T_MSGLVL().getText() : null;
            items.add(TDLFunction.raise(lvl, expression(funcStmt.raise_stmt().expression().children, rules)));
        }
    }

    return items;
}

Тут у нас сначала происходит проверка на переопределение функции из classpath (чего мы не можем просто так допустить), а затем на наличие OR REPLACE (если надо сделать замену). После чего в засисимости от спецификатора RECORD функция билдится с соответствующими ExpressionRules, и используется билдер для простого RETURN, или сложного тела в BEGIN/END.


(Функция funcStatements() выглядит как кусок хтонической жести, но это издержки используемого парсера ANTLR, с ним такое write only безобразие вполне в порядке нормы.)


Пользовательские функции помещаются в тот же самый объект Library уровня скрипта, который мы когда-то ранее завели для хранения пользовательских процедур. Хороший пример заранее продуманной расширяемости.


Для полноты картины в то же самое место, которое делает DROP PROCEDURE, можно ещё и DROP FUNCTION добавить, ведь там всё ровно такое же — и удаляется пользовательская функция из той же самой Library.


Ну и во всех остальных слоях движка, таких как REPL, придётся добавлять соответствующие маленькие кусочки кода, благо, большая часть из них это просто копи-пейст того, что было добавлено для процедур с заменой CREATE/DROP PROCEDURE на CREATE/DROP FUNCTION. См. предыдущие статьи цикла.


Короче. Всего-то понадобилось порядка 300 с чем-то строк кода, и наш маленький диалект SQL стал не только ещё чуть более взрослым, но и в разы более могучим с точки зрения функциональности, доступной конечным пользователям.


Но тут будет ещё чем заняться когда-нибудь в будущем, поэтому stay tuned!


Исходники: https://github.com/PastorGL/datacooker-etl
Официальная группа в телеге: https://t.me/data_cooker_etl


Замечание для странных людей, которые триггерятся от слова «грант» в тегах

Да, разработка описываемого проекта не могла начаться без привлечения грантового финансирования. Также да, данная публикация действительно идёт в официальную отчётность по гранту, как и все остальные статьи в серии. И нет, вас абсолютно не касаются другие его условия, а код выложен в публичный доступ AS IS.

Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
0
Комментарии0

Публикации

Работа

Java разработчик
176 вакансий
Data Scientist
45 вакансий

Ближайшие события