Как стать автором
Обновить

Вилкой в глаз, или ForkJoinPool в Java

Время на прочтение8 мин
Количество просмотров61K

Всем привет. Сегодня я хотел бы поговорить о многопоточности. Вернее, не о многопоточности вообще, а о таком её механизме как ForkJoinPool. Нельзя сказать, что данная технология является новой (она появилась ещё в Java 7), или что в сети нельзя найти материалы по данной теме. Информации хватает. Например, для глубокого погружения могу порекомендовать лекцию блистательного Алексея Шипилёва, которую можно без труда найти на YouTube. Но лично мне большинство этих материалов показались либо слишком сложными, либо наоборот – поверхностными. Так же некоторые из них содержат явные ошибки, что вносит ещё большую неразбериху в данную тему. Судя по тому, что в комментариях под одной из этих статей я нашёл вот такую картинку, подобные проблемы были не только у меня.

Если вдруг и вы перелопатили всё, что нашли в сети по поводу ForkJoinPool, а просветления так и не достигли, добро пожаловать под кат. Попробуем на максимально простом примере разобраться в данной теме, с котиками, картинками, всё как вы любите.

Для понимания всего, что будет изложено ниже, крайне желательно быть знакомым с основами многопоточности (Thread, Runnable, Callable, Future и т.д.). 

Если вы уже что-то копали по теме ForkJoin, то должны знать, что в основе данной технологии лежит старый как мир принцип «разделяй и властвуй». Легко нагуглить, что если у нас есть какая-то задача, с помощью ForkJoinPool мы сначала делим её на подзадачи, выполняем их, потом объединяем результаты и делаем это всё рекурсивно…

Ладно, ладно, теперь помедленнее и по порядку. Допустим, у нас есть некий массив большого размера, заполненный числами, и наша задача - найти сумму всех этих чисел.

public static void main(String[] args) throws InterruptedException {
    int[] array = getInitArray(10000);
    System.out.println(new Date());
    long sum = 0;
    for (int i = 0; i < array.length; i++) {
        Thread.sleep(1);
        sum += array[i];
    }
    System.out.println(sum);
    System.out.println(new Date());
}

public static int[] getInitArray(int capacity) {
    int[] array = new int[capacity];
    for (int i = 0; i < capacity; i++) {
        array[i] = 1;
    }
    return array;
}

Заполнить массив можно любыми числами (в данном случае это не важно). Скорее всего System.out.println(new Date()) – не самый оптимальный способ измерить скорость выполнения кода, но весьма простой и для нашего примера сгодится. Thread.sleep(1) добавлен для того, чтобы сымитировать задачу, которая при работе в одном потоке вызывает значительную загрузку процессора. У меня на выполнение данного кода ушло 17 секунд. Таким образом, мы имеем некую большую задачу, существенно замедляющую работу нашей программы. Очевидно, что запуск её в параллельном потоке проблемы не решит. Что же делать? Конечно же, разбить эту задачу на подзадачи. Допустим, мы разделим наш массив пополам, суммирование первой части массива запустим в одном потоке, суммирование второй части массива – в другом, а потом сложим получившиеся результаты. Проблема в том, что если задача достаточно большая, то обе её половинки также могут получиться достаточно большого размера, что не лучшим образом скажется на производительности. Следовательно, возможно и их нужно будет поделить на части и продолжать данную операцию до достижения некоего оптимального размера. Когда условие будет достигнуто, каждый из этих кусочков мы отдадим отдельному потоку, а потом соберём получившиеся результаты воедино. Чувствуете? В воздухе отчётливо запахло рекурсией, и мы всё ближе приближаемся к ForkJoinPool.

Допустим, что в деле изучения ForkJoinPool вы уже миновали стадию гнева и находитесь на стадии отрицания, тогда у вас может возникнуть вполне резонный вопрос: «Ну, и зачем нам нужен этот ForkJoin, да ещё и с какой-то рекурсией? Разве нельзя всё сделать проще?» В каком-то смысле можно. Напомню, что у нас есть интерфейс Callable, метод которого call() возвращает некое значение и запускается асинхронно в отдельном потоке. Ничто не мешает нам создать класс, имплементирующий данный интерфейс и содержащий в качестве поля числовой массив. Мы можем поделить наш огромный массив на 100500 маленьких массивов, создать 100500 экземпляров такого класса, создать 100500 отдельных потоков, собрать их в одну коллекцию, запустить их в цикле, потом ещё в одном цикле получить из них значения. Но вы уверены, что хотите построить ещё один велосипед из костылей, а не воспользоваться уже готовым решением, пусть и несколько сложным? Кроме того, описанное решение, обладает ещё одним существенным недостатком. Создание отдельных потоков – операция весьма тяжеловесная и ресурсозатратная. Рассчитывая получить прирост в производительности, и создавая 100500 потоков, мы рискуем получить прямо противоположный результат. Именно по этой причине и был придуман пул потоков, одним из видов которого является ForkJoinPool.

Итак, в основе своей ForkJoinPool – это пул потоков, преимущество которого состоит в том, что он работает на основе принципа WorkStealing, что дословно можно перевести как «кража работы». Когда один из потоков ForkJoinPool заканчивает свою работу, он не идёт пить кофе или чилить в ютубчике, он проявляет «сознательность» и берёт из общей очереди работ новую задачу. Это продолжается до тех пор, пока задачи не кончатся.

Ещё одной особенностью ForkJoinPool является то, что в него нельзя подать Callable или Runnable задачу. У него есть своя иерархия задач, наследуемая от абстрактного класса ForkJoinTask. Основные реализации – RecursiveTask и RecursiveAction. У каждого из них есть абстрактный метод compute(), который и надо реализовывать при наследование. RecursiveTask. compute() возвращает некое значение, RecursiveAction. compute() возвращает void.

Не знаю, как у вас, но у меня при первом знакомстве с данными классами по спине пробежали мурашки. «Раз они recursive, значит в них обязательно надо применить чёрную магию рекурсии…» Как я понял, на самом деле не обязательно (если я не прав, напишите в комментариях).  Такой код вполне легален и будет работать.

public class SimpleClass extends RecursiveTask<String> {

    @Override
    protected String compute() {
         return "I am just innocent simple class";
    }
}

Если мы передадим экземпляр такого класса на выполнение в ForkJoinPool, то получим обычную строку без всякой рекурсии. Судя по всему, (возможно, я ошибаюсь), создатели данных классов добавили в название слово Recursive в качестве некой рекомендации, а не обязательного требования.

Следующий не вполне очевидный вопрос: как запустить задачу в ForkJoinPool на исполнение? Для этого есть метод T invoke(ForkJoinTask<T> task)

public static void main(String[] args) {
    SimpleClass simpleClass = new SimpleClass();
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    System.out.println(forkJoinPool.invoke(simpleClass));
}

Данный код ожидаемо выведет на консоль «I am just innocent simple class». «И в чём тут сложность?» - спросите вы? А вот в чём. RecursiveTask содержит свои собственные методы для запуска задачи и получения результата – fork() и join(). Следующий код вернёт точно такой же результат, что и предыдущий.

public static void main(String[] args) {
    SimpleClass simpleClass = new SimpleClass();
    simpleClass.fork();
    System.out.println(simpleClass.join());
}

Здесь мы вообще обошлись без всякого ForkJoinPool. Задача выполнила сама себя! Это никуда не годится. Для того, чтобы понять, какой именно метод нужно использовать и в чём разница между ними, обратимся к официальной документации. С методом invoke() всё ясно, он «выполняет данную задачу, возвращая результат по завершении». А вот метод fork() работает немного сложнее, он «организует асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача». Для полного духовного просветления изменим наш класс.

public class SimpleClass extends RecursiveTask<String> {
    @Override
    protected String compute() {
        System.out.println("I am work in thread: " + Thread.currentThread().getName());
        return "I am just innocent simple class";
    }
}

Запускаем

public static void main(String[] args) {
    SimpleClass simpleClass = new SimpleClass();
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    System.out.println(forkJoinPool.invoke(simpleClass));
}

и видим в консоле: «I am work in thread: ForkJoinPool-1-worker-1» и «I am just innocent simple class».

Запускаем

public static void main(String[] args) {
    SimpleClass simpleClass = new SimpleClass();
    simpleClass.fork();
    System.out.println(simpleClass.join());
}

и видим: «I am work in thread: main» и «I am just innocent simple class». То есть при вызове метода fork() задача не «выполнила сама себя» магическим образом, а была выполнена в том же потоке из которого и был вызван данный метод. Вызов метода ForkJoinPool.invoke() передал задачу на выполнение в один из потоков данного пула. Важно отметить, что метод fork() отправляет задачу в какой-либо поток, но при этом не запускает её выполнения. Для получения результата служит метод join().

Разобравшись с основными классами и методами, вернёмся к нашему массиву и попробуем применить на практике, полученный духовный опыт.

Сначала мы будем рекурсивно делить наш массив на всё более мелкие части, пока не получим массивы, состоящие всего из 2 элементов. Почему именно из 2? Потому что наш условный «слабенький» процессор может условно «быстро» выполнить именно такую условно «маленькую» задачу. Почему рекурсивно? Просто потому, что применение рекурсии в данном случае действительно удобно. Это позволяет сначала выполнить некую работу по подготовке, а потом получить результат. Если вам не нравится рекурсия, то, наверное, можно попробовать найти какой-то другой способ, её применение, судя по всему, не является обязательным.

После того, как мы получим «100500» маленьких массивов, состоящих всего из 2 элементов, мы запустим «100500» маленьких задач на выполнение и суммируем их результаты. И для этого нам не придётся создавать 100500 отдельных нитей выполнения.

public class ValueSumCounter extends RecursiveTask<Integer> {

    private int[] array;

    public ValueSumCounter(int[] array) {
        this.array = array;
    }

    @SneakyThrows
    @Override
    protected Integer compute() {
        if(array.length <= 2) {
            Thread.sleep(1);
            return Arrays.stream(array).sum();
        }
        ValueSumCounter firstHalfArrayValueSumCounter = new ValueSumCounter(Arrays.copyOfRange(array, 0, array.length/2));
        ValueSumCounter secondHalfArrayValueSumCounter = new ValueSumCounter(Arrays.copyOfRange(array, array.length/2, array.length));
        firstHalfArrayValueSumCounter.fork();
        secondHalfArrayValueSumCounter.fork();
        return firstHalfArrayValueSumCounter.join() + secondHalfArrayValueSumCounter.join();
    }
}

public class Main {

    public static void main(String[] args) {
        int[] array = getInitArray(10000);
        ValueSumCounter counter = new ValueSumCounter(array);
        System.out.println(new Date());
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println(forkJoinPool.invoke(counter));
        System.out.println(new Date());
    }

    public static int[] getInitArray(int capacity) {
        int[] array = new int[capacity];
        for (int i = 0; i < capacity; i++) {
            array[i] = 3;
        }
        return array;
    }
}

При создании экземпляра класса ValueSumCounter мы передаём в него массив. В методе compute() сначала проверяется длинна массива, и если он «слишком большой», то разбивается пополам на 2 части, на основе каждой из которых в свою очередь создаётся своя задача и отправляется на выполнение путём вызова метода fork(). Когда разбивка будет закончена, наступает время «собирать камни», метод join() запускает каждую задачу на выполнение и возвращает полученный результат. Выполнение данной задачи с помощью ForkJoinPool заняло у меня на компьютере 3 секунды. Напомню, что эта же задача, выполненная с помощью цикла в одном потоке, ранее заняла 17 секунд.

Нам осталось убедиться, что при использовании ForkJoinPool не создаётся «100500» отдельных потоков. Для этого добавим в метод compute() всего одну строку

protected Integer compute() {
    if(array.length <= 2) {
        System.out.printf("Task %s execute in thread %s%n", this, Thread.currentThread().getName());
        Thread.sleep(1);
        return Arrays.stream(array).sum();
    }

Запустив код на выполнение, мы увидим, что для выполнения большого количества задач используется несколько одних и тех же потоков (в моём случае 4).

Вот, пожалуй, и всё, что хотелось бы рассказать о данной технологии. Если вдруг в коде или описании обнаружатся ошибки, просьба не кидать в автора тапками, а написать в комментариях об этом. Надеюсь, данный материал поможет духовному просветлению юных падаванов, ищущих знаний. Да пребудет с вами сила Java.

Теги:
Хабы:
Всего голосов 9: ↑7 и ↓2+8
Комментарии11

Публикации

Истории

Работа

Java разработчик
357 вакансий

Ближайшие события

15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань