В одной конторе соискателю на позицию Senior C# developer выдали тестовое задание: отсортировать файл со строками определенного формата.
Требования такие:
Формат строки: число, точка, пробел, далее любые символы до конца строки.
Порядок сортировки — сначала сортируем текстовой части строки, потом по числу если текстовые части совпадают.
Кодировка — UTF-8.
Размер файла — 100гб - гарантирова��но больше объема ОП.
Должно отработать за 1 час на машине проверяющего, вряд ли там будет супер-быстрый SSD и огромное количество оперативной памяти.
Как и многие другие программисты, узнав о таком тестовом задании, я возмутился. Внешнюю сортировку слиянием практически всех проходили в ВУЗе, но практически никто никогда не писал её. Задача очень непрактическая и непонятно какие навыки проверяет. Так мне казалось.
Эта задача вызвала бурные обсуждения о способах её решения. Многие программисты, причисляющие себя к рангу senior, предложили использовать базы данных, ибо не барское это дело - вручную писать алгоритмы сортировки. Некоторые даже попытались сделать решение на Apache Spark. Однако никто до конца задачу не решил, ибо мало кому удалось отсортировать в нужном порядке даже 10ГБ файл менее чем за 15 минут без SSD.
Я подумал, что стоит решить задачу до конца с помощью программирования, и тоже причислить себя к рангу senior developer.
В первую очередь я написал генератор тестового файла, который генерирует нужное количество строк из исходного файла. В качестве исходного взял первый том Войны и Мира, так как там есть как русские, так и английские символы.
Код генератора
var source = (from l in File.ReadLines("source.txt") where !string.IsNullOrEmpty(l) from s in l.Split(new[] { '.', '?', '!', '[', ']' }, StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) where s.Length > 10 select s).ToList(); Random rand = new(); using (var f = File.CreateText(file)) { f.AutoFlush = false; while(f.BaseStream.Position < maxSize) { var n = rand.Next(); f.Write(n); f.Write(". "); f.WriteLine(source[rand.Next(source.Count)]); } } return 0;
Для начала решил сгенерировать 10ГБ, чтобы не ждать час на каждом тестовом прогоне. Кроме того файл такого размера не помещается в кэши операционной системы и операции чтения-записи доходят до диска, что дает представление о реальном быстродействии на больших объемах.
Самое простое работающее решение
Все началось со статьи на хабре о внешней сортировке. Сразу отбросил идею нескольких прогонов для объединения блоков, так как это привело бы к дополнительным затратам на запись. Весь код разделил на две фазы — разбиение исходного файла на отдельные блоки (чанки, от английского chunk) и сортировка строк в блоках, слияние блоков в один файл.
Код разбиения:
var count = 0; var tempFiles = File.ReadLines(file) .Select(s => new Item(s, s.IndexOf('.'))) .Chunk(chunkSize) .Select(chunk => { Array.Sort(chunk, comparer); var tempFileName = Path.ChangeExtension(file, $".part-{count++}" + Path.GetExtension(file)); File.WriteAllLines(tempFileName, chunk.Select(x => x.Line)); return tempFileName; }).ToList();
Код слияния:
try { var mergedLines = tempFiles .Select(f => File.ReadLines(f).Select(s => new Item(s, s.IndexOf('.')))) .Merge(comparer) // IEnumerable<IEnumerable<T>> -> IEnumerable<T> .Select(x => x.Line); File.WriteAllLines(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)), mergedLines); } finally { tempFiles.ForEach(File.Delete); }
Для того, чтобы удобнее писать код, определил тип, содержащий строку и позицию точки в строке и компаратор для этого типа:
public record struct Item(string Line, int DotPosition); public record Comparer(StringComparison StringComparison) : IComparer<Item> { public int Compare(Item x, Item y) { var spanX = x.Line.AsSpan(); var spanY = y.Line.AsSpan(); var xDot = x.DotPosition; var yDot = y.DotPosition; var cmp = spanX[(xDot + 2)..].CompareTo(spanY[(yDot + 2)..], StringComparison); if (cmp != 0) return cmp; return int.Parse(spanX[..xDot]) - int.Parse(spanY[..yDot]); } }
"Сердце" всего алгоритма внешней сортировки - слияние итераторов:
public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default) { var enumerators = (from source in sources let e = source.GetEnumerator() where e.MoveNext() select e).ToList(); while (enumerators.Count > 0) { var min = enumerators.MinBy(e => e.Current, comparer)!; yield return min.Current; if (!min.MoveNext()) { min.Dispose(); enumerators.Remove(min); } } }
Почему я не использовал async\await? Ведь сейчас все программисты C# втыкают async\await на автомате. Конечно я тоже так сделал сначала, но потом убрал.
Во-первых для асинхронных итераторов сложнее написать Merge. Во-вторых код с async\await медленнее работал. async\await несет дополнительные расходы на переключение контекста, продолжения вызывают всю цепочку асинхронных методов. Это может быть выгодно когда нам надо распараллелить ожидание, но в этом коде никаких параллельных ожиданий нет. Все операции происходят последовательно.
Первый запуск
Запустил сортировку слиянием, размер чанка - 1М строк или около 157Мб, время работы - 15:30, пятнадцать с половиной минут! В час для 100Гб уложиться не выйдет.
Что по вашему тормозило в этом коде больше всего? Напишите свой вариант в комментариях, прежде чем разворачивать спойлер и читать дальше.
Тайминг
SplitSort done in 00:04:59.2942000 Merge done in 00:10:32.1238153
Диспетчер задач показывал, что во время сортировки ресурсы компьютера задействуются очень мало:


Оптимизируем слияние
Дольше всего выполняется не чтение или запись, а поиск минимального элемента во время слияния. Этот код я честно написал сам, не подсматривая в готовые решения. Гораздо эффективнее будет отсортировать итераторы один раз, а далее поддерживать их отсортированность после вызова .MoveNext(), даже на StackOverflow предлагают такой вариант.
Лучше всего подойдет двоичная (она же бинарная) куча. Она имеет минимальный элемент в корне и позволяет восстановить отсортированность за O(logN), где K - количество элементов в куче (у нас равно числу чанков). Естественно это я не сам придумал, а подсмотрел в интернете.
Методы работы с кучей
public static void Heapify<T>(this Span<T> heap, int index, IComparer<T> comparer) { ArgumentNullException.ThrowIfNull(comparer); var min = index; while (true) { var leftChild = 2 * index + 1; var rightChild = 2 * index + 2; var v = heap[index]; if (rightChild < heap.Length && comparer.Compare(v, heap[rightChild]) > 0) { min = rightChild; v = heap[min]; } if (leftChild < heap.Length && comparer.Compare(v, heap[leftChild]) > 0) { min = leftChild; } if (min == index) break; var temp = heap[index]; heap[index] = heap[min]; heap[min] = temp; index = min; } } public static void BuildHeap<T>(this Span<T> heap, IComparer<T> comparer) { ArgumentNullException.ThrowIfNull(comparer); for (int i = heap.Length / 2; i >= 0; i--) { Heapify(heap, i, comparer); } }
Код метода слияния:
public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default) { var heap = (from source in sources let e = source.GetEnumerator() where e.MoveNext() select e).ToArray(); var enumeratorComparer = new EnumeratorComparer<T>(comparer ?? Comparer<T>.Default); heap.AsSpan().BuildHeap(enumeratorComparer); while (true) { var min = heap[0]; yield return min.Current; if (!min.MoveNext()) { min.Dispose(); if (heap.Length == 1) yield break; heap[0] = heap[^1]; Array.Resize(ref heap, heap.Length - 1); } heap.AsSpan().Heapify(0, enumeratorComparer); } } private record EnumeratorComparer<T>(IComparer<T> comparer) : IComparer<IEnumerator<T>> { public int Compare(IEnumerator<T>? x, IEnumerator<T>? y) { return comparer.Compare(x!.Current, y!.Current); } }
Остальной код программы не изменился. Время работы:
SplitSort done in 00:04:27.8391844 Merge done in 00:02:11.4364005
Значительно лучше, но до заветного часа на 100ГБ еще очень далеко. Тут стоит обратить внимание, что из-за кэша файловой системы время работы может варьироваться +\-15%
Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/heapsort
Оптимизируем разбиение
Фазы разбиения и слияния выполняют одинаковое количество чтения-записи, создают одинаковое количество объектов типа string, но фаза разбиения использует в 2,5 раз больше памяти и запуск под отладчиком показывает множество сборок мусора.
Все дело во времени жизни объектов. В фазе слияния объект строки живет от чтения из чанка до записи в результирующий файл. Когда считывается следующая строка из чанка предыдущая уже превратилась мусор. Мусор убирается в нулевом поколении сборщика, это происходит быстро и память не растет.
В фазе разбиения объекты строк живут от чтения из исходного файла до записи в чанк. Большинство объектов строк переживает несколько сборок мусора, что создает повышенную активность сборщика и увеличивает потребляемую память.
Мы не можем уменьшить время жизни строк на фазе разбиения. Но их можно вообще не создавать! Можно прочитать из файла блок символов, разделить по символу перевода строки и использовать вместо строк тип ReadOnlyMemory<char>, который предоставляет ту же функциональность. ReadOnlyMemory<char> это структура (не требует аллокаций в управляемой куче), которая представляет из себя ссылку на массив, смещение и длину.
Код разбиения без аллокаций:
List<string> tempFiles = new(); List<Item> chunk = new(); using (var reader = File.OpenText(file)) { var chunkBuffer = new char[chunkSize]; var chunkReadPosition = 0; var eos = reader.EndOfStream; while (!eos) { // Читаем из файла весь буфер var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition)); eos = reader.EndOfStream; var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead); // Заполняем список строк ReadOnlyMemory<char> для сортировки int linePos; while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0)) { var line = linePos >= 0 ? m[..linePos] : m; chunk.Add(new Item(line, line.Span.IndexOf('.'))); m = m[(linePos + Environment.NewLine.Length)..]; } chunk.Sort(comparer); // Записываем строки из отсортированного списка во временный файл var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file)); using (var tempFile = File.CreateText(tempFileName)) { foreach (var (l, _) in chunk) { tempFile.WriteLine(l); } } tempFiles.Add(tempFileName); if (eos) break; chunk.Clear(); //Отсток буфера переносим в начало m.CopyTo(chunkBuffer); chunkReadPosition = m.Length; } }
Можно было бы оставить код в функциональном стиле, но тогда код получился бы более неуклюжим из-за необходимости передачи флага конца файла.
В структурах данных заменил string наReadOnlyMemory<char>и больше ничего не изменилось.
Время работы при размере чанка в 100М символов, 161Мб на диске:
SplitSort done in 00:03:50.6780519 Merge done in 00:02:19.5627238
Удалось выиграть еще 30 сек и сократить расход памяти на фазе разбиения со 600 до 250 мегабайт. Как говорится Allocation is cheap… until it is not (статья от другом, но заголовок подходит).
К сожалению на этом все простые оптимизации кончились, а суммарное время работы все еще не позволит уложиться в час.
Как сравнивать строки
Для многих программистов сравнение строк это все еще посимвольное, а для тех кто пришел из С — побайтное сравнение. Но примерно с 2000 го��а все используют юникод. Юникод это не просто два байта на символ и кодировки переменной длины, вроде UTF8, это еще правила сравнения, нормализации и подсчета символов. Кто еще не в курсе - посмотрите доклад Plain Text Дилана Битти на NDC. Это один из лучших докладов за всю историю конференций.
Сравнение юникодных строк описано в стандарте Unicode Collation Algorithm (UCA). Это очень сложный алгоритм, который опирается на таблицы весов символом для разных культур. Этот алгоритм реализован в операционной системе (CompareStringW, CompareStringEx в Windows и CompareString из libSystem.Globalization.Native.so в Linux).
Конечно можно от этого всего отказаться и сравнивать строки посимвольно, это ускорит сортировку почти на минуту, так как .NET не использует системные API для этого. Достаточно указать StringComparison.Ordinal в Comparer. Кроме того, отказ от UCA позволяет использовать поразрядные (radix) алгоритмы сортировки, которые должны работать быстрее обычных. Но изменит порядок сортировки и фактически является оптимизацией под один частный случай. Не будет простых способов вернуться к UCA без потери быстродействия.
Один из шагов UCA — получение ключа сортировки (sort key) для строк — простого массива байт, который можно использовать для побайтного сравнения. Оказывается в .NET есть функция получения ключа сортировки строк CompareInfo.GetSortKey. То есть мы можем получить эти байты и потом сравнивать их. Если дописать в конец полученного массива байты числа, стоящего в начале, то мы можем всю сортировку свести к сортировке байтовых массивов.
Скоро 15 лет как я программирую на .NET и я узнал о наличии ключей сортировки строк и соответствующих классов только когда решал эту задачу.
Пытаемся оптимизировать сортировку
Для начала добавим получение ключей и сортировку по ним в методы разбиения и слияния:
List<string> tempFiles = new(); List<Item> chunk = new(); using (var reader = File.OpenText(file)) { var keyBuffer = new byte[chunkSize * 2]; //Буфер для ключей var chunkBuffer = new char[chunkSize]; var chunkReadPosition = 0; var eos = reader.EndOfStream; while (!eos) { // Читаем из файла весь буфер var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition)); eos = reader.EndOfStream; var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead); var key = keyBuffer.AsMemory(); // Заполняем список строк ReadOnlyMemory<char> для сортировки int linePos; while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0)) { var line = linePos >= 0 ? m[..linePos] : m; var s = line.Span; var dot = line.Span.IndexOf('.'); int x = int.Parse(s[..dot]); s = s[(dot + 2)..]; var keyLen = culture.CompareInfo.GetSortKey(s, key.Span); // Получаем ключ BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x); // Добписываем число в конец ключа, чтобы старшый байт был с меньшим индексом keyLen += sizeof(int); chunk.Add(new Item(line, key[..keyLen])); m = m[(linePos + Environment.NewLine.Length)..]; key = key[keyLen..]; } chunk.Sort(comparer); // Записываем строки из отсортированного списка во временный файл var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file)); using (var tempFile = File.CreateText(tempFileName)) { foreach (var (l, _) in chunk) { tempFile.WriteLine(l); } } tempFiles.Add(tempFileName); if (eos) break; chunk.Clear(); //Остаток буфера переносим в начало m.CopyTo(chunkBuffer); chunkReadPosition = m.Length; } }
При слиянии нам также надо получать ключи:
try { var mergedLines = tempFiles .Select(f => File.ReadLines(f).Select(s => // Читаем построчно все файлы { var m = s.AsMemory(); var dot = s.IndexOf('.'); // Находим в строках точку int x = int.Parse(s.AsSpan(0, dot)); // Получаем ключ того, что находится после точки с пробелом var key = new byte[s.Length * 2 + sizeof(int)]; var keyLen = culture.CompareInfo.GetSortKey(m[(dot + 2)..].Span, key); // Дописываем число в конец BinaryPrimitives.WriteInt32BigEndian(key.AsSpan(keyLen), x); return new Item(m, key); })) .Merge(comparer); //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T> using var sortedFile = File.CreateText(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file))); foreach (var (l, _) in mergedLines) { sortedFile.WriteLine(l); } } finally { tempFiles.ForEach(File.Delete); }
Компаратор теперь очень простой:
public record struct Item(ReadOnlyMemory<char> Line, ReadOnlyMemory<byte> Key); public class Comparer : IComparer<Item> { public int Compare(Item x, Item y) { return x.Key.Span.SequenceCompareTo(y.Key.Span); } }
Результаты ожидаемо хуже:
SplitSort done in 00:04:09.5091207 Merge done in 00:03:02.5646277
Мы проиграли 40 секунд на слиянии из-за получения ключей и 10 секунд на разбиении и сортировке. Сортировка ключей оказалась эффективнее, чем сортировка строк, но накладные расходы на получение ключей убили весь выигрыш.
Зато теперь можно применить поразрядную (Radix) сортировку ключей. Я написал два варианта поразрядной сортировки - Radix Quick Sort aka Multi-key QuickSort (просто перевел на C# алгоритм описанный в статье) и Counting Radix Sort (в основном скопировал код отсюда). К сожалению оба варианта проиграли стандартному Array.Sort(Код этих сортировок в статье не привожу, чтобы не забивать объем, но вы сможете найти его в исходниках вместе с бенчмарками по ссылке в конце статьи). Скорее всего потому, что сравнение блоков памяти методом SequenceCompareTo оптимизируется с помощью SIMD и работает гораздо быстрее, чем ручной код сравнения по разрядам.
Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key
На этом месте я устал и лег спать.
А что если сохранять ключи?
С этой мыслью я проснулся на следующий день.
Во-первых сохраняя ключи во временном файле мы можем не получать ключ сортировки через API в фазе слияния.
Во-вторых нам вообще даже не надо декодировать символы в фазе слияния, мы можем просто сохранять нужное количество байт в выходном файле.
В-третьих, спустившись на уровень файловых потоков (
FileStreamвместоStreamReader) мы сможем эффективнее управлять буферизацией.
Я сделал бенчмарк, где сравнил все способы построчного чтения файлов, где сравнил File.ReadLines, StreamReader, FileStream и различные варианты буферизации, а также модный молодежный PipeReader. Победил, ожидаемо, FileStream, как самый низкоуровневый инструмент. Кроме того если вы будете читать или записывать данные большими блоками, то выгодно отключать встроенную буферизацию .NET, а если маленькими, то указывать большой размер буфера (код бенчмарков по ссылке в конце статьи).
Много кода
Фаза разбиения
public void SplitSort() { using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan); fileSize = stream.Length; List<SortKey> chunk = new(); var keyBuffer = new byte[maxChunkSize]; var readBuffer = new byte[maxChunkSize]; var remainingBytes = 0; var charBuffer = new char[1024]; var eof = false; while (!eof) { var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof); int chunkSize = remainingBytes + bytesRead; if (!eof) { var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine); if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length; remainingBytes = remainingBytes + bytesRead - chunkSize; } chunk.AddRange(ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer)); //Сортируем и записываем чанки на диск chunk.Sort(comparer); WriteChunk(chunk); chunk.Clear(); //Остаток буфера переносим в начало if (remainingBytes > 0) readBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan()); } }
Функция чтения строк и получения ключей сортировки
private IEnumerable<SortKey> ParseChunk(int byteCount, byte[] readBuffer, byte[] keyBuffer, char[] charBuffer) { var readPos = 0; var key = keyBuffer.AsMemory(); while (byteCount > 0) { var linePos = readBuffer.AsSpan(readPos, byteCount).IndexOf(NewLine); if (linePos == -1) linePos = byteCount; if (charBuffer.Length < linePos) charBuffer = new char[linePos]; // Надо обязательно вызывать именно эту перегрузку, потому что остальные аллоцируют память var lineLen = encoding.GetChars(readBuffer, readPos, linePos, charBuffer, 0); var line = charBuffer.AsMemory(0, lineLen); var s = line.Span; var dot = s.IndexOf('.'); var x = int.Parse(s[0..dot]); var keyLen = culture.CompareInfo.GetSortKey(s[(dot + 2)..], key.Span, compareOptions); BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x); keyLen += sizeof(int); var lineSize = linePos + NewLine.Length; yield return new SortKey(readBuffer.AsMemory(readPos, lineSize), key[..keyLen]); key = key[keyLen..]; readPos += lineSize; byteCount -= lineSize; maxLineSize = Math.Max(maxLineSize, lineSize); maxKeyLength = Math.Max(maxKeyLength, keyLen); } }
Функция записи чанка на диск
void WriteChunk(List<SortKey> chunk) { // Записываем строки из отсортированного списка во временный файл var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp"); using var stream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan); Span<byte> buffer = stackalloc byte[sizeof(int)]; foreach (var (line, key) in chunk) { BinaryPrimitives.WriteInt32LittleEndian(buffer, line.Length); stream.Write(buffer); stream.Write(line.Span); BinaryPrimitives.WriteInt32LittleEndian(buffer, key.Length); stream.Write(buffer); stream.Write(key.Span); } tempFiles.Add(tempFileName); }
Фаза слияния
public void Merge() { var mergedLines = tempFiles .Select(ReadTempFile) // Читаем построчно все файлы, находим в строках точку .Merge(comparer); //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T> string sortedFileName = Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)); using var sortedFile = new FileStream(sortedFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan); sortedFile.SetLength(fileSize); foreach (var (l, _) in mergedLines) { sortedFile.Write(l.Span); } }
Чтение временного файла
private IEnumerable<SortKey> ReadTempFile(string file) { using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, BufferSize, FileOptions.SequentialScan); var maxBlockSize = maxLineSize + maxKeyLength + sizeof(int) * 2; var readBuffer = new byte[Math.Max(BufferSize, maxBlockSize)]; var bytesRemaining = 0; var eof = false; while (!eof) { var bytesRead = stream.ReadBlock(readBuffer, bytesRemaining, readBuffer.Length - bytesRemaining, out eof); if (bytesRead == 0) eof = true; var mem = readBuffer.AsMemory(0, bytesRemaining + bytesRead); while (mem.Length > maxBlockSize || (eof && mem.Length > 0)) { var lineSize = BinaryPrimitives.ReadInt32LittleEndian(mem.Span); mem = mem[sizeof(int)..]; var line = mem[..lineSize]; mem = mem[lineSize..]; var keyLen = BinaryPrimitives.ReadInt32LittleEndian(mem.Span); mem = mem[sizeof(int)..]; yield return new SortKey(line, mem[..keyLen]); mem = mem[keyLen..]; } mem.CopyTo(readBuffer); bytesRemaining = mem.Length; } }
Из 25 строк кода в самом начале, написанных даже без классов и метода Main, всё превратилось в 150 строк без учета конструктора и полей класса.
Результаты забега при установке размера чанка в 100М байт. Так как теперь вместе со строками записываются ключи размер одного временного файла на диске составляет 180МБ.
SplitSort done in 00:04:12.8286312 Merge done in 00:03:05.3477665
Результат приблизительно равен предыдущему, но это при учете что теперь мы пишем и читаем не 10Гб временных файлов, а 18гб. В таск менеджере заметно, что быстродействие теперь сильно упирается в диск.
Если быстродействие сильно упирается в диск, то нужно данные сжать. Так мне говорила бабушка прочитал в книге по базам данных. Завернем FileStream в BrotliStream при записи и чтении временных файлов. Brotli — это новый алгоритм сжатия, который пока еще приходит в веб и другие аспекты разработки. Подробнее можно прочитать на википедии.
Результаты забега со сжатием
SplitSort done in 00:04:28.3044728 Merge done in 00:00:36.4300613
В сумме меньше 5 минут. Суммарный объем временных файлов на диске сократился до 970МБ, то есть почти в 20 раз. Это понятно, так как в файлах очень много повторяющихся строк. Возможно на других текстовых файлах результат будет не настолько выдающимся, но все равно написанные человеком или chatGpt тексты будут хороши сжиматься.
Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key-with-compression
Быстродействие теперь упирается не в диск, а в процессор. И это хорошо. Диск у нас один, а процессоров зачастую больше.
Распараллеливание
Сейчас программа выполняется последовательно:
Чтение чанка (нагружает диск и не использует процессор)
Парсинг строк и получение ключей (нагружает процессор в ос��овном)
Сортировка (сильно нагружает процессор)
Сжатие данных (сильно нагружает процессор)
Запись (сильно нагружает диск)
Было бы неплохо пункты 1 и 5 выполнять параллельно с 2-4.
Заведем пять отдельных потоков для каждой задачи. Для передачи чанков между потоками воспользуемся библиотекой System.Threading.Channels.
readToParse = Channel.CreateBounded<(byte[], int)>(1); // Буфер и размер parseToSort = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1); // Список ключей, буфер строк и буфер ключей sortToCompress = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1)); // Список ключей, буфер строк и буфер ключей compressToWrite = Channel.CreateBounded<(byte[], int)>(1); // Сжатые данные и размер parserThreads = Enumerable .Range(0, degreeOfParallelism) .Select(_ => Task.Run(ParallelParser)).ToArray(); sorterThreads = Enumerable .Range(0, degreeOfParallelism) .Select(_ => Task.Run(ParallelSorter)).ToArray(); compressThreads = Enumerable .Range(0, degreeOfParallelism) .Select(_ => Task.Run(ParallelCompressor)).ToArray(); writerThread = Task.Run(ParallelWriter);
Нам нужен ограниченный канал с емкостью в одно сообщение. Если сообщение уже есть в очереди, то есть получатели заняты обработкой предыдущего, то отправитель будет висеть в ожидании освобождения канала. Таким образом нагрузка будет автоматически балансироваться.
Метод SplitSort изменим так, чтобы он мог работать как в синхронном режиме, так и в параллельном
using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan); fileSize = stream.Length; List<SortKey>? chunk = null; byte[]? keyBuffer = null; char[]? charBuffer = null; var readBuffer = pool!.Rent(maxChunkSize); var remainingBytes = 0; var eof = false; while (!eof) { var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof); int chunkSize = remainingBytes + bytesRead; if (!eof) { var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine); if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length; remainingBytes = remainingBytes + bytesRead - chunkSize; } var oldBuffer = readBuffer; if (degreeOfParallelism > 0) { await readToParse.Writer.WriteAsync((readBuffer, chunkSize)); readBuffer = pool.Rent(maxChunkSize); } else { chunk ??= new(); chunk.AddRange(ParseChunk(chunkSize, readBuffer, keyBuffer ??= pool.Rent(maxChunkSize), charBuffer ??= new char[1024])); //Сортируем и записываем чанки на диск chunk.Sort(comparer); WriteChunk(chunk); chunk.Clear(); } //Осаток буфера переносим в начало if (remainingBytes > 0) oldBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan()); } if (degreeOfParallelism == 0) { if (readBuffer != null) pool.Return(readBuffer); if (keyBuffer != null) pool.Return(keyBuffer); }
Если параметр degreeOfParallelism равен нулю, то код будет выполнятся последовательно, как и раньше. Если degreeOfParallelism >= 1, то после чтения чанка он отправится в readToParse канал и основной поток сразу же начнет читать второй чанк.
Очевидно в таком случае одним буфером для строк и ключей обойтись не получится, буферы придется каждый раз выделять новые. Чтобы не забить всю память таким образом я сразу применил ArrayPool. Ничего сложного нет: вместо оператора new вызываем метод Rent, а когда перестали пользоваться - вызываем Return.
ParallelParser, ParallelSorter и ParallelWriter выглядят так:
private async Task ParallelParser() { var charBuffer = new char[1024]; await foreach (var (readBuffer, chunkSize) in readToParse.Reader.ReadAllAsync()) { var keyBuffer = pool!.Rent(maxChunkSize); var chunk = ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer).ToList(); await parseToSort.Writer.WriteAsync((chunk, readBuffer, keyBuffer)); } } private async Task ParallelSorter() { await foreach (var item in parseToSort.Reader.ReadAllAsync()) { item.Item1.Sort(comparer); await sortToCompress.Writer.WriteAsync(item); } } private async Task ParallelWriter() { await foreach (var (buffer, bufferLength) in compressToWrite.Reader.ReadAllAsync()) { var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp"); using (var tempFile = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, 0, FileOptions.SequentialScan)) { await tempFile.WriteAsync(buffer.AsMemory(0, bufferLength)); } pool!.Return(buffer); tempFiles.Add(tempFileName); } }
Они построены по простому принципу - читаем сообщения из канала пока они не кончатся, на каждое сообщение выполняем свое действие и отправляем дальше.
ParallelCompressor построен по тому же принципу, но содержит больше кода. Уберу его под спойлер.
Код ParallelCompressor
private async Task ParallelCompressor() { var buffer = new byte[1024]; //Buffer with margin var outputSize = BrotliEncoder.GetMaxCompressedLength(maxChunkSize * 2); await foreach (var (chunk, readBuffer, keyBuffer) in sortToCompress.Reader.ReadAllAsync()) { using var encoder = new BrotliEncoder(4, 22); var output = pool!.Rent(outputSize); var dest = output.AsMemory(); var compressed = 0; foreach (var sk in chunk) { if (sk.Length > buffer.Length) { buffer = new byte[sk.Length]; } sk.Write(buffer, 0); var source = buffer.AsMemory(0, sk.Length); while (true) { var r = encoder.Compress(source.Span, dest.Span, out var bytesConsumed, out var bytesWritten, false); compressed += bytesWritten; if (bytesConsumed > 0) source = source[bytesConsumed..]; if (bytesWritten > 0) dest = dest[bytesWritten..]; if (r == OperationStatus.Done) break; if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData) { throw new InvalidOperationException(); } var old = output; outputSize *= 2; output = pool.Rent(outputSize); old.CopyTo(output, 0); pool.Return(old); dest = output.AsMemory(compressed); } } while (true) { var r = encoder.Flush(dest.Span, out var bytesWritten); compressed += bytesWritten; if (r == OperationStatus.Done) break; if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData) { throw new InvalidOperationException(); } var old = output; outputSize *= 2; output = pool.Rent(outputSize); old.CopyTo(output, 0); pool.Return(old); dest = output.AsMemory(compressed); } outputSize = compressed * 11 / 10; await compressToWrite.Writer.WriteAsync((output, compressed)); pool.Return(readBuffer); pool.Return(keyBuffer); } }
Из отсортированного списка строк и ключей записываем все в энкодер, а он периодически отдает нам блок упакованных данных. В конце надо вызвать Flush. Все осложняется тем, что метод может выполниться частично и сказать, что для продолжения недостаточно места в целевом буфере. Тогда надо выделить буфер побольше и перенести туда данные из старого.
В конце код завершения параллельной обработки: завершаем очереди и ждем завершения потоков.
readToParse.Writer.Complete(); await parserThread; parseToSort.Writer.Complete(); await sorterThread; sortToCompress.Writer.Complete(); await compressThread; compressToWrite.Writer.Complete(); await writerThread;
Запускаем с размером чанка в 200 мегабайт.
SplitSort done in 00:02:21.4203828 Merge done in 00:00:39.0610435
Три минуты в сумме, есть шанс уложиться в час для 100Гб.
Посмотрим в таск менеджер:

Потребление памяти выросло с 400Мб до 5,3Гб, это уже много. Почему так?
Когда код выполнялся последовательно для всех операцию использовался один набор буферов - для чтения данных, для ключей, список для сортировки и буфер для временного файла. Когда мы перешли в параллельный вариант у нас таких наборов как минимум количеству потоков + количеству каналов и свободных мест в них.
Такова, к сожалению, цена параллельности. Очень редко можно распараллелить обработку данных, не повышая размер используемой оперативной памяти.
Нагрузка на диск получилась небольшая, стоит добавить еще потоков для парсинга, сортировки и сжатия данных, то есть увеличить степень параллелизма (dop). Но это увеличит затраты памяти. Можно уменьшать размер чанка при повышении степени параллелизма.
// Значения по умолчанию dop = Environment.ProcessorCount / 4; chunkSize = 200 / int.Max(dop, 1);
Финальный прогон с дефолтными параметрами (dop=4, chunkSize=50)
SplitSort done in 00:00:53.8610345 Merge done in 00:00:39.7727140
Итого 1:40 (не более 1:50 за несколько прогонов).
Код со всеми бенчмарками по ссылке.
Заключение
Я очень сильно ошибся, думая что задача сортировки 100Гб файла простая. Для её решения нужно много знаний алгоритмов, библиотек, навык оптимизации программ и написания параллельного кода. А самое главное эта задача хорошо показывает способен ли программист преодолевать технические трудности и решать задачу до конца, а не пытаться найти короткий путь и опустить руки, если такого пути нет.
PS
❯ .\Sort.exe ..\..\..\..\100gb.txt SplitSort done in 00:11:35.9023876 Merge done in 00:20:16.3989011