Параллельные стримы в Java 8 выглядят как бесплатное ускорение: добавил .parallel() — и все ядра побежали работать. Но по факту всё сложнее, так как распараллеливание имеет накладные расходы, зависит от того, насколько хорошо делится источник данных, сколько стоит объединение результатов, и даже от локальности данных в памяти.

В новом переводе от команды Spring АйО на примерах и бенчмарках разберем, почему простая свёртка на маленьких диапазонах может стать медленнее, почему ArrayList часто выигрывает у LinkedList в параллели, как reduce может неожиданно сломаться из-за неверного identity, и что вообще происходит внутри common ForkJoinPool.

Обзор

В Java 8 появился Stream API, который упрощает обход коллекций как потоков данных. Также очень просто создавать стримы, которые выполняются параллельно и задействуют несколько процессорных ядер.

Можно подумать, что всегда быстрее распределить работу по большему числу ядер. Но на практике это часто не так.

Стримы в Java

Стрим в Java — это всего лишь обёртка над источником данных, позволяющая удобно выполнять над данными пакетные операции.

Он не хранит данные и не вносит изменений в исходный источник данных. Вместо этого он добавляет поддержку функционального стиля для операций в конвейерах обработки данных.

Последовательные стримы

По умолчанию любая операция со стримом в Java выполняется последовательно, если явно не указано параллельное выполнение.

Последовательные стримы используют один поток для обработки конвейера:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())
);

Вывод такого последовательного стрима предсказуем. Элементы списка всегда будут напечатаны в упорядоченной последовательности:

1 main
2 main
3 main
4 main

Параллельные стримы

Любой стрим в Java легко преобразовать из последовательного в параллельный.

Это можно сделать, добавив метод parallel к последовательному стриму, или создав стрим через метод parallelStream у коллекции:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())
);

Параллельные стримы позволяют выполнять код параллельно на разных ядрах. Итоговый результат — это комбинация отдельных результатов.

Однако порядок выполнения мы не контролируем. Он может меняться при каждом запуске программы:

4 ForkJoinPool.commonPool-worker-3
2 ForkJoinPool.commonPool-worker-5
1 ForkJoinPool.commonPool-worker-7
3 main

Fork-Join Framework

Параллельные стримы используют fork-join framework и его общий пул рабочих потоков.

Fork-join framework был добавлен в java.util.concurrent в Java 7 для управления задачами между несколькими потоками.

Разбиение источника

Fork-join framework отвечает за разбиение исходных данных между рабочими потоками и за обработку обратных вызовов при завершении задач.

Рассмотрим пример параллельного вычисления суммы целых чисел.

Мы воспользуемся методом reduce и добавим пять к стартовой сумме вместо того, чтобы начинать с нуля:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
assertThat(sum).isNotEqualTo(15);

В последовательном стриме результат этой операции был бы равен 15.

Но поскольку операция reduce выполняется параллельно, число пять фактически прибавляется в каждом рабочем потоке:

Фактический результат может отличаться в зависимости от количества потоков, используемых в общем fork-join пуле.

Чтобы исправить эту проблему, число пять нужно прибавлять вне параллельного стрима:

List listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
assertThat(sum).isEqualTo(15);

Следовательно, нужно внимательно относиться к тому, какие операции можно запускать параллельно.

Общий пул потоков

Количество потоков в общем пуле равно (число процессорных ядер - 1).

Однако API позволяет указать, сколько потоков он будет использовать, передав параметр JVM:

-D java.util.concurrent.ForkJoinPool.common.parallelism=4

Важно помнить, что это глобальная нас��ройка, и она повлияет на все параллельные стримы и любые другие fork-join задачи, использующие общий пул. Мы настоятельно рекомендуем не менять этот параметр, если только для этого нет действительно веской причины.

Пользовательский пул потоков

Помимо стандартного общего пула потоков, параллельный стрим можно запускать и в пользовательском пуле:

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
    () -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();
customThreadPool.shutdown();
assertThat(sum).isEqualTo(10);

Обратите внимание: Oracle рекомендует использовать общий пул потоков. Следовательно, для запуска параллельных стримов в пользовательских пулах должны быть очень веские основания.

Последствия для производительности

Параллельная обработка может быть полезной, чтобы задействовать несколько ядер по максимуму. Но нужно также учитывать накладные расходы на управление потоками, локальность данных в памяти, разбиение источника и объединение результатов.

Накладные расходы

Рассмотрим пример со стримом целых чисел. Запустим бенчмарк для последовательной и параллельной операции свёртки:

IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);

Для такой простой операции суммирования перевод последовательного стрима в параллельный дал худшую производительность:

Benchmark Mode Cnt Score Error Units
SplittingCosts.sourceSplittingIntStreamParallel avgt 25 35476,283 ± 204,446 ns/op
SplittingCosts.sourceSplittingIntStreamSequential avgt 25 68,274 ± 0,963 ns/op

Причина в том, что иногда накладные расходы на управление потоками, источниками и результатами обходятся дороже, чем сама работа.

Стоимость разбиения

Равномерное разбиение источника данных и есть цена параллельного выполнения, однако одни источники делятся лучше других.

Рассмотрим это на примере ArrayList и LinkedList:

private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();

static {
    IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
        arrayListOfNumbers.add(i);
        linkedListOfNumbers.add(i);
    });
}

Запустим бенчмарк для последовательной и параллельной операции свёртки на двух типах списков:

arrayListOfNumbers.stream().reduce(0, Integer::sum)
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
linkedListOfNumbers.stream().reduce(0, Integer::sum);
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);

Результаты показывают, что перевод последовательного стрима в параллельный даёт выигры�� по производительности только для ArrayList:

Benchmark Mode Cnt Score Error Units
DifferentSourceSplitting.differentSourceArrayListParallel avgt 25 2004849,711 ± 5289,437 ns/op
DifferentSourceSplitting.differentSourceArrayListSequential avgt 25 5437923,224 ± 37398,940 ns/op
DifferentSourceSplitting.differentSourceLinkedListParallel avgt 25 13561609,611 ± 275658,633 ns/op
DifferentSourceSplitting.differentSourceLinkedListSequential avgt 25 10664918,132 ± 254251,184 ns/op

А все дело в том, что массивы делятся дёшево и равномерно, тогда как у LinkedList нет ни одного из этих свойств. TreeMap и HashSet делятся лучше, чем LinkedList, но не все равно не так хорошо, как массивы.

Стоимость объединения результатов

Каждый раз, когда мы делим источник для параллельных вычислений, нужно также убедиться, что результаты в конце будут объединены.

Запустим бенчмарк для последовательного и параллельного стрима, сравнив суммирование и группировку как разные операции объединения:

arrayListOfNumbers.stream().reduce(0, Integer::sum);
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);
arrayListOfNumbers.stream().collect(Collectors.toSet());
arrayListOfNumbers.stream().parallel().collect(Collectors.toSet())

Результаты показывают, что перевод последовательного стрима в параллельный даёт выигрыш по производительности только для операции суммирования:

Benchmark Mode Cnt Score Error Units
MergingCosts.mergingCostsGroupingParallel avgt 25 135093312,675 ± 4195024,803 ns/op
MergingCosts.mergingCostsGroupingSequential avgt 25 70631711,489 ± 1517217,320 ns/op
MergingCosts.mergingCostsSumParallel avgt 25 2074483,821 ± 7520,402 ns/op
MergingCosts.mergingCostsSumSequential avgt 25 5509573,621 ± 60249,942 ns/op

Операция объединения очень дешева для некоторых случаев. Например, для свёртки и сложения. Но объединение результатов при группировке в множества может быть весьма затратным.

Локальность данных в памяти

Современные компьютеры используют сложную многоуровневую систему кэшей, чтобы держать часто используемые данные ближе к процессору. Когда обнаруживается линейный шаблон доступа к памяти, аппаратная часть заранее подгружает следующую строку данных, исходя из предположения, что она, вероятно, скоро понадобится.

Параллелизм даёт выигрыш по производительности, когда мы можем удерживать ядра процессора занятыми полезной работой. Поскольку ожидание промахов кэша полезной работой не является, нужно учитывать пропускную способность памяти как ограничивающий фактор.

Рассмотрим это на примере двух массивов: один использует примитивный тип, другой — ссылочный тип данных:

private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];

static {
    IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
        intArray[i-1] = i;
        integerArray[i-1] = i;
    });
}

Запустим бенчмарк для последовательной и параллельной операции свёртки на двух массивах:

Arrays.stream(intArray).reduce(0, Integer::sum);
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
Arrays.stream(integerArray).reduce(0, Integer::sum);
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);

Результаты показывают, что перевод последовательного стрима в параллельный даёт немного больший выигрыш по производительности при использовании массива примитивов:

Benchmark                                                     Mode  Cnt        Score        Error  Units
MemoryLocalityCosts.localityIntArrayParallel                sequential stream  avgt   25     116247,787 ±     283,150  ns/op
MemoryLocalityCosts.localityIntArraySequential                avgt   25     293142,385 ±    2526,892  ns/op
MemoryLocalityCosts.localityIntegerArrayParallel              avgt   25    2153732,607 ±   16956,463  ns/op
MemoryLocalityCosts.localityIntegerArraySequential            avgt   25    5134866,640 ±  148283,942  ns/op

Массив примитивов обеспечивает наилучшую возможную локальность в Java. В целом, чем больше указателей содержит наша структура данных, тем сильнее мы нагружаем память необходимостью подгружать объекты по ссылкам. Это может негативно влиять на распараллеливание, поскольку несколько ядер одновременно извлекают данные из памяти.

Модель NQ

Oracle представила простую модель, которая помогает определить, может ли параллелизм дать прирост производительности. В модели NQ N означает число элементов исходных данных, а Q — объём вычислений, выполняемых на один элемент.

Чем больше произведение N*Q, тем выше вероятность получить выигрыш от распараллеливания. Для задач с тривиально малым Q, например для суммирования чисел, эмпирическое правило такое: N должно быть больше 10 000. По мере роста числа вычислений уменьшается размер данных, необходимый для выигрыша от параллелизма.

Стоимость поиска по файлам

Поиск файлов с использованием параллельных стримов работает лучше по сравнению с последовательными. Запустим бенчмарк для последовательного и параллельного стрима при поиске по 1500 текстовым файлам:

Files.walk(Paths.get("src/main/resources/")).map(Path::normalize).filter(Files::isRegularFile)
      .filter(path -> path.getFileName().toString().endsWith(".txt")).collect(Collectors.toList());
Files.walk(Paths.get("src/main/resources/")).parallel().map(Path::normalize).filter(Files::
      isRegularFile).filter(path -> path.getFileName().toString().endsWith(".txt")).
      collect(Collectors.toList());

Результаты показывают, что перевод последовательного стрима в параллельный даёт немного больший выигрыш по производительности при поиске по большему количеству файлов:

Benchmark                                Mode  Cnt     Score         Error    Units
FileSearchCost.textFileSearchParallel    avgt   25  10808832.831 ± 446934.773  ns/op
FileSearchCost.textFileSearchSequential  avgt   25  13271799.599 ± 245112.749  ns/op

Когда использовать параллельные стримы

Как мы увидели, к использованию параллельных стримов нужно подходить очень аккуратно.

Параллелизм может дать выигрыш по производительности в определённых сценариях. Но параллельные стримы нельзя считать "магическим" ускорителем. Поэтому в процессе разработки последовательные стримы по умолчанию всё равно должны оставаться основным выбором.

Последовательный стрим стоит преобразовывать в параллельный, когда есть реальные требования к производительности. Имея такие требования, сначала следует провести измерения и рассматривать параллелизм как одну из стратегий оптимизации.

Большие объёмы данных и значительное число вычислений на элемент указывают на то, что параллелизм может быть хорошим вариантом.

С другой стороны, небольшой объём данных, неравномерно делящиеся источники, дорогие операции объединения и плохая локальность данных в памяти указывают на потенциальные проблемы для параллельного выполнения.

Заключение

В этой статье мы разобрали различия между последовательными и параллельными стримами в Java. Мы узнали, что параллельные стримы используют стандартный fork-join пул и его рабочие потоки.

Затем мы увидели, что параллельные стримы не всегда дают выигрыш по производительности. Постарались учесть накладные расходы на управление несколькими потоками, локальность данных в памяти, разбиение источника и объединение результатов. Мы также увидели, что массивы — это отличный источник данных для параллельного выполнения, поскольку обеспечивают максимально возможную локальность и могут делиться дёшево и равномерно.

Наконец, мы рассмотрели модель NQ и рекомендовали использовать параллельные стримы только тогда, когда действительно есть требования к производительности.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано.