Как стать автором
Обновить

Возможности языка Q и KDB+ на примере сервиса реального времени

Время на прочтение13 мин
Количество просмотров6K
О том, что такое база KDB+, язык программирования Q, какие у них есть сильные и слабые стороны, можно прочитать в моей предыдущей статье и кратко во введении. В статье же мы реализуем на Q сервис, который будет обрабатывать входящий поток данных и высчитывать поминутно различные агрегирующие функции в режиме “реального времени” (т.е. будет успевать все посчитать до следующей порции данных). Главная особенность Q состоит в том, что это векторный язык, позволяющий оперировать не единичными объектами, а их массивами, массивами массивов и другими сложносоставными объектами. Такие языки как Q и родственные ему K, J, APL знамениты своей краткостью. Нередко программу, занимающую несколько экранов кода на привычном языке типа Java, можно записать на них в несколько строк. Именно это я и хочу продемонстрировать в этой статье.



Введение


KDB+ — это колоночная база данных, ориентированная на очень большие объемы данных, упорядоченные определенным образом (в первую очередь по времени). Используется она, в первую очередь, в финансовых организациях – банках, инвестиционных фондах, страховых компаниях. Язык Q – это внутренний язык KDB+, позволяющий эффективно работать с этими данными. Идеология Q – это краткость и эффективность, понятность при этом приносится в жертву. Обосновывается это тем, что векторный язык в любом случае будет сложен для восприятия, а краткость и насыщенность записи позволяет увидеть на одном экране гораздо большую часть программы, что в итоге облегчает ее понимание.

В статье мы реализуем полноценную программу на Q и вам, возможно, захочется попробовать ее в деле. Для этого вам понадобится собственно Q. Скачать бесплатную 32-битную версию можно на сайте компании kx – www.kx.com. Там же, если вам интересно, вы найдете справочную информацию по Q, книгу Q For Mortals и разнообразные статьи на эту тему.

Постановка задачи


Есть источник, который присылает каждые 25 миллисекунд таблицу с данными. Поскольку KDB+ применяется в первую очередь в финансах, будем считать, что это таблица сделок (trades), в которой есть следующие колонки: time (время в миллисекундах), sym (обозначение компании на бирже – IBM, AAPL,…), price (цена, по которой куплены акции), size (размер сделки). Интервал 25 миллисекунд выбран произвольно, он не слишком маленький и не слишком большой. Его наличие означает, что данные приходят в сервис уже буферизованные. Можно было бы легко реализовать буферизацию на стороне сервиса, в том числе динамическую, зависящую от текущей нагрузки, но для простоты остановимся на фиксированном интервале.

Сервис должен считать поминутно для каждого входящего символа из колонки sym набор агрегирующих функций – max price, avg price, sum size и т.п. полезную информацию. Для простоты мы положим, что все функции можно вычислять инкрементально, т.е. для получения нового значения достаточно знать два числа – старое и входящее значения. Например, функции max, average, sum обладают этим свойством, а функция медиана нет.

Также мы предположим, что входящий поток данных упорядочен по времени. Это даст нам возможность работать только с последней минутой. На практике достаточно уметь работать с текущей и предыдущей минутами на случай, если какие-то апдейты запоздали. Для простоты мы не будем рассматривать этот случай.

Агрегирующие функции


Ниже перечислены необходимые агрегирующие функции. Я взял их как можно больше, чтобы увеличить нагрузку на сервис:

  • high – max price – максимальная цена за минуту.
  • low – min price – минимальная цена за минуту.
  • firstPrice – first price – первая цена за минуту.
  • lastPrice – last price – последняя цена за минуту.
  • firstSize – first size – первый размер сделки за минуту.
  • lastSize – last size — последний размер сделки за минуту.
  • numTrades – count i – число сделок за минуту.
  • volume – sum size – сумма размеров сделок за минуту.
  • pvolume – sum price – сумма цен за минуту, необходимо для avgPrice.
  • turnover – sum price*size – суммарный объем сделок за минуту.
  • avgPrice – pvolume%numTrades – средняя цена за минуту.
  • avgSize – volume%numTrades – средний размер сделки за минуту.
  • vwap – turnover%volume – взвешенная по размеру сделки средняя цена за минуту.
  • cumVolume – sum volume – накопленный размер сделок за все время.

Сразу обсудим один неочевидный момент – как инициализировать эти колонки в первый раз и для каждой следующей минуты. Некоторые колонки типа firstPrice каждый раз нужно инициализировать значением null, их значение не определено. Другие типа volume нужно устанавливать всегда в 0. Еще есть колонки, которые требуют комбинированного подхода – например, cumVolume необходимо копировать из предыдущей минуты, а для первой установить в 0. Зададим все эти параметры используя тип данных словарь (аналог записи):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Я добавил sym и time в словарь для удобства, теперь initWith – это готовая строчка из финальной агрегированной таблицы, где осталось задать правильные sym и time. Можно использовать ее для добавления новых строк в таблицу.

aggCols нам понадобится при создании агрегирующей функции. Список необходимо инвертировать из-за особенностей порядка вычислений выражений в Q (справа налево). Цель – обеспечить вычисление в направлении от high к cumVolume, поскольку некоторые колонки зависят от предыдущих.

Колонки, которые нужно скопировать в новую минуту из предыдущей, колонка sym добавлена для удобства:

rollColumns:`sym`cumVolume;

Теперь разделим колонки на группы согласно тому, как их следует обновлять. Можно выделить три типа:

  1. Аккумуляторы (volume, turnover,..) – мы должны прибавить входящее значение к предыдущему.
  2. С особой точкой (high, low, ..) – первое значение в минуте берется из входящих данных, остальные считаются с помощью функции.
  3. Остальные. Всегда считаются с помощью функции.

Определим переменные для этих классов:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Порядок вычислений


Обновлять агрегированную таблицу мы будем в два этапа. Для эффективности мы сначала ужмем входящую таблицу так, чтобы там осталась одна строка для каждого символа и минуты. То, что все наши функции инкрементальные и ассоциативные, гарантирует нам, что результат от этого дополнительного шага не изменится. Ужать таблицу можно было бы с помощью селекта:

select high:max price, low:min price … by sym,time.minute from table

У этого способа есть минус – набор вычисляемых колонок задан заранее. К счастью, в Q селект реализован и как функция, куда можно подставить динамически созданные аргументы:

?[table;whereClause;byClause;selectClause]

Я не буду подробно описывать формат аргументов, в нашем случае нетривиальными будут только by и select выражения и они должны быть словарями вида columns!expressions. Таким образом, ужимающую функцию можно задать так:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Для понятности я использовал функцию parse, которая превращает строку с Q выражением в значение, которое может быть передано в функцию eval и которое требуется в функциональном селекте. Также отметим, что preprocess задана как проекция (т.е. функция с частично определенными аргументами) функции селект, один аргумент (таблица) отсутствует. Если мы применим preprocess к таблице, то получим ужатую таблицу.

Второй этап – это обновление агрегированной таблицы. Напишем сначала алгоритм в псевдокоде:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

В Q вместо циклов принято использовать функции map/reduce. Но поскольку Q – векторный язык и все операции мы можем спокойно применять ко всем символам сразу, то в первом приближении мы можем обойтись вообще без цикла, проделывая операции со всеми символами сразу:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Но мы можем пойти и дальше, в Q есть уникальный и исключительно мощный оператор – оператор обобщенного присваивания. Он позволяет изменить набор значений в сложной структуре данных используя список индексов, функций и аргументов. В нашем случае он выглядит так:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

К сожалению, для присвоения в таблицу нужен список строк, а не колонок, и приходится транспонировать матрицу (список колонок в список строк) с помощью функции flip. Для большой таблицы это накладно, поэтому вместо этого применим обобщенное присваивание к каждой колонке отдельно, используя функцию map (которая выглядит как апостроф):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Мы снова используем проекцию функции. Также заметьте, что в Q создание списка – это тоже функция и мы можем вызвать ее с помощью функции each(map), чтобы получить список списков.

Чтобы набор вычисляемых колонок не был фиксирован, создадим выражение выше динамически. Определим сначала функции для вычисления каждой колонки, используя переменные row и inp для ссылки на агрегированные и входные данные:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Некоторые колонки особые, их первое значение не должно вычисляться функцией. Мы можем определить, что оно первое по колонке row[`numTrades] – если в ней 0, то значение первое. В Q есть функция выбора — ?[Boolean list;list1;list2] – которая выбирает значение из списка 1 или 2 в зависимости от условия в первом аргументе:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Тут я вызвал обобщенное присваивание с моей функцией (выражение в фигурных скобках). В нее передается текущее значение (первый аргумент) и дополнительный аргумент, который я передаю в 4-м параметре.

Отдельно добавим аккумуляторные колонки, поскольку для них функция одна и та же:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Это обычное по меркам Q присваивание, только присваиваю я сразу список значений. Наконец, создадим главную функцию:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Этим выражением я динамически создаю функцию из строки, которая содержит выражение, которое я приводил выше. Результат будет выглядеть так:

{[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Порядок вычисления колонок инвертирован, поскольку в Q порядок вычисления справа налево.

Теперь у нас есть две основные функции, необходимые для вычислений, осталось добавить немного инфраструктуры и сервис готов.

Финальные шаги


У нас есть функции preprocess и updateAgg, которые делают всю работу. Но необходимо еще обеспечить правильный переход через минуты и вычислить индексы для агрегации. В первую очередь определим функцию init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Также определим функцию roll, которая будет менять текущую минуту:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Нам понадобится функция для добавления новых символов:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

И, наконец, функция upd (традиционное название этой функции для Q сервисов), которая вызывается клиентом, для добавления данных:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Вот и все. Вот полный код нашего сервиса, как и обещалось, всего несколько строк:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Тестирование


Проверим производительность сервиса. Для этого запустим его в отдельном процессе (поместите код в файл service.q) и вызовите функцию init:

q service.q –p 5566

q)init[]

В другой консоли запустите второй Q процесс и подсоединитесь к первому:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Сначала создадим список символов – 10000 штук и добавим функцию для создания случайной таблицы. Во второй консоли:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Я добавил в список символов три настоящих, чтобы было удобнее искать их в таблице. Функция rnd создает случайную таблицу с n строками, где время меняется от t до t+25 миллисекунд.

Теперь можно попробовать послать данные в сервис (добавим первые десять часов):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Можно проверить в сервисе, что таблица обновилась:

\c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Результат:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Проведем теперь нагрузочное тестирование, чтобы выяснить сколько данных сервис может обрабатывать в минуту. Напомню, что мы установили интервал для апдейтов в 25 миллисекунд. Соответственно, сервис должен (в среднем) укладываться хотя бы в 20 миллисекунд на апдейт, чтобы дать время пользователям запросить данные. Введите следующее во втором процессе:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 – это две минуты. Можно попробовать запустить сначала для 1000 строк каждые 25 миллисекунд:

start 1000

В моем случае результат получается в районе пары миллисекунд на апдейт. Так что я сразу увеличу количество строк до 10.000:

start 10000

Результат:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Снова ничего особенного, а ведь это 24 миллиона строк в минуту, 400 тысяч в секунду. Больше 25 миллисекунд апдейт тормозил только 5 раз, видимо при смене минуты. Увеличим до 100.000:

start 100000

Результат:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Как видим, сервис едва справляется, но тем не менее ему удается удержаться на плаву. Такой объем данных (240 миллионов строк в минуту) чрезвычайно велик, в таких случаях принято запускать несколько клонов (или даже десятков клонов) сервиса, каждый из которых обрабатывает только часть символов. Тем не менее, результат впечатляющий для интерпретируемого языка, который ориентирован в первую очередь на хранение данных.

Может возникнуть вопрос, почему время растет нелинейно вместе с размером каждого апдейта. Причина в том, что ужимающая функция – это фактически С функция, которая работает гораздо эффективнее updateAgg. Начиная с какого-то размера апдейта (в районе 10.000), updateAgg достигает своего потолка и дальше ее время выполнения не зависит от размера апдейта. Именно за счет предварительного шага Q сервис в состоянии переваривать такие объемы данных. Это подчеркивает, насколько важно, работая с большими данными, выбирать правильный алгоритм. Еще один момент – правильное хранение данных в памяти. Если бы данные хранились не по-колоночно или не были упорядочены по времени, то мы бы познакомились с такой вещью, как TLB cache miss – отсутствие адреса страницы памяти в кэше адресов процессора. Поиск адреса занимает где-то в 30 раз больше времени в случае неудачи и в случае рассеянных данных может замедлить сервис в несколько раз.

Заключение


В этой статье я показал, что база KDB+ и Q пригодны не только для хранения больших данных и простого доступа к ним через селект, но и для создания сервисов обработки данных, которые способны переваривать сотни миллионов строк/гигабайты данных даже в одном отдельно взятом Q процессе. Сам язык Q позволяет исключительно кратко и эффективно реализовывать алгоритмы, связанные с обработкой данных за счет своей векторной природы, встроенного интерпретатора диалекта SQL и очень удачного набора библиотечных функций.

Я замечу, что изложенное выше, это лишь часть возможностей Q, у него есть и другие уникальные особенности. Например, чрезвычайно простой IPC протокол, который стирает границу между отдельными Q процессами и позволяет объединять сотни этих процессов в единую сеть, которая может располагаться на десятках серверов в разных концах света.
Теги:
Хабы:
Всего голосов 20: ↑19 и ↓1+18
Комментарии1

Публикации

Информация

Сайт
dbtc-career.ru
Дата регистрации
Дата основания
2001
Численность
1 001–5 000 человек
Местоположение
Россия

Истории