Как стать автором
Обновить

Lambda-функции в SQL… дайте подумать

Время на прочтение 13 мин
Количество просмотров 9.6K
image

О чем будет статья, и так понятно из названия.

Кроме того, автор объяснит, зачем с его точки зрения это нужно, а также расскажет, что SUBJ не просто модная технология, но и «дело вдвойне нужное — как приятное, так и полезное».

Всегда интересно посмотреть, как несколько талантливых людей делают что-нибудь (язык программирования, почему нет), точно осознавая, какую проблему они решают и какие задачи перед собой ставят. А также испытывают своё творение на себе самих. Не сравнить с монументальными творениями гигантских комитетов, которые во главу угла помещают поддержание стройности мироздания, кто как его понимает.

Сравните, к примеру, судьбу FORTRAN и PL/1. Кто сейчас вообще вспомнит про этот PL/1.

С этой точки зрения язык AWK, например, очень удачен. Стоит сказать, что в его названии A- это Альфред Ахо, один из авторов Dragon Book, W — Питер Вайнбергер, приложивший руку и к Fortran-77, К — Брайан Керниган, куда же без него. Предназначен язык для обработки ‘на лету’ текстовых потоков в ‘трубах’ между процессами.

Язык бестиповый (это не совсем так), синтаксисом очень похож на C, имеет возможности фильтрации, ассоциативные массивы, события начала/конца потока, событие новой строки…

Автору этот язык всегда импонировал еще и тем, что его интерпретатор не надо инсталлировать, под UNIX-подобными системами он всегда есть, а под Windows достаточно просто скопировать исполняемый файл и всё работает. Впрочем, это к делу не относится.

В процессе работы автору приходится довольно часто использовать связку SQL + AWK и вот почему. SQL- это всё же изначально декларативный язык, предназначенный для управления потоками данных. Он даёт очень ограниченные возможности работы с контекстом выполнения запроса в виде агрегатных функций.

Как, например, построить с помощью SQL двумерную гистограмму?

-- размер ячейки 100 x 100
SELECT 
    count(), round(x, -2) AS cx, 
    round(y, -2) AS cy 
FROM 
    samples 
GROUP BY 
    cx, xy

Но позвольте, использование GROUP BY подразумевает применение сортировки, а это не дешевое удовольствие, если у вас сотни миллионов (или даже более) строк.
UPD: в комментариях меня поправили, что это не совсем так (или совсем не так)
SQL-процессор имеет возможность исполнять агрегатные функции в процессе построения хэша по критерию группировки. Для этого необходимо, чтобы он обладал количеством свободной памяти, достаточной для размещения хэш-мапы в памяти.

Тогда контексты групп будут обновляться по мере чтения таблицы и в конце этого чтения мы уже будем иметь вычисленный результат.
Эту же технику можно распространить и на оконные функции (ниже), просто контекст будет «потолще».

В случае, когда количество групп заранее неизвестно или очень велико, SQL-процессор вынужден строить временный индекс и вторым проходом пробегать по нему.

В простых случаях, например, как здесь — простой COUNT, возможен универсальный вариант — временный индекс (cx,cy,count), тогда при небольшом числе групп он весь окажется в памяти на кэшированных страницах. В сложных случаях, оконных функциях, состояние группы становится нетривиальными и постоянно (де)сериализовать его совсем не то что доктор прописал.
Коротко: SQL-процессор прибегает к сортировке, когда не может оценить количество групп после GROUP BY. Впрочем, группировка по вычисляемым значениям это (зачастую) как раз тот самый случай.

Поэтому приходится делать что-то вроде:

psql -t -q -c ‘select x, y from samples’ | gawk -f mk_hist2d.awk

где mk_hist2d.awk накапливает в ассоциативном массиве статистику и выводит ее по завершении работы

# mk_hist2d.awk
{
  bucket[int($2*0.01), int($3*0.01)]+=$1;
}
END {
for (i=0; i < 500; i++)
  for (j=0; j < 500; j++)
  {
    if ((i, j) in bucket)
      print i*100." "j*100." "bucket[i, j];
    else
      print i*100." "j*100." 0";
  }
}

Есть одно НО — полный поток данных должен быть отправлен с сервера на рабочую машину, а это не так уж дёшево.

А можно ли как-то совместить приятное с полезным — накопить статистику во время выполнения SQL-запроса, но не прибегая к сортировке? Да, например, с помощью пользовательских агрегатных функций.

Пользовательские агрегатные функции


Subj присутствует в разных системах, везде сделан немного по-своему.

  1. PostgreSQL. Документация здесь. Подробнее здесь.
    Вот здесь вычисляют максимальный баланс по счету.
    А это пример, вычисляющий чего больше в булевой колонке — true или false.

    Выглядит это так —

    
    CREATE AGGREGATE mode(boolean) (
        SFUNC = mode_bool_state,
        STYPE = INT[],
        FINALFUNC = mode_bool_final,
        INITCOND = '{0,0}'
    );
    

    Здесь SFUNC — функция, которая вызывается для каждой строки в потоке,
    первый аргумент в ней — типа STYPE.

    FINALFUNC служит для финализации вычислений и возвращает значение агрегата.
    INITCOND — инициализация начального значения внутреннего состояния (STYPE), передаваемого первым аргументом.
    С учетом того, что функции могут быть написаны на C (а значит для внутреннего состояния можно использовать автоматически освобождаемую при закрытии запроса память), это очень мощный инструмент. За рамки его использования надо еще суметь выйти.
  2. MS SQL.
    Раньше (2000) приходилось до запроса создавать ActiveX объект, делать агрегацию с помощью этого объекта.
    Сейчас (2016+) это делается в среде CLR. Придется создать пользовательскую функцию, создать и зарегистрировать сборку. После чего можно создавать агрегат.
    Пример вычисления геометрического среднего, а также слияния строк: с дополнительными параметрами и user-defined типом для хранения промежуточного состояния.
  3. Oracle.
    В Oracle это делается с помощью Data Cartridge (интерфейса) ODCIAggregate.
    Для создания собственного агрегата необходимо написать пользовательский тип, реализующий 4 метода
    — инициализации (ODCIAggregateInitialize), статическая, должна создать экземпляр нужного типа и вернуть через параметр
    — итерации (ODCIAggregateIterate), вызывается на каждую строку данных
    — merge (ODCIAggregateMerge), используется для слияния параллельно выполненных агрегатов
    — финиш (ODCIAggregateTerminate) — выдача результата
    Примеры: 1, 2, 3, 4.
  4. DB2.
    В DB2 нет явного способа использовать пользовательские агрегаты.
    Но можно подсунуть стандартной функции (пусть, MAX) user defined type (на Java) и заставить систему выполнять запросы вида

    CREATE TYPE Complex AS (
          real DOUBLE,
          i DOUBLE )
    …
    CREATE TABLE complexNumbers (
       id  	INTEGER  NOT NULL  PRIMARY KEY,
       number  Complex )
    …
    SELECT sum..real, sum..i
    FROM   ( SELECT GetAggrResult(MAX(BuildComplexSum(number)))
             FROM   complexNumbers ) AS t(sum)
    

Что обращает на себя внимание во всех этих системах?

  • Так или иначе, вам потребуется создавать какие-то объекты в БД. Будь то AGGREGATE или TYPE. На это как минимум требуются соответствующие права. А всего-то хочется несколько чисел на коленке сложить.
  • Возможно, придётся писать что-то на другом языке, будь то C, C# или Java.
    Чтобы интегрировать написанное в систему, опять же потребуются права. А всего то хочется …
  • Трудности с инициализацией. Допустим, требуется считать гистограммы с разными размерами корзины. Казалось бы, чего проще — укажем нужный INITCOND при объявлении агрегата (PostgreSQL) и всего делов. Но тогда для каждого размера корзины потребуется свой агрегат, а для этого опять права нужны.

    Здесь можно прибегнуть к грязному трюку и подсунуть процессору union из строки инициализации (вперёд) и данных, конструировать контекст не в конструкторе, а при получении первой строки.
  • Тем не менее, пусть и с описанными ограничениями, пользовательские агрегаты позволяют вычислить всё что угодно.
  • Немаловажно, что агрегаты можно распараллеливать, про крайней мере PostgreSQL, и Oracle (Enterprise Edition) умеют это делать. Для этого, правда придется научиться сериализовать/десериализовать промежуточные состояния а также мёржить их, полученные из разных потоков.

Оконные функции


Оконные функции появились в стандарте SQL:2003. На данный момент их поддерживают все вышеупомянутые системы. В сущности, оконные функции — расширение работы с агрегатами. И, конечно, пользовательские агрегатные функции работают и в оконном контексте.

Расширение заключается вот в чем. И до SQL:2003 агрегатные функции работали в некотором окне, которым выступал либо весь resultset, либо его часть, соответствующая комбинации значений полей из выражения GROUP BY. Теперь у пользователя появляется некоторая свобода в манипулировании этим окном.

Разница в том, что вычисленные с помощью окон значения добавляются к выдаче отдельной колонкой, а не требуют схлопнуть весь поток с помощью агрегатных функций. А значит в одном запросе можно использовать несколько оконных агрегатов каждый в своём контексте (окне). Агрегатных функций и раньше могло быть несколько, но они все работали в одном окне.

Крупными мазками,

  • OVER()
    окном является весь resultset. Допустим, запрос ‘select count(1) from Samples’ возвращает 169. В этом случае, запустив ‘select count(1) over() from Samples’, мы получим колонку, в которой записано 169 раз по 169.
  • OVER(PARTITION BY)
    это аналог GROUP BY, для каждой комбинации значений создается окно, в котором выполняются агрегатные функции. Допустим, в таблице Samples одна целочисленная колонка — val, данные — числа от 1 до 169.
    Тогда запрос ‘select count(1) over(partition by (12+val)/13) from Samples’ вернет колонку, в которой 169 раз записано значение 13.
  • OVER(ORDER BY)
    может сочетаться с PARTITION BY, позволяет динамически менять размер окна в процессе продвижения курсора, в данном случае окно простирается от начала группы до текущего положения курсора. В результате для группы получается не одно и то же значение в агрегатной колонке, а своё собственное. Удобно для вычисления сумм с нарастающим итогом. Результатом запроса
    ‘select sum(val) over(order by val) from Samples’ будет колонка, в которой n-й элемент будет содержать сумму натуральных чисел от 1 до n.
  • OVER(ROWS)
    позволяет определить рамки окна, отталкиваясь либо от позиции курсора, либо от начала/конца диапазона ORDER BY.

    Например, ‘…ROWS 1 PRECEDING…’ означает, что окно состоит из текущей строки и 1 до нее. A ‘…ROWS BETWEEN 1 FOLLOWING AND 2 FOLLOWING…’ — окно состоит из двух строк непосредственно после курсора.

    CURRENT ROW в этом режиме обозначает текущее положение курсора. Например, ‘ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING’ означает начиная с текущей строки и до конца диапазона.
  • OVER(RANGE)
    отличается от ROWS тем, что CURRENT ROW здесь означает в качестве начала окна — начало диапазона из ORDER BY, а в качестве конца окна — последнюю строку диапазона ORDER BY.

Синтаксис использования оконных функций в разных системах немного отличается.

Если обобщить вышенаписанное, остаётся немного тягостное ощущение от того, что разработчики, проанализировав построение в SQL всевозможных отчетов, выделили наиболее часто встречающиеся случаи и намертво их забетонировали в синтаксисе.

Функции, возвращающие рекордсет


В выдаче агрегатных/оконных функций каждая результирующая строка соответствует некоторому диапазону строк из входящего потока данных. В жизни такое соответствие не всегда существует.

Например, требуется построить ковариационную матрицу 10X10 (вот для этого потребовалось бы 672X672). Это можно сделать и в один проход, для этого выполняем написанную нами агрегатную функцию с 10 числовыми параметрами. Результат её работы — recordset 10 строк по 10 значений, каждый элемент матрицы относится ко всем строкам входного потока (сколько бы их ни было).

Можно сказать — ну и что, в PostgreSQl, вот, например, можно вернуть из функции двумерный массив (Ex: ‘ARRAY[[1,2],[3,4]’). Или просто сериализовать матрицу в строку.

Хорошо, но не всегда удаётся удержать размер результата в приемлемых для такого подхода рамках.

Лирическое отступление
Например, наша задача — генерализовать геометрию.

Размер геометрий нам неизвестен, это может быть и береговая линия Евразии из десятков миллионов точек. Или наоборот, есть очень грубая геометрия, требуется сгладить её сплайнами. Хотелось бы передать параметры в агрегат и получить на выходе поток данных вместо вектора или строки.

Можно, конечно, сказать, что задача надуманная, что так никто не делает, геометрии в СУБД хранят специальным образом, для обработки геометрий существуют специальные программы, …

Вообще-то, геометрии достаточно удобно хранить в обычных таблицах поточечно уже хотя бы потому, что, передвинув одну точку, нет необходимости переписывать весь блоб. До того, как пространственные данные повсеместно просочились в СУБД, так оно и было, например, в ArcSDE.

Как только средний размер блоба геометрии превышает размер страницы, становится выгоднее работать непосредственно с точками. Если бы была физическая возможность оперировать потоками точек, возможно, колесо истории повернулось бы еще раз.

Ковариационная матрица всё же не очень удачный пример рассинхронизации между входным и выходными потоками поскольку весь результат получается одномоментно в конце. Предположим, требуется обработать/сжать поток исходных данных. При этом

  • данных очень много, они лежат в “куче” без индексов, фактически их просто 'по-быстрому' записали на диск
  • требуется разложить их по разным категориям, которых относительно немного
  • внутри категорий усреднить по отрезкам времени, хранить только среднее, число измерений и дисперсию
  • всё это необходимо сделать быстро

Какие существуют варианты?

  1. В рамках SQL потребуется сортировка по интервалу времени/категории, что противоречит последнему пункту.
  2. Если данные уже отсортированы по времени (что вообще-то, не гарантируется), и удастся донести этот факт до SQL-процессора, можно обойтись оконными функциями и одним проходом.
  3. Написать отдельное приложение, которое будет всем этим заниматься. На PL/SQL или, более вероятно, учитывая, что данных много, на С/С++.
  4. Функции, возвращающие рекордсет. Возможно, они смогут нам помочь.

Поподробнее про П.4. Для этого есть два механизма — временные таблицы и конвейерные функции.

  1. Конвейерные функции.
    Этот механизм появился в Oracle (начиная с 9i, 2001) и позволяет функции, вернувшей recordset не накапливать данные, а вычислять их по мере необходимости (по аналогии с синхронизацией stdout и stdin двух связанных через pipe процессов).
    Т.е. результаты работы pipelined функции могут начать обрабатываться до выхода из этой функции. Для этого достаточно сказать в определении функции

     FUNCTION f_trans(p refcur_t) 
       RETURN outrecset PIPELINED IS
     …

    и в теле регистрировать строки результата

    LOOP
        …
        out_rec.var_char1 := in_rec.email;
        out_rec.var_char2 := in_rec.phone_number;
        PIPE ROW(out_rec);
        …
    END LOOP;

    В результате имеем

    SELECT * FROM TABLE(
       refcur_pkg.f_trans(
            CURSOR(SELECT * FROM employees WHERE department_id = 60)));
    

    Пользовательские агрегаты просто не нужны, когда есть конвейерные функции.

    Браво, Oracle!

    Не так давно (2014) конвейерные функции появились и в DB2 (IBM i 7.1 TR9, i 7.2 TR1).
  2. Временные таблицы.
    Начнём с того, что ни в MS SQL ни в PostgreSQL, по видимому, невозможно вернуть курсор из агрегатной функции.

    Хорошо, давайте по аналогии с конвейерными функциями получим курсор параметром, обработаем, сложим во временную таблицу и вернём на неё курсор.

    Однако, в MS SQL нельзя передать курсор параметром в хранимую процедуру, можно лишь в процедуре создать курсор и вернуть через output параметр. То же можно сказать и про PostgreSQL.

    Ну и ладно, просто откроем курсор, вычитаем его, обработаем значения, вычислим результат, сложим во временную таблицу и отдадим курсор.

    Или даже проще, сложим результаты запроса в одну временную таблицу, обработаем и результаты вернём через курсор на другую временную таблицу.

    Что тут скажешь. Во первых, и это главное, чтение данных через курсор медленнее, чем обработка в потоке. Во вторых, а зачем тогда вообще нужен SQL-процессор, давайте читать таблицы курсорами, руками создавать временные таблицы, писать логику join-ов в циклах … Это как ассемблерные вставки в C/C++, изредка можно себя побаловать, но лучше этим не злоупотреблять.

Итак, рассмотрев вопрос с функциями, возвращающими recordset, приходим к выводам:

  • Пользовательские агрегаты нам тут не особо помогут.
  • В любом случае вам потребуется создавать какие-то объекты в БД. Будь то функции или временные таблицы. На это как минимум требуются соответствующие права. А всего-то хочется несколько чисел обработать.
  • Тем не менее, пусть и с описанными ограничениями, иногда не слишком изящно, но этим методом можно решить поставленную задачу.

Чего ж ещё


В самом деле, если у нас уже есть возможность решать задачи, чего еще требуется автору?
Вообще-то, машина Тьюринга тоже может вычислить всё что угодно, просто делает это не очень быстро и не слишком удобно.

Сформулируем требования так:

  1. это должен быть реляционный оператор, который можно использовать наравне с остальными (выборка, проекция, …)
  2. это должен быть оператор, превращающий один поток данных в другой
  3. между входным и выходным потоками нет синхронизации
  4. при объявлении оператора определяется структура выходного потока
  5. оператор имеет возможность динамической инициализации (в виде функции, точнее её тела, заданного непосредственно в определении оператора)
  6. а также деструктор в виде функции (...)
  7. а также функцию (...), которая вызывается каждый раз, когда из входного потока достаётся новая строка
  8. оператор имеет контекст исполнения — задаваемый пользователем набор переменных и/или коллекций, которые нужны для работы
  9. для запуска этого оператора не требуется создавать объекты базы данных, не нужны дополнительные права
  10. всё, что требуется для работы, определяется в одном месте, на одном языке

Когда-то очень давно автор сделал такой оператор, расширяющий самодельный процессор реализованного подмножества TTM/Tutorial D. Сейчас та же идея предлагается для SQL.

Стоит предупредить, здесь SQL заканчивается и начинается импровизация. Синтаксис оставлен таким, каким он был в оригинале, в конце концов синтаксический сахар может быть любым, сути это не меняет.

Итак, оператор chew состоит из

  1. Заголовка, в котором содержится список выходных полей и их типов.
    Каждое выходное (и входное) поле является локальной переменной.
    Ex: «chew {“var1” float, “var2” integer}» означает, что в выходном потоке будет две колонки — с плавающей точкой и целочисленная
  2. Тела — списка callback-ов на события, на данный момент — старт потока, конец потока, строка. По синтаксису функции близки к PL/SQL. Предопределенная функция __interrupt() является аналогом PIPE, она берет значения из переменных, соответствующих выходным колонкам и помещает в выходной поток. Если буфер выходного потока переполнится, работа обработчика приостановится и начнется работа приемной стороны потока.
    Ex: «hook “init” { var1 := 0; var2 := -1; }»

Проще всего показать на примерах.

  • Аналог агрегатной функции SUM.

    -- вместо ‘select sum(val) from samples’
    --
    select *
    from samples
    chew {“sum(val)” float} -- единственная выходная колонка
        hook “init” {
            “sum(val)” := 0;   -- для нас это просто переменная
        } 
        hook “row” {
            if (not isnull("val")) then
                "sum(val)" := "sum(val)" + "val";
            end if;
        }
       hook “finit” {
            call __interrupt(); -- аналог PIPE
       }
    

    Выглядит громоздко, но ведь это просто пример,
    не обязательно писать программу на C, чтобы сложить пару чисел.
  • SUM + AVG

    -- вместо ‘select sum(val), avg(val) from samples’
    --
    select *
    from samples
    chew {
            “sum(val)” float,
            “avg(val)” float    -- а здесь два поля в выдаче
        }
        hook “init” {
            “sum(val)” := 0;   
            “avg(val)” := 0;   
            var num integer;
            num := 0; -- можно и без кавычек, если в имени нет ничего экзотического
        } 
        hook “row” {
            if (not isnull("val")) then
                "sum(val)" := "sum(val)" + "val";
                num := num + 1;
            end if;
        }
       hook “finit” {
            if (num > 0) then
                “avg(val)” := “sum(val)” / num;
            end if;
            call __interrupt();
       }
    

    Здесь обращаем внимание, что суммирование происходит только один раз.
  • SUM + GROUP BY

    -- вместо ‘select sum(val) from samples group by type’
    --
    select *
    from 
        -- надо явно задать сортировку
        ( samples val, type from samples order by type ) 
    chew {
            “sum(val)” float
        }
        hook “init” {
            “sum(val)” := 0;   
            var gtype integer;
            gtype := NULL;
            var num integer; -- внутренняя переменная
            num := 0;
        } 
        hook “row” {
            if (gtype <> “type”) then
                __interrupt();
                “gtype” := type;
                "sum(val)" := 0;
                num := 0;
            end if;
            if (not isnull("val")) then
                "sum(val)" := "sum(val)" + "val";
                num := num + 1;
            end if;
        }
       hook “finit” {
            if (num > 0) then
                call __interrupt();
            end if;
       }
    
  • ROW_NUMBER() OVER()

    -- select row_number() over() as num, * from samples
    -- 
    select * 
    from samples
    chew {
            “num” integer,
            *   -- все поля входного потока регистрируются в выходном
                -- допустимо также ‘* except val1, ...valX’, привет от TTM
        }
        hook “init” {
            num := 0;
        } 
        hook “row” {
            num := num + 1;
            call __interrupt();
        }
    

Можно ли предложить пример, на котором данный подход даёт результаты принципиально недостижимые обычным путём? Их есть у нас.

Иногда так бывает, что данные почти отсортированы. Возможно, они даже совсем отсортированы, но точно это неизвестно.

Пусть в примере выше (сжатие потока данных) данные приходят из разных источников и в силу разных причин могут слегка перемешиваться. Т.е. строка от одного источника с отметкой времени T1 может оказаться в базе после строки из другого источника с временем T2 притом что T1 < T2.

Пусть даже мы гарантируем, что разница между T1 и T2 никогда не превысит некоторую (мизерную) константу, без сортировки здесь (традиционным путём) не обойтись.

Однако, используя предложенный подход, можно буферизовать входной поток и обрабатывать данные текущего интервала времени только после того, как на вход поступили строки с отметкой времени, превышающей как минимум на заданную константу правую границу интервала.

Здесь есть очень важный момент.

Только мы знаем, что данные почти отсортированы.

Только мы знаем величину той константы.

Эта константа характерна только для данной задачи, а может и только для данного эксперимента.
И мы под свою ответственность применяем данный хак, чтобы избежать сортировки.

Стандартного способа сообщить SQL-процессору наше знание о задаче не существует и его трудно себе представить.

А использование lambda-функций даёт универсальный способ заставить SQL-процессор делать именно то, что нам нужно именно там, где нам это нужно.

Заключение


Предложенная конструкция не выглядит очень сложной в реализации.

Во всяком случае при наличии действующего PL/SQL.

Сама идея проста и интуитивно понятна и не добавляет в язык новых сущностей.

Это единый аппарат, который при необходимости заменяет собой агрегатные и оконные функции, GROUP BY.

Механизм, который позволяет обходиться без сортировок там, где с традиционным SQL-процессором без них никак.

Но самое главное, это механизм, который даёт свободу делать с данными всё что заблагорассудится самым что ни на есть императивным образом.

P.S.: спасибо Дорофею Пролесковскому за участие в подготовке статьи.
Теги:
Хабы:
+9
Комментарии 30
Комментарии Комментарии 30

Публикации

Истории

Работа

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн