Прочитав эту статью, я вспомнил, как писал внешнюю сортировку, которая использовала O(1) внешней памяти. Функция получала бинарый файл и максимальный размер памяти, которую она могла выделить под массив:
Я использовал алгоритм из Effective Performance of External Sorting with No Additional Disk Space:
Преимущество такого алгоритма, кроме отсутствия буфера на диске, это то, что с диска мы читаем данные относительно большими порциями, что ускоряет алгоритм.
Реализуем алгоритм на C++.
Для начала определим количество блоков и размер блока в байтах и в элементах и выделим память:
Теперь пункты 2-3 — сортируем каждый бло��:
Саму сортировку мы напишем чуть позднее.
Приступим к слияниям. Нижняя половина:
И аналогично верхняя:
Перераспределяем блоки, завершаем цикл и не забываем освободить память:
Осталось реализовать функции flat_quick_sort и in_place_merge. Идею (и большую часть реализации) flat quick sort я взял в статье хабраюзера ripatti. Я только заменил median of medians (посчитал это оверкиллом в среднем случае) на median-of-nine и добавил сортировку вставками для маленьких частей массива.
Со слиянием было сложнее. Сначала я использовал заглушку, использующую O(n) памяти:
Когда я захотел заменить заглушку in-place версией, оказалось, что быстрые алгоритмы in-place слияния в большинстве своем достаточно запутанные (посмотрите, например, On optimal and efficient in place merging). Мне надо было что-то попроще, и я выбрал алгоритм из статьи A simple algorithm for in-place merging:
Но на моем компьютере замена merge на in-place merge замедляла алгоритм почти на порядок. Возможно я ошибся в реализации или просто выбрал медленный алгоритм в погоне за простотой. Времени разбираться, как всегда, не было, к тому же gprof почему-то падал. И тут меня осенило. Если мы выделям M байт динамической памяти, то не важно, как мы её используем, мы все равно получаем O(1). Тогда просто выделим 2/3 под данные, а треть — под буфер слияния. Замедление будет гораздо меньше. И правда:
К сожалению, на больших объемах алгоритм замедляется, что ожидаемо, ведь мы не используем никакого буфера вообще. Тем не менее, скорость алгоритма достаточно адекватная, и, я уверен, может быть улучшена.
Все исходники лежат здесь.
void ext_sort(const std::string filename, const size_t memory)
Я использовал алгоритм из Effective Performance of External Sorting with No Additional Disk Space:
- Разделим файл на блоки, которые помещаются в доступную память. Обозначим эти блоки Block_1, Block_2, …, Block_(S-1), Block_S. Установим P = 1.
- Читаем Block_P в память.
- Отсортируем данные в памяти и запишем назад в Block_P. Установим P = P + 1, и если P ≤ S, то читаем Block_P в память и повторяем этот шаг. Другими словами, отсортируем каждый блок файла.
- Разделим каждый блок на меньшие блоки B_1 и B_2. Каждый из таких блоков занимает половину доступной памяти.
- Читаем блок B_1 блока Block_1 в первую половину доступной памяти. Установим Q = 2.
- Читаем блок B_1 блока Block_Q во вторую половину доступной памяти.
- Объеденим массивы в памяти с помощью in-place слияния, запишем вторую половину памяти в блок B_1 блока Block_Q и установим Q = Q + 1, если Q ≤ S, читаем блок B_1 блока Block_Q во вторую половину доступной памяти и повторяем этот шаг.
- Записываем первую половину доступной памяти в блок B_1 блока Block_1. Так как мы всегда оставляли в памяти меньшую половину элементов и провели слияние со всеми блоками, то в этой части памяти хранятся M минимальных элементы всего файла.
- Читаем блок B_2 блока Block_S во вторую половину доступной памяти. Установим Q = S −1.
- Читаем блок B_2 блока Block_Q в первую половину доступной памяти.
- Объеденим массивы в памяти с помощью in-place слияния, запишем первую половину доступной памяти в блок B_2 блока Block_Q и установим Q = Q −1. Если Q ≥ 1 читаем блок B_2 блока Block_Q в первую половину доступной памяти и повторяем этот шаг.
- Записываем вторую половину доступной памяти в блок B_2 блока Block_S. Аналогично шагу 8, тут хранятся максимальные элементы всего файла.
- Начиная от блока B_2 блока Block_1 и до блока B_1 блока Block_S, определим новые блоки в файле и снова пронумеруем их Block_1 to Block_S. Разделим каждый блок на блоки B_1 и B_2. Установим P = 1.
- Читаем B_1 и B_2 блока Block_P в память. Объеденим массивы в памяти. запишем отсортированный массив назад в Block_P и установим P = P +1. Если P ≤ S, повторяем этот шаг.
- Если S > 1, возвращаемся к шагу 5. Каждый раз мы выделяем M минимальных и максимальных элементов, записываем их в начало и конец файла соответственно, а потом делаем то же самое с оставшимися элементами, пока не дойдем до середины файла.
Преимущество такого алгоритма, кроме отсутствия буфера на диске, это то, что с диска мы читаем данные относительно большими порциями, что ускоряет алгоритм.
Реализуем алгоритм на C++.
Для начала определим количество блоков и размер блока в байтах и в элементах и выделим память:
const size_t type_size = sizeof(T); const uint64_t filesize = file_size(filename); std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary); const uint64_t chunk_number = filesize / memory; const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size = chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2; std::vector<T> *chunk = new std::vector<T>(chunk_size);
Теперь пункты 2-3 — сортируем каждый бло��:
for (uint64_t i = 0; i < chunk_number; ++i) { data.seekg(chunk_byte_size * i); data.read((char *) chunk->data(), chunk_byte_size); flat_quick_sort(chunk->begin(), chunk->end()); data.seekp(chunk_byte_size * i); data.write((char *) chunk->data(), chunk_byte_size); }
Саму сортировку мы напишем чуть позднее.
Приступим к слияниям. Нижняя половина:
int64_t s = chunk_number, start = 0; while (s > 0) { data.seekp(half_chunk_byte_size * start); data.read((char *) chunk->data(), half_chunk_byte_size); for (int64_t q = 1; q < s; ++q) { data.seekg(half_chunk_byte_size * start + chunk_byte_size * q); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * start + chunk_byte_size * q); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); } data.seekp(half_chunk_byte_size * start); data.write((char *) chunk->data(), half_chunk_byte_size);
И аналогично верхняя:
data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); for (int64_t q = s - 2; q >= 0; --q) { data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.read((char *) chunk->data(), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.write((char *) chunk->data(), half_chunk_byte_size); } data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
Перераспределяем блоки, завершаем цикл и не забываем освободить память:
--s; ++start; for (int64_t p = 0; p < s; ++p) { data.seekp(half_chunk_byte_size * start + chunk_byte_size * p); data.read((char *) chunk->data(), chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekg(half_chunk_byte_size * start + chunk_byte_size * p); data.write((char *) chunk->data(), chunk_byte_size); } } delete chunk; }
Функция полностью
template<typename T> void ext_sort(const std::string filename, const size_t memory) { const size_t type_size = sizeof(T); const uint64_t filesize = file_size(filename); std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary); const uint64_t chunk_number = filesize / memory; const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size = chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2; std::vector<T> *chunk = new std::vector<T>(chunk_size); for (uint64_t i = 0; i < chunk_number; ++i) { data.seekg(chunk_byte_size * i); data.read((char *) chunk->data(), chunk_byte_size); flat_quick_sort(chunk->begin(), chunk->end()); data.seekp(chunk_byte_size * i); data.write((char *) chunk->data(), chunk_byte_size); } int64_t s = chunk_number, start = 0; while (s > 0) { data.seekp(half_chunk_byte_size * start); data.read((char *) chunk->data(), half_chunk_byte_size); for (int64_t q = 1; q < s; ++q) { data.seekg(half_chunk_byte_size * start + chunk_byte_size * q); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * start + chunk_byte_size * q); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); } data.seekp(half_chunk_byte_size * start); data.write((char *) chunk->data(), half_chunk_byte_size); data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); for (int64_t q = s - 2; q >= 0; --q) { data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.read((char *) chunk->data(), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.write((char *) chunk->data(), half_chunk_byte_size); } data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); --s; ++start; for (int64_t p = 0; p < s; ++p) { data.seekp(half_chunk_byte_size * start + chunk_byte_size * p); data.read((char *) chunk->data(), chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekg(half_chunk_byte_size * start + chunk_byte_size * p); data.write((char *) chunk->data(), chunk_byte_size); } } delete chunk; }
Осталось реализовать функции flat_quick_sort и in_place_merge. Идею (и большую часть реализации) flat quick sort я взял в статье хабраюзера ripatti. Я только заменил median of medians (посчитал это оверкиллом в среднем случае) на median-of-nine и добавил сортировку вставками для маленьких частей массива.
flat_quicksort.h
#ifndef EXTERNAL_SORT_FLAT_QUICKSORT_H #define EXTERNAL_SORT_FLAT_QUICKSORT_H template<class ForwIt> void insertion_sort(ForwIt be, ForwIt en) { for (ForwIt ii = be + 1; ii != en; ii++) { ForwIt jj = ii - 1; auto val = *ii; while (jj >= be and *jj > val) { *(jj + 1) = *jj; --jj; } *(jj + 1) = val; } } template<class ForwIt> ForwIt median_of_3(ForwIt it1, ForwIt it2, ForwIt it3) { return (*it1 > *it2) ? (*it3 > *it2) ? (*it1 > *it3) ? it3 : it1 : it2 : (*it3 > *it1) ? (*it2 > *it3) ? it3 : it2 : it1; } template<class ForwIt> ForwIt choose_pivot(ForwIt be, ForwIt en) { ptrdiff_t s = (en - be) / 8; ForwIt mid = be + (en - be) / 2; ForwIt best1 = median_of_3(be, be + s, be + 2 * s), best2 = median_of_3(mid - s, mid, mid + s), best3 = median_of_3( en - 2 * s, en - s, en); return median_of_3(best1, best2, best3); } // search for the end of the current block template<class ForwIt> ForwIt block_range(ForwIt be, ForwIt en) { ForwIt it = be; while (it != en) { if (*be < *it) return it; ++it; } return it; } // warning: after the partition outer pivot may point to random element template<class ForwIt> std::pair<ForwIt, ForwIt> partition_range(ForwIt be, ForwIt en, ForwIt pivot) { std::pair<ForwIt, ForwIt> re; ForwIt j = be; for (ForwIt i = be; i != en; ++i) if (*i < *pivot) { if (pivot == i) pivot = j; else if (pivot == j) pivot = i; std::swap(*j, *i); ++j; } re.first = j; for (ForwIt i = j; i != en; ++i) if (*pivot == *i) { if (pivot == i) pivot = j; else if (pivot == j) pivot = i; std::swap(*j, *i); ++j; } re.second = j; return re; } // makes largest element the first template<class ForwIt> void blockify(ForwIt be, ForwIt en) { for (ForwIt it = be; it != en; ++it) if (*be < *it) std::swap(*be, *it); } template<class ForwIt> void flat_quick_sort(ForwIt be, ForwIt en) { ForwIt tmp = en; // the current end of block while (be != en) { if (std::is_sorted(be, tmp)) { be = tmp; tmp = block_range(be, en); continue; } if (tmp - be < 32) insertion_sort(be, tmp); else { ForwIt pivot = choose_pivot(be, tmp); std::pair<ForwIt, ForwIt> range = partition_range(be, tmp, pivot); blockify(range.second, tmp); tmp = range.first; } } } #endif //EXTERNAL_SORT_FLAT_QUICKSORT_H
Со слиянием было сложнее. Сначала я использовал заглушку, использующую O(n) памяти:
template<typename T> void merge(std::vector<T> &chunk, size_t s, size_t q, size_t r) { std::vector<T> *chunk2 = new std::vector<T>(q - s + 1); auto it2 = chunk2->begin(), it1 = chunk.begin() + q + 1, it = chunk.begin() + s; std::copy(it, it1, it2); while (it2 != chunk2->end() && it1 != chunk.begin() + r + 1) { if (*it1 > *it2) { *it = *it2; ++it2; } else { *it = *it1; ++it1; } ++it; } if (it1 == chunk.begin() + r + 1) std::copy(it2, chunk2->end(), it); else std::copy(it1, chunk.begin() + r + 1, it); delete chunk2; }
Когда я захотел заменить заглушку in-place версией, оказалось, что быстрые алгоритмы in-place слияния в большинстве своем достаточно запутанные (посмотрите, например, On optimal and efficient in place merging). Мне надо было что-то попроще, и я выбрал алгоритм из статьи A simple algorithm for in-place merging:
in-place merge
template<typename Iter> void merge_B_and_Y(Iter z, Iter y, Iter yn) { for (; z < y && y < yn; ++z) { Iter j = std::min_element(z, y); if (*j <= *y) std::swap(*z, *j); else { std::swap(*z, *y); ++y; } } if (z < y) flat_quick_sort(z, yn); } template<typename Iter> Iter find_next_X_block(Iter x0, Iter z, Iter y, size_t k, size_t f, Iter b1, Iter b2, auto max) { auto min1 = max, min2 = max; Iter m = x0 + (ptrdiff_t) floor((z - x0 - f) / k) * k + f, x = x0; if (m <= z) m += k; for (auto i = m; i + k <= y; i += k) { if (i != b1 && i != b2) { Iter j = (i < b1 && b1 < i + k) ? m - 1 : i + k - 1; if (*i <= min1 && *j <= min2) { x = i; min1 = *i; min2 = *j; } } } return x; } template<typename Iter> void in_place_merge(Iter x0, Iter y0, Iter yn, int64_t k, bool rec) { if (k == -1) k = (int64_t) sqrt(yn - x0); size_t f = (y0 - x0) % k; Iter x = (f == 0) ? y0 - 2 * k : y0 - k - f; auto t = *x, max = *std::max_element(x0, yn); *x = *x0; Iter z = x0, y = y0, b1 = x + 1, b2 = y0 - k; int i = 0; while (y - z > 2 * k) { ++i; if (*x <= *y || y >= yn) { *z = *x; *x = *b1; ++x; if ((x - x0) % k == f) if (z < x - k) b2 = x - k; x = find_next_X_block(x0, z, y, k, f, b1, b2, max); } else { *z = *y; *y = *b1; ++y; if ((y - y0) % k == 0) b2 = y - k; } ++z; *b1 = *z; if (z == x) x = b1; if (z == b2) b2 = yn + 1; ++b1; if ((b1 - x0) % k == f) b1 = (b2 == yn + 1) ? b1 - k : b2; } *z = t; if (rec) merge_B_and_Y(z, y, yn); else { flat_quick_sort(z, y); in_place_merge(z,y,yn,(int64_t)sqrt(k),true); } }
Но на моем компьютере замена merge на in-place merge замедляла алгоритм почти на порядок. Возможно я ошибся в реализации или просто выбрал медленный алгоритм в погоне за простотой. Времени разбираться, как всегда, не было, к тому же gprof почему-то падал. И тут меня осенило. Если мы выделям M байт динамической памяти, то не важно, как мы её используем, мы все равно получаем O(1). Тогда просто выделим 2/3 под данные, а треть — под буфер слияния. Замедление будет гораздо меньше. И правда:
| Алгоритм | Время (75MB int64 в 7,5MB памяти) | Скорость (75MB int64 в 7,5MB памяти) | Время (7,5MB int64 в 75KB памяти) | Скорость (7,5MB int64 в 75KB памяти) | Время (750MB int64 в 75MB памяти) | Скорость (750MB int64 в 75MB памяти) |
|---|---|---|---|---|---|---|
| In-place merge | 6.04329 s | 1 241 045 B/s | 24.2993 s | 3 086 508 B/s | - | - |
| Merge | 0.932663 s | 8 041 489 B/s | 2.73895 s | 27 382 756 B/s | 47.7946 s | 15 691 689 B/s |
| Алгоритм SLY_G | 1.79629 s | 4175272 B/s | 3.84775 s | 19 491 910 B/s | 39.77 s | 18 858 436 B/s |
Все исходники лежат здесь.
