
Cитуация, когда недостаток производительности пытаются покрыть новым железом, не редка. Важно понимать, однако, что железо, которое мы использовали и используем сегодня, содержит в себе множество механизмов, способных актуализировать наш код на годы вперед. В моем понимании программист, умеющий грамотно оперировать этими механизмами(в частности в терминах бизнес процессов, требующих 'Здесь и Сейчас', терминах поиска золотой середины между Скоростью и Дизайном) - профессионал. В этой статье речь пойдет про довольно изъезженную и, казалось бы, понятную тему - тему сортировок, но с одним небольшим дополнением - SIMD. Эту тему я выбрал не случайно: в процессе решения довольно важной для индустрии задачи возникла следующая подзадача: есть входное множество целых чисел. Каждому множеству сопоставлено свое уникальное значение. При этом множества элементов, которые отличаются между собой только порядком следования элементов, а не их значениями, считаются одинаковыми и должны возвращать одно и тоже значение. Одно из решений - посортировать множества, а затем использовать результат как ключ в Хеш Таблице. Одно из важных условий - количество элементов в множестве не превышает 128 элементов. Под катом рассказываю о том, как сортировать такие множества быстро.
Современные процессоры реализуют так называемый SIMD: принцип компьютерных вычислений, позволяющий обеспечить параллелизм на уровне данных. Например, в случае Intel это SSE, AVX расширения, реализуемые набором специальных 128, 256, 512-битных регистров, которые де-факто представляют некоторый 'Массив' значений меньшей размерности(например 64, 32, 16 или 8 бит) и инструкции работы с ними(сложить два таких массива в столбик одной командой, например). Таким образом, одной инструкцией мы получаем результат сразу для нескольких входных значений, что при умелом использовании положительно сказывается на производительности алгоритма. Хорошим кандидатом на использование SIMD выглядит какая-нибудь параллельная сортировка. Одна из самых известных и нам вполне подходящая - Битонная сортировка. И хотя свою долю славы она давно сыскала на различных многоблочных вычислительных устройствах(например GPU), сегодня мы будем рассматривать ее в контексте векторных SIMD инструкций CPU.
Алгоритм Битонной сортировки таков:
На вход подается последовательность чисел, размерность которой - степень двойки.
Первый шаг алгоритма заключается в получении Битонной последовательности из исходной. Битонная последовательность это такая последовательность, которая сначала монотонно не убывает, а затем монотонно не возрастает, либо приводится к такому виду в результате циклического cдвига. Например, последовательность чисел 1, 2, 4, 3 - битонная. Легко заметить, что из определения, любая последовательность, входящая в битонную, любая последовательность состоящая из одного или двух элементов, а также любая монотонная последовательность также является битонной. Каждое множество неотсортированных элементов можно считать множеством битонных последовательностей, состоящих из двух элементов. Сам алгоритм Битонной сортировки предполагает работу с каждой из половин входящего множества элементов.
Чтобы превратить произвольную последовательность в битонную, нужно:
Разделить последовательность пополам.
Первую часть превратить в битонную последовательность и отсортировать по возрастанию.
Вторую часть превратить в битонную последовательность и отсортировать по убыванию.
На втором шаге алгоритма производится слияние полученных половин Битонной последовательности.
Чтобы превратить любую битонную последовательность в отсортированную, нужно:
Разделить последовательность пополам, попарно сравнить элементы x[i] и x[n / 2 + i], меняя их местами, если элемент из первой половины больше(или меньше).
Рекурсивно выполнить сортировку над каждой из получившихся половин.
В коде описанный алгоритм можно представить как:
#include <cstdint>
#include <utility>
template
<bool dir>
void compare_and_swap(uint32_t *input_array, uint32_t i_a, uint32_t i_b)
{
if (dir == (input_array[i_a] > input_array[i_b]))
std::swap(input_array[i_a], input_array[i_b]);
}
template
<bool dir>
void bitonicMerge(uint32_t *input_array, uint32_t input_array_size, uint32_t start_ix)
{
if (input_array_size < 2)
return;
const uint32_t half_size = input_array_size >> 1;
for (uint32_t it_b = start_ix,
it_e = start_ix + half_size; it_b < it_e; ++it_b)
compare_and_swap<dir>
(input_array, it_b, it_b + half_size);
bitonicMerge<dir>
(input_array, half_size, start_ix);
bitonicMerge<dir>
(input_array, half_size, start_ix + half_size);
}
template
<bool dir>
void bitonicSort(uint32_t *input_array, uint32_t input_array_size, uint32_t start_ix)
{
if (input_array_size < 2)
return;
const uint32_t half_size = input_array_size >> 1;
bitonicSort< dir>
(input_array, half_size, start_ix);
bitonicSort<!dir>
(input_array, half_size, start_ix + half_size);
bitonicMerge<dir>
(input_array, input_array_size, start_ix);
}
Ассимптотическая сложность Битонной сортировки для худшего, медианного и лучшего случаев - O(N * log(N) * log(N)).
Таким образом, алгоритм начинает свою основную работу с последовательностей длины два, сортируя и наращивая битонные последовательности большей размерности.
Сам алгоритм можно изобразить графически с помощью сортировочных сетей. Ниже изображена сортировочная сеть для входного набора данных из восьми элементов:

Тем не менее у алгоритма битонной сортировки, описанного выше, есть один существенный для этой статьи недостаток, а именно - компараторы разной направленности(максимум и минимум двух чисел меняются местами в разные стороны). Этот недостаток легко исправляется изменением индексов сравниваемых элементов и добавлением компаратора, меняющего минимум и максимум только в одном направлении. Таким образом, изображение выше можно представить как:

Данное изображение и будет нашей отправной точкой в описании реализации алгоритма Битонной сортировки с использованием SIMD инструкций. Использовать будем - Intel SSE(4.2). Ниже - подробное, пошаговое описание реализации SSE алгоритма Битонной сортировки для последовательностей из 8 элементов, краткое описание масштабирования алгоритма для обработки массивов входных данных большей размерности, код, а также некоторые бенчмарки.
Intel SSE может оперировать 128-ью битными регистрами. Команда _mm_loadu_si128 загружает из памяти 4-е 32-битных целочисленных значения и хранит их в регистре в том же порядке, в котором они идут в памяти. Нам потребуется выполнить две таких команды для загрузки 8-ми 32-ух битных целочисленных значений из памяти прямиком в регистры. Однако порядок загруженных в регистр данных мы будем интерпретировать иначе исходному(линейному). Для понимания причин давайте взглянем на первый шаг алгоритма:

Видим, что для выполнения первой итерации мы должны выполнить 4-е операции сравнения парно идущих двоек элементов исходной последовательности, перемещая минимумы и максимумы на соответствующие шагу 1 индексы. Используя SIMD, мы можем отдельно найти 4-е минимума и 4-е максимума, используя команды _mm_min_epi32 и _mm_max_epi32 соответсвенно. Другими словами, имея два регистра A {a_1, a_2, a_3, a_4} и B {b_1, b_2, b_3, b_4} со значениями(для наглядности) {1, 6, 7, 4} и {8, 3, 5, 2} мы интепретируем данные в них так, как изображено ниже:

Этот важный процесс я также продемонстрировал в анимации ниже, под спойлером:
Иллюстрация процесса инициализации [3МБ]

Таким образом, шаг 1 можно изобразить как:

Здесь, я также выполнил нумерацию Минимальных и Максимальных элементов на выходе сети - это пригодится для понимания запутанного процесса перестановок данных внутри регистров. Таким образом, мы можем двумя SSE командами найти нужные нам минимумы и максимумы и перейти к следующему шагу алгоритма. Вообще говоря, поиск минимумов и максимумов - ключевая и неизменная операция, которая понадобится нам на протяжении работы всех шагов алгоритма.
Второй шаг сортировочной сети выглядит следующим образом(слева - выходные миниммумы и максимумы прошлого шага):

Важно понимать как выглядит текущее состояние регистров и как выглядит состояние необходимое для выполнения текущего шага. Для шага 2 это:

Можно заметить, что требуется некоторая модификация одного из регистров, полученного на прошлом шаге, а именно перестановка местами частей регистра хранящего максимумы: max2 с max1 и max4 с max3, после чего мы снова можем выполнить уже привычный нам compare_and_swap(поиск минимумов и максимумов среди двух пар регистров). Для перестановки данных внутри регистра SSE предоставляет команду _mm_shuffle_epi32, на вход которой мы подаем регистр, внутри которого выполняем перестановку, а также маску, по которой перестановка происходит. Результат описанных действий можно увидеть ниже:

Переходим к шагу 3:

В целом это повторение шага 1, однако перед его выполнением нам снова нужно перемешать данные в регистрах. И если данные внутри одного регистра перемешивались командой _mm_shuffle_epi32, то перемешать и закинуть данные из двух регистров в один можно с помощью команды _mm_shuffle_ps. Особенность этой команды в том, что итоговый регистр смешивания должен в первой половине содержать данные из первого регистра, а во второй половине - второго. Другими словами, во второй половине результата не может быть данных первого регистра, а в первой - данных второго. Здесь и во всех последующих шагах я опущу визуализацию состояния регистров на прошлом шаге и перед текущим compare_and_swap шага: по изображению текущего шага можно легко понять, что именно мы хотим получить. А вот визуализацию шагов связанных с перемешиванием и получением результата я опускать не стану:

Шаг 4 выглядит так:

Опять же, для его выполнения, нам потребуется аналогично шагу 2 проделать перемешивание элементов внутри регистра с использованием _mm_shuffle_epi32:

Шаг 5:

Необходимые операции перемешивания внутри регистров на шаге 5:

Шаг 6:

Необходимые операции перемешивания внутри регистров на шаге 6:

Все шаги проделаны, но взять и сохранить данные регистров в память мы пока что не можем - нужно преобразовать наши регистры так, чтобы при сохранении данных в память они располагались в ней как {min1, max1, min2, max2, min3, max3, min4, max4}. Если бы мы выгружали данные из регистров сразу по окончании шага шесть, то только часть элементов находилась бы на нужном месте в памяти:

Выходит, нужно проделать некоторые дополнительные действия, чтобы получить нужный результат. Во-первых, понадобятся временные регистры, в которые мы сохраним результат перемешивания данных внутри исходных регистров таким образом, чтобы вторая группа данных с изображения выше шла на своих местах. Во-вторых, нам потребуется команда блендинга регистров _mm_blend_epi16. Принцип ее работы таков: по 8-ми битам входной маски создается выходной регистр, в котором на i-ой позиции стоят соответствующие данные из первого регистра, если бит маски установлен в единицу, либо из второго, если соответствующий бит равен нулю. Иллюстрация этого процесса изображена ниже:

Результатом смешивания является искомая последовательность {min1, max1, min2, max2, min3, max3, min4, max4}.
В коде описанный алгоритм можно представить как:
#include <smmintrin.h>
inline void compare_and_swap(__m128i &a, __m128i &b)
{
__m128i c = a;
a = _mm_min_epi32(a, b);
b = _mm_max_epi32(c, b);
}
#define shuffle_two_vecs(a, b, mask) \
reinterpret_cast<__m128i>(_mm_shuffle_ps( \
reinterpret_cast<__m128>(a), reinterpret_cast<__m128>(b), mask));
inline void sort_8(int *arr)
{
__m128i a = _mm_loadu_si128(reinterpret_cast<__m128i *>(arr + 0));
__m128i b = _mm_loadu_si128(reinterpret_cast<__m128i *>(arr + 4));
/* шаг 1 */
compare_and_swap(a, b);
/* шаг 2 */
b = _mm_shuffle_epi32(b, _MM_SHUFFLE(2, 3, 0, 1));
compare_and_swap(a, b);
/* шаг 3 */
__m128i t = a;
a = shuffle_two_vecs(a, b, 0b10001000);
b = shuffle_two_vecs(t, b, 0b11011101);
compare_and_swap(a, b);
/* шаг 4 */
b = _mm_shuffle_epi32(b, _MM_SHUFFLE(0, 1, 2, 3));
compare_and_swap(a, b);
/* шаг 5 */
t = a;
a = shuffle_two_vecs(a, b, 0b01000100);
b = shuffle_two_vecs(t, b, 0b11101110);
compare_and_swap(a, b);
/* шаг 6 */
t = a;
a = shuffle_two_vecs(a, b, 0b10001000);
b = shuffle_two_vecs(t, b, 0b11011101);
compare_and_swap(a, b);
/* перед сохранением в память - меняем порядок */
auto t_1 = _mm_shuffle_epi32(a, _MM_SHUFFLE(2, 3, 0, 1));
auto t_2 = _mm_shuffle_epi32(b, _MM_SHUFFLE(2, 3, 0, 1));
b = _mm_blend_epi16(t_1, b, 0b11001100);
a = _mm_blend_epi16(a, t_2, 0b11001100);
/* сохраняем данные регистров в память */
_mm_storeu_si128(reinterpret_cast<__m128i *>(arr + 0), a);
_mm_storeu_si128(reinterpret_cast<__m128i *>(arr + 4), b);
}
Итак, выше мы получили SSE версию алгоритма Битонной Сортировки для 8-ми элементов. Полученный алгоритм - возрастающая inplace сортировка(не требует дополнительной оперативной памяти) реализованная на регистрах. Чтобы сделать сортировку убывающей, достаточно в функции compare_and_swap в переменной a хранить максимум, а в переменной b - минимум.
Масштабирование алгоритма для большего количества входных элементов:
Существует ряд закономерностей упрощающих масштабировать описанный выше алгоритм на массивы большего размера. На рисунке ниже - сортировочная сеть для 16-ти элементов:

До шага 7 мы применяем к каждой из входных половин алгоритм, описанный выше. Шаг 7 привносит кое-что "новое": нам необходимо попарно сравнивать элементы, стоящие на одних и тех же индексах, с начала и с конца. Чтобы выполнить этот шаг, я сделал следующее: первую половину входных данных отсортировал по возрастанию, а вторую - по убыванию. При этом сортировка тут - это процесс алгоритма сортировки 8-ми элементов до момента перестановки данных внутри регистров для результирующего сохранения в память. Затем выполнил compare_and_swap над полученными половинами(каждая из которых хранится в двух XMM регистрах). Последующие шаги, как можно видеть, не требуют знания обо всех элементах сортируемого множества, и задачу можно свести к обработке массивов меньшей размерности. Шаг 8, шаг 9 и шаг 10 похожи между собой: мы сравниваем пары элементов расстояние между индексами которых 4, 2 и 1 соответственно. Однако основной похожестью этих шагов является то, что их выполнение это повторение одной и той же операции, но с разными входными регистрами: мы выполняем уже известный нам _mm_shuffle_ps и, более того, с одной и той же маской на каждом шаге. После выполнения всех шагов мы должны поменять порядок следования элементов внутри регистров аналогично тому, как мы делали это при сортировке 8-ми элементов. Только теперь подобная операция распространяется на каждую пару соседних регистров. Аналогичным образом выполняется масштабирование алгоритма для сортировки массивов большей длины: сначала сортируем по возрастанию первую половину, по убыванию - вторую, выполняем compare_and_swap. Следящий шаг, требующий попарной сортировки чисел с расстоянием между индексами большей или равной 8 - отличается от того, что мы делали ранее. Здесь нам не требуется выполнять каких-либо перестановок элементов внутри регистров, и мы можем просто выполнить compare_and_swap над соответствующими регистрами. Все последующие шаги, за исключением последних трех - повторяют данный шаг(не забываем только про разные входные регистры). Последние три шага - это всегда перестановка _mm_shuffle_ps с одной и той же маской.
Код и бенчмарки
Исходный код описанной SSE сортировки я выложил на github. Здесь же находится код бенчмарка. В сравненнии я использовал написанные мной SSE версии сортировок для 8, 16, 32, 64 и 128 элементов(bench_sort_bit_xxx), а само сравнение производил с STL sort(bench_sort_stl_xxx), а также с некоторыми из наилучших сортировок из этого бенчмарка и репозитория с сортировками.
Результаты на Ryzen 7 5800H, компилятор GCC 13:
bench_sort_bit_8/min_warmup_time:1.000/iterations:1000000 2.81 ns 2.81 ns 1000000
bench_sort_stl_8/min_warmup_time:1.000/iterations:1000000 48.2 ns 48.2 ns 1000000
bench_sort_pdq_8/min_warmup_time:1.000/iterations:1000000 47.5 ns 47.5 ns 1000000
bench_sort_ska_8/min_warmup_time:1.000/iterations:1000000 47.7 ns 47.7 ns 1000000
bench_sort_sn_8/min_warmup_time:1.000/iterations:1000000 39.6 ns 39.6 ns 1000000
bench_sort_bit_16/min_warmup_time:1.000/iterations:1000000 7.28 ns 7.29 ns 1000000
bench_sort_stl_16/min_warmup_time:1.000/iterations:1000000 119 ns 119 ns 1000000
bench_sort_pdq_16/min_warmup_time:1.000/iterations:1000000 121 ns 121 ns 1000000
bench_sort_ska_16/min_warmup_time:1.000/iterations:1000000 121 ns 121 ns 1000000
bench_sort_sn_16/min_warmup_time:1.000/iterations:1000000 125 ns 125 ns 1000000
bench_sort_bit_32/min_warmup_time:1.000/iterations:1000000 18.1 ns 18.1 ns 1000000
bench_sort_stl_32/min_warmup_time:1.000/iterations:1000000 405 ns 405 ns 1000000
bench_sort_pdq_32/min_warmup_time:1.000/iterations:1000000 295 ns 295 ns 1000000
bench_sort_ska_32/min_warmup_time:1.000/iterations:1000000 295 ns 295 ns 1000000
bench_sort_sn_32/min_warmup_time:1.000/iterations:1000000 379 ns 379 ns 1000000
bench_sort_bit_64/min_warmup_time:1.000/iterations:1000000 45.5 ns 45.5 ns 1000000
bench_sort_stl_64/min_warmup_time:1.000/iterations:1000000 994 ns 994 ns 1000000
bench_sort_pdq_64/min_warmup_time:1.000/iterations:1000000 675 ns 675 ns 1000000
bench_sort_ska_64/min_warmup_time:1.000/iterations:1000000 674 ns 674 ns 1000000
bench_sort_sn_64/min_warmup_time:1.000/iterations:1000000 1245 ns 1245 ns 1000000
bench_sort_bit_128/min_warmup_time:1.000/iterations:1000000 116 ns 116 ns 1000000
bench_sort_stl_128/min_warmup_time:1.000/iterations:1000000 2310 ns 2310 ns 1000000
bench_sort_pdq_128/min_warmup_time:1.000/iterations:1000000 1496 ns 1496 ns 1000000
bench_sort_ska_128/min_warmup_time:1.000/iterations:1000000 1897 ns 1897 ns 1000000
Результаты на Ryzen 7 5800H, компилятор Clang 17:
bench_sort_bit_8/min_warmup_time:1.000/iterations:1000000 2.75 ns 2.75 ns 1000000
bench_sort_stl_8/min_warmup_time:1.000/iterations:1000000 52.8 ns 52.8 ns 1000000
bench_sort_pdq_8/min_warmup_time:1.000/iterations:1000000 38.2 ns 38.2 ns 1000000
bench_sort_ska_8/min_warmup_time:1.000/iterations:1000000 38.8 ns 38.8 ns 1000000
bench_sort_sn_8/min_warmup_time:1.000/iterations:1000000 5.62 ns 5.62 ns 1000000
bench_sort_bit_16/min_warmup_time:1.000/iterations:1000000 7.34 ns 7.34 ns 1000000
bench_sort_stl_16/min_warmup_time:1.000/iterations:1000000 126 ns 126 ns 1000000
bench_sort_pdq_16/min_warmup_time:1.000/iterations:1000000 89.3 ns 89.3 ns 1000000
bench_sort_ska_16/min_warmup_time:1.000/iterations:1000000 88.6 ns 88.6 ns 1000000
bench_sort_sn_16/min_warmup_time:1.000/iterations:1000000 17.0 ns 17.0 ns 1000000
bench_sort_bit_32/min_warmup_time:1.000/iterations:1000000 18.7 ns 18.7 ns 1000000
bench_sort_stl_32/min_warmup_time:1.000/iterations:1000000 419 ns 419 ns 1000000
bench_sort_pdq_32/min_warmup_time:1.000/iterations:1000000 297 ns 297 ns 1000000
bench_sort_ska_32/min_warmup_time:1.000/iterations:1000000 296 ns 296 ns 1000000
bench_sort_sn_32/min_warmup_time:1.000/iterations:1000000 56.6 ns 56.6 ns 1000000
bench_sort_bit_64/min_warmup_time:1.000/iterations:1000000 46.1 ns 46.1 ns 1000000
bench_sort_stl_64/min_warmup_time:1.000/iterations:1000000 1020 ns 1020 ns 1000000
bench_sort_pdq_64/min_warmup_time:1.000/iterations:1000000 691 ns 691 ns 1000000
bench_sort_ska_64/min_warmup_time:1.000/iterations:1000000 689 ns 689 ns 1000000
bench_sort_sn_64/min_warmup_time:1.000/iterations:1000000 225 ns 225 ns 1000000
bench_sort_bit_128/min_warmup_time:1.000/iterations:1000000 128 ns 128 ns 1000000
bench_sort_stl_128/min_warmup_time:1.000/iterations:1000000 2400 ns 2400 ns 1000000
bench_sort_pdq_128/min_warmup_time:1.000/iterations:1000000 1533 ns 1532 ns 1000000
bench_sort_ska_128/min_warmup_time:1.000/iterations:1000000 1967 ns 1966 ns 1000000
По результатам видим, что SIMD версия битонной сортировки минимум в 16 раз обгоняет C++ STL сортировку, для разных размерностей входного множества.
Послесловие
Наверное, SIMD одна из тех технологий, используя которую, можно сказать "Зачем платить больше, если можно платить меньше?". Многие начинающие программисты, часто находят интересное в оптимизации простейших алгоритмов по типу memcpy с использлванием SIMD, но эта технология применима и в более комплексных алгоритмах: сортировки, хширование, численные методы, геометрические задачи, работа со строками и т.д
Работу над этой статьей я начинал, еще работая в Яндексе(вдохновленный @juks) и даже выложил "черновой" вариант на telegra.ph, а также на внутрикорпоративный ресурс Яндекса - Этушку. К моему удивлению, этот вариант статьи смог легко взлететь в местный топ, обгоняя статьи о новостях компании, всяких ништяках и т.п
А еще меня можно почитать в телеграмм.