Введение
Обычно, при оптимизации программы для многоядерных компьютеров первым шагом является выяснение возможности разделения алгоритма на части, выполняющиеся параллельно. Если для решения задачи необходимо параллельно обрабатывать отдельные элементы из большого набора данных, то первыми кандидатами станут новые возможности параллельности в .NET Framework 4: Parallel.ForEach и Parallel LINQ (PLINQ)
Parallel.ForEach
Класс Parallel содержит метод ForEach, представляющий собой многопоточную версию обычного цикла foreach в C#. Подобно обычному foreach, Parallel.ForEach выполняет итерации над перечислимыми данными (enumerable), но с использованием множества потоков. Одна из более часто используемых перегрузок Parallel.ForEach выглядит следующим образом:
public static ParallelLoopResult ForEach<TSource>(
IEnumerable<TSource> source,
Action<TSource> body)
Ienumerable указывает на последовательность, по которой нужно выполнить итерации, а Action body задает делегат, вызываемый для каждого элемента. Полный список перегрузок Parallel.ForEach можно найти здесь.
PLINQ
Родственный с Parallel.ForEach PLINQ представляет собой модель программирования для паралелльных операций над данными. Пользователь определяет операцию из стандартного набора операторов, включающих в себя проекции, фильтры, агрегирование и т.д. Подобно Parallel.ForEach PLINQ достигает параллельности, разбивая входную последовательность на части и обрабатывая элементы в разных потоках.
В статье выделяются различия между этими двумя подходами к параллельности. Разбираются сценарии использования, в которых лучше всего использовать Parallel.ForEach вместо PLINQ и наоборот.
Выполнение независимых операций
Если необходимо выполнить длительные вычисления над элементами последовательности и полученные результаты независимы, то предпочтительнее использовать Parallel.ForEach. PLinq в свою очередь будет слишком тяжеловесным для таких операций. Кроме того, для Parallel.ForEach указывается максимальное число потоков, то есть если у ThreadPool мало ресурсов и доступно потоков меньше, чем задано в ParallelOptions.MaxDegreeOfParallelism, будет использовано оптимальное число потоков, которое может быть увеличено по мере выполнения. Для PLINQ число выполняемых потоков задается строго.
Параллельные операции с сохранением порядка данных
PLINQ для сохранения порядка
Если ваши преобразования требуют сохранения порядка входных данных, то Вы, скорее всего, обнаружите, что проще использовать PLINQ, чем Parallel.ForEach. Например, если мы хотим сконвертировать цветные RGB-кадры видео в черно-белые, на выходе, порядок кадров, естественно, должен сохраниться. В этом случае лучше воспользоваться PLINQ и функцией AsOrdered() , которая в недрах PLINQ разбивает (partitioning) входную последовательность, выполняет преобразования, а затем располагает результат в корректном порядке.
public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
var ProcessedMovie =
Movie
.AsParallel()
.AsOrdered()
.Select(frame => ConvertToGrayscale(frame));
foreach (var grayscaleFrame in ProcessedMovie)
{
// Movie frames will be evaluated lazily
}
}
Почему бы не использовать здесь Parallel.ForEach?
За исключением тривиальных случаев, реализация параллельных операций над последовательными данными с помощью Parallel.ForEach требует значительного объема кода. В нашем случае мы можем использовать перегрузку функции Foreach, чтобы повторить эффект оператора AsOrdered():
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState,Int64>body)
В перегруженной версии Foreach к делегату действия над данными добавился параметр индекса текущего элемента. Теперь мы можем записывать результат в выходную коллекцию по тому же самому индексу, производить затратные вычисления параллельно и в итоге получать выходную последовательность в правильном порядке. Следующий пример иллюстрирует один из способов сохранения порядка с помощью Parallel.ForEach:
public static double [] PairwiseMultiply( double[] v1, double[] v2)
{
var length = Math.Min(v1.Length, v2.Lenth);
double[] result = new double[length];
Parallel.ForEach(v1,
(element, loopstate, elementIndex) =>
result[elementIndex] = element * v2[elementIndex]);
return result;
}
Однако незамедлительно обнаруживаются недостатки этого подхода. Если входная последовательность будет типом IEnumerable, а не массивом, то есть 4 способа реализации сохранения порядка:
- Первый вариант — вызов IEnumerable.Count(), который обойдется в O(n). Если число элементов известно, можно создать выходной массив для сохранения результатов по заданному индексу
- Второй вариант — материализовать коллекцию, (превратив её, например, в массив). Если данных много, то такой способ не очень подходит.
- Третий вариант — хорошенько подумать над выходной коллекцией. Выходная коллекция может быть хешем, тогда количество памяти необходимой памяти для хранения выходных значении будет, по меньшей мере, в 2 раза больше входной памяти для того, чтобы избежать коллизий при хешировании; если данных много, то структура данных для хеша будет непозволительно большой, к тому же можно получить падение производительности из-за false sharing и сборщика мусора.
- И последний вариант — сохранять результаты с их оригинальными индексами, а затем применить собственный алгоритм сортировки выходной коллекции.
В PLINQ пользователь просто запрашивает сохранение порядка, а движок запросов управляет всеми рутинными деталями обеспечения корректного порядка результатов. Инфраструктура PLINQ позволяет оператору AsOrdered() обрабатывать потоковые данные, иными словами, PLINQ поддерживают ленивую материализацию. В PLINQ материализация всей последовательности — худшее решение, Вы можете легко избежать вышеуказанных проблем и выполнять параллельные операции над данными просто используя оператор AsOrdered().
Параллельная обработка потоковых данных
Использование PLINQ для обработки потока
PLINQ предлагает возможность обработки запроса как запроса над потоком. Эта возможность крайне ценна по следующим причинам:
- 1. Результаты не материализуются в массиве, таким образом, нет избыточности хранения данных в памяти.
- 2. Вы можете получать (enumerate) результаты в одиночный поток вычислений по мере получения новых данных.
Продолжая с примером анализа ценных бумаг, представим, чтобы Вы хотите вычислить риск каждой бумаги из портфеля бумаг, выдавая только бумаги, соответствующие критерию анализа рисков, а затем выполнить некоторые вычисления на отфильтрованных результатах. В PLINQ код будет выглядеть примерно так:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
var StockRiskPortfolio =
Stocks
.AsParallel()
.AsOrdered()
.Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
.Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));
foreach (var stockRisk in StockRiskPortfolio)
{
SomeStockComputation(stockRisk.Risk);
// StockRiskPortfolio will be a stream of results
}
}
В этом примере элементы распределяются на части (partitions), обрабатываются несколькими потоками, затем переупорядочиваются; это важно для понимания того, что эти шаги выполняются параллельно, по мере появления результатов фильтрации однопоточный потребитель в цикле foreach может выполнять вычисления. PLINQ оптимизирован на производительность, а не задержку обработку(latency) и внутри себя использует буферы; может случиться так, что хотя частичный результат уже получен, он будет находиться в выходном буфере до тех пор, пока выходной буфер полностью насыщен и не позволяет производить дальнейшую обработку. Ситуацию можно исправить использованием расширяющего метода PLINQ WithMergeOptions, который позволяет задать выходную буферизацию. Метод WithMergeOptions принимает в качестве параметра перечисление ParallelMergeOptions, можно указать, как запрос выдает итоговый результат, который будет использоваться одиночным потоком. Предлагаются следующие варианты:
- ParallelMergeOptions.NotBuffered — указывает, что каждый обработанный элемент возвращается из каждого потока, как только он обработан
- ParallelMergeOptions.AutoBuffered — указывает, что элементы собираются в буфер, буфера периодически возвращается потоку-потребителю
- ParallelMergeOptions.FullyBuffered — указывает, что выходная последовательность полностью буферизуется, это позволяет получить результаты быстрее, чем при использовании других вариантов, однако тогда потоку-потребителю придется долго дожидаться получения первого элемента для обработки.
Пример использования WithMergeOptions доступен на MSDN
Почему не Parallel.ForEach?
Отбросим в сторону недостатки Parallel.ForEach для сохранения порядка последовательности. Для неупорядоченных вычислений над потоком с помощью Parallel.ForEach код будет выглядеть следующим образом:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
Parallel.ForEach(Stocks,
stock => {
var risk = ComputeRisk(stock);
if(ExpensiveRiskAnalysis(risk)
{
// stream processing
lock(myLock) { SomeStockComputation(risk) };
// store results
}
}
Этот код практически идентичен примеру с PLINQ, за исключением явной блокировки и менее элегантного кода. Заметьте, что в данной ситуации Parallel.ForeEach подразумевает сохранение результатов в потоко-безопасном стиле, тогда как PLINQ делает это за вас.
Для сохранения результатов у нас есть 3 способа: первый — сохранять значения в в потоко-небезопасной коллекции и требовать блокировку при каждой записи. Второй — сохранять в потокобезопасную коллекцию, благо, .NET Framework 4 предоставляет набор таких коллекций в пространстве имен System.Collections.Concurrent и реализовывать её самим не придется. Третий способ — использовать Parallel.ForEach с thread-local хранилищем, о чем будет рассказано далее. Каждый из этих способов требует явного управления сторонними эффектами записи в коллекцию, тогда как PLINQ позволяет нам абстрагироваться от этих операций.
Операции над двумя коллекциями
Использование PLINQ для операций над двумя коллекциями
Оператор PLINQ ZIP специальным образом выполняет параллельные вычисления над двумя различными коллекциями. Так как его можно компоновать с другими запросами, Вы можете параллельно выполнить сложные операции над каждой коллекций до объединения двух коллекций. Например:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
return
a
.AsParallel()
.AsOrdered()
.Select(element => ExpensiveComputation(element))
.Zip(
b
.AsParallel()
.AsOrdered()
.Select(element => DifferentExpensiveComputation(element)),
(a_element, b_element) => Combine(a_element,b_element));
}
Пример выше демонстрирует, как каждый источник данных обрабатывается параллельно различными операциями, затем результаты из обоих источников объединяются оператором Zip.
Почему не Parallel.ForEach?
Подобная операция может быть выполнена с перегрузкой Parallel.ForEach, использующей индексы, например:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
var numElements = Math.Min(a.Count(), b.Count());
var result = new T[numElements];
Parallel.ForEach(a,
(element, loopstate, index) =>
{
var a_element = ExpensiveComputation(element);
var b_element = DifferentExpensiveComputation(b.ElementAt(index));
result[index] = Combine(a_element, b_element);
});
return result;
}
Однако здесь присутствуют потенциальные ловушки и недостатки, описанные в применении Parallel.ForEach с сохранением порядка данных, один из недостатков включает просмотр всей коллекции до конца и явное управление индексами.
Локальное состояние потока (Thread-Local State)
Использование Parallel.ForEach для доступа к локальному состоянию потока
Хотя PLINQ предоставляет более лаконичные средства для параллельных операций над данными, некоторые сценарии обработки лучше подходят под применение Parallel.ForEach, например операции, поддерживающие локальное состояние потока. Сигнатура соответствующего метода Parallel.ForEach выглядит так:
public static ParallelLoopResult ForEach<TSource,TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
Следует заметить, что существует перегрузка оператора Aggregate, которая позволяет осуществлять доступ к локальному состоянию потока и может быть использована, если шаблон обработки данных может быть описан как снижение размерности. Следующий пример иллюстрирует, как исключить числа, не являющиеся простыми из последовательности:
public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
var results = new List<R>();
using (SemaphoreSlim sem = new SemaphoreSlim(1))
{
Parallel.ForEach(source, () => new List<R>(),
(element, loopstate, localStorage) =>
{
bool filter = filterFunction(element);
if (filter)
localStorage.Add(element);
return localStorage;
},
(finalStorage) =>
{
lock(myLock)
{
results.AddRange(finalStorage)
};
});
}
return results;
}
Подобная функциональность могла быть достигнута гораздо легче с PLINQ, цель примера — показать, что использование Parallel.ForEach и локального состояния потока может очень сильно снизить расходы на синхронизацию. Однако в других сценариях локальные состояния потоков становятся абсолютно необходимыми, следующий пример демонстрирует такой сценарий.
Представьте себе, что Вы как блестящий ученый-информатик и математик разработали статистическую модель для анализа рисков ценных бумаг; эта модель по-вашему разобьет все другие модели рисков в пух и прах. Для того, чтобы доказать это, Вам нужны данные с сайтов с информацией о фондовых рынках. Но загрузка данных последовательность будет очень долгой и является бутылочным горлышком для восьмиядерного компьютера. Хотя использование Parallel.ForEach является легким способом для параллельной загрузки данных с помощью WebClient, каждый поток будет блокироваться при каждой закачке, что может быть улучшено применение асинхронного ввода-вывода; больше информации доступно здесь. По причинам производительности Вы решили использовать Parallel.ForEach для итерации по коллекции URL адресов и закачивать данные параллельно. Код выглядит примерно так:
public static void UnsafeDownloadUrls ()
{
WebClient webclient = new WebClient();
Parallel.ForEach(urls,
(url,loopstate,index) =>
{
webclient.DownloadFile(url, filenames[index] + ".dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
К удивлению, мы получим во время исполнения исключение: «System.NotSupportedException -> WebClient does not support concurrent I/O operations.» Поняв, что множество потоков не могут производить доступ к одному WebClient в одно и то же время, Вы решаете создавать WebClient для каждой закачки.
public static void BAD_DownloadUrls ()
{
Parallel.ForEach(urls,
(url,loopstate,index) =>
{
WebClient webclient = new WebClient();
webclient.DownloadFile(url, filenames[index] + ".dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
Такой код позволяет создать программе более сотни веб-клиентов, программа выдаст исключение о тайм-ауте в WebClient. Вы поймете, что на компьютере запущена не серверная операционная система, поэтому максимальное число подключений ограничено. Затем можно догадаться, что использование Parallel.ForEach с локальным состоянием потока позволит решить проблему:
public static void downloadUrlsSafe()
{
Parallel.ForEach(urls,
() => new WebClient(),
(url, loopstate, index, webclient) =>
{
webclient.DownloadFile(url, filenames[index]+".dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
return webclient;
},
(webclient) => { });
}
}
В этой реализации каждая операция доступа к данным не зависит от другой. В то же время, точка доступа ни независима, ни является потоко-безопасной. Использование локального хранилища потока позволяет нам быть уверенным в том, что количество созданных экземпляров WebClient столько, сколько требовалось, и каждый экземпляр WebClient принадлежит потоку, создавшего его.
Чем здесь плох PLINQ?
Если реализовать предыдущий пример с использованием объектов ThreadLocal и PLINQ код будет следующим:
public static void downloadUrl()
{
var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
var res =
urls
.AsParallel()
.ForAll(
url =>
{
webclient.Value.DownloadFile(url, host[url] +".dat"));
Console.WriteLine("{0}:{1}",
Thread.CurrentThread.ManagedThreadId, url);
});
}
В то время как реализация достигает тех же целей, важно понимать, что в любом сценарии, использование ThreadLocal<> существенно дороже, чем соответствующая перегрузка Parallel.ForEach. Заметим, что в этом сценарии, стоимость создания экземплеров ThreadLocal<> незначительно по сравнению со временем загрузки файла из интернета.
Выход из операций
Использование Parallel.ForEach для выхода из операций
В ситуации, когда контроль над выполнением операций существенен, важно понимать, что выход из цикла Parallel.ForEach позволяет достигнуть такого же эффекта что и проверка условия необходимости продолжения вычислений внутри тела цикла. Одна из перегрузок Parallel.ForEach, позволяющих отслеживать ParallelLoopState, выглядит так:
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState> body)
ParallelLoopState обеспечивает поддерживать для прерывания выполнения цикла двумя различными методами, описанными далее.
ParallelLoopState.Stop()
Stop() информирует цикл о необходимости прекращения выполнения итераций; свойство ParallelLoopState.IsStopped позволяет каждой итерации определить, вызвала ли какая-либо другая итерация метод Stop(). Метод Stop() обычно полезен в случае, если цикл выполняет неупорядоченный поиск и следует выйти, как только найден искомый элемент. Например, если мы хотим узнать, присутствует ли объект в коллекции, код будет примерно таким:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
var matchFound = false;
Parallel.ForEach(TSpace,
(curValue, loopstate) =>
{
if (curValue.Equals(match) )
{
matchFound = true;
loopstate.Stop();
}
});
return matchFound;
}
Функциональность может быть достигнута и с помощью PLINQ, этот пример демонстрирует, как использовать ParallelLoopState.Stop() для контроля поток выполнения.
ParallelLoopState.Break()
Break() информирует цикл о том, что элементы, предшествующие текущему элементу должны быть обработаны, но для последующих элементов итерации нужно прекратить. Значение нижней итерации может быть получено из свойства ParallelLoopState.LowestBreakIteration. Break() обычно полезен, если выполняется поиск по упорядоченным данным. Другими словами, есть некий критерий необходимости обработки данных. Например, для последовательности, содержащей неуникальные элементы, в которой необходимо найти нижний индекс совпадающего объекта, код будет выглядеть так:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where
T: IEqualityComparer<T>
{
var loopResult = Parallel.ForEach(source,
(curValue, loopState, curIndex) =>
{
if (curValue.Equals(match))
{
loopState.Break();
}
});
var matchedIndex = loopResult.LowestBreakIteration;
return matchedIndex.HasValue ? matchedIndex : -1;
}
В этом примере цикл выполняется до тех пор, пока не найден объект, сигнал Break() означает, что только элементы с меньшим, чем у найденного объекта индексом должны быть обработаны; если будет найден еще один совпадающий экземпляр, снова будет получен сигнал Break(), так повторяется до тех пор, пока есть элементы, если объект был найден, поле LowestBreakIteration указывает на первый индекс совпадающего объекта.
Почему не PLINQ?
Хотя PLINQ обеспечивает поддержку выхода из выполнения запроса, различия в механизмах выхода у PLINQ и Parallel.ForEach существенны. Для того чтобы выйти из запроса PLINQ, запрос должен быть снабжен маркером отмены (cancellation token), как описано здесь. C Parallel.ForEach флаги выхода опрашиваются на каждой итерации. В случае с PLINQ Вы не можете полагаться на то, что отмененный запрос остановится быстро.
Заключение
Parallel.ForEach и PLINQ — мощные инструменты для быстрого внедрения параллелизма в ваших приложениях без необходимости глубокого погружения в механизмы их работы. Однако для выбора правильного инструмента решения конкретной задачи помните об отличиях и советах, описанных в этой статье.
Полезные ссылки:
Threading in C#
RSDN: Работа с потоками в C#. Параллельное программирование
Microsoft Samples for Parallel Programming with the .NET Framework