Прим. Wunder Fund: ну, вы наверное, и сами догадываетесь, как мы любим быстрые алгоритмы и оптимизации. Если вы тоже такое любите — вы знаете, что делать)
В наши дни сказать, что изобрёл алгоритм сортировки, который на 30% быстрее того, что считают эталонным, это значит — сделать довольно смелое заявление. Я, к сожалению, вынужден сделать ещё более смелое заявление. Дело в том, что я создал алгоритм сортировки, который, для многих вариантов входных данных, вдвое быстрее std::sort
. И, за исключением сортировки специально созданных входных последовательностей, на которых алгоритм упирается в свой худший случай, он всегда быстрее std::sort
. (А когда появляются данные, приводящие к худшему случаю алгоритма, я эту ситуацию детектирую и автоматически перехожу на std::sort
).
Почему я сказал: «…к сожалению, вынужден…»? Вероятно, из-за того, что мне, скорее всего, предстоит нелёгкое дело убеждения читателя в том, что я действительно увеличил скорость сортировки в два раза. Поэтому материал, который я начинаю писать, вполне может получиться достаточно длинным. Но весь мой код открыт — это значит, что вы можете попробовать мои наработки на данных, характерных для вашей сферы деятельности. Поэтому я могу убедить вас в достоинствах моего алгоритма с помощью массы аргументов и результатов измерений. А ещё вы можете просто попробовать алгоритм самостоятельно.
Учитывая то, о чём я писал в моём прошлом материале, это, конечно, вариант поразрядной сортировки (radix sort). То есть — его временная сложность ниже, чем O(n log n). Вот два основных направления, по которым я усовершенствовал базовый алгоритм:
Я автоматизировал внутренний цикл поразрядной сортировки на месте. А именно, я начал с реализации алгоритма сортировки «Американский флаг» из Википедии и внёс в него некоторые неочевидные улучшения. Это позволило сделать поразрядную сортировку гораздо быстрее
std::sort
даже для сравнительно небольших наборов данных (начиная со 128 элементов).Я обобщил алгоритм поразрядной сортировки на месте, что позволило ему работать с сущностями различных типов произвольной длины. Например, это целые числа, числа с плавающей точкой, кортежи, структуры, векторы, массивы, строки. Я могу сортировать всё, к элементам чего можно обратиться в произвольном порядке, пользуясь чем-то вроде
operator[]
илиstd::get
. Если вы работаете с какими-то особыми структурами — вам нужно будет предоставить функцию, которая может извлекать ключ, по которому вы хотите эти структуры сортировать. Это — совершенно обычная функция. Она проще оператора сравнения, который приходится писать дляstd::sort
.
Если вы просто хотите попробовать мой алгоритм — можете заглянуть в раздел этого материала, посвящённый исходному коду и использованию алгоритма.
Алгоритм сортировки с временной сложностью O(n)
Для начала расскажу о том, как создать алгоритм сортировки с временной сложностью O(n). Если вы читали мою предыдущую статью — можете пропустить этот раздел. А если нет — читайте дальше.
Если бы вы сейчас были бы в том же состоянии, в котором я пребывал месяц назад, то вы бы могли с уверенностью заявить следующее: «Доказано, что временной сложностью самого быстрого из возможных алгоритмов сортировки является O(n log n)». Есть математическое доказательство, указывающее на то, что более быстрый алгоритм создать нельзя. Я был в этом уверен до тех пор, пока не посмотрел эту видеолекцию из курса «Introduction to Algorithms» на MIT Open Course Ware. Там было сказано, что временная сложность алгоритма сортировки не будет меньше O(n log n) в том случае, если в рамках этого алгоритма можно лишь сравнивать элементы. Но если позволено выполнять большее количество операций, а не только операции сравнения, можно ускорить алгоритм сортировки.
В качестве примера продемонстрирую алгоритм сортировки подсчётом (counting sort):
template<typename It, typename OutIt, typename ExtractKey>
void counting_sort(It begin, It end, OutIt out_begin, ExtractKey && extract_key)
{
size_t counts[256] = {};
for (It it = begin; it != end; ++it)
{
++counts[extract_key(*it)];
}
size_t total = 0;
for (size_t & count : counts)
{
size_t old_count = count;
count = total;
total += old_count;
}
for (; begin != end; ++begin)
{
std::uint8_t key = extract_key(*begin);
out_begin[counts[key]++] = std::move(*begin);
}
}
Эта реализация алгоритма способна сортировать только значения типа unsigned char
. Или, точнее, только типы, которые могут предоставить ключ сортировки, представленный значением unsigned char
. В противном случае в первом цикле мы будем индексировать элементы, находящиеся за пределами диапазона допустимых значений. Расскажу о том, как работает этот алгоритм.
Имеются три массива и три цикла. Есть входной массив, выходной массив и вспомогательный массив, используемый для подсчёта элементов. В первом цикле мы заполняем вспомогательный массив, перебирая входной массив и считая частоту, с которой в нём появляется каждый из элементов.
Второй цикл строит на основе вспомогательного массива массив префиксных сумм для найденных количеств вхождений элементов. Предположим, во вспомогательном массиве не наберётся 256 элементов. Там их лишь 8. Они представляют собой количества различных элементов во входном массиве:
Индекс | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
Вспомогательный массив | 0 | 2 | 1 | 0 | 5 | 1 | 0 | 0 |
Массив префиксных сумм | 0 | 0 | 2 | 3 | 3 | 8 | 9 | 9 |
В данном случае у нас имеется всего девять элементов. Число 1 встретилось во входном массиве 2 раза. Число 4 встретилось 5 раз. Число 5 — 1 раз. Возможно, входная последовательность выглядела как { 4, 4, 2, 4, 1, 1, 4, 5, 4 }
.
Последний цикл снова перебирает входной массив, используя ключи для того, чтобы обращаться к массиву префиксных сумм. И, как и следовало ожидать, этот массив сообщает нам итоговую позицию, в которой надо сохранить число. То есть, когда мы перебираем входную последовательность, 4 идёт в позицию 3, так как это — значение, которое нам сообщает массив префиксных сумм. Выполнив эту операцию, мы инкрементируем значение в массиве, в результате следующее число 4 идёт в позицию 5. Число 2 идёт в позицию 2, следующая четвёрка — в позицию 5 (так как мы уже дважды инкрементировали значение в массиве префиксных сумм) и так далее. Советую вам самостоятельно, вручную, проработать этот алгоритм. Это позволит вам как следует его прочувствовать. Итоговым результатом сортировки будет последовательность { 1, 1, 2, 4, 4, 4, 4, 4, 5 }
.
Мы получили отсортированный массив, уложившись в линейное время. Массив префиксных сумм сообщил нам о том, какие числа должны попадать в те или иные позиции.
Ещё обратите внимание на то, что этот алгоритм может работать с любыми типами данных, а не только с целыми числами. Для этого достаточно предоставить функцию extract_key()
для нужного типа. В последнем цикле выполняются перемещения значений предоставленного типа, а не ключей, возвращаемых этой функцией. В результате тип может представлять собой любую структуру данных. Например, можно сортировать строки по длине. Достаточно воспользоваться в функции extract_key()
функцией size()
и ограничить длину значением 255. Можно написать модифицированную версию функции counting_sort()
, сообщающую о том, где находится позиция последнего раздела. Это позволит, после основного прохода сортировки, отсортировать все длинные строки с использованием std::sort
. (Такие строки должны составлять небольшую часть всех строк, в результате второй проход сортировки, когда обрабатываются эти строки, должен быть достаточно быстрым).
Сортировка на месте за линейное время
Вышеприведённый алгоритм сохраняет отсортированные элементы в отдельном массиве. Но для сортировки беззнаковых целых чисел несложно будет создать алгоритм, выполняющий сортировку на месте, использующий лишь один массив. Один из подходов к созданию такого алгоритма может заключаться в том, чтобы вместо того, чтобы перемещать элементы, менять их местами.
Наиболее очевидная проблема, с которой мы столкнёмся, попытавшись реализовать эту идею, заключается в том, что когда мы меняем местами первый элемент и ещё какой-нибудь элемент, этот новый элемент, возможно, не предназначен для того места, где был первый элемент. Его итоговым местом может оказаться позиция 10, а не 1. Но проблему эту несложно решить: продолжим менять местами первый элемент с другими элементами до тех пор, пока не найдётся элемент, который должен стоять на первом месте. А переходить ко второму элементу можно лишь тогда, когда мы найдём тот элемент, которому предназначено первое место.
Вторая проблема этого алгоритма заключается в том, что мы найдём в массиве много разделов, которые уже отсортированы. Правда, мы можем не знать о том, что эти части массива уже отсортированы. Представим, что у нас имеется два числа 3, которые стремятся занять позиции 6 и 7. И предположим, что в ходе перемещения в нужную позицию первого элемента мы переместили первое число 3 в позицию 6, а второе — в позицию 7. Теперь эти элементы отсортированы и нам уже не нужно ничего с ними делать. Но когда мы продвигаемся по массиву, начав с первого элемента, мы, в некий момент времени, доберёмся до числа 3, которое находится в позиции 6. И мы переместим это число в позицию 8, так как это — следующая позиция, в которую попадает число 3. Затем мы найдём другое число три и переместим его в позицию 9. Потом мы снова найдём первое число 3 и переместим его в позицию 10 и так далее. Это будет продолжаться до тех пор, пока мы не выйдем за границы массива, после чего программа остановится с ошибкой.
Решение второй проблемы заключается в том, чтобы держать под рукой копию исходного массива префиксных сумм. Это позволит нам узнать о том, закончена ли работа с неким разделом массива. Затем мы, продвигаясь по массиву, можем пропускать те его разделы, работа над которыми завершена.
Эти два изменения позволят нам выйти на алгоритм сортировки целых чисел, работающий на месте. Это — алгоритм сортировки «Американский флаг», описанный в Википедии.
Поразрядная сортировка на месте
Алгоритм поразрядной сортировки берёт вышеописанный алгоритм и обобщает его на целые числа, которые не помещаются в одно значение типа unsigned char
. Версия этого алгоритма, работающая на месте, на самом деле, использует довольно простой приём: за один раз сортируется лишь один байт. Сначала сортируются старшие байты. Это разделит входные данные на 256 частей. Потом рекурсивно сортируют данные в каждом из этих разделов, используя следующий байт. Так продолжают до тех пор, пока не закончатся байты.
Если посчитать количество операций, выполняемых в рамках такой реализации алгоритма, то окажется, что для четырёхбайтного целого числа будет выполнено 2563 рекурсивных вызовов. Мы делим массив на 256 частей, потом рекурсивно их обрабатываем, делим каждую из этих частей ещё на 256 частей, рекурсивно проходимся по ним, делим и их на 256 частей, а потом в последний раз выполняем рекурсивную обработку. Если всё так и будет — получится очень медленный алгоритм. Обойти эту проблему можно, останавливая рекурсию тогда, когда количество элементов в разделе меньше некоего «магического числа», и используя в этом разделе, вместо нашего алгоритма, std::sort
. В моём случае я останавливаю рекурсию тогда, когда в разделе оказывается меньше 128 элементов. Когда я разбил массив на разделы, в которых меньше этого количества элементов, я, для сортировки элементов, находящихся в них, вызываю std::sort
.
Если вам интересно узнать о том, почему я выбрал именно число 128, то знайте, что дело в том, что я разделяю входные данные на 256 частей. Если число частей равняется k, тогда сложность сортировки по одному байту будет O(n+k). Ситуация, когда поразрядная сортировка быстрее, чем std::sort
, возникает тогда, когда цикл, который зависит от n, начинает доминировать над циклом, который зависит от k. В моей реализации эта ситуация возникает примерно при 0,5k. Нелегко сместить эту границу значительно ниже. (У меня есть некоторые идеи на эту тему, но ни одна из них пока не сработала).
Обобщение поразрядной сортировки
Должно быть очевидно то, что вышеописанный алгоритм подходит для сортировки беззнаковых целых чисел любого размера. Но он подходит и для коллекций таких чисел (включая пары чисел и кортежи), и для строк. Нужно всего лишь сначала выполнить сортировку по первому элементу, потом по следующему, и так далее — до тех пор, пока размер одного раздела не станет достаточно маленьким (на самом деле, в документе, который назван в Википедии первоисточником алгоритма сортировки «Американский флаг», говорится, что этот алгоритм предназначен для работы со строками).
Но несложно обобщить этот алгоритм так, чтобы он поддерживал бы целочисленные значения со знаком: достаточно сдвинуть все значения в диапазон беззнаковых целых чисел того же размера. То есть, например, привести значения типа int16_t
к типу uint16_t
и прибавить 32768.
Майкл Херф, кроме того, открыл хороший метод обобщения этого алгоритма на числа с плавающей точкой. А именно, нужно привести (с помощью reinterpret_cast
) значение типа float
к типу uint32
, после чего, если float-значение было отрицательным, обратить каждый его бит. А для положительных float-значений надо обратить лишь знаковый бит. То же самое подходит и для значений типа double
, и для значений типа uint64
. Херф объясняет то, почему этот метод работоспособен, здесь. Если рассказать об этом вкратце, то окажется, что положительные числа с плавающей точкой уже сортируются правильно в том случае, если их преобразовать к типу uint32
. Экспонента идёт до мантиссы, поэтому сначала выполняют сортировку по экспоненте, а потом — по мантиссе. В результате всё работает как надо. Отрицательные числа с плавающей точкой, правда, будут сортироваться неправильно. Обращение всех их битов помогает с этим справиться. Последняя оставшаяся проблема заключается в том, что положительные числа с плавающей точкой должны восприниматься при сортировке как числа, которые больше отрицательных чисел. Легче всего их можно «увеличить», обратив знаковый бит, так как это — самый значащий их бит.
В результате, если говорить о фундаментальных типах, остаются лишь логический тип и различные символьные типы. Символьные типы можно, с помощью reinterpret_cast
, привести к беззнаковым типам того же размера. Логические значения тоже можно превратить в беззнаковые типы, но тут ещё можно использовать особый, более эффективный алгоритм для сортировки логических значений. А именно, вместо обычного алгоритма сортировки достаточно использовать std::partition
. А если понадобится рекурсия, в том случае, когда сортировка проводится по более чем одному ключу, можно рекурсивно обработать каждый из разделов.
Оптимизация внутреннего цикла
Кратко повторим основные положения, касающиеся алгоритма для сортировки по одному байту:
Считаем элементы и создаём массив префиксных сумм, который сообщает нам о том, куда надо поместить элементы.
Меняем местами первый элемент и другие элементы до тех пор, пока не найдём элемент, который должен быть на первом месте (в соответствии с массивом префиксных сумм).
Повторяем шаг 2 для всех позиций.
Я реализовал этот алгоритм сортировки с использованием проекта Sound of Sorting. Вот как выглядит (и звучит) этот алгоритм.
Визуализация алгоритма «Американский флаг»
Как видите — алгоритм тратит большую часть времени на пару первых элементов. Иногда массив оказывается практически полностью отсортированным к тому времени, когда алгоритм продвигается к следующему элементу после обработки первого. На видео, правда, не виден массив префиксных сумм, который строится при работе алгоритма. Визуализация этого массива сделала бы алгоритм более понятным (благодаря этому ясно стало бы то, откуда алгоритм знает о том, где должна находиться конечная позиция элемента, перенося элемент именно в эту позицию, но я эту часть работы алгоритма не визуализировал).
Если нам надо сортировать данные по нескольким байтам — мы рекурсивно обрабатываем каждый из 256 разделов и выполняем сортировку в их пределах по следующему байту. Но не это замедляет работу алгоритма. Замедляет её то, что выполняется в части 2 и 3 плана работы алгоритма, приведённого выше.
Если выполнить профилирование кода реализации алгоритма, то окажется, что всё его время уходит на перестановки элементов. Поначалу я думал, что это было так из-за промахов кеша. Обычно, когда строка ассемблерного кода, выполнение которой занимает много времени, представляет собой разыменование указателя, замедление — это результат промаха кеша. О том, что это, на самом деле, за проблема, я скажу ниже. Но даже хотя интуиция меня и подвела, первая мысль натолкнула меня на идею о способе хорошего ускорения алгоритма. А именно — если при работе с первым элементом имеется промах кеша — почему бы не попробовать переместить на своё место второй элемент в то время, когда мы ждём, из-за промаха кеша, продолжения обработки первого элемента?
Мне уже нужно хранить информацию о том, какой элемент переместился на свою позицию, поэтому это я могу пропустить. В результате я поступил так: обошёл все элементы, которые ещё не заняли свои места, и поставил их туда, где они должны быть. За один проход по массиву это позволит поставить на место, как минимум, половину всех элементов. Для того чтобы разобраться с тем, почему это так, подумаем о том, как это будет выглядеть на списке { 4, 3, 1, 2 }
. Мы смотрим на первый элемент, на число 4, и меняем его местами с последним элементом, с числом 2, что даёт нам список { 2, 3, 1, 4 }
. Затем мы берём второй элемент — число 3, и меняем его местами с числом 1, что даёт список { 2, 1, 3, 4 }
. После этого оказывается, что мы прошлись по половине списка, и что оставшиеся элементы уже отсортированы (это понятно из проверки того, что смещение, хранящееся в массиве префиксных сумм, является тем же самым, что и исходное смещение следующего раздела), то есть — их обрабатывать уже не надо. Но весь список пока ещё не отсортирован. Для того чтобы отсортировать весь список, мы решим, что, когда мы доходим до конца списка, мы просто начинаем работу с его начала, устанавливая на свои места все неотсортированные элементы. В данном случае нам надо лишь поменять местами числа 2 и 1. В результате получится список { 1, 2, 3, 4 }
, после чего мы узнаем о том, что все разделы отсортированы, и что мы можем остановиться.
Вот как это выглядит в исполнении Sound of Sorting:
Визуализация алгоритма «Ska Sort»
Детали реализации
Вот код вышеописанного алгоритма:
struct PartitionInfo
{
PartitionInfo()
: count(0)
{
}
union
{
size_t count;
size_t offset;
};
size_t next_offset;
};
template<typename It, typename ExtractKey>
void ska_byte_sort(It begin, It end, ExtractKey & extract_key)
{
PartitionInfo partitions[256];
for (It it = begin; it != end; ++it)
{
++partitions[extract_key(*it)].count;
}
uint8_t remaining_partitions[256];
size_t total = 0;
int num_partitions = 0;
for (int i = 0; i < 256; ++i)
{
size_t count = partitions[i].count;
if (count)
{
partitions[i].offset = total;
total += count;
remaining_partitions[num_partitions] = i;
++num_partitions;
}
partitions[i].next_offset = total;
}
for (uint8_t * last_remaining = remaining_partitions + num_partitions, * end_partition = remaining_partitions + 1; last_remaining > end_partition;)
{
last_remaining = custom_std_partition(remaining_partitions, last_remaining, [&](uint8_t partition)
{
size_t & begin_offset = partitions[partition].offset;
size_t & end_offset = partitions[partition].next_offset;
if (begin_offset == end_offset)
return false;
unroll_loop_four_times(begin + begin_offset, end_offset - begin_offset, [partitions = partitions, begin, &extract_key, sort_data](It it)
{
uint8_t this_partition = extract_key(*it);
size_t offset = partitions[this_partition].offset++;
std::iter_swap(it, begin + offset);
});
return begin_offset != end_offset;
});
}
}
Вначале работа похожа на работу алгоритма сортировки подсчётом. А именно, выполняется подсчёт того, сколько элементов попадают в каждый из разделов. Но я изменил второй цикл. В этом цикле я создаю массив индексов для всех разделов, в которых имеется хотя бы один элемент. Мне это нужно из-за того, что мне требуется некий способ отслеживания всех разделов, работа с которыми пока не окончена. Кроме того, я храню конечный индекс для каждого раздела в переменной ext_offset
. Это позволяет мне проверять то, окончена ли сортировка раздела.
Мой третий цикл гораздо сложнее, чем цикл в реализации алгоритма сортировки подсчётом. Он представляет собой три вложенных цикла, причём обычным циклом for
является лишь внешний.
Внешний цикл перебирает все оставшиеся неотсортированные разделы. Он останавливается в том случае, когда остаётся лишь один неотсортированный раздел. Последний раздел не нужно сортировать если все остальные разделы уже отсортированы. Это — важная оптимизация, так как весьма распространена ситуация, когда все элементы попадают в один раздел. Если, при сортировке четырёхбайтовых целых чисел, все числа невелики, тогда, при первом вызове этой функции, которая выполняет сортировку по самому старшему байту, все ключи получат одно и то же значение и попадут в один раздел. В данном случае алгоритм немедленно перейдёт к рекурсивной сортировке по следующему байту.
Второй цикл использует std::partition
для того чтобы убирать завершённые разделы из списка оставшихся разделов. Я использую собственную версию std::partition (custom_std_partition)
, так как std::partition
развернёт свой внутренний цикл, а мне это не нужно. Мне нужно, чтобы, вместо этого, был бы развёрнут цикл с максимальной глубиной вложенности. Но поведение custom_std_partition
идентично поведению std::partition
. Поговорим о смысле этого цикла. Пусть есть входная последовательность { 3, 3, 3, 3, 2, 5, 1, 4, 5, 5, 3, 3, 5, 3, 3 }
. Разделы для чисел 3 и 5 больше других разделов. Если элементы попадают в разделы разных размеров, то, в данном случае, работа над разделами для чисел 1, 2 и 4 будет очень быстро завершена, а затем внешнему и внутреннему циклам нужно будет лишь пройтись по разделам для 3 и 5. Можно подумать, что тут я, вместо std::partition
, мог бы использовать std::remove_if
, но мне нужна недеструктивная операция, так как тот же список разделов мне понадобится при выполнении рекурсивных вызовов (они тут не показаны).
А самый глубокий цикл, наконец, меняет элементы местами. Он просто перебирает все оставшиеся неотсортированные элементы в разделе и, меняя местами с другими, ставит на их окончательные места. Это, по большому счёту, обычный цикл for, за исключением того, что мне, ради скорости, надо, чтобы он был бы развёрнут. Поэтому я написал функцию unroll_loop_four_times()
, которая принимает итератор и число итераций цикла, а после этого разворачивает цикл.
Продолжение следует…
О, а приходите к нам работать? ?
Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.
Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.
Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.