Pull to refresh

PLINQ: распределение данных между рабочими потоками

Reading time2 min
Views2.3K
Пусть имеется некоторая последовательность элементов, которую мы хотим обработать при помощи PLINQ-запроса. При этом есть некоторое количество физических ядер CPU, готовых выполнять рабочие потоки. Как распределить элементы входной коллекции между потоками?

Представим, что входная коллекция остаётся монолитной, и рабочие потоки по одному начинают выбирать элементы. Т.о. выборка будет сводиться к выполнению следующих действий:
  • Установка блокировки
  • Выборка элемента
  • Удаление блокировки

Очевидно, это даст большие накладные расходы на установку/снятие блокировки. Особенно они будут заметны в случае быстрой обработки элемента рабочим потоком. Избавиться от этого можно путём разбиения исходной коллекции на части.

Каким образом поделить входную последовательность элементов так, чтобы максимально распараллелить обработку, наиболее эффективно используя возможности аппаратуры? В общем случае точно известно только количество рабочих потоков, которые будут обрабатывать LINQ-запрос. Мы не можем заранее предугадать время обработки каждого элемента. Кроме того, неизвестна длина коллекции (конечно, число элементов можно предварительно подсчитать, но это действие также потенциально длительное и ресурсоёмкое). То есть, не получится «справедливо» распределить элементы между потоками выполнения. Оптимальным решением будет не делить всю исходную последовательность сразу, а выдавать данные в виде порций разного размера. Parallel Extensions делает именно так, и работает следующим образом:
  1. каждый рабочий поток имеет свою очередь элементов для обработки;
  2. начальный размер любой очереди равен единице, т.е. из исходной коллекции выбирается по одному элементу на поток;
  3. размер очереди растёт: при повторном обращении к коллекции он будет увеличен вдвое. Длина очереди для каждого потока вычисляется отдельно, и повышается до определённой величины, после чего рост прекращается;

Вот как это выглядит. В момент времени 0 каждый рабочий поток получает по 1 элементу из коллекции:

image

В следующий момент 1 рабочие потоки 1 и 2 закончат обработку своих элементов и выберут уже по два элемента из входной коллекции. При этом поток 3 всё ещё обрабатывает первый выбранный элемент:

image

Проходит ещё сколько-нибудь времени, потоки 1 и 2 заканчивают обработку полученных на предыдущем шаге элементов и получают теперь уже по 4 элемента. Поток 3 также завершает обработку и запрашивает 2 новых:

image

Ну и так далее. Здесь соблюдается баланс нагрузки: если один рабочий поток обрабатывает какой-то элемент слишком долго, остальные потоки будут брать из коллекции всё больше, таким образом, увеличивая общую эффективность. Очередь потока растёт до тех пор, пока её размер не достигнет 512 байт. То есть максимальное число элементов, например, типа Int32, равно 128.

Разумеется, рассмотренный способ не является универсальным, и в некоторых случаях будет не самым лучшим. Можно предложить ряд дополнений к используемому решению. Например, измерять среднее время обработки каждого элемента, и менять размер части с учётом этого параметра. Но разработчики Parallel Extensions решили остановиться на описанном выше подходе. Он хорошо зарекомендовал себя в большинстве сценариев, потому и был реализован.

Однако, есть возможность реализации собственного принципа распределения элементов исходной коллекции. Имеется комплект примеров приложений, использующих Paralell Extensions, среди которых есть и реализация иного способа разбиения входной коллекции.
Tags:
Hubs:
Total votes 34: ↑24 and ↓10+14
Comments5

Articles