В предыдущей статье мы остановились на варианте, который с помощью SWAR-хинта превращает 8 последовательных цифр в одно числовое 32bit-значение. Но что если мы предположим, что все значения у нас, в основном, невелики - до 3 цифр? Тогда нам вполне достаточно использовать всего лишь 32bit-инструкции, а SWAR будет выполнен за 2 операции вместо 3 - сплошной выигрыш!
Давайте перепишем наш код так, чтобы первый блок из 4 символов обрабатывался 32bit-инструкциями, а если понадобится второй блок из 8 символов - уже 64bit-инструкциями.
И... вместо 26ms получаем 28ms! Значит, наше предположение относительно длины чисел не оправдалось, и в первом блоке выгоднее обрабатывать сразу побольше символов.

То есть больше размерность регистра - лучше? И такие регистры есть - это 128-битные SSE-регистры XMM - в WebAssembly они доступны нам как переменные с типом v128 и суперскалярные операции над ними.
Обратите внимание, что пока не все эти операции реализованы в разных JavaScript-движках.
В зависимости от используемой инструкции v128-регистр интерпретируется либо как единый блок (например, для битовых логических операций and, or, xor, not), либо как набор блоков меньшей размерности:

Как включить SIMD в Node.js
Для начала нам придется немного доработать наш "компилятор" compile-test.js: и включить дополнительные расширения:
const wasmModule = wabt.parseWat( inputWat , readFileSync(inputWat, 'utf8') , {simd : true} // включаем поддержку SIMD-feature );
Посмотреть, какие существуют другие дополнительные "фичи" компиляции в WASM можно на демо-странице wat2wasm.
Поскольку в Node.js пока поддержка SIMD проходит в качестве экспериментальной, запускать наш парсер придется с включенными соответствующими V8-опциями:
--experimental-wasm-simd (enable prototype SIMD opcodes for wasm) type: bool default: false --wasm-simd-post-mvp (allow experimental SIMD operations for prototyping that are not included in the current proposal) type: bool default: false
Теперь команда на запуск выглядит вот так:
node --experimental-wasm-simd --wasm-simd-post-mvp --expose-gc buffers-test.js
SIMD-парсинг натуральных чисел на WASM
Собственно, нам необходимо решить те же задачи, что и для SWAR-варианта, но уже с учетом возможностей v128-инструкций. За основу возьмем алгоритм, приведенный на StackOverflow.
Узнаем длину числа
вычитаем "нулевой" символ
0x30из каждого, с переходом через 0 - это означает, что наши стоп-символы0x0A, 0x20 и 0x2Cпревратятся в0xDA, 0xF0 и 0xFCсоответственновычисляем для каждого байта, что он
<= 0x09- то есть исходно находился в диапазоне0x30..0x39, что соответствует символам'0'..'9'находим первый
false-байт с помощьюctz(not(bitmask))- это и есть позиция нашего стоп-символа
Для реализации этого в коде нам понадобятся две глобальные переменные-"константы": "все нули" и "все девятки":
(global $all_zero v128 (v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30) ) (global $all_nine v128 (v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09) )
;; разбор числа - SIMD (func $parseNumber (local $ch i32) (local $xword v128) (local $xn i32) ;; xword = line[off..+15] (local.set $xword (v128.load align=4 (global.get $off) ) ) ;; xn = num digits (local.set $xn (i32.ctz ;; * = 5 (i32.xor ;; 0b... 1 0 0 0 0 0 (i8x16.bitmask ;; 0b... 0 1 1 1 1 1 (i8x16.le_u ;; 0x... 00 FF FF FF FF FF (i8x16.sub ;; 0x... F0 05 04 03 02 01 (local.get $xword) ;; 0x... 20 35 34 33 32 31 (global.get $all_zero) ;; 0x... 30 30 30 30 30 30 ) (global.get $all_nine) ;; 0x... 09 09 09 09 09 09 ) ) (i32.const 0xFFFFFFFF) ;; 0b... 1 1 1 1 1 1 ) ) )
Определяем стоп-символ

Просто так взять xn-й байт из v128-значения нельзя - нет соответствующей инструкции. Зато есть i8x16.extract_lane, которая позволяет взять заранее указанный байт (нас вполне устроит #0).
Но чтобы в нулевом байте оказался xn-й, нужно "переложить" байты в соответствии с маской с помощью swizzle.
А соответствующую маску, состоящую из единственного повторяющегося значения xn, нам позволит сделать инструкция splat.
;; ch = xword[xn] (local.set $ch ;; ch = 0x20 (i8x16.extract_lane_u ;; xmm[0] 0 ;; константа (i8x16.swizzle ;; 0x... 20 20 20 20 20 20 (local.get $xword) ;; 0x... 20 35 34 33 32 31 (i8x16.splat ;; 0x... 05 05 05 05 05 05 (local.get $xn) ;; xn = 5 ) ) ) )
Нормализуем число
Аналогичным образом, с помощью swizzle "развернем" число, чтобы младший разряд оказался в младшем байте, и сразу зачистим от не-цифр за счет свойства этой операции "занулять" байты, которым соответствуют индексы больше 0x0F.
(global $shuf_reverse v128 (v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00) ) (global $all_digit v128 (v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F) )
;; clean & reverse (local.set $xword (v128.and ;; 0x... 00 01 02 03 04 05 (i8x16.swizzle ;; 0x... 00 31 32 33 34 35 (local.get $xword) ;; 0x... 20 35 34 33 32 31 (i8x16.sub ;; 0x... FF 00 01 02 03 04 (global.get $shuf_reverse) ;; 0x... 0A 0B 0C 0D 0E 0F (i8x16.splat ;; 0x... 0B 0B 0B 0B 0B 0B (i32.sub ;; = 0x0B (i32.const 16) (local.get $xn) ;; xn = 5 ) ) ) ) (global.get $all_digit) ;; 0x... 0F 0F 0F 0F 0F 0F ) )
SIMD-конвертация
Итак, мы получили состояние переменной (читай, регистра), когда в каждом из 16 байтов находятся значения цифр, позиционно соответствующие десятичным разрядам числа, которое мы хотим получить - в младшем байте младший разряд:
P O N M L K J I H G F E D C B A == A * 10^0 + B * 10^1 + C * 10^2 + D * 10^3 + ...
В идеале, для реализации алгоритма нам подошли бы инструкции расширенного умножения типа i16x8.extmul_[low|high]_i8x16_u и расширенного попарного сложения i16x8.extadd_pairwise_i8x16_u.
Первая из этих инструкций умножает нижние/верхние половины пары XMM-регистров, воспринимая их как набор 8bit-чисел на входе, расширяя выход до 16bit-чисел:
0x... FC FD FE FF x x x x 0x... 04 03 02 01 = = = = 0x... 03F0 02F7 01FC 00FF = i16x8.extmul_low_i8x16_u
Вторая попарно складывает соседние "числа" в регистре:
0x... FC + FD FE + FF 0x... 01F9 01FD = i16x8.extadd_pairwise_i8x16_u
Поскольку они пока не поддерживаются, нам придется их эмулировать с помощью доступных инструкций.
Итак, сначала мы выделим из нашего регистра с помощью i16x8.widen_low_i8x16_u нижнюю половину, расширив 8bit-числа до 16bit и перемножим с маской степеней 10:
(global $stepA_pow10 v128 (v128.const i16x8 1 10 100 1000 1 10 100 1000) ) ;; ... ;; i16x8.extmul_low_i8x16_u - not implemented (local.set $xtempL (i16x8.mul ;; H000 G00 F0 E D000 C00 B0 A (i16x8.widen_low_i8x16_u ;; 0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A (local.get $xword) ;; ... H G F E D C B A ) (global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1 ) )
Степени выбраны с таким умыслом, чтобы ни в одной из 16bit-ячеек результат не мог превзойти 2^16.
Затем эмулируем extadd_pairwise с помощью битового сдвига, сложения и, когда необходимо, наложения битовой маски:
;; i32x4.extadd_pairwise_i16x8_u - not implemented (local.set $xtempL (i16x8.add ;; H000 HG00 F0 FE D000 DC00 B0 BA (i32x4.shr_u ;; 0 H000 0 F0 0 D000 0 B0 (local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A (i32.const 16) ) (local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A ) )
То же самое проделаем со старшими байтами регистра:
(local.set $xtempH (i16x8.mul ;; P000 O00 N0 M L000 K00 J0 I (i16x8.widen_high_i8x16_u ;; 0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I (local.get $xword) ;; P O N M L K J I ... ) (global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1 ) ) (local.set $xtempH (i16x8.add ;; P000 PO00 N0 NM L000 LK00 J0 JI (i32x4.shr_u ;; 0 P000 0 N0 0 L000 0 J0 (local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I (i32.const 16) ) (local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I ) )
Упакуем оба полученных значения обратно в один регистр, отбросив лишние блоки с помощью shuffle:
;; shuffle (local.set $xword (i8x16.shuffle ;; PO00 NM LK00 JI HG00 FE DC00 BA 0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D (local.get $xtempL) ;; H000 HG00 F0 FE D000 DC00 B0 BA (local.get $xtempH) ;; P000 PO00 N0 NM L000 LK00 J0 JI ) )
Повторим сложение-умножение соседних блоков еще дважды - для i16x8 и i32x4, уже с наложением маски:
(global $stepB_pow10 v128 (v128.const i32x4 1 10000 100000000 0) ) (global $stepB_mask v128 (v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000) ) ;; ... (local.set $xword (i16x8.add ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA (i32x4.shr_u ;; 0 PO00 0 LK00 0 HG00 0 DC00 (local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA (i32.const 16) ) (local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA ) ) (local.set $xword (v128.and ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA (local.get $xword) ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA (global.get $stepA_mask) ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x... ) ) (local.set $xword (i32x4.mul ;; 0 LKJI00000000 HGFE0000 DCBA (local.get $xword) ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA (global.get $stepB_pow10) ;; 0 100000000 10000 1 ) ) (local.set $xword (i32x4.add ;; 0 LKJI00000000 HGFE0000 HGFEDCBA (i64x2.shr_u ;; 0 0 0 HGFE0000 (local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA (i32.const 32) ) (local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA ) ) (local.set $xword (v128.and ;; 0 LKJI00000000 0 HGFEDCBA (local.get $xword) ;; 0 LKJI00000000 HGFE0000 HGFEDCBA (global.get $stepB_mask) ;; 00000000 FFFFFFFF 00000000 FFFFFFFF 0x... ) )
Заметим, что самый старший блок цифр 'PONM' мы потеряли при умножениях, поскольку он не влез в размерность i32. Если все-таки предполагаются значения такой длины, данный сегмент можно вытащить отдельно через extract_lane.
Полный код SIMD-версии
(module (import "js" "memory" (memory 1)) ;; текущее смещение "курсора" (global $off (import "js" "off") (mut i32)) ;; текущее значение (global $val (mut i64) (i64.const 0) ) (global $mask (mut i32) (i32.const 0) ) (global $state (mut i32) (i32.const 0) ) (global $key (mut i32) (i32.const 0) ) (global $stepA_pow10 v128 (v128.const i16x8 1 10 100 1000 1 10 100 1000) ) (global $stepB_pow10 v128 (v128.const i32x4 1 10000 100000000 0) ) (global $stepA_mask v128 (v128.const i16x8 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000) ) (global $stepB_mask v128 (v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000) ) (global $all_zero v128 (v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30) ) (global $all_nine v128 (v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09) ) (global $shuf_reverse v128 (v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00) ) (global $all_digit v128 (v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F) ) ;; таблица косвенной адресации функций (table 128 anyfunc) (elem (i32.const 0) $null ;; 00 $null ;; 1 $null ;; 2 $null ;; 3 $null ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F $null ;; 10 $null ;; 1 $null ;; 2 $null ;; 3 $null ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F $null ;; 20 $null ;; 1 $null ;; 2 $null ;; 3 $null ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F $null ;; 30 $null ;; 1 $null ;; 2 $null ;; 3 $null ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F $null ;; 40 $null ;; 1 $null ;; 2 $null ;; 3 $null ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F $null ;; 50 $null ;; 1 $null ;; 2 $null ;; 3 $null ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F $null ;; 60 $null ;; 1 $null ;; 2 $null ;; 3 $step_d ;; 4 $null ;; 5 $null ;; 6 $null ;; 7 $step_h ;; 8 $null ;; 9 $null ;; A $null ;; B $step_l ;; C $null ;; D $null ;; E $null ;; F $null ;; 70 $null ;; 1 $step_r ;; 2 $step_s ;; 3 $step_t ;; 4 $null ;; 5 $null ;; 6 $step_w ;; 7 $null ;; 8 $null ;; 9 $null ;; A $null ;; B $null ;; C $null ;; D $null ;; E $null ;; F ) ;; обновляем маску атрибутов и значение (func $setData ;; buf[key << 3] = val (i64.store (i32.shl (global.get $key) (i32.const 3) ) (global.get $val) ) ;; mask |= 1 << key (global.set $mask (i32.or (global.get $mask) (i32.shl (i32.const 1) (global.get $key) ) ) ) ) (func $step (param $off_v i32) ;; $off += $off_v (global.set $off (i32.add (global.get $off) (local.get $off_v) ) ) ) ;; разбор числа - SIMD (func $parseNumber (local $ch i32) (local $xword v128) (local $xn i32) (local $xtempL v128) (local $xtempH v128) ;; xword = line[off..+15] (local.set $xword (v128.load align=4 (global.get $off) ) ) ;; xn = num digits (local.set $xn (i32.ctz (i32.xor (i8x16.bitmask (i8x16.le_u (i8x16.sub (local.get $xword) (global.get $all_zero) ) (global.get $all_nine) ) ) (i32.const 0xFFFFFFFF) ) ) ) ;; ch = xword[xn] (local.set $ch ;; ch = 0x20 (i8x16.extract_lane_u ;; xmm[0] 0 ;; константа (i8x16.swizzle ;; 0x... 20 20 20 20 20 20 (local.get $xword) ;; 0x... 20 35 34 33 32 31 (i8x16.splat ;; 0x... 05 05 05 05 05 05 (local.get $xn) ;; xn = 5 ) ) ) ) ;; clean & reverse (local.set $xword (v128.and ;; 0x... 00 01 02 03 04 05 (i8x16.swizzle ;; 0x... 00 31 32 33 34 35 (local.get $xword) ;; 0x... 20 35 34 33 32 31 (i8x16.sub ;; 0x... FF 00 01 02 03 04 (global.get $shuf_reverse) ;; 0x... 0A 0B 0C 0D 0E 0F (i8x16.splat ;; 0x... 0B 0B 0B 0B 0B 0B (i32.sub ;; = 0x0B (i32.const 16) (local.get $xn) ;; xn = 5 ) ) ) ) (global.get $all_digit) ;; 0x... 0F 0F 0F 0F 0F 0F ) ) ;; i16x8.extmul_low_i8x16_u - not implemented (local.set $xtempL (i16x8.mul ;; H000 G00 F0 E D000 C00 B0 A (i16x8.widen_low_i8x16_u ;; 0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A (local.get $xword) ;; ... H G F E D C B A ) (global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1 ) ) ;; i32x4.extadd_pairwise_i16x8_u - not implemented (local.set $xtempL (i16x8.add ;; H000 HG00 F0 FE D000 DC00 B0 BA (i32x4.shr_u ;; 0 H000 0 F0 0 D000 0 B0 (local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A (i32.const 16) ) (local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A ) ) (local.set $xtempH (i16x8.mul ;; P000 O00 N0 M L000 K00 J0 I (i16x8.widen_high_i8x16_u ;; 0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I (local.get $xword) ;; P O N M L K J I ... ) (global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1 ) ) (local.set $xtempH (i16x8.add ;; P000 PO00 N0 NM L000 LK00 J0 JI (i32x4.shr_u ;; 0 P000 0 N0 0 L000 0 J0 (local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I (i32.const 16) ) (local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I ) ) ;; shuffle (local.set $xword (i8x16.shuffle ;; PO00 NM LK00 JI HG00 FE DC00 BA 0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D (local.get $xtempL) ;; H000 HG00 F0 FE D000 DC00 B0 BA (local.get $xtempH) ;; P000 PO00 N0 NM L000 LK00 J0 JI ) ) (local.set $xword (i16x8.add ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA (i32x4.shr_u ;; 0 PO00 0 LK00 0 HG00 0 DC00 (local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA (i32.const 16) ) (local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA ) ) (local.set $xword (v128.and ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA (local.get $xword) ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA (global.get $stepA_mask) ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x... ) ) (local.set $xword (i32x4.mul ;; 0 LKJI00000000 HGFE0000 DCBA (local.get $xword) ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA (global.get $stepB_pow10) ;; 0 100000000 10000 1 ) ) (local.set $xword (i32x4.add ;; 0 LKJI00000000 HGFE0000 HGFEDCBA (i64x2.shr_u ;; 0 0 0 HGFE0000 (local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA (i32.const 32) ) (local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA ) ) (local.set $xword (v128.and ;; 0 LKJI00000000 0 HGFEDCBA (local.get $xword) ;; 0 LKJI00000000 HGFE0000 HGFEDCBA (global.get $stepB_mask) ;; 00000000 FFFFFFFF 00000000 FFFFFFFF 0x... ) ) ;; return (global.set $val (i64.add (i64x2.extract_lane 0 (local.get $xword) ) (i64x2.extract_lane 1 (local.get $xword) ) ) ) ;; шагаем на n символов ;; ch == '\n' || ch == ' ' => +1 ;; ch == ',' => +2 (call $step (i32.add (local.get $xn) (i32.add (i32.const 1) (i32.eq (local.get $ch) (i32.const 0x2C) ) ) ) ) ) ;; [state, off] = [state_v, off + off_v] (func $iterate0 (param $state_v i32) (param $off_v i32) ;; state = state_v (global.set $state (local.get $state_v) ) ;; off += off_v (call $step (local.get $off_v) ) ) ;; [key, off] = [state + state_v, off + off_v] (func $iterate1 (param $state_v i32) (param $off_v i32) ;; key = state + state_v (global.set $key (i32.add (global.get $state) (local.get $state_v) ) ) ;; off += off_v (call $step (local.get $off_v) ) ) (func $null (result i32) i32.const 0 ) (func $step_s (result i32) ;; shared ;; $state = 0 ;; $off += 7 (call $iterate0 (i32.const 0) (i32.const 7) ) i32.const 1 ) (func $step_l (result i32) ;; local ;; $state = 4 ;; $off += 6 (call $iterate0 (i32.const 4) (i32.const 6) ) i32.const 1 ) (func $step_t (result i32) ;; temp ;; $state = 8 ;; $off += 5 (call $iterate0 (i32.const 8) (i32.const 5) ) i32.const 1 ) (func $step_h (result i32) ;; hit ;; key = state + 0; ;; $off += 4 (call $iterate1 (i32.const 0) (i32.const 4) ) ;; call $parseNumber call $setData i32.const 1 ) (func $step_r (result i32) ;; read ;; key = state + 1; ;; $off += 5 (call $iterate1 (i32.const 1) (i32.const 5) ) ;; call $parseNumber call $setData i32.const 1 ) (func $step_d (result i32) ;; dirtied ;; key = state + 2; ;; $off += 8 (call $iterate1 (i32.const 2) (i32.const 8) ) ;; call $parseNumber call $setData i32.const 1 ) (func $step_w (result i32) ;; written ;; key = state + 3; ;; $off += 8 (call $iterate1 (i32.const 3) (i32.const 8) ) ;; call $parseNumber call $setData i32.const 1 ) (func (export "parseBuffers") (result i32) ;; mask = 0 (global.set $mask (i32.const 0) ) ;; off += 'Buffers: '.length (global.set $off (i32.add (global.get $off) (i32.const 9) ) ) ;; for (...) (block (loop ;; if (table[line[off]]()) continue; (br_if 0 (call_indirect (result i32) (i32.load8_u (global.get $off) ) ) ) ;; break, end loop ) ) ;; сохраняем текущее смещение (i32.store (i32.const 0xFFFC) ;; 16383 * 4 (global.get $off) ) ;; возвращаем маску global.get $mask ) )
Запускаем! Увы, наши ожидания не оправдались - время ухудшилось с 26ms до 30ms. Неужели мы написали такую плохую реализацию конвертации чисел?
Давайте выключим все эти вычисления, оставив лишь вычисление количества цифр xn и стоп-символа ch - и все равно уже 27ms!
Так что основные потери у нас возникают уже на чтении слишком больших блоков, когда нам это вовсе не требуется. Представьте - мы прочитали 16 байт, обнаружили там пусть даже 7 цифр плюс стоп-символ, но остальные-то 8 байт попросту выкинули и перечитаем их заново.
Так что SIMD - полезно, но не всегда!
Заметки на полях
Можно ли ускорить приведенный выше алгоритм? Конечно!
Число можно парсить в
v128, даже если оно находится не в самом начале, если использоватьi8x16.add_sat_uдля добавления смещения.Вычленить ключевые символы (
s, l, t, h, r, d, w) можно прямо вxword, если сначала получить битовую маску "послепробельных" символов, а затем конвертировать ее в позиционное представление нужныхi8-блоков.Вместо таблицы функций можно использовать таблицу соответствий "ключевой символ" =>
{тип, длина, значение}.
Полный код buffers-test.js
const { readFileSync } = require('fs'); const fn = 'buffers-test'; const run = async () => { const buffer = readFileSync(`${fn}.wasm`); const module = await WebAssembly.compile(buffer); const memory = new WebAssembly.Memory({initial : 1}); const data = readFileSync('buffers.txt'); memory.grow((data.byteLength >> 16) + 8); const prj32 = new Uint32Array(memory.buffer); const prj8 = new Uint8Array(memory.buffer); // единственный раз передадим исходное смещение через переменную const off = new WebAssembly.Global({value : 'i32', mutable : true}, 0x80000); data.copy(prj8, off.value); // дописываем отсутствующий '\n' у последней строки prj8[off.value + data.length] = 0x0A; const importObject = { js : { memory , off } }; // 10^N для домножения первого сегмента let pow10 = 1; for (let i = 0; i < 8; i++) { prj32[(i << 1) + 32] = pow10; pow10 *= 10; } // таблица соответствий ключевых символов let keys = { 's' : 0x010700 // 'shared ' , 'l' : 0x010604 // 'local ' , 't' : 0x010508 // 'temp ' , 'h' : 0x000400 // 'hit=' , 'r' : 0x000501 // 'read=' , 'd' : 0x000802 // 'dirtied=' , 'w' : 0x000803 // 'written=' }; Object.entries(keys).forEach(([key, val]) => prj32[48 + key.charCodeAt()] = val); // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/clz32 const clz = Math.clz32; const ctz = integer => { integer |= integer << 16; integer |= integer << 8; integer |= integer << 4; integer |= integer << 2; integer |= integer << 1; return 32 - clz(~integer) | 0; }; // https://stackoverflow.com/questions/43122082/efficiently-count-the-number-of-bits-in-an-integer-in-javascript/43122214 const popcnt = n => { n = n - ((n >> 1) & 0x55555555) n = (n & 0x33333333) + ((n >> 2) & 0x33333333) return ((n + (n >> 4) & 0xF0F0F0F) * 0x1010101) >> 24 }; // обратная битовая маска for (let i = 0; i < (1 << 16); i++) { if (popcnt(i) < 5) { let r = 0xFFFFFFFF; for (let v = i; v; ) { let bp = 31 - clz(v); r <<= 8; r |= bp; v ^= 1 << bp; } prj32[16384 + i] = r; } } const instance = await WebAssembly.instantiate(module, importObject); const parseBuffersWASM = instance.exports.parseBuffers; const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written']; const m32to64 = 0x100000000; const parseBuffers = () => { let mask = parseBuffersWASM(); switch (mask) { case 1: return {'shared-hit' : prj32[0] + prj32[1] * m32to64}; case 2: return {'shared-read' : prj32[2] + prj32[3] * m32to64}; case 3: return { 'shared-hit' : prj32[0] + prj32[1] * m32to64 , 'shared-read' : prj32[2] + prj32[3] * m32to64 }; default: let rv = {}; for (let i = 0; mask; mask >>= 1, i++) { if (mask & 1) { let off = i << 1; rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64; } } return rv; } }; for (let lim = data.length + off.value; prj32[16383] < lim; ) { let obj = parseBuffers(); } off.value = 0x80000; prj32[16383] = off.value; const hrb = process.hrtime(); // -- 8< -- for (let lim = data.length + off.value; prj32[16383] < lim; ) { let obj = parseBuffers(); } // -- 8< -- const hre = process.hrtime(hrb); const usec = hre[0] * 1e+9 + hre[1]; console.log(usec); }; run();
Полный код buffers-test.wat
(module (import "js" "func" (func $js_func (param i32))) (import "js" "memory" (memory 1)) ;; текущее смещение "курсора" (global $off (import "js" "off") (mut i32)) ;; текущее значение (global $val (mut i64) (i64.const 0) ) (global $stepA_pow10 v128 (v128.const i16x8 1 10 100 1000 1 10 100 1000) ) (global $stepB_pow10 v128 (v128.const i32x4 1 10000 100000000 0) ) (global $stepA_mask v128 (v128.const i16x8 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000) ) (global $stepB_mask v128 (v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000) ) (global $all_zero v128 (v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30) ) (global $all_nine v128 (v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09) ) (global $shuf_straight v128 (v128.const i8x16 0x00 0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08 0x09 0x0A 0x0B 0x0C 0x0D 0x0E 0x0F) ) (global $shuf_reverse v128 (v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00) ) (global $all_digit v128 (v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F) ) (global $all_space v128 (v128.const i8x16 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20) ) (global $all_0A v128 (v128.const i8x16 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A) ) ;; обновляем маску атрибутов и значение в памяти (func $setData (param $key i32) (param $mask i32) (result i32) ;; buf[key << 3] = val (i64.store (i32.shl (local.get $key) (i32.const 3) ) (global.get $val) ) ;; mask |= 1 << key (i32.or (local.get $mask) (i32.shl (i32.const 1) (local.get $key) ) ) ) ;; разбор числа - SIMD (func $parseNumber (param $xword v128) (param $off i32) (result i32) (local $xn i32) (local $xtempL v128) (local $xtempH v128) ;; xn = num digits (local.set $xn (i32.ctz ;; ^ = 5 (i32.xor ;; 0b... 1 0 0 0 0 0 (i32.shr_u ;; 0b... 0 1 1 1 1 1 (i8x16.bitmask ;; 0b... 0 1 1 1 1 1 0 (i8x16.le_u ;; 0x... 00 FF FF FF FF FF 00 (i8x16.sub ;; 0x... F0 05 04 03 02 01 0D (local.get $xword) ;; 0x... 20 35 34 33 32 31 3D (global.get $all_zero) ;; 0x... 30 30 30 30 30 30 30 ) (global.get $all_nine) ;; 0x... 09 09 09 09 09 09 09 ) ) (local.get $off) ;; off = 1 ) (i32.const 0xFFFFFFFF) ;; 0b... 1 1 1 1 1 1 1 ) ) ) ;; clean & reverse (local.set $xword (v128.and ;; 0x... 00 00 01 02 03 04 05 (i8x16.swizzle ;; 0x... 00 00 31 32 33 34 35 (local.get $xword) ;; 0x... 20 35 34 33 32 31 3D (i8x16.add_sat_u ;; 0x... FF FF 01 02 03 04 05 (i8x16.sub ;; 0x... FF FF 00 01 02 03 04 (global.get $shuf_reverse) ;; 0x... 09 0A 0B 0C 0D 0E 0F (i8x16.splat ;; 0x... 0B 0B 0B 0B 0B 0B 0B (i32.sub ;; = 0x0B (i32.const 16) (local.get $xn) ;; xn = 5 ) ) ) (i8x16.splat ;; 0x... 01 01 01 01 01 01 01 (local.get $off) ;; off = 1 ) ) ) (global.get $all_digit) ;; 0x... 0F 0F 0F 0F 0F 0F 0F ) ) ;; i16x8.extmul_low_i8x16_u - not implemented (local.set $xtempL (i16x8.mul ;; H000 G00 F0 E D000 C00 B0 A (i16x8.widen_low_i8x16_u ;; 0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A (local.get $xword) ;; ... H G F E D C B A ) (global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1 ) ) ;; i32x4.extadd_pairwise_i16x8_u - not implemented (local.set $xtempL (i16x8.add ;; H000 HG00 F0 FE D000 DC00 B0 BA (i32x4.shr_u ;; 0 H000 0 F0 0 D000 0 B0 (local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A (i32.const 16) ) (local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A ) ) (local.set $xtempH (i16x8.mul ;; P000 O00 N0 M L000 K00 J0 I (i16x8.widen_high_i8x16_u ;; 0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I (local.get $xword) ;; P O N M L K J I ... ) (global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1 ) ) (local.set $xtempH (i16x8.add ;; P000 PO00 N0 NM L000 LK00 J0 JI (i32x4.shr_u ;; 0 P000 0 N0 0 L000 0 J0 (local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I (i32.const 16) ) (local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I ) ) ;; shuffle (local.set $xword (i8x16.shuffle ;; PO00 NM LK00 JI HG00 FE DC00 BA 0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D (local.get $xtempL) ;; H000 HG00 F0 FE D000 DC00 B0 BA (local.get $xtempH) ;; P000 PO00 N0 NM L000 LK00 J0 JI ) ) (local.set $xword (i16x8.add ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA (i32x4.shr_u ;; 0 PO00 0 LK00 0 HG00 0 DC00 (local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA (i32.const 16) ) (local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA ) ) (local.set $xword (v128.and ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA (local.get $xword) ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA (global.get $stepA_mask) ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x... ) ) (local.set $xword (i32x4.mul ;; 0 LKJI00000000 HGFE0000 DCBA (local.get $xword) ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA (global.get $stepB_pow10) ;; 0 100000000 10000 1 ) ) (local.set $xword (i32x4.add ;; 0 LKJI00000000 HGFE0000 HGFEDCBA (i64x2.shr_u ;; 0 0 0 HGFE0000 (local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA (i32.const 32) ) (local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA ) ) (local.set $xword (v128.and ;; 0 LKJI00000000 0 HGFEDCBA (local.get $xword) ;; 0 LKJI00000000 HGFE0000 HGFEDCBA (global.get $stepB_mask) ;; 00000000 FFFFFFFF 00000000 FFFFFFFF 0x... ) ) (global.set $val (i64.add (i64x2.extract_lane 0 (local.get $xword) ) (i64x2.extract_lane 1 (local.get $xword) ) ) ) ;; return (local.get $xn) ) (func (export "parseBuffers") (result i32) (local $ch i32) (local $chp i32) (local $chv i32) (local $chm i32) (local $chi i32) (local $ln i32) (local $chpos i32) (local $xword v128) (local $xn i32) (local $xbp i32) (local $eol i32) (local $esp i32) (local $vtemp i64) (local $mask i32) (local $state i32) (local $key i32) ;; mask = 0 (local.set $mask (i32.const 0) ) ;; val = 0 (global.set $val (i64.const 0) ) ;; off += 'Buffers: '.length (global.set $off (i32.add (global.get $off) (i32.const 9) ) ) ;; первый символ всегда "послепробельный" (local.set $esp (i32.const 1) ) (local.set $xbp (i32.const -1) ) ;; for (...) (block (loop ;; xword = line[off..+15] (local.set $xword (v128.load align=4 (global.get $off) ) ) ;; xbp == -1 -> не парсим сейчас число ;; xbp >= 0 -> начиная отсюда и парсим (if (i32.ne (local.get $xbp) (i32.const -1) ) (then ;; store temp (local.set $vtemp (global.get $val) ) ;; xn = num digits (local.set $xn (call $parseNumber (local.get $xword) (local.get $xbp) ) ) (if (i32.eqz (local.get $xbp) ) (then ;; val = val * 10 ^ memory[128 + (n << 3)]:i32 + vtemp (global.set $val (i64.add (i64.mul (local.get $vtemp) (i64.load align=4 (i32.add (i32.shl (local.get $xn) (i32.const 3) ) (i32.const 128) ) ) ) (global.get $val) ) ) ) ) ;; store value (local.set $mask (call $setData (local.get $key) (local.get $mask) ) ) ;; xbp = -1 (local.set $xbp (i32.const -1) ) ) ) ;; off += 16 (global.set $off (i32.add (global.get $off) (i32.const 16) ) ) ;; формируем битовую маску "послепробельных" символов (local.set $chm (i32.and (i32.add (i32.shl ;; space mask (i8x16.bitmask (i8x16.eq (local.get $xword) (global.get $all_space) ) ) (i32.const 1) ) ;; выставляем для первого символа, что он "послепробельный", ;; если пробел был последним символом предыдущего блока (local.get $esp) ) (i32.const 0xFFFF) ) ) ;; по таблице превращаем битовую маску в позиции битов ;; shared hit=1 read ;; 1000000100000100 => 0x FF 0D 07 00 ;; chp = memory[0x10000 + (chm << 2)]:i32 (local.set $chp (i32.load align=4 (i32.add (i32.const 0x10000) (i32.shl (local.get $chm) (i32.const 2) ) ) ) ) ;; превращаем позиции символов в их значения (local.set $chv (i32x4.extract_lane 0 (i8x16.swizzle (local.get $xword) (i32x4.replace_lane 0 (i8x16.splat (i32.const 0xFF) ) (local.get $chp) ) ) ) ) ;; перебираем символы согласно выставленным битам маски, пока она не обнулится (block (loop ;; if (chv == 0) break (br_if 1 (i32.eqz (local.get $chv) ) ) (local.set $chpos (i32.and (local.get $chp) (i32.const 0xFF) ) ) ;; ch = xword[chpos] (local.set $ch (i32.and (local.get $chv) (i32.const 0xFF) ) ) ;; ch = memory[0xC0 + ch] (local.set $ch (i32.load align=4 (i32.add (i32.shl (local.get $ch) (i32.const 2) ) (i32.const 0xC0) ) ) ) ;; ln = ch >> 8 (local.set $ln (i32.shr_u (local.get $ch) (i32.const 8) ) ) ;; позиция начала числа, смещение хранится в старшем байте ch (local.set $xbp (i32.and (i32.add (local.get $chpos) (local.get $ln) ) (i32.const 0xFF) ) ) ;; ch & 0x10000 (if (i32.eqz (i32.and (local.get $ch) (i32.const 0x10000) ) ) ;; key (then (local.set $key (i32.and (i32.add (local.get $state) (local.get $ch) ) (i32.const 0x0F) ) ) ;; если число начинается в следующем блоке - переходим к нему (if (i32.ge_u (local.get $xbp) (i32.const 0x10) ) (then ;; val = 0 (global.set $val (i64.const 0) ) ;; xbp -= 16 (local.set $xbp (i32.and (local.get $xbp) (i32.const 0x0F) ) ) (br 1) ) ) ;; xn = num digits (local.set $xn (call $parseNumber (local.get $xword) (local.get $xbp) ) ) ;; xbp = xbp + xn == 0x10 ? 0 : -1 (local.set $xbp (if (result i32) (i32.eq (i32.add (local.get $xbp) (local.get $xn) ) (i32.const 0x10) ) (then (i32.const 0) ) (else ;; store value (local.set $mask (call $setData (local.get $key) (local.get $mask) ) ) (i32.const -1) ) ) ) ) ;; state (else (local.set $state (i32.and (local.get $ch) (i32.const 0x0F) ) ) ) ) ;; chv >>= 8 (local.set $chv (i32.shr_u (local.get $chv) (i32.const 8) ) ) ;; chp >>= 8 (local.set $chp (i32.shr_u (local.get $chp) (i32.const 8) ) ) ;; do loop (br 0) ) ) ;; esp = xword[15] == 0x20 (local.set $esp (i32.eq (i8x16.extract_lane_u 15 (local.get $xword) ) (i32.const 0x20) ) ) ;; check EOL (local.set $eol (i8x16.bitmask (i8x16.eq (local.get $xword) (global.get $all_0A) ) ) ) ;; if (!eol) break (br_if 0 (i32.eqz (local.get $eol) ) ) ;; break, end loop ;; store value (local.set $mask (call $setData (local.get $key) (local.get $mask) ) ) ;; определяем позицию перевода строки (local.set $eol (i32.ctz (local.get $eol) ) ) ;; встаем на начало следующей строки ;; off += 15 - eol (global.set $off (i32.sub (i32.add (global.get $off) (local.get $eol) ) (i32.const 15) ) ) ;; break (br 1) ) ) ;; сохраняем текущее смещение (i32.store (i32.const 0xFFFC) ;; 16383 * 4 (global.get $off) ) ;; возвращаем маску (local.get $mask) ) )
Увы, все эти оптимизации не позволяют обогнать SWAR-код из прошлой части, а лишь добиться паритета с ним.
В целом, для нашей конкретной задачи использование SIMD оказывается малоэффективным, поскольку расстояния в памяти между значимой информацией достаточно велики, а сама она мала. Например, для полного декодирования такой строки длиной 47 символов достаточно прочитать всего 11 из них:
Buffers: shared hit=123 read=45, temp written=6 ^ ^ *** ^ ** ^ ^ *
Материалы:
