Pull to refresh

Использование TPL Dataflow для многопоточной компрессии файлов

Reading time4 min
Views20K
На небольшом примере я расскажу как используя библиотеку TPL Dataflow можно решить довольно не тривиальную задачу многопоточной компрессии файлов в течении 15 минут.

Задача


Необходимо реализовать эффективную компрессию файлов используя класс GZipStream находящийся в пространстве имён System.IO.Compression. Предполагается, что сжимать мы будем файлы большие, и их нельзя уместить целиком в оперативной памяти.

TPL Dataflow


TPL Dataflow (TDF) построена поверх вошедшей в .NET 4 библиотеки TPL (The Task Parallel Library) и дополняет ее набором примитивов, для решения более сложных задач чем исходная библиотека. TPL Dataflow использует задачи, потоково-безопасные коллекции и другие возможности, представленные в .NET 4 для добавления поддержки параллельной обработки потоков данных. Суть библиотеки сводится к тому, чтобы стыкуя различные блоки, организовывать различные цепи обработки данных. При этом обработка данных может происходить как синхронно так и асинхронно. Библиотека войдет в грядущий .NET 4.5.

Решение


Для решения этой задачи понадобится всего 3 блока:
  1. Буфер для данных считанных из источника данных:
    var buffer = new BufferBlock<byte[]>();
    

  2. Блок сжатия данных:
    var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes));
    

    Функция сжатия:
    private static byte[] Compress(byte[] bytes)
    {
        using (var resultStream = new MemoryStream())
        {
            using (var zipStream = new GZipStream(resultStream, CompressionMode.Compress))
            {
                using (var writer = new BinaryWriter(zipStream))
                {
                    writer.Write(bytes);
                    return resultStream.ToArray();
                }
            }
        }
    }
    

  3. Блок записи сжатых данных:
    var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length));
    


Соединим наши блоки:
buffer.LinkTo(compressor);
compressor.LinkTo(writer);

Так же мы будем сообщать нашим блокам когда данные для них закончились и они могут завершить свою работу. Сделать это можно вызвав метод Complete блока:
buffer.Completion.ContinueWith(task => compressor.Complete());
compressor.Completion.ContinueWith(task => writer.Complete());

По мере считывания файла, мы будем предлагать данные нашему буферу. Делается это вызовом метода Post блока:
while (!buffer.Post(bytes))
{
}

Такая конструкция нам нужна для того, чтобы учесть ситуацию, когда блок полон и не принимает больше данные.

По завершению считывания уведомим наш блок о том, что данные у нас кончились:
buffer.Complete();

Теперь нам осталось дождаться только окончания работы нашего блока, отвечающего за запись сжатых данных в поток:
writer.Completion.Wait();


Получившийся метод:
public static void Compress(Stream inputStream, Stream outputStream)
{
    var buffer = new BufferBlock<byte[]>();
    var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes));
    var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length));
 
    buffer.LinkTo(compressor);
    buffer.Completion.ContinueWith(task => compressor.Complete());
    compressor.LinkTo(writer);
    compressor.Completion.ContinueWith(task => writer.Complete());
 
    var readBuffer = new byte[BufferSize];
    while (true)
    {
        int readCount = inputStream.Read(readBuffer, 0, BufferSize);
        if (readCount > 0)
        {
            var bytes = new byte[readCount];
            Buffer.BlockCopy(readBuffer, 0, bytes, 0, readCount);
            while (!buffer.Post(bytes))
            {
            }
        }
        if (readCount != BufferSize)
        {
            buffer.Complete();
            break;
        }
    }
 
    writer.Completion.Wait();
}

Можно было бы закончить на этом, если бы не одно «но»: данный код по скорости работы не отличается от абсолютно синхронного. Для того, чтобы он стал работать быстрее, нам необходимо указать, что нашу операцию сжатия необходимо делать асинхронно. Сделать это можно добавив необходимые настройки в наш блок:
var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions);

Так же нам необходимо предусмотреть ситуацию, когда данные считываются быстрее чем сжимаются или записываются медленне чем сжимаются. Сделать это можно изменив свойство BoundedCapacity наших блоков:
var buffer = new BufferBlock<byte[]>(new DataflowBlockOptions { BoundedCapacity = 100 });
var compressorOptions = new ExecutionDataflowBlockOptions
                            {
                                MaxDegreeOfParallelism = 4, 
                                BoundedCapacity = 100
                            };
var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions);
var writerOptions = new ExecutionDataflowBlockOptions
                        {
                            BoundedCapacity = 100, 
                            SingleProducerConstrained = true
                        };
var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions);

Итоговый метод выглядит вот так:
public static void Compress(Stream inputStream, Stream outputStream)
{
    var buffer = new BufferBlock<byte[]>(new DataflowBlockOptions {BoundedCapacity = 100});
    var compressorOptions = new ExecutionDataflowBlockOptions
                                {
                                    MaxDegreeOfParallelism = 4,
                                    BoundedCapacity = 100
                                };
    var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions);
    var writerOptions = new ExecutionDataflowBlockOptions
                            {
                                BoundedCapacity = 100,
                                SingleProducerConstrained = true
                            };
    var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions);

    buffer.LinkTo(compressor);
    buffer.Completion.ContinueWith(task => compressor.Complete());
    compressor.LinkTo(writer);
    compressor.Completion.ContinueWith(task => writer.Complete());

    var readBuffer = new byte[BufferSize];
    while (true)
    {
        int readCount = inputStream.Read(readBuffer, 0, BufferSize);
        if (readCount > 0)
        {
            var postData = new byte[readCount];
            Buffer.BlockCopy(readBuffer, 0, postData, 0, readCount);
            while (!buffer.Post(postData))
            {
            }
        }
        if (readCount != BufferSize)
        {
            buffer.Complete();
            break;
        }
    }

    writer.Completion.Wait();
}

Вызвать мы его можем например из такого консольного приложения:
private const int BufferSize = 16384;

static void Main(string[] args)
{
    var stopwatch = Stopwatch.StartNew();
    using (var inputStream = File.OpenRead(@"C:\file.bak"))
    {
        using (var outputStream = File.Create(@"E:\file.gz"))
        {
            Compress(inputStream, outputStream);
        }
    }
    stopwatch.Stop();

    Console.WriteLine();
    Console.WriteLine(string.Format("Time elapsed: {0}s", stopwatch.Elapsed.TotalSeconds));
    Console.ReadKey();
}


Заключение


Как Вы можете видеть использование TPL Dataflow может серьезно упростить решение задач многопоточной обработки данных. На проведенных мной тестах, время, необходимое для компрессии, сократилось почти в 3 раза.
Скачать данную библиотеку и почитать о ней можно на официальной странице.
Tags:
Hubs:
Total votes 28: ↑28 and ↓0+28
Comments5

Articles