В первой части статьи мы исследовали скорость различных вариантов обмена информацией между JavaScript и WASM-кодом. В этом продолжении - наконец-то займемся написанием прикладного кода нашего парсера.
Мы ведь теперь пишем "прямо на ассемблере" - значит, все будет супербыстро! Правда ведь?
Портируем алгоритм наивно
Начнем с самых простых действий - опишем, где-какие данные будут лежать в разделяемой памяти.
JavaScript
первые 12 * 8 байт
[0x0000..0x005F]
используем для возврата 64bit-значений каждого из 12 потенциально возможных ключейданные в эти поля будет записывать импортируемая
parseBuffersWASM
в последней 32bit-ячейке первой страницы памяти
[0xFFFC..0xFFFF]
будем сохранять текущее положение "курсора" парсера[0x10000..]
содержит разбираемый контент
// ...
const data = readFileSync('buffers.txt');
memory.grow((data.byteLength >> 16) + 1);
// формируем проекции
const prj32 = new Uint32Array(memory.buffer);
const prj8 = new Uint8Array(memory.buffer);
// единственный раз передадим исходное смещение через переменную
const off = new WebAssembly.Global({value : 'i32', mutable : true}, 1 << 16);
// передаем содержимое файла, начиная с нужного смещения
data.copy(prj8, off.value);
// дописываем отсутствующий '\n' у последней строки
prj8[off.value + data.length] = 0x0A;
const importObject = {
js : {
memory
, off
}
};
const instance = await WebAssembly.instantiate(module, importObject);
const parseBuffersWASM = instance.exports.parseBuffers;
// набор наших ключей
const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
const lnBK = buffersKeys.length;
const parseBuffers = () => {
// зануляем все значения перед каждым вызовом
prj32.fill(0x0, 0, lnBK << 1); // 32bit-ячеек в 2 раза больше, чем 64bit-значений
parseBuffersWASM();
let rv = {};
// заполняем ненулевые значения ключей
for (let i = 0, off = 0; i < lnBK; i++) {
let val = prj32[off++] + prj32[off++] * 0x100000000;
if (val) {
rv[buffersKeys[i]] = val;
}
}
return rv;
};
// ...
// текущее смещение "курсора" вычитываем из памяти 0xFFFC
for (let lim = data.length + off.value; prj32[16383] < lim; ) {
let obj = parseBuffers();
}
// ...
WASM
Переносим наш JS-код конечного автомата из начала первой части "как есть":
все переменные, с которыми работают несколько функций - глобальные
повторяющиеся конструкции типа
off += off_v
вынесены в несколько вспомогательных функций
WASM-HSM
(module
(import "js" "memory" (memory 1))
;; текущее смещение "курсора"
(global $off (import "js" "off") (mut i32))
;; текущее начитываемое число
(global $val (mut i64)
(i64.const 0)
)
;; состояние HSM
(global $state (mut i32)
(i32.const 0)
)
;; позиция атрибута в массиве
(global $key (mut i32)
(i32.const 0)
)
;; обновляем значение в сегменте результатов
(func $setData
;; buf[key << 3] = val
(i64.store
(i32.shl
(global.get $key)
(i32.const 3)
)
(global.get $val)
)
)
;; $off += $off_v
(func $step (param $off_v i32)
(global.set $off
(i32.add
(global.get $off)
(local.get $off_v)
)
)
)
;; разбор числа - посимвольно
;; ' ', ',' и '\n' - стоп-символы
(func $parseNumber
(local $ch i32)
(global.set $val
(i64.const 0)
)
(block
(loop
(local.set $ch
(i32.load8_u
(global.get $off)
)
)
;; ch == ' '
(if
(i32.eq
(local.get $ch)
(i32.const 0x20)
)
(then
;; off++
(call $step (i32.const 1))
(return)
)
)
;; ch == ','
(if
(i32.eq
(local.get $ch)
(i32.const 0x2C)
)
(then
;; off += 2
(call $step (i32.const 2))
(return)
)
)
;; ch == '\n'
(if
(i32.eq
(local.get $ch)
(i32.const 0x0A)
)
(then
;; off++
(call $step (i32.const 1))
(return)
)
)
;; val = val * 10 + (ch - 0x30)
(global.set $val
(i64.add
(i64.mul
(global.get $val)
(i64.const 10)
)
(i64.extend_i32_u
(i32.sub
(local.get $ch)
(i32.const 0x30) ;; '0'
)
)
)
)
;; off++
(call $step (i32.const 1))
br 0 ;; do loop
)
)
)
;; [state, off] = [state_v, off + off_v]
(func $iterate0 (param $state_v i32) (param $off_v i32)
;; state = state_v
(global.set $state
(local.get $state_v)
)
;; off += off_v
(call $step
(local.get $off_v)
)
)
;; [key, off] = [state + state_v, off + off_v]
(func $iterate1 (param $state_v i32) (param $off_v i32)
;; key = state + state_v
(global.set $key
(i32.add
(global.get $state)
(local.get $state_v)
)
)
;; off += off_v
(call $step
(local.get $off_v)
)
)
;; аналог String.charCodeAt
(func $charAtEq (param $char i32) (result i32)
(i32.eq
(i32.load8_u
(global.get $off)
) ;; AL = line[$off] as unsigned
(local.get $char)
)
)
(func (export "parseBuffers")
;; off += 'Buffers: '.length
(global.set $off
(i32.add
(global.get $off)
(i32.const 9)
)
)
;; for (...)
(block
(loop
;; case 's' // shared
(if
(call $charAtEq (i32.const 0x73))
(then
;; $state = 0
;; $off += 7
(call $iterate0
(i32.const 0)
(i32.const 7)
)
br 1 ;; уходим на начало текущего цикла
)
)
;; case 'l' // local
(if
(call $charAtEq (i32.const 0x6c))
(then
;; $state = 4
;; $off += 6
(call $iterate0
(i32.const 4)
(i32.const 6)
)
br 1
)
)
;; case 't' // temp
(if
(call $charAtEq (i32.const 0x74))
(then
;; $state = 8
;; $off += 5
(call $iterate0
(i32.const 8)
(i32.const 5)
)
br 1
)
)
;; case 'h' // hit
(if
(call $charAtEq (i32.const 0x68))
(then
;; key = state + 0;
;; $off += 4
(call $iterate1
(i32.const 0)
(i32.const 4)
)
;;
call $parseNumber
call $setData
br 1
)
)
;; case 'r' // read
(if
(call $charAtEq (i32.const 0x72))
(then
;; key = state + 1;
;; $off += 5
(call $iterate1
(i32.const 1)
(i32.const 5)
)
;;
call $parseNumber
call $setData
br 1
)
)
;; case 'd' // dirtied
(if
(call $charAtEq (i32.const 0x64))
(then
;; key = state + 2;
;; $off += 8
(call $iterate1
(i32.const 2)
(i32.const 8)
)
;;
call $parseNumber
call $setData
br 1
)
)
;; case 'w' // written
(if
(call $charAtEq (i32.const 0x77))
(then
;; key = state + 3;
;; $off += 8
(call $iterate1
(i32.const 3)
(i32.const 8)
)
;;
call $parseNumber
call $setData
br 1
)
)
br 1
)
)
;; сохраняем текущее смещение
(i32.store
(i32.const 0xFFFC)
(global.get $off)
)
)
)
Запускаем…
76ms - ой, и это вместо 48ms исходного JS… Давайте разбираться.
Прокачиваем производительность
Маска возвращаемых данных
А ведь мы каждый раз все 96 байт, куда записываются возвращаемые результаты, затираем нулями - причем неважно, одно, два или 10 значений мы реально записывали. Да еще и читаем их все!
А ведь если вернуть информацию о том, какие именно ключи были записаны, то и от зануления можно отказаться, и от чтения лишнего. Сделать это весьма просто - достаточно вернуть из WASM-кода значение с битовой маской записанных ключей:
const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
const parseBuffers = () => {
let mask = parseBuffersWASM();
let rv = {};
// читаем только ячейки, соответствующие установленным битам маски
for (let i = 0; mask; mask >>= 1, i++) {
if (mask & 1) {
let off = i << 1;
rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * 0x100000000;
}
}
return rv;
};
;; ...
(global $mask (mut i32)
(i32.const 0)
)
;; ...
;; обновляем маску атрибутов и записываемое значение
(func $setData
;; ...
;; mask |= 1 << key
(global.set $mask
(i32.or
(global.get $mask)
(i32.shl
(i32.const 1)
(global.get $key)
)
)
)
)
;; ...
;; теперь наша функция возвращает значение маски
(func (export "parseBuffers") (result i32)
;; mask = 0
(global.set $mask
(i32.const 0)
)
;; ...
;; возвращаем маску
global.get $mask
)
Запускаем - уже 54ms, практически вплотную приблизились к исходному времени!
Используем статистические знания о данных
Если внимательно посмотреть на исходные данные, то станет очевидно, что большинство строк имеют либо вид 'shared hit=XXX'
, либо 'shared read=YYY'
, либо 'shared hit=XXX read=YYY'
, и существенно реже что-то другое - что вполне логично, ведь почти всегда узел плана или читает что-то из памяти, или с диска, или получает вверх по дереву накопленную сумму этих же показателей.
В нашем случае это означает значение маски 1, 2 или 3
, обработку которых мы можем пустить по сокращенному пути:
const m32to64 = 0x100000000;
const parseBuffers = () => {
let mask = parseBuffersWASM();
switch (mask) {
case 1:
return {'shared-hit' : prj32[0] + prj32[1] * m32to64};
case 2:
return {'shared-read' : prj32[2] + prj32[3] * m32to64};
case 3:
return {
'shared-hit' : prj32[0] + prj32[1] * m32to64
, 'shared-read' : prj32[2] + prj32[3] * m32to64
};
default:
let rv = {};
for (let i = 0; mask; mask >>= 1, i++) {
if (mask & 1) {
let off = i << 1;
rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64;
}
}
return rv;
}
};
Результат - 44ms, ура! Мы уже опередили исходный вариант. А еще быстрее - можно?
Условная адресация функций
Заметим, что в нашем WASM-коде есть несколько весьма похожих кусков типа такого:
;; case 'w' // written
(if
(call $charAtEq (i32.const 0x77))
(then
;; ...
Причем, если нам приходится проверять совпадение с 'w'
, то, значит, уже 6 неудачных сравнений до этого мы произвели - с вариантами 's', 'l', 't', 'h', 'r', 'd'
.
Чтобы избежать необходимости подобного перебора, в WASM используется инструкция call_indirect
, реализующая вызов функции в соответствии с заранее сформированной таблицей.
Собственно, для этого нам и придется преобразовать каждое такое условие в отдельную функцию, возвращающую 1
. 0
будет возвращать функция-заглушка, означающая возникновение неопознанного символа. В нашем случае это как раз будет '\n'
, сигнализирующая окончание строки:
;; ...
;; таблица косвенной адресации функций
(table 128 anyfunc)
(elem (i32.const 0)
$null ;; 00
$null ;; 1
;; ...
$step_d ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$step_h ;; 8
$null ;; 9
$null ;; A
$null ;; B
$step_l ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 70
$null ;; 1
$step_r ;; 2
$step_s ;; 3
$step_t ;; 4
$null ;; 5
$null ;; 6
$step_w ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
)
;; ...
(func $step_w (result i32) ;; written
;; key = state + 3;
;; $off += 8
(call $iterate1
(i32.const 3)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
;; ...
(func (export "parseBuffers") (result i32)
;; ...
;; for (...)
(block
(loop
;; if (table[line[off]]()) continue;
(br_if 0
(call_indirect (result i32)
(i32.load8_u
(global.get $off)
)
)
)
;; break, end loop
)
)
;; ...
Полный код таблицы и всех 7 функций
;; таблица косвенной адресации функций
(table 128 anyfunc)
(elem (i32.const 0)
$null ;; 00
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 10
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 20
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 30
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 40
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 50
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 60
$null ;; 1
$null ;; 2
$null ;; 3
$step_d ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$step_h ;; 8
$null ;; 9
$null ;; A
$null ;; B
$step_l ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 70
$null ;; 1
$step_r ;; 2
$step_s ;; 3
$step_t ;; 4
$null ;; 5
$null ;; 6
$step_w ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
)
;; ...
(func $null (result i32) ;; заглушка
i32.const 0
)
(func $step_s (result i32) ;; shared
;; $state = 0
;; $off += 7
(call $iterate0
(i32.const 0)
(i32.const 7)
)
i32.const 1
)
(func $step_l (result i32) ;; local
;; $state = 4
;; $off += 6
(call $iterate0
(i32.const 4)
(i32.const 6)
)
i32.const 1
)
(func $step_t (result i32) ;; temp
;; $state = 8
;; $off += 5
(call $iterate0
(i32.const 8)
(i32.const 5)
)
i32.const 1
)
(func $step_h (result i32) ;; hit
;; key = state + 0;
;; $off += 4
(call $iterate1
(i32.const 0)
(i32.const 4)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_r (result i32) ;; read
;; key = state + 1;
;; $off += 5
(call $iterate1
(i32.const 1)
(i32.const 5)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_d (result i32) ;; dirtied
;; key = state + 2;
;; $off += 8
(call $iterate1
(i32.const 2)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_w (result i32) ;; written
;; key = state + 3;
;; $off += 8
(call $iterate1
(i32.const 3)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func (export "parseBuffers") (result i32)
;; mask = 0
(global.set $mask
(i32.const 0)
)
;; off += 'Buffers: '.length
(global.set $off
(i32.add
(global.get $off)
(i32.const 9)
)
)
;; for (...)
(block
(loop
;; if (table[line[off]]()) continue;
(br_if 0
(call_indirect (result i32)
(i32.load8_u
(global.get $off)
)
)
)
;; break, end loop
)
)
;; сохраняем текущее смещение
(i32.store
(i32.const 0xFFFC) ;; 16383 * 4
(global.get $off)
)
;; возвращаем маску
global.get $mask
)
Такой вариант вызова позволяет отыграть еще немного и добиться результата в 40ms - почти 20% выигрыша!
Объединяем условия
Выше у нас приведен код, в котором стоп-символ сравнивается в худшем случае трижды - давайте объединим эти условия в одно:
(if
(i64.eq ;; n == 8
(local.get $n)
(i64.const 8)
)
(then
;; off += 8
(call $step
(i32.const 8)
)
)
(else
;; шагаем на n символов
;; ch == '\n' || ch == ' ' => +1
;; ch == ',' => +2
(call $step
(i32.add
(i32.wrap_i64
(local.get $n)
)
(i32.add
(i32.const 1)
(i64.eq
(local.get $ch)
(i64.const 0x2C)
)
)
)
)
(return)
)
)
Поскольку CPU достаточно сильно не любит ветвления, этот способ позволяет отыграть еще 2мс.
Прокачиваем алгоритм (SWAR)
Вспомним, что мы считываем число "посимвольно", а все развитие процессорной отрасли последние пару десятилетий говорит о том, что обрабатывать надо как можно больше данных за единственную операцию - то есть SIMD-подход в чистом виде.
Воспользуемся техникой, описанной в статье про парсинг double - зачем перебирать "поцифирно", если можно обрабатывать сразу 8 последовательных символов, считав их в регистр.
Основных сложностей тут две:
надо четко понимать, где/чем число заканчивается - пробелом (
0x20
), запятой (0x2C
) или концом строки (0x0A
), поскольку от этого зависит, через сколько символов дальше надо "перешагнуть"число может не умещаться в 8 байт
Узнаем длину числа
В нашем случае число '12345'
с пробелом на конце будет считано в регистр как 0x..203534333231 ...
. - и тут можно легко заметить, что любой из стоп-символов не превосходит 0x2F
, в то время как цифры - заведомо не меньше 0x30
.
Поэтому определить количество цифр в числе можно достаточно легко:
все цифры при наложении маски
& 0x10
дают ненулевой результат, а стоп-символы - нулевойнаходим первый с конца ненулевой байт с помощью инструкции
.ctz
, определяющей количество нулевых бит "в хвосте" числанезначимое для нас битовое смещение внутри байта (
ctz = 44
) усечется при делении на 8 с помощью битового сдвига
#5 #4 #3 #2 #1 #0 - старший <- младший
0x20 0x35 0x34 0x33 0x32 0x31 = qword:i64
0010 0000 0011 0101 0011 0100 0011 0011 0011 0010 0011 0001
1101 1111 1100 1010 1100 0100 1100 1100 1100 1101 1100 1110 = not(qword) = qword ^ -1
0001 0000 0001 0000 0001 0000 0001 0000 0001 0000 0001 0000 = mask:i64
0001 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 = not(qword) & mask
^ = .ctz >> 3 = 5 byte
;; позиция "нецифрового" символа
;; n = ctz >> 3
(local.set $n
(i64.shr_u
(i64.ctz
(i64.and ;; digit mask == not(d) & 0x10
(i64.xor ;; not(d) = d ^ -1
(local.get $qword)
(i64.const 0xFFFFFFFFFFFFFFFF)
)
(i64.const 0x1010101010101010) ;; digit mask
)
)
(i64.const 3)
)
)
Определяем стоп-символ
Чтобы получить значение первого нецифрового символа, сдвинем наше число на n << 3
бит вправо - самый младший байт и будет искомым:
;; nb = n << 3 // num bits
(local.set $nb
(i64.shl
(local.get $n)
(i64.const 3)
)
)
;; ch = line[off + n]
(local.set $ch
(i32.wrap_i64
(i64.and
(i64.shr_u
(local.get $qword)
(local.get $nb)
)
(i64.const 0xFF) ;; младший байт
)
)
)
Выравниваем число
Теперь, чтобы SWAR-хинт отработал как надо, необходимо обеспечить выравнивание числа в максимальную позицию:
;; top-align
;; qword <<= 64 - nb
(local.set $qword
(i64.shl
(local.get $qword)
(i64.sub
(i64.const 64)
(local.get $nb)
)
)
)
SWAR-конвертация
;; SWAR
;; https://habr.com/ru/company/ruvds/blog/542640/
;; v = ((v & 0x0F0F0F0F0F0F0F0F) * 2561) >> 8
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x0F0F0F0F0F0F0F0F)
)
(i64.const 2561)
)
(i64.const 8)
)
)
;; v = ((v & 0x00FF00FF00FF00FF) * 6553601) >> 16
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x00FF00FF00FF00FF)
)
(i64.const 6553601)
)
(i64.const 16)
)
)
;; v = ((v & 0x0000FFFF0000FFFF) * 42949672960001) >> 32
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x0000FFFF0000FFFF)
)
(i64.const 42949672960001)
)
(i64.const 32)
)
)
Решение для 64bit-чисел
В подавляющем большинстве ситуаций, числа будут умещаться в 7 знаков, а из 8-го мы извлечем стоп-символ. Но значения длиной 8 и более знаков тоже встречаются - для этого, вместо организации цикла, просто повторим SWAR для следующего 8-символьного блока.
Только надо не забыть, что второй блок может содержать 0 цифр, в отличие от первого.
В результате, мы получим в val
число из первых 8 символов, а в qword
- из следующего блока. Чтобы получить итоговый результат, надо val
домножить на соответствующую количеству цифр во втором блоке количество цифр:
'123456789' = '12345678' | '9'
val = 12345678, qword = 9, n = 1
val = val * 10 ^ n + qword
Чтобы не вычислять коэффициенты домножения каждый раз, занесем их сразу в память по смещению 128 (32 ячейки по 4 байта):
// 10^N для домножения первого 32bit-сегмента
let pow10 = 1;
for (let i = 0; i < 8; i++) {
prj32[i + 32] = pow10;
pow10 *= 10;
}
;; val = val * memory[128 + (n << 2)] + qword
(global.set $val
(i64.add
(i64.mul
(global.get $val)
(i64.extend_i32_u
(i32.load
(i32.add
(i32.shl
(i32.wrap_i64
(local.get $n)
)
(i32.const 2)
)
(i32.const 128)
)
)
)
)
(local.get $qword)
)
)
Полный код buffers-test.js
const { readFileSync } = require('fs');
const fn = 'buffers-test';
const run = async () => {
const buffer = readFileSync(`${fn}.wasm`);
const module = await WebAssembly.compile(buffer);
const memory = new WebAssembly.Memory({initial : 1});
const data = readFileSync('buffers.txt');
memory.grow((data.byteLength >> 16) + 1);
const prj32 = new Uint32Array(memory.buffer);
const prj8 = new Uint8Array(memory.buffer);
// единственный раз передадим исходное смещение через переменную
const off = new WebAssembly.Global({value : 'i32', mutable : true}, 1 << 16);
data.copy(prj8, off.value);
// дописываем отсутствующий '\n' у последней строки
prj8[off.value + data.length] = 0x0A;
const importObject = {
js : {
memory
, off
}
};
// предзаписываем 10^N в виде i64-ячеек для домножения первого блока
let pow10 = 1;
for (let i = 0; i < 8; i++) {
prj32[(i << 1) + 32] = pow10;
pow10 *= 10;
}
const instance = await WebAssembly.instantiate(module, importObject);
const parseBuffersWASM = instance.exports.parseBuffers;
const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
const m32to64 = 0x100000000;
const parseBuffers = () => {
let mask = parseBuffersWASM();
switch (mask) {
case 1:
return {'shared-hit' : prj32[0] + prj32[1] * m32to64};
case 2:
return {'shared-read' : prj32[2] + prj32[3] * m32to64};
case 3:
return {
'shared-hit' : prj32[0] + prj32[1] * m32to64
, 'shared-read' : prj32[2] + prj32[3] * m32to64
};
default:
let rv = {};
for (let i = 0; mask; mask >>= 1, i++) {
if (mask & 1) {
let off = i << 1;
rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64;
}
}
return rv;
}
};
const hrb = process.hrtime();
// -- 8< --
// в дальнейшем текущее смещение "курсора" вычитываем из памяти
for (let lim = data.length + off.value; prj32[16383] < lim; ) {
let obj = parseBuffers();
}
// -- 8< --
const hre = process.hrtime(hrb);
const usec = hre[0] * 1e+9 + hre[1];
console.log(usec);
};
run();
Полный код buffers-test.wat
(module
(import "js" "memory" (memory 1))
;; текущее смещение "курсора"
(global $off (import "js" "off") (mut i32))
;; текущее возвращаемое значение
(global $val (mut i64)
(i64.const 0)
)
(global $mask (mut i32)
(i32.const 0)
)
(global $state (mut i32)
(i32.const 0)
)
(global $key (mut i32)
(i32.const 0)
)
;; таблица косвенной адресации функций
(table 128 anyfunc)
(elem (i32.const 0)
$null ;; 00
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 10
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 20
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 30
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 40
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 50
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 60
$null ;; 1
$null ;; 2
$null ;; 3
$step_d ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$step_h ;; 8
$null ;; 9
$null ;; A
$null ;; B
$step_l ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 70
$null ;; 1
$step_r ;; 2
$step_s ;; 3
$step_t ;; 4
$null ;; 5
$null ;; 6
$step_w ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
)
;; обновляем маску атрибутов и значение
(func $setData
;; buf[key << 3] = val
(i64.store
(i32.shl
(global.get $key)
(i32.const 3)
)
(global.get $val)
)
;; mask |= 1 << key
(global.set $mask
(i32.or
(global.get $mask)
(i32.shl
(i32.const 1)
(global.get $key)
)
)
)
)
(func $step (param $off_v i32)
;; $off += $off_v
(global.set $off
(i32.add
(global.get $off)
(local.get $off_v)
)
)
)
;; разбор числа - SWAR
(func $parseNumber
(local $qword i64)
(local $n i64)
(local $nb i64)
(local $ch i64)
(global.set $val
(i64.const 0)
)
;; qword = line[off..+7]
(local.set $qword
(i64.load align=4
(global.get $off)
)
)
;; позиция "нецифрового" символа
;; n = num digits
(local.set $n
(i64.shr_u
(i64.ctz
(i64.and ;; digit mask == not(d) & 0x10
(i64.xor
(local.get $qword)
(i64.const 0xFFFFFFFFFFFFFFFF)
)
(i64.const 0x1010101010101010) ;; digit mask
)
)
(i64.const 3)
)
)
;; nb = n << 3 // num bits
(local.set $nb
(i64.shl
(local.get $n)
(i64.const 3)
)
)
;; ch = line[off + n]
(local.set $ch
(i64.and
(i64.shr_u
(local.get $qword)
(local.get $nb)
)
(i64.const 0xFF)
)
)
;; top-align
;; qword <<= 64 - nb
(local.set $qword
(i64.shl
(local.get $qword)
(i64.sub
(i64.const 64)
(local.get $nb)
)
)
)
;; SWAR
;; https://habr.com/ru/company/ruvds/blog/542640/
;; v = ((v & 0x0F0F0F0F0F0F0F0F) * 2561) >> 8
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x0F0F0F0F0F0F0F0F)
)
(i64.const 2561)
)
(i64.const 8)
)
)
;; v = ((v & 0x00FF00FF00FF00FF) * 6553601) >> 16
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x00FF00FF00FF00FF)
)
(i64.const 6553601)
)
(i64.const 16)
)
)
;; v = ((v & 0x0000FFFF0000FFFF) * 42949672960001) >> 32
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x0000FFFF0000FFFF)
)
(i64.const 42949672960001)
)
(i64.const 32)
)
)
(global.set $val
(local.get $qword)
)
(if
(i64.eq ;; n == 8
(local.get $n)
(i64.const 8)
)
(then
;; off += 8
(call $step
(i32.const 8)
)
)
(else
;; шагаем на n символов
;; ch == '\n' || ch == ' ' => +1
;; ch == ',' => +2
(call $step
(i32.add
(i32.wrap_i64
(local.get $n)
)
(i32.add
(i32.const 1)
(i64.eq
(local.get $ch)
(i64.const 0x2C)
)
)
)
)
(return)
)
)
;; qword = line[off..+7]
(local.set $qword
(i64.load
(global.get $off)
)
)
;; n = (ctz - 4) / 8 == ctz >> 3 // num digits
(local.set $n
(i64.shr_u
(i64.ctz
(i64.and
(i64.xor ;; digit mask == not(d) & 0x10
(local.get $qword)
(i64.const 0xFFFFFFFFFFFFFFFF)
)
(i64.const 0x1010101010101010) ;; digit mask
)
)
(i64.const 3)
)
)
;; nb = n << 3 // num bits
(local.set $nb
(i64.shl
(local.get $n)
(i64.const 3)
)
)
;; ch = line[off + n]
(local.set $ch
(i64.and
(i64.shr_u
(local.get $qword)
(local.get $nb)
)
(i64.const 0xFF)
)
)
;; if (n) ...
(if
(i32.wrap_i64
(local.get $n)
)
(then
;; top-align
;; qword <<= 64 - nb
(local.set $qword
(i64.shl
(local.get $qword)
(i64.sub
(i64.const 64)
(local.get $nb)
)
)
)
;; SWAR
;; https://habr.com/ru/company/ruvds/blog/542640/
;; v = ((v & 0x0F0F0F0F0F0F0F0F) * 2561) >> 8
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x0F0F0F0F0F0F0F0F)
)
(i64.const 2561)
)
(i64.const 8)
)
)
;; v = ((v & 0x00FF00FF00FF00FF) * 6553601) >> 16
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x00FF00FF00FF00FF)
)
(i64.const 6553601)
)
(i64.const 16)
)
)
;; v = ((v & 0x0000FFFF0000FFFF) * 42949672960001) >> 32
(local.set $qword
(i64.shr_u
(i64.mul
(i64.and
(local.get $qword)
(i64.const 0x0000FFFF0000FFFF)
)
(i64.const 42949672960001)
)
(i64.const 32)
)
)
;; val = val * 10 ^ memory[128 + (n << 3)]:i32 + qword
(global.set $val
(i64.add
(i64.mul
(global.get $val)
(i64.load align=8
(i32.wrap_i64
(i64.add
(i64.shl
(local.get $n)
(i64.const 3)
)
(i64.const 128)
)
)
)
)
(local.get $qword)
)
)
)
)
;; шагаем на n символов
;; ch == '\n' || ch == ' ' => +1
;; ch == ',' => +2
(call $step
(i32.add
(i32.wrap_i64
(local.get $n)
)
(i32.add
(i32.const 1)
(i64.eq
(local.get $ch)
(i64.const 0x2C)
)
)
)
)
)
;; [state, off] = [state_v, off + off_v]
(func $iterate0 (param $state_v i32) (param $off_v i32)
;; state = state_v
(global.set $state
(local.get $state_v)
)
;; off += off_v
(call $step
(local.get $off_v)
)
)
;; [key, off] = [state + state_v, off + off_v]
(func $iterate1 (param $state_v i32) (param $off_v i32)
;; key = state + state_v
(global.set $key
(i32.add
(global.get $state)
(local.get $state_v)
)
)
;; off += off_v
(call $step
(local.get $off_v)
)
)
(func $null (result i32)
i32.const 0
)
(func $step_s (result i32) ;; shared
;; $state = 0
;; $off += 7
(call $iterate0
(i32.const 0)
(i32.const 7)
)
i32.const 1
)
(func $step_l (result i32) ;; local
;; $state = 4
;; $off += 6
(call $iterate0
(i32.const 4)
(i32.const 6)
)
i32.const 1
)
(func $step_t (result i32) ;; temp
;; $state = 8
;; $off += 5
(call $iterate0
(i32.const 8)
(i32.const 5)
)
i32.const 1
)
(func $step_h (result i32) ;; hit
;; key = state + 0;
;; $off += 4
(call $iterate1
(i32.const 0)
(i32.const 4)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_r (result i32) ;; read
;; key = state + 1;
;; $off += 5
(call $iterate1
(i32.const 1)
(i32.const 5)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_d (result i32) ;; dirtied
;; key = state + 2;
;; $off += 8
(call $iterate1
(i32.const 2)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_w (result i32) ;; written
;; key = state + 3;
;; $off += 8
(call $iterate1
(i32.const 3)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func (export "parseBuffers") (result i32)
;; mask = 0
(global.set $mask
(i32.const 0)
)
;; off += 'Buffers: '.length
(global.set $off
(i32.add
(global.get $off)
(i32.const 9)
)
)
;; for (...)
(block
(loop
;; if (table[line[off]]()) continue;
(br_if 0
(call_indirect (result i32)
(i32.load8_u
(global.get $off)
)
)
)
;; break, end loop
)
)
;; сохраняем текущее смещение
(i32.store
(i32.const 0xFFFC) ;; 16383 * 4
(global.get $off)
)
;; возвращаем маску
global.get $mask
)
)
Проверяем - 32ms!
Влияние оптимизирующего компилятора
Поскольку наш код должен работать "на потоке", давайте посмотрим, какое время продемонстрирует он при повторном прогоне:
// первый прогон
for (let lim = data.length + off.value; prj32[16383] < lim; ) {
let obj = parseBuffers();
}
// сбрасываем смещения в начало файла
off.value = 1 << 16;
prj32[16383] = off.value;
const hrb = process.hrtime();
// -- 8< --
// замеряемый второй прогон
for (let lim = data.length + off.value; prj32[16383] < lim; ) {
let obj = parseBuffers();
}
// -- 8< --
Хм… почему-то стало 26ms. Приятно, что меньше, но - почему? Для ответа на этот вопрос посмотрим генерируемый байткод:
node --print-wasm-code buffers-test.js >_bytecode
В теле сформировавшегося файла поищем уникальную для нашего WASM-кода сигнатуру - например, "маску цифр" 1010101010101010
.
Что интересно, она найдется дважды:
--- WebAssembly code ---
index: 2
kind: wasm function
compiler: Liftoff
...
75 REX.W movq rdx,[rax+rcx*1] ; rdx = qword
79 REX.W movq rax,rdx ; rax = rdx
7c REX.W xorq rax,0xff ; rax = rax ^ 0xFF
80 REX.W movq rcx,1010101010101010 ; rcx = digit mask
8a REX.W andq rax,rcx ; rax = rax & rcx
8d REX.W bsfq rax,rax ; rax = ctz(rax)
--- WebAssembly code ---
index: 2
kind: wasm function
compiler: TurboFan
...
42 REX.W movq rbx,[rbx+r9*1] ; rbx = qword
46 REX.W movq rdx,rbx ; rdx = rbx
49 REX.W notq rdx ; rdx = not(rdx) <--
4c REX.W movq r9,1010101010101010 ; r9 = digit mask
56 REX.W andq rdx,r9 ; rdx = rdx & r9
59 REX.W bsfq rdx,rdx ; rdx = ctz(rdx)
Мы видим два примера ассемблерного кода, полученного из одного и того же WASM-кода. Например, среди WASM-инструкций банально отсутствует i32.not(v)
, которую вполне официально предлагается реализовывать через i32.xor(v, -1)
.
Собственно, именно это мы и видим в нашем участке кода. Компилятор Liftoff, хоть и считается более продвинутым, превратил такой хинт "по написанному" в xorq rax,0xff
, а TurboFan использовал более эффективный вариант notq rdx
.
А это только одна из множества мелких деталей, поэтому хорошая производительность результирующего ассемблерного кода в V8 - результат не только правильного алгоритма, но и все-таки определенной доли везения.
В следующей части посмотрим, как можно использовать вместо SWAR-хинта нативные SIMD-инструкции в WebAssembly, и сильно ли это поможет конкретно нам.
Материалы: