Год назад я написал парсер CSV, способный обрабатывать 64 символа за раз. Он создан исключительно в исследовательских целях и в нём не учтены важнейшие этапы продакшен-парсера наподобие валидации. Сегодня я расскажу о базовом алгоритме, использующем SIMD и побитовые операции для групповой фильтрации структурных символов.

Если вы новичок в SIMD, то рекомендую сначала прочитать введение в SIMD McYoung. Вкратце же SIMD можно описать так:

  • Тактовые скорости CPU достигли плато примерно двадцать лет назад. Нельзя дальше разгонять ядра, иначе они расплавятся, поэтому мы не ускоряем обработку одного значения, а обрабатываем несколько значений одновременно (шире).

  • SIMD (single instruction, multiple data) позволяет выполнять одну и ту же операцию с фиксированной группой данных (обычно 16 или 32 байтами или даже 64 байтами) за то же время, которое требуется для обработки одного байта.

  • Код SIMD (или векторизованный код) наиболее эффективен, если в нём отсутствует ветвление, то есть в нём нет конструкций if, циклов и вызовов функций, и выполняются одни и те же операции вне зависимости от входных данных.

  • У каждой архитектуры есть свой набор команд SIMD. Взгляните на модуль Rust std::arch.

Статья о simdjson

По любой теме всегда есть пара выдающихся научных статей, которые стоит изучить для пространства задач. Например, как сказал Джозеф Берил Кошаков, «статьи об Amazon DynamoDB и Google Spanner — одни из самых канонических статей о базах данных, которые должны прочитать все разработчики баз данных».

Что касается SIMD, то обязательно стоит прочитать статью о simdjson. Парсинг JSON — хорошо изученная задача, но simdjson решает её, сканируя и обрабатывая 64 байта одновременно. Если вы предпочитаете, то можете посмотреть доклад соавтора статьи Дэниела Лемайра.

Daniel Lemire
Дэниел Лемайр и все остальные

Ниже я объясню некоторые из описанных в статье техник, которые я использовал для своего парсера CSV. Я пользуюсь командами SIMD ARM NEON, но в x86 есть полные эквиваленты их всех.

CSV и параллельное мышление

Вот, как выглядит типичный CSV:

name

age

location

alice

30

Irvine

bob

25

123 Union Square, New York New York

lonely charlie

28

where she said, "hi"to me

Сырой байтовый поток этого файла, соответствующий спецификации CSV RFC4180, выглядит так:

name,age,location\r\n
alice,30,Irvine\n
bob,25,"123 Union Square, New York New York"\n
lonely charlie,28,"where she said, ""hi""\nto me"

В каждом файловом формате есть структурные символы, придающие форму плоской последовательности байт. В CSV запятыми (,) разделяются столбцы, символами новой строки (\n или \r\n) разделяются строки, а кавычками (") обёртываются поля, сами содержащие структурные символы, например, "123 Union Square, New York New York". Чтобы вставить кавычку в закавыченное поле, нужно её удвоить: "hi" превращается в ""hi"".

Парсинг CSV сводится к трём этапам. Сначала мы классифицируем каждый байт:

Но не всё это настоящие разделители. Запятая внутри "where she said, ""hi""\nto me" — это просто текст, \n — часть значения поля, а "" вокруг hi — это экранирующие последовательности, а не границы. Поэтому мы отфильтровываем их:

Остаются только символы, на самом деле разделяющие столбцы и строки:

Затем мы записываем позиции всех оставшихся структурных символов. Этих смещений нам достаточно для того, чтобы разделить исходный поток байт на строки и поля.

Хитрость в том, что каждый из этих этапов можно выполнить для множества байт за раз.

Шаг 1: классификация структурных символов

При скалярном подходе мы бы классифицировали каждый байт, один за другим сравнивая его с каждым структурным символом.

Джефф Лэнгдейл, соавтор simdjson и создатель Hyperscan, придумал технику векторизованной классификации, которая классифицирует 16/32/64 байта за один проход.

Идея заключается в том, чтобы создать пару таблиц поиска, работающие, как идеальный хэш. Каждый структурный символ соответствует его классу (COMMA = 1, QUOTE = 2, NEWLINE = 3), а всё остальное соответствует 0. Можно построить таблицу поиска из 256 элементов, но обычные команды поиска SIMD наподобие pshufb и vqtbl1q_u8 поддерживают только таблицы из 16 элементов (4-битные индексы).

Так как байт состоит из 8 бит, мы разбиваем каждый байт пополам. Каждая половина называется полубайтом (нибблом), а в шестнадцатеричном виде каждая цифра соответствует ровно одному нибблу.

Символ

Hex

Старший ниббл

Младший ниббл

,

0x2C

0x2

0xC

"

0x22

0x2

0x2

\n

0x0A

0x0

0xA

\r

0x0D

0x0

0xD

Мы создадим одну таблицу, индексированную по старшему нибблу, и вторую — по младшему нибблу. Каждая из них возвращает множество потенциальных классов для этого ниббла. Если выполнить для результатов AND, то останется только подходящий класс.

Например, давайте найдём запятую (0x2C) в обеих таблицах:

Таблица старшего ниббла

Индекс

Возвращает

0x0

NEWLINE

0x2

COMMA | QUOTE

...

0

Таблица младшего ниббла

Индекс

Возвращает

0x2

QUOTE

0xA

NEWLINE

0xC

COMMA

0xD

NEWLINE

...

0

  0x2C  →  старший ниббл 0x2 возвращает COMMA | QUOTE
           младший ниббл 0xC возвращает COMMA
           AND результата:              COMMA ✓

Обратите внимание, что старший ниббл 0x2 общий для , и ", поэтому таблица возвращает оба этих символа. Но младший ниббл 0xC соответствует только COMMA. AND устраняет QUOTE, оставляя COMMA.

При построении этих таблиц поиска требуется аккуратность. Так как AND комбинирует результаты из двух независимых операций поиска, мы, по сути, строим сетку. Классифицироваться будут все байты, старший и младший нибблы которых совпадают.

Допустим, мы хотим сгруппировать в класс 0xA00xB4 и 0xB0. Старшие нибблы {0xA0xB}, а младшие {0x00x4}. Эта сетка имеет 4 пересечения, но мы перечислили только 3 байта. Четвёртый байт 0xA4 будет классифицирован ошибочно.

Быстрая проверка: если количество байт в вашем классе равно произведению количества уникальных старших нибблов на количество уникальных младших, то ложноположительных совпадений не будет.

Давайте рассмотрим ниже скалярную и векторизованную реализации. Вы заметите, что в векторизованной реализации мы по-прежнему циклически обходим весь байтовый поток, но теперь по 16 байт за раз. А во внутреннем теле нет ветвлений, только операции поиска и AND.

// скалярная реализация
enum Class {
    Comma = 1,
    Quote = 2,
    NewLine = 3,
    Other = 0,
}

let bytes: &[u8] = &data_stream;

let mut classified_bytes = Vec::with_capacity(bytes.len());

for b in bytes {
    let class = match b {
        0x2c => Class::Comma,
        0x22 => Class::Quote,
        0x0A | 0x0D => Class::NewLine,
        _ => Class::Other
    };

    classified_bytes.push(class);
}

Показанные ниже функции v* — это интринсики NEON, тонкие обёртки вокруг отдельных команд ARM, позволяющие использовать SIMD в Rust без написания ассемблерного кода.

// Векторизованная реализация
// Пишу на Mac :/
use std::arch::aarch64::*;

const COMMA: u8 = Class::Comma as u8;
const QUOTE: u8 = Class::Quote as u8;
const NEWLINE: u8 = Class::NewLine as u8;

const LO_LOOKUP: [u8; 16] = {
    let mut t = [0u8; 16];
    t[0x2] = QUOTE;
    t[0xA] = NEWLINE;
    t[0xC] = COMMA;
    t[0xD] = NEWLINE;
    t
};

const HI_LOOKUP: [u8; 16] = {
    let mut t = [0u8; 16];
    t[0x0] = NEWLINE;
    t[0x2] = COMMA | QUOTE;
    t
};

let bytes: &[u8] = &data_stream;
let mut classified_bytes = Vec::with_capacity(bytes.len());

unsafe {
    // Загружаем 16 байт в регистр
    let lo_lut = vld1q_u8(LO_LOOKUP.as_ptr());
    let hi_lut = vld1q_u8(HI_LOOKUP.as_ptr());
    // Транслируем 0x0F на все 16 каналов
    let mask = vdupq_n_u8(0x0F);

    let chunks = bytes.chunks_exact(16);
    let remainder = chunks.remainder();

    for chunk in chunks {
        let input = vld1q_u8(chunk.as_ptr());

        // Побитовое AND, изолируем младший ниббл
        let lo_nibbles = vandq_u8(input, mask);
        // Сдвиг вправо на 4, изолируем старший ниббл
        let hi_nibbles = vandq_u8(vshrq_n_u8::<4>(input), mask);

        // Поиск в таблице по младшему нибблу
        let lo_result = vqtbl1q_u8(lo_lut, lo_nibbles);
        // Поиск в таблице по старшему нибблу
        let hi_result = vqtbl1q_u8(hi_lut, hi_nibbles);

        // AND для получения окончательного класса
        let classified = vandq_u8(lo_result, hi_result);

        let mut out = [0u8; 16];
        // Сохраняем 16 байт из регистра
        vst1q_u8(out.as_mut_ptr(), classified);
        classified_bytes.extend_from_slice(&out);
    }

    // Заполняем оставшиеся байты нулями
    if !remainder.is_empty() {
        let mut padded = [0u8; 16];
        padded[..remainder.len()].copy_from_slice(remainder);

        let input = vld1q_u8(padded.as_ptr());

        let lo_nibbles = vandq_u8(input, mask);
        let hi_nibbles = vandq_u8(vshrq_n_u8::<4>(input), mask);

        let lo_result = vqtbl1q_u8(lo_lut, lo_nibbles);
        let hi_result = vqtbl1q_u8(hi_lut, hi_nibbles);

        let classified = vandq_u8(lo_result, hi_result);

        let mut out = [0u8; 16];
        vst1q_u8(out.as_mut_ptr(), classified);
        classified_bytes.extend_from_slice(&out[..remainder.len()]);
    }
}

Шаг 1.5: битовые маски

После классификации всех байтов потока мы сжимаем классифицированные байты в 3 битовые маски, по одной на каждый класс. Каждый бит битовой маски соответствует позиции в байтовом потоке: 1, если класс присутствует, 0, если нет.

Возьмём alice,30,Irvine\n (удобные 16 байт):

Сырые байты:          a  l  i  c  e  ,  3  0  ,  I  r  v  i  n  e  \n
Классифицированные:   0  0  0  0  0  1  0  0  1  0  0  0  0  0  0  3
Маска COMMA:          0  0  0  0  0  1  0  0  1  0  0  0  0  0  0  0
Маска QUOTE:          0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
Маска NEWLINE:        0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  1

Битовая маска каждого класса компактно описывается в Vec<u64>, то есть один u64 описывает 64 байта потока.

Шаг 2: фильтрация «ложных» структурных символов

На этом этапе классификатор не знает различия между настоящим разделителем и находящимся в закавыченном поле. Возьмём третью строку из нашего примера:

Классификатор пометил все структурные символы в третьем поле "where she said, ""hi""\nto me", в том числе запятую, символ новой строки и 6 кавычек. Реальные ограничители — это первая и последняя кавычки, отделяющие внутреннюю часть закавыченного поля от внешней.

Мы можем определять, находится ли позиция внутри или снаружи закавыченного поля, выполняя скользящую проверку чётности по позициям кавычек. Если количество кавычек после позиции чётное, она находится снаружи, если нечётное, то внутри.

Если вернуться к нашему примеру, всё между первой и последней кавычками будет закавыченным полем. Обратите внимание, что экранированные кавычки ("") просто уравновешивают друг друга, поэтому алгоритму не нужно отличать их от реальных границ.

"where she said, ""hi""\nto me"
Разберём пример пошагово

Байт 0 " (количество кавычек = 1, нечётное → внутри)

Байты 1-16 where she said, внутри

Байт 17 " (количество кавычек = 2, чётное → снаружи)

Байт 18 " (количество кавычек = 3, нечётное → внутри)

Байты 19-20 hi внутри

Байт 21 " (количество кавычек = 4, чётное → снаружи)

Байт 22 " (количество кавычек = 5, нечётное → внутри)

Байт 23 \n внутри

Байты 24-28 to me внутри

Байт 29 " (количество кавычек = 6, чётное → снаружи)

Для вычисления всего этого для байтового потока за один проход можно взять префиксный XOR битовой маски кавычек. В каждой позиции результатом будет XOR каждого бита кавычек до этой позиции, включая её.

         "where she said, ""hi""\nto me"
кавычки: 100000000000000001100110000001
префикс: 111111111111111110111011111110

Можно заметить, что экранированные пары кавычек ("") создают чередующиеся 01 в префиксной маске. Это нормально, потому что мы используем префиксную маску только для фильтрации запятых и символов новой строки, а не самих кавычек. Мы обращаем префикс, чтобы получить маску внешнего, и выполняем её AND с битовыми масками запятых и символов новой строки. Благодаря этому запятые и символы новой строки внутри закавыченного поля обнуляются, оставляя только реальные ограничители.

               "where she said, ""hi""\nto me"
префикс:       111111111111111110111011111110
!префикс:      000000000000000001000100000001

запятые:       000000000000000100000000000000
!префикс:      000000000000000001000100000001
& результат:   000000000000000000000000000000

новая строка:  000000000000000000000001000000
!префикс:      000000000000000001000100000001
& результат:   000000000000000000000000000000

Вычисление префиксного XOR для 64 бит можно выполнить цепочкой сдвигов в шести операциях или одной командой vmull_p64 (умножение без переноса) на ARM:

// Цепочка сдвигов XOR: 6 операций
fn prefix_xor(mut x: u64) -> u64 {
    x ^= x >> 1;
    x ^= x >> 2;
    x ^= x >> 4;
    x ^= x >> 8;
    x ^= x >> 16;
    x ^= x >> 32;
    x
}
// Умножение без переноса: 1 команда
use std::arch::aarch64::vmull_p64;

fn prefix_xor(x: u64) -> u64 {
    unsafe { vmull_p64(x, u64::MAX) as u64 }
}

Шаг 3: сбор границ полей и строк

Теперь, когда наши битовые маски запятых и символов новых строк содержат настоящие ограничители, будем обходить их по одному u64 за раз.

Мы переносим между блоками единственное булево значение, отслеживающее, находимся ли мы внутри закавыченного поля. Если в предыдущем u64 было нечётное количество бит кавычек, то мы всё ещё находимся внутри, и префиксный XOR текущего блока нужно обратить.

Из очищенных битовых масок мы извлекаем позиции разделителей, подсчитывая нули в начале (clz) — в большинстве архитектур это однотактовая команда. Каждый подсчёт даёт нам смещение следующего ненулевого бита, который будет следующим разделителем. Если символ новой строки встречается до следующей запятой, она означает конец и текущего поля, и текущей строки.

type FieldRef = Range<usize>;
type RowRef = Vec<FieldRef>;

let mut rows = Vec::new();
let mut current_row = Vec::new();
let mut start = 0;
let mut end = 0;
let mut carry = false;

for i in 0..quote_bitsets.len() {
    let quotes = quote_bitsets[i];
    let outside = !prefix_xor(quotes, carry);

    // Способ переключени переноса без ветвления,
    // когда в блоке содержится нечётное количество кавычек
    carry ^= (quotes.count_ones() & 1) != 0;

    let mut commas = comma_bitsets[i] & outside;
    let mut newlines = newline_bitsets[i] & outside;

    loop {
        let next_comma = commas.leading_zeros() as usize;
        let next_newline = newlines.leading_zeros() as usize;
        let advance = next_comma.min(next_newline);

        // В этом блоке разделителей не найдено
        if advance == 64 {
            end = (i + 1) * 64;
            break;
        }

        end += advance;

        if start < end {
            current_row.push(start..end);

            if next_newline < next_comma {
                rows.push(current_row.clone());
                current_row.clear();
            }
        }

        // Пропускаем разделитель
        end += 1;

        // Сдвигаем биты, которые уже обработали
        commas <<= advance + 1;
        newlines <<= advance + 1;
        start = end;
    }
}