Возможности языка Q и KDB+ на примере сервиса реального времени

    О том, что такое база KDB+, язык программирования Q, какие у них есть сильные и слабые стороны, можно прочитать в моей предыдущей статье и кратко во введении. В статье же мы реализуем на Q сервис, который будет обрабатывать входящий поток данных и высчитывать поминутно различные агрегирующие функции в режиме “реального времени” (т.е. будет успевать все посчитать до следующей порции данных). Главная особенность Q состоит в том, что это векторный язык, позволяющий оперировать не единичными объектами, а их массивами, массивами массивов и другими сложносоставными объектами. Такие языки как Q и родственные ему K, J, APL знамениты своей краткостью. Нередко программу, занимающую несколько экранов кода на привычном языке типа Java, можно записать на них в несколько строк. Именно это я и хочу продемонстрировать в этой статье.



    Введение


    KDB+ — это колоночная база данных, ориентированная на очень большие объемы данных, упорядоченные определенным образом (в первую очередь по времени). Используется она, в первую очередь, в финансовых организациях – банках, инвестиционных фондах, страховых компаниях. Язык Q – это внутренний язык KDB+, позволяющий эффективно работать с этими данными. Идеология Q – это краткость и эффективность, понятность при этом приносится в жертву. Обосновывается это тем, что векторный язык в любом случае будет сложен для восприятия, а краткость и насыщенность записи позволяет увидеть на одном экране гораздо большую часть программы, что в итоге облегчает ее понимание.

    В статье мы реализуем полноценную программу на Q и вам, возможно, захочется попробовать ее в деле. Для этого вам понадобится собственно Q. Скачать бесплатную 32-битную версию можно на сайте компании kx – www.kx.com. Там же, если вам интересно, вы найдете справочную информацию по Q, книгу Q For Mortals и разнообразные статьи на эту тему.

    Постановка задачи


    Есть источник, который присылает каждые 25 миллисекунд таблицу с данными. Поскольку KDB+ применяется в первую очередь в финансах, будем считать, что это таблица сделок (trades), в которой есть следующие колонки: time (время в миллисекундах), sym (обозначение компании на бирже – IBM, AAPL,…), price (цена, по которой куплены акции), size (размер сделки). Интервал 25 миллисекунд выбран произвольно, он не слишком маленький и не слишком большой. Его наличие означает, что данные приходят в сервис уже буферизованные. Можно было бы легко реализовать буферизацию на стороне сервиса, в том числе динамическую, зависящую от текущей нагрузки, но для простоты остановимся на фиксированном интервале.

    Сервис должен считать поминутно для каждого входящего символа из колонки sym набор агрегирующих функций – max price, avg price, sum size и т.п. полезную информацию. Для простоты мы положим, что все функции можно вычислять инкрементально, т.е. для получения нового значения достаточно знать два числа – старое и входящее значения. Например, функции max, average, sum обладают этим свойством, а функция медиана нет.

    Также мы предположим, что входящий поток данных упорядочен по времени. Это даст нам возможность работать только с последней минутой. На практике достаточно уметь работать с текущей и предыдущей минутами на случай, если какие-то апдейты запоздали. Для простоты мы не будем рассматривать этот случай.

    Агрегирующие функции


    Ниже перечислены необходимые агрегирующие функции. Я взял их как можно больше, чтобы увеличить нагрузку на сервис:

    • high – max price – максимальная цена за минуту.
    • low – min price – минимальная цена за минуту.
    • firstPrice – first price – первая цена за минуту.
    • lastPrice – last price – последняя цена за минуту.
    • firstSize – first size – первый размер сделки за минуту.
    • lastSize – last size — последний размер сделки за минуту.
    • numTrades – count i – число сделок за минуту.
    • volume – sum size – сумма размеров сделок за минуту.
    • pvolume – sum price – сумма цен за минуту, необходимо для avgPrice.
    • turnover – sum price*size – суммарный объем сделок за минуту.
    • avgPrice – pvolume%numTrades – средняя цена за минуту.
    • avgSize – volume%numTrades – средний размер сделки за минуту.
    • vwap – turnover%volume – взвешенная по размеру сделки средняя цена за минуту.
    • cumVolume – sum volume – накопленный размер сделок за все время.

    Сразу обсудим один неочевидный момент – как инициализировать эти колонки в первый раз и для каждой следующей минуты. Некоторые колонки типа firstPrice каждый раз нужно инициализировать значением null, их значение не определено. Другие типа volume нужно устанавливать всегда в 0. Еще есть колонки, которые требуют комбинированного подхода – например, cumVolume необходимо копировать из предыдущей минуты, а для первой установить в 0. Зададим все эти параметры используя тип данных словарь (аналог записи):

    // list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
    initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
    aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже
    

    Я добавил sym и time в словарь для удобства, теперь initWith – это готовая строчка из финальной агрегированной таблицы, где осталось задать правильные sym и time. Можно использовать ее для добавления новых строк в таблицу.

    aggCols нам понадобится при создании агрегирующей функции. Список необходимо инвертировать из-за особенностей порядка вычислений выражений в Q (справа налево). Цель – обеспечить вычисление в направлении от high к cumVolume, поскольку некоторые колонки зависят от предыдущих.

    Колонки, которые нужно скопировать в новую минуту из предыдущей, колонка sym добавлена для удобства:

    rollColumns:`sym`cumVolume;
    

    Теперь разделим колонки на группы согласно тому, как их следует обновлять. Можно выделить три типа:

    1. Аккумуляторы (volume, turnover,..) – мы должны прибавить входящее значение к предыдущему.
    2. С особой точкой (high, low, ..) – первое значение в минуте берется из входящих данных, остальные считаются с помощью функции.
    3. Остальные. Всегда считаются с помощью функции.

    Определим переменные для этих классов:

    accumulatorCols:`numTrades`volume`pvolume`turnover;
    specialCols:`high`low`firstPrice`firstSize;
    

    Порядок вычислений


    Обновлять агрегированную таблицу мы будем в два этапа. Для эффективности мы сначала ужмем входящую таблицу так, чтобы там осталась одна строка для каждого символа и минуты. То, что все наши функции инкрементальные и ассоциативные, гарантирует нам, что результат от этого дополнительного шага не изменится. Ужать таблицу можно было бы с помощью селекта:

    select high:max price, low:min price … by sym,time.minute from table
    

    У этого способа есть минус – набор вычисляемых колонок задан заранее. К счастью, в Q селект реализован и как функция, куда можно подставить динамически созданные аргументы:

    ?[table;whereClause;byClause;selectClause]
    

    Я не буду подробно описывать формат аргументов, в нашем случае нетривиальными будут только by и select выражения и они должны быть словарями вида columns!expressions. Таким образом, ужимающую функцию можно задать так:

    selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
    preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
    

    Для понятности я использовал функцию parse, которая превращает строку с Q выражением в значение, которое может быть передано в функцию eval и которое требуется в функциональном селекте. Также отметим, что preprocess задана как проекция (т.е. функция с частично определенными аргументами) функции селект, один аргумент (таблица) отсутствует. Если мы применим preprocess к таблице, то получим ужатую таблицу.

    Второй этап – это обновление агрегированной таблицы. Напишем сначала алгоритм в псевдокоде:

    for each sym in inputTable
      idx: row index in agg table for sym+currentTime;
      aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
      aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
      …
    

    В Q вместо циклов принято использовать функции map/reduce. Но поскольку Q – векторный язык и все операции мы можем спокойно применять ко всем символам сразу, то в первом приближении мы можем обойтись вообще без цикла, проделывая операции со всеми символами сразу:

    idx:calcIdx inputTable;
    row:aggTable idx;
    aggTable[idx;`high]: row[`high] | inputTable`high;
    aggTable[idx;`volume]: row[`volume] + inputTable`volume;
    …
    

    Но мы можем пойти и дальше, в Q есть уникальный и исключительно мощный оператор – оператор обобщенного присваивания. Он позволяет изменить набор значений в сложной структуре данных используя список индексов, функций и аргументов. В нашем случае он выглядит так:

    idx:calcIdx inputTable;
    rows:aggTable idx;
    // .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
    .[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
    

    К сожалению, для присвоения в таблицу нужен список строк, а не колонок, и приходится транспонировать матрицу (список колонок в список строк) с помощью функции flip. Для большой таблицы это накладно, поэтому вместо этого применим обобщенное присваивание к каждой колонке отдельно, используя функцию map (которая выглядит как апостроф):

    .[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
    

    Мы снова используем проекцию функции. Также заметьте, что в Q создание списка – это тоже функция и мы можем вызвать ее с помощью функции each(map), чтобы получить список списков.

    Чтобы набор вычисляемых колонок не был фиксирован, создадим выражение выше динамически. Определим сначала функции для вычисления каждой колонки, используя переменные row и inp для ссылки на агрегированные и входные данные:

    aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
     ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
    

    Некоторые колонки особые, их первое значение не должно вычисляться функцией. Мы можем определить, что оно первое по колонке row[`numTrades] – если в ней 0, то значение первое. В Q есть функция выбора — ?[Boolean list;list1;list2] – которая выбирает значение из списка 1 или 2 в зависимости от условия в первом аргументе:

    // high -> ?[isFirst;inp`high;row[`high]|inp`high]
    // @ - тоже обобщенное присваивание для случая когда индекс неглубокий
    @[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
    

    Тут я вызвал обобщенное присваивание с моей функцией (выражение в фигурных скобках). В нее передается текущее значение (первый аргумент) и дополнительный аргумент, который я передаю в 4-м параметре.

    Отдельно добавим аккумуляторные колонки, поскольку для них функция одна и та же:

    // volume -> row[`volume]+inp`volume
    aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
    

    Это обычное по меркам Q присваивание, только присваиваю я сразу список значений. Наконец, создадим главную функцию:

    // ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
    // string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
    // ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
    updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
    

    Этим выражением я динамически создаю функцию из строки, которая содержит выражение, которое я приводил выше. Результат будет выглядеть так:

    {[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
    

    Порядок вычисления колонок инвертирован, поскольку в Q порядок вычисления справа налево.

    Теперь у нас есть две основные функции, необходимые для вычислений, осталось добавить немного инфраструктуры и сервис готов.

    Финальные шаги


    У нас есть функции preprocess и updateAgg, которые делают всю работу. Но необходимо еще обеспечить правильный переход через минуты и вычислить индексы для агрегации. В первую очередь определим функцию init:

    init:{
      tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
      currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
      currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
      offset::0; // индекс в tradeAgg, где начинается текущая минута 
      rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
     }
    

    Также определим функцию roll, которая будет менять текущую минуту:

    roll:{[tm]
      if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
      rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
      offset::count tradeAgg;
      currSyms::`u#`$();
     }
    

    Нам понадобится функция для добавления новых символов:

    addSyms:{[syms]
      currSyms,::syms; // добавим в список известных
      // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
      // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
      `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
     }
    

    И, наконец, функция upd (традиционное название этой функции для Q сервисов), которая вызывается клиентом, для добавления данных:

    upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
      tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
      updMinute[data] each tm; // добавим данные для каждой минуты
    };
    updMinute:{[data;tm]
      if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
      data:select from data where time=tm; // фильтрация
      if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
      updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
     };
    

    Вот и все. Вот полный код нашего сервиса, как и обещалось, всего несколько строк:

    initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
    aggCols:reverse key[initWith] except `sym`time;
    rollColumns:`sym`cumVolume;
    
    accumulatorCols:`numTrades`volume`pvolume`turnover;
    specialCols:`high`low`firstPrice`firstSize;
    
    selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
    preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
    
    aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
    @[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
    aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
    updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
    
    init:{
      tradeAgg::0#enlist[initWith];
      currTime::00:00;
      currSyms::`u#`symbol$();
      offset::0;
      rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
     };
    roll:{[tm]
      if[currTime>tm; :init[]];
      rollCache,::offset _ rollColumns#tradeAgg;
      offset::count tradeAgg;
      currSyms::`u#`$();
     };
    addSyms:{[syms]
      currSyms,::syms;
      `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
     };
    
    upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
    updMinute:{[data;tm]
      if[tm<>currTime; roll tm; currTime::tm];
      data:select from data where time=tm;
      if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
      updateAgg[`tradeAgg;offset+currSyms?syms;data];
     };
    

    Тестирование


    Проверим производительность сервиса. Для этого запустим его в отдельном процессе (поместите код в файл service.q) и вызовите функцию init:

    q service.q –p 5566
    
    q)init[]
    

    В другой консоли запустите второй Q процесс и подсоединитесь к первому:

    h:hopen `:host:5566
    h:hopen 5566 // если оба на одном хосте
    

    Сначала создадим список символов – 10000 штук и добавим функцию для создания случайной таблицы. Во второй консоли:

    syms:`IBM`AAPL`GOOG,-9997?`8
    rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
    

    Я добавил в список символов три настоящих, чтобы было удобнее искать их в таблице. Функция rnd создает случайную таблицу с n строками, где время меняется от t до t+25 миллисекунд.

    Теперь можно попробовать послать данные в сервис (добавим первые десять часов):

    {h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
    

    Можно проверить в сервисе, что таблица обновилась:

    \c 25 200
    select from tradeAgg where sym=`AAPL
    -20#select from tradeAgg where sym=`AAPL
    

    Результат:

    sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
    --|--|--|--|--|--------------------------------
    AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
    AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
    AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
    AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
    AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

    Проведем теперь нагрузочное тестирование, чтобы выяснить сколько данных сервис может обрабатывать в минуту. Напомню, что мы установили интервал для апдейтов в 25 миллисекунд. Соответственно, сервис должен (в среднем) укладываться хотя бы в 20 миллисекунд на апдейт, чтобы дать время пользователям запросить данные. Введите следующее во втором процессе:

    tm:10:00:00.000
    stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
    start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
    

    4800 – это две минуты. Можно попробовать запустить сначала для 1000 строк каждые 25 миллисекунд:

    start 1000
    

    В моем случае результат получается в районе пары миллисекунд на апдейт. Так что я сразу увеличу количество строк до 10.000:

    start 10000
    

    Результат:

    min| 00:00:00.004
    avg| 9.191458
    med| 9f
    max| 00:00:00.030
    

    Снова ничего особенного, а ведь это 24 миллиона строк в минуту, 400 тысяч в секунду. Больше 25 миллисекунд апдейт тормозил только 5 раз, видимо при смене минуты. Увеличим до 100.000:

    start 100000
    

    Результат:

    min| 00:00:00.013
    avg| 25.11083
    med| 24f
    max| 00:00:00.108
    q)sum times
    00:02:00.532
    

    Как видим, сервис едва справляется, но тем не менее ему удается удержаться на плаву. Такой объем данных (240 миллионов строк в минуту) чрезвычайно велик, в таких случаях принято запускать несколько клонов (или даже десятков клонов) сервиса, каждый из которых обрабатывает только часть символов. Тем не менее, результат впечатляющий для интерпретируемого языка, который ориентирован в первую очередь на хранение данных.

    Может возникнуть вопрос, почему время растет нелинейно вместе с размером каждого апдейта. Причина в том, что ужимающая функция – это фактически С функция, которая работает гораздо эффективнее updateAgg. Начиная с какого-то размера апдейта (в районе 10.000), updateAgg достигает своего потолка и дальше ее время выполнения не зависит от размера апдейта. Именно за счет предварительного шага Q сервис в состоянии переваривать такие объемы данных. Это подчеркивает, насколько важно, работая с большими данными, выбирать правильный алгоритм. Еще один момент – правильное хранение данных в памяти. Если бы данные хранились не по-колоночно или не были упорядочены по времени, то мы бы познакомились с такой вещью, как TLB cache miss – отсутствие адреса страницы памяти в кэше адресов процессора. Поиск адреса занимает где-то в 30 раз больше времени в случае неудачи и в случае рассеянных данных может замедлить сервис в несколько раз.

    Заключение


    В этой статье я показал, что база KDB+ и Q пригодны не только для хранения больших данных и простого доступа к ним через селект, но и для создания сервисов обработки данных, которые способны переваривать сотни миллионов строк/гигабайты данных даже в одном отдельно взятом Q процессе. Сам язык Q позволяет исключительно кратко и эффективно реализовывать алгоритмы, связанные с обработкой данных за счет своей векторной природы, встроенного интерпретатора диалекта SQL и очень удачного набора библиотечных функций.

    Я замечу, что изложенное выше, это лишь часть возможностей Q, у него есть и другие уникальные особенности. Например, чрезвычайно простой IPC протокол, который стирает границу между отдельными Q процессами и позволяет объединять сотни этих процессов в единую сеть, которая может располагаться на десятках серверов в разных концах света.
    Технологический Центр Дойче Банка
    Компания

    Комментарии 1

      –2

      Q/KDB — откровенное говно, которое к тому-же ещё и афигенных денег стоит.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое