Pull to refresh

Распараллеливание задач. Случай «идеальной параллельности». Часть 2

.NET *

Предлагаемые решения в .NET 4


Это вторая часть статьи, посвященной распараллеливанию идеальных циклов. В первой части были рассмотрены проблемы, возникающие при этом, и общие подходы к их решению. В этой мы поговорим о конкретных библиотечных компонентах, предоставляемых .NET 4.0 для поддержки этих задач.

Для распараллеливания «идеальных» циклов предоставляются следующие варианты:
  • класс System.Threading.Tasks.Parallel с методами For (), ForEach ()
  • Parallel LINQ с методом расширения AsParallel ().




Методы Parallel.For и Parallel.ForEach


Начнём с рассмотрения класса Parallel и его методов организации циклов.

Первый рассматриваемый нами метод имеет сигнатуру (это базовая, одна из многих перегрузок):

  1. public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);


Она во многом аналогична сигнатурам наших экспериментальных методов из первой части статьи, кроме типа возвращаемого значения.

Второй «в базовой комплектации» выглядит как:

  1. public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body);


Рассмотрим, что умеют методы Parallel.For и Parallel.ForEach.

Обработка исключений. При возникновении исключения в одной из итераций, новые итерации больше не стартуют. Уже стартовавшие, тем не менее, могут закончить свою работу, и уже по их завершении все исключения (изначальное и возможные последующие исключения в дорабатывавших итерациях) аггрегируются в одно исключение типа AggregateException и оно выкидывается.

Раннее прерывание цикла. Соответствующие перегрузки метода For предоставляют возможность прерывать выполнение, вызывая у контекстного объекта (передаваемого в каждую итерацию) методы Stop или Break.

Отличие Stop от Break в том, что Stop сигнализирует о необходимости прекратить запускать новые итерации вообще, а Break — прекратить запускать новые итерации, следующие (по порядку) за той, в которой вызван Break. То есть, при вызове Break на 5-й итерации гарантируется, несмотря на параллельность, что итерации с 1-й по 4-ю всё же будут выполнены. А при вызове Stop на 5 итерации, если итерация 4 ещё не стартовала, то она и не будет стартовать.

Текущие итерации, которые уже были запущены на момент вызова Stop или Break, могут проверять статус прерывания цикла, и завершиться раньше времени, если узнают о прерывании всего цикла. Для этого они могут у контекстного объекта проверить соответствующие свойства: IsStopped и LowestBreakIteration.

Поддержка хранения данных на уровне потока. Некоторые перегрузки метода позволяют сохранять промежуточные результаты вычислений в хранилище данных, локальном для каждого потока. Это позволяет, например, при агрегации результата, сперва агрегировать результаты работы отдельного потока, и только потом уже агрегировать их между всеми работавшими потоками. Это избавляет от излишних затрат на синхронизацию в процессе вычислений.

Возможность конфигурирования уровня параллелизма. Можно указать максимальное количество потоков, используемое для выполнения.

Поддержка вложенности вызовов. Благодаря использованию пула потоков, чрезмерной многопоточности не возникает как при вложенных вызовах методов For и ForEach, так и при параллельном их выполнении.

Динамическое изменение количества потоков. Parallel.For был спроектирован из расчёта изменяющейся во времени загрузки, и с учётом того, что некоторые элементы работы могут требовать большего количества вычислений, чем другие. Поэтому количество используемых потоков может изменяться в процессе работы метода.

Сложное управление загрузкой потоков. В методе была реализована логика балансировки загрузки потоков, которая учитывает большое количество разных факторов. Также, размер блока («порции данных» для обработки) увеличивается в процессе работы, что позволяет получить более качественную балансировку загрузки для небольшого количества итераций (предполагается, что более «тяжелых» при этом), и меньшие затраты на синхронизацию «раздачи заданий» при большом количестве итераций.

Поддержка отмены выполнения цикла извне его. Для этого используется класс CancellationTokenSource. При запуске цикла необходимо передать в него свойство Token, и тогда для запроса отмены цикла извне необходимо просто вызвать у объекта CancellationTokenSource метод Cancel (), что предотвратит запуск новых итераций цикла, и по завершению всех текущих сгенерирует исключение OperationCanceledException. Текущие итерации, кстати, могут проверять статус отмены, чтобы «добровольно» завершиться раньше времени, если они узнают, что весь цикл отменён.

Parallel LINQ



Доступный ещё в .NET 3.5 в качестве расширения, Parallel LINQ доступен в .NET 4.0 сразу в System.Core. Идея его использования очень проста: мы добавляем в цепочку LINQ-запросов вызов .AsParallel (), и все последующие вызовы в цепочке выполняются распаралеленно. Например:

  1. var doubled = new [] {1, 2, 3, 4}.AsParallel().Select(i => i*2);


Может оказаться неочевидным, что порядок следования элементов в результирующей коллекции может быть произвольным, так они обрабатываются параллельно. Чтобы сохранить порядок следования элементов в коллекции, мы можем модифицировать наш первый фрагмент вот так:

  1. var doubled = new [] {1, 2, 3, 4}.AsParallel().AsOrdered().Select(i => i*2);


Обработка всё равно пройдёт параллельно, но этап агрегации (сборки результатов от каждого потока) может длиться дольше, так как нужно будет дождаться сперва потоков, считающих «первые» элементы, взять их результат, и только потом взять результат от потоков, считавших «последние» элементы.

Важно осознавать при работе с PLINQ, что есть дополнительные накладные расходы на 
  1. распределение поступающих элементов исходного перечисления по потокам
  2. агрегацию вычисленных элементов в общую коллекцию.

Причём от распределения работы по потокам никуда не деться, а вот агрегацию можно попробовать отсрочить, а то и вообще обойтись без неё, если стоит задача не получить новую коллекцию, а просто обработать элементы существующей.

Метод AsParallel () возвращает не IEnumerable, а ParallelEnumerable, после чего все другие LINQ-методы (Select, Where и т.п.) тоже возвращают этот же тип. До тех пор, пока в цепочке передаётся ParallelEnumarable, результаты вычисления каждого из потоков без необходимости не агрегируются, и строится конвейер.

Важно понимать, что когда конвейер рвётся, параллелизм прекращается и данные агрегируются в единую результирующую коллекцию. Например, вот этот код работает не так, как можно было бы ошибочно предположить:

  1. List<InputData> inputData = ...;
  2. foreach (var o in inputData.AsParallel().Select(i => new OutputData(i)))
  3. {
  4.         ProcessOutput(o);
  5. }


Параллельно здесь выполнится только создание объектов OutputData, после чего у всех потоков будут собраны эти объекты и из них будет образована результирующая коллекция, которая последовательно будет обходиться с вызовом ProcessOutput () для каждого элемента.

Чтобы избежать ненужного здесь этапа агрегации, можно воспользоваться методом ParallelEnumarable.ForAll ():

  1. List<InputData> inputData = ...;
  2. inputData.AsParallel().Select(i => new OutputData(i)).ForAll(o =>
  3. {
  4.         ProcessOutput(o);
  5. });


В этом случае после этапа «new OutputData (i)» этап «ProcessOutput (o)» тоже будет выполняться параллельно, причём без этапа агрегации между ними.

Надо отметить, что вызов Parallel.ForEach () для «inputData.AsParallel ().Select (i => new OutputData (i))» будет иметь тот же недостаток, что и первый пример с обычным foreach: в Parallel.ForEach () передаётся IEnumerable, а не ParallelEnumerable — поэтому перед передачей коллекции в Parallel.ForEach () произойдёт её агрегация. Именно для избежания этого существует метод ParallelEnumerable.ForAll (), который и следует использовать в этом случае.

Типичные проблемы и ошибки


Рассмотрим возможные проблемы при работе с этими компонентами.

Потокобезопасность собственного кода

Прежде всего, надо понимать, что использование Parallel.ForEach само по себе не сделает ваш код потокобезопасным — необходимо самому следить, что итерации независимы друг от друга, или, если они зависимы, чётко предусмотреть потокобезопасную работу с разделяемыми ресурсами.

Также, при работе с Parallel.For непосредственно не поддерживаются нисходящие циклы и циклы с нестандартным (не равным единице) приращением счётчика. Если ваши изначальные алгоритмы написаны таким образом, необходимо внимательно проанализировать их, так как часто нестандартные циклы пишут как раз из-за наличия зависимостей между итерациями (обращение к вычисленному на предыдущем этапе предыдущему элементу массива и т.п.).

Размеры тела цикла

Использование класса Parallel предполагает накладные расходы, как минимум,
  • на вызов делегата для выполнения тела цикла, и 
  • на синхронизацию между потоками при раздаче заданий.


Если тело цикла достаточно длительное, эти дополнительные расходы играют небольшую роль. Однако, если мы распараллеливаем что-то очень простое вроде «i = i*i», накладные расходы в этом случае превышают полезную работу. Чтобы избавиться от этого недостатка, необходимо «укрупнить» тело цикла. Это очень просто сделать, включив в него не одну, а много итераций.

Вручную это можно сделать, явно разбив входную последовательность на набор блоков, и запустить параллельный цикл по этому набору, а в теле этого параллельного цикла уже последовательно обрабатывать каждый блок. Однако, тут придётся принимать решение о количестве этих блоков. Его можно возложить на библиотеку и воспользоваться классом, специально созданным для создания поднаборов из входной последовательности: System.Concurrent.Collections.Partitioner.

С его использованием цикл

  1. for (int i = 0; i < length; i++)
  2.         result[i] = i*i;


вместо наивной версии:

  1. Parallel.For(from, to, i =>
  2. {
  3.         result[i] = i*i;
  4. });


можно эффективно распараллелить вот так:

  1. Parallel.ForEach(Partitioner.Create(from, to), range =>
  2. {
  3.         for (int i = range.Item1; i < range.Item2; i++)
  4.         {
  5.                 result[i] = i*i;
  6.         }
  7. });


Partitioner.Create (from, to) создаёт тот самый набор блоков, по которому мы проходим параллельно, внутри тела цикла последовательно обрабатывая каждый блок. Тем самым мы обеспечиваем параллельному циклу длительное тело, и распределяем накладные расходы на большее количество полезной работы.

Обработка вложенных циклов

При обработке вложенных циклов возникает вопрос, насколько глубоко стоит проводить распараллеливание. Возьмём пример по обработке прямоугольного изображения:

  1. for(int y = 0; y < screenHeight; y++)
  2. {
  3.         int stride = y * screenWidth;
  4.         for (int x = 0; x < screenWidth; x++)
  5.         {
  6.                 rgb[x + stride] = calcColor(x, y); // вычисляем цвет
  7.         }
  8. };


Можно заменить только внешний цикл на вызов Paralllel.For, а можно и оба. Если сделать оба цикла параллельными, не окажется ли тело внутреннего слишком мало? Если оставить внутрений последовательным, точно ли мы загрузим работой все наши ядра?

Ответ на эти вопросы может дать только тестирование производительности. Вероятно, если вычисления цвета не слишком сложны, стоит оставить внутренний цикл последовательным. Но если мы работаем с широким, но невысоким изображением, вполне возможно, что одно только распараллеливание внешнего цикла загрузит не все ядра процессора.

Альтернативным путём может быть разворачивание двух вложенных циклов в один, и распараллеливание уже его:

  1. int totalPixels = screenHeight * screenWidth;
  2. Parallel.For(0, totalPixels, i =>
  3. {
  4.         int y = i / screenWidth, x = i % screenWidth;
  5.         rgb[i] = calcColor(x, y);
  6. });


Если calcColor () слишком проста, то чтобы укрупнить тело цикла, можно воспользоваться классом Partitioner, как в предыдущем примере:

  1. int totalPixels = screenHeight * screenWidth;
  2. Parallel.ForEach(Partitioner.Create(0, totalPixels), range =>
  3. {
  4.         for (int i = range.Item1; i < range.Item2; i++)
  5.         {
  6.                 int y = i / screenWidth, x = i % screenWidth;
  7.                 rgb[i] = calcColor(x, y);
  8.         }
  9. });


Непотокобезопасные реализации IList

Оба механизма — и Parallel.ForEach, и PLINQ принимают на входе IEnumerable, но при этом пытаются для переданной им коллекции найти наиболее быстрый интерфейс по работе с ней. В частности, для распараллеливания интерфейс IList подходит лучше, чем просто IEnumerable так как в нём есть индексер для произвольного доступа к любому элементу. Поэтому, если для переданной коллекции определён IList, то работа с ней происходит именно через этот интерфейс. Это понижает затраты на синхронизацию, но при этом библиотечный код полагается на потокобезопасную реализацию индексера.

Если же используемая коллекция не предоставляет потокобезопасный индексер, что часто может быть в случае, если элементы хранятся в сложном для индексации виде, или вообще подгружаются лениво, то необходимо явным образом указать системе, что для неё не нужно использовать IList, а ограничиться IEnumerable.

Чтобы это сделать, подходят два варианта. Первый — создать для коллекции System.Collections.Concurrent.Partitioner:

  1. // Здесь может быть использован IList<T>, если он поддерживается коллекцией
  2. IEnumerable<T> source = ...;
  3. Parallel.ForEach(source, item => { /*...*/ });
  4.  
  5. // А здесь гарантированно будет использоваться IEnumerable<T>
  6. IEnumerable<T> source = ...;
  7. Parallel.ForEach(Partitioner.Create(source), item => { /*...*/ });


Второй способ очевиден, и знаком по обычному LINQ: просто вызвать для коллекции «.Select (item => item)». Он подходит как для Parallel.For, так и для PLINQ:

  1. // Здесь тоже гарантированно будет использоваться IEnumerable<T>
  2. source.Select(i => i).AsParallel().Select(i => { /*...*/ });


Наличие сродства к потоку (thread affinity) у исходной коллекции

При работе Parallel.ForEach и PLINQ каждый из рабочих потоков сам вызывает у исходной коллекции MoveNext (). Если коллекция такова, что доступ к ней возможен лишь из одного определённого потока (обладает «сродством к потоку»), как, например, бывает при работе с UI-компонентами в Windows Forms или WPF, то использовать эти механизмы непосредственно для неё нельзя.

Чтобы обеспечить параллельную обработку такой коллекции, нужно воспользоваться шаблоном Источник-Потребитель (Producer-Consumer), основной «кирпичик» для которого в .NET 4.0 — это класс BlockingCollection, о котором напишу позже отдельно. Подробнее пока о нём можно почитать на английском в разделе «Producer-Consumer» на 53 странице оригинального документа.

P.S. Данная статья написана под впечатлением книги «PATTERNS OF PARALLEL PROGRAMMING: UNDERSTANDING AND APPLYING PARALLEL PATTERNS WITH THE .NET FRAMEWORK 4 AND VISUAL C#» и может считаться её вольным переводом с переработкой.

Заключение


Надеюсь, этот экскурс поможет совершить меньше собственных ошибок при распараллеливании вашего кода. Но всё равно, удачной отладки, и с праздником:)

На будущее


Могу продолжить тему параллельности в 4 framework, если будет интерес. Это может быть про:
  • класс Task и инфраструктуру для параллельных вычислений с зависимостями,
  • новые потокобезопасные коллекции,
  • ленивость и параллелизм,
  • разделяемые данные и синхронизацию.

Пишите, что из этого в какой степени интересно:)
Tags:
Hubs:
Total votes 35: ↑30 and ↓5 +25
Views 14K
Comments Comments 8