Pull to refresh

Распараллеливание задач. Случай «идеальной параллельности». Часть 1

.NET *

Распараллеливание кода без зависимостей


Введение

В первой части данной статьи мы поговорим о подходах к параллельной обработке циклов в тех удачных случаях, когда между отдельными итерациями нет зависимостей, и они могут корректно выполняться параллельно. Во второй части — рассмотрим появившиеся в .NET 4.0 механизмы для управления таким распараллеливанием, и выявим тонкости работы этих механизмов.



Рассмотрим некоторый цикл, в котором обрабатываются данные:
  1. for (int i = 0; i < upperBound; i++)
  2. {
  3.         // ... тело цикла
  4. }

Если логика работы тела цикла такова, что результат его вычислений на конкретной итерации никак не зависит от результата вычислений какой-либо другой итерации, то данный цикл относится к «идеально-параллельным», так как все его итерации могут быть выполнены параллельно, буде достанет для этого ядер на процессоре.

Такое же определение можно отнести и к аналогичному foreach-циклу; из него же вытекает, что порядок вычисления итераций неважен.

Подходы к реализации параллельного цикла

Попробуем реализовать сами метод для выполнения цикла параллельно, и посмотрим, с какими трудностями мы при этом столкнёмся.

Итак, мы собираемся создать метод наподобие такого:

  1. public static void ParallelFor(int from, int to, Action<int> body);


Он должен будет выполнить body для всех значений в интервале от from до to (не включая последнее), при этом распараллелив их выполнение на несколько потоков, чтобы добиться максимальной производительности. Насколько же возможно это распараллелить?

Определение степени параллелизма

Есть здравое суждение, что логично использовать столько потоков, сколько есть ядер у процессора на этой машине. При таком решении, каждое из ядер будет полностью загружено. При большем количестве потоков мы будем получать рост накладных затрат на переключение между ними, при меньшем — какие-то из ядер будут простаивать.

Такой подход — по потоку на ядро во многих случаях верен, хотя и базируется на идеалистичной модели вычислений, когда все потоки занимаются только вычислениями на процессоре. Однако, иногда может быть полезно иметь и больше потоков. Например, в случае, если потоки в своей работе половину времени проводят в ожидании ресурсов, то процессор всё равно простаивает в это время, и увеличение количества рабочих потоков может привести к улучшению производительности.

Таким образом, выходит что стоит начинать с простого варианта «один поток на ядро», но быть готовым к рассмотрению и других стратегий.

Для того, чтобы получить количество доступных ядер, мы можем воспользоваться свойством System.Environment.ProcessorCount. Надо отметить, что оно возвращает количество ядер с учётом Hyper-threading, то есть, в случае его наличия, будет возвращено «логическое», уже удвоенное количество ядер.

С учётом вышесказанного, вот логичная (хотя несколько наивная) реализация:
  1. public static void ParallelFor(int from, int to, Action<int> body)
  2. {
  3.         // определяемся с количество потоков и размером блока данных для каждого потока
  4.         int size = to - from;
  5.         int numProcs = Environment.ProcessorCount;
  6.         int range = size / numProcs;
  7.        
  8.         // разбиваем данные, запускаем все потоки и ждём завершения
  9.         var threads = new List<Thread>(numProcs);
  10.         for (int p = 0; p < numProcs; p++)
  11.         {
  12.                 int start = p * range + from;
  13.                 int end = (p == numProcs - 1) ?
  14.                 to : start + range;
  15.                 threads.Add(new Thread(() => {
  16.                         for (int i = start; i < end; i++) body(i);
  17.                 }));
  18.         }      
  19.         foreach (var thread in threads) thread.Start();
  20.         foreach (var thread in threads) thread.Join();
  21. }


Важным недостатком такой версии является создание/завершение новых потоков каждый раз, так как это довольно затратная операция (в частности, каждый поток резервирует 1М памяти в стеке, даже если в данный момент не выполняется ни одной функции на нём).

Другая проблема тоже вытекает из создания новых потоков. Предположим, что в body нам передали некий код, который тоже содержит вызов ParallelFor. В таком случае, в процессе выполнения будет создано уже не numProcs потоков, а в два раза больше, и может возникнуть ситуация, когда затраты на переключение между потоками будут слишком велики («чрезмерная многопоточность»).

Статическое распредение итераций

Поэтому вместо создания потоков вручную мы предпочтём воспользоваться пулом потоков:
  1. public static void ParallelFor(int from, int to, Action<int> body)
  2. {
  3.         int size = to - from;
  4.         int numProcs = Environment.ProcessorCount;
  5.         int range = size / numProcs;
  6.  
  7.         int remaining = numProcs;
  8.  
  9.         // объект синхронизации, для определения завершения работы
  10.         using (ManualResetEvent mre = new ManualResetEvent(false))
  11.         {
  12.                 // создаём все задания
  13.                 for (int p = 0; p < numProcs; p++)
  14.                 {
  15.                         int start = p * range + from;
  16.                         int end = (p == numProcs - 1) ? to : start + range;
  17.  
  18.                         ThreadPool.QueueUserWorkItem(delegate {
  19.                                 for (int i = start; i < end; i++)
  20.                                         body(i);
  21.                                 // проверяем, не последнее ли это задание выполнилось
  22.                                 if (Interlocked.Decrement(ref remaining) == 0)
  23.                                         mre.Set();
  24.                         });
  25.                 }
  26.                 // Ждём, пока все задания выполнятся
  27.                 mre.WaitOne();
  28.         }
  29. }

Такое решение уже лишено проблем с чрезмерной многопоточностью и с затратами на создание потоков. Что ещё может помешать в данной ситуации максимально быстрой отработке данного цикла?

В случае, если все итерации небольшие по времени и приблизительно одинаково затратные, приведённый подход близок к оптимальному.

А если мы представим себе, что некоторые итерации завершаются быстро, а некоторые — во много раз дольше, то в таком случае тот поток из пула, которому не повезло быть исполнителем большего количества «долгих» итераций, будет работать в несколько раз дольше остальных. При этом, так как время работы всей параллельной операции определяется временем работы самой медленной её составляющей, то в итоге остальные потоки будут простаивать, сделав всю «свою» работу, в то время как один «невезучий» будет задерживать всех.

Чтобы в такой ситуации обеспечить минимальное время работы, нужно 
  • либо знать природу итераций, чтобы всем потокам поровну раздать «долгие» итерации,
  • либо изменить вообще подход к разделению задач между потоками.

Динамическое распределение итераций

От статического разделения можно перейти к динамическому, то есть, выдавать каждому потоку некоторую «порцию» работы, сделав которую, он получит следующую порцию, если несделанная работа ещё останется к тому времени.

Такой подход можно проиллюстрировать этим кодом:

  1. public static void ParallelFor(int from, int to, Action<int> body)
  2. {
  3.         int numProcs = Environment.ProcessorCount;
  4.         // количество оставшихся
  5.         int remainingWorkItems = numProcs;
  6.         int nextIteration = from;
  7.        
  8.         using (ManualResetEvent mre = new ManualResetEvent(false))
  9.         {
  10.                 // создаём задания
  11.                 for (int p = 0; p < numProcs; p++)
  12.                 {
  13.                         ThreadPool.QueueUserWorkItem(delegate
  14.                         {
  15.                                 int index;
  16.                                 // отбираем по одному элементу на выполнение
  17.                                 while ((index = Interlocked.Increment(ref nextIteration) - 1) < to)
  18.                                 {
  19.                                         body(index);
  20.                                 }
  21.                                 if (Interlocked.Decrement(ref remainingWorkItems) == 0)
  22.                                         mre.Set();
  23.                         });
  24.                 }
  25.                 // ждём, пока отработают все задания
  26.                 mre.WaitOne();
  27.         }
  28. }


Данный код хорош для случая долгих итераций с непредсказуемым временем работы, но для быстрых итераций в нём слишком много затрат на синхронизацию.

Сбалансированный подход

Таким образом, мы видим, что в зависимости от природы распараллеливаемого цикла, может быть выигрышной как стратегия статического разделения заданий между потоками, так и динамического. Логично предположить, что удачной для промежуточных случаев может быть и некоторая сбалансированная стратегия.

Вот вариант её кода. Идея в том, что «порцию» мы делаем несколько больше, чем просто один элемент, но меньше, чем в случае статического распределения.

  1. public static void ParallelFor(int from, int to, Action<int> body)
  2. {
  3.         int numProcs = Environment.ProcessorCount;
  4.         int remainingWorkItems = numProcs;
  5.         int nextIteration = from;
  6.         // размер "порции" данных
  7.         const int batchSize = 3;
  8.         using (ManualResetEvent mre = new ManualResetEvent(false))
  9.         {
  10.                 for (int p = 0; p < numProcs; p++)
  11.                 {
  12.                         ThreadPool.QueueUserWorkItem(delegate {
  13.                                 int index;
  14.                                 while ((index = Interlocked.Add(ref nextIteration, batchSize) - batchSize) < to)
  15.                                 {
  16.                                         int end = index + batchSize;
  17.                                         if (end >= to)  
  18.                                                 end = to;
  19.                                         for (int i = index; i < end; i++)
  20.                                                 body(i);
  21.                                 }
  22.                                 if (Interlocked.Decrement(ref remainingWorkItems) == 0)
  23.                                         mre.Set();
  24.                         });
  25.                 }
  26.                 mre.WaitOne();
  27.         }
  28. }


Здесь размер порции данных задан константой, и, меняя её, мы выбираем нужный нам уровень между статическим и динамическим распределением задач.

Заключение

В любом случае, оптимальные настройки для конкретного случая определяются природой вычислений. Но компромиссный вариант с динамической раздачей более-менее крупных «порций» работы может оказаться вполне приемлемым для большинства ситуаций. Именно так реализованы библиотечные методы параллельного выполнения циклов в .NET 4, о которых пойдёт речь во второй части статьи.

P.S. Данная статья написана под впечатлением книги «PATTERNS OF PARALLEL PROGRAMMING: UNDERSTANDING AND APPLYING PARALLEL PATTERNS WITH THE .NET FRAMEWORK 4 AND VISUAL C#» и может считаться её вольным переводом с переработкой.

______________________
Текст подготовлен в Редакторе Блогов от © SoftCoder.ru


UPD: Вторая часть — практическая: habrahabr.ru/blogs/net/104103
Tags:
Hubs:
Total votes 44: ↑35 and ↓9 +26
Views 22K
Comments 29
Comments Comments 29

Stories