Java 8 в параллель. Учимся создавать подзадачи и контролировать их выполнение

    Продолжаем цикл статей, посвященный обработке больших объемов данных в параллель (красивое слово, неправда?).

    В предыдущей статье мы познакомились и интересным инструментарием Fork/Join Framework, позволяющим разбить обработку на несколько частей и запустить параллельно выполнение отдельных задач. Что нового в этой статье – спросите Вы? Отвечу – более содержательные примеры и новые механизмы для качественной обработки информации. Параллельно я вам расскажу о ресурсных и прочих особенностях работы в этом режиме.



    Всех заинтересованных приглашаю под кат:

    Начало


    Все, что хорошо делится — делим. Примерно так я писал в предыдущей статье, предлагая разделить обработку на части и максимально загрузить процессоры (если конечно они у вас есть). Как-то грубо прозвучало.

    Да, конечно, есть, уже давно даже домашние компьютеры – многоядерные. Здесь и кроется первая особенность работы в этом режиме. Нужно соблюдать паритет между количеством подзадач и количеством ядер. По многочисленным тестам формула запуска приблизительно следующая, количество подзадач должно быть: (Количество ядер +0) или (Количество ядер +1). Эти варианты тестировался на нескольких серьезных серверах и нескольких обычных машинах.

    Механизмы ограничения


    Под механизмами ограничения я понимаю, всевозможные механизмы («отсечки») максимально быстрой и удобной обработкой и отладкой ошибок. В своих проектах стараюсь создать максимальное количество способов отладки кода, например:

    а) Постарайтесь реализовать механизмы однозадачных и многозадачных режимов для ваших вычислений. Зачем? Сейчас объясню. Допустим, что у вас есть успешно переданный проект, и даже, возможно протестированный. В случае нестандартно ситуации, первое, что можно сделать – это для быстрого понимания и исправления ошибки переключиться в однозадачный режим (принудительно) и сразу получаеть ошибку на экране сервера приложений (если такое возможно и есть доступ).

    Удобнее выявлять недочеты в однозадачном режиме — если у вас включен параллельный режим, то при остановке первой подзадачи, вы увидите часть выполнения второй (это не всегда удобно для просмотра).

    б) Продумывайте механизмы доступа и загрузки постоянно требующихся данных.

    Расскажу подробнее. Например, вы хотите перед началом обработки перевести около 100 огромных таблиц в Map<K,V>. Да, это быстро удобно, но есть несколько неприятных моментов.

    Допустим, что вы начинаете тестировать большие данные. Присутствует проблема по трем договорам, блокам, клиентам, без разницы, по трем позициям (давайте будем называть «позиции»). Вы разобрались, в чем ошибка, исправили, перезаписали jar, перезапустили и… Ничего! Снова сидим, ждет несколько минут. Ждем расчет.

    В этой ситуации нам бы помогли механизмы выборочной загрузки (загрузок) данных.

    Например, далее не самый лучший вариант. Фактически, построение Map по всем данным. Хотя иногда и он применяется.

    public Map<Long, String> getValueInDao(Date date) {
        Map<Long, String> valueMap = new HashMap<Long, String>();
    
        HashMap map = new LinkedHashMap();
        map.put("date", date);
        List<Values> ValuesList = this.findWithQuery("select c  from Values c where  c.vf < :date and c.vd >= :date", map);
        if (ValuesList.size() > 0) {
            ListIterator<Values> iterValues = ValuesList.listIterator();
            while (iterValues.hasNext()) {
                Values tmpValues = iterValues.next();
                valueMap.put(tmpValues.getId,tmpValues.getDescr ());
            }
        }
        if (valueMap.size() > 0) {
            return valueMap;
        } return Collections.emptyMap();
    }

    Следующий вариант предпочтительнее (имхо). В нем мы ограничиваемся входящими данными, так как получаем только то, что нам действительно нужно.

    public Map<Long,String> getValue(List<Long> idList,Date date) {
    
        Map<Long,String> valueMap = new HashMap<Long, String>();
        HashMap map = new LinkedHashMap();
    
        List<Value> list = new ArrayList<>();
        if (idList.size() > 1000) {
            int j = 0;
            for (int i = 0; i < idList.size() / 999; i++) {
                map.put("idList", idList.subList(j, j + 999));
                map.put("date", date);
                list.addAll(this.findWithQuery("select c from Value c where  c.id in (" + ":idList" + ")  and c.val < :date and c.val2 >= :date", map));
                map.clear();
                j += 999;
            }
            if (j <= idList.size()-1) {
                map.put("idList", idList.subList(j, idList.size()));
                map.put("date", date);
                list.addAll(this.findWithQuery("select c from Value c where  c.id in (" + ":idList" + ")  and c.val < :date and c.val2 >= :date", map));
            }
        }
        else {
            map.put("idList", idList);
            map.put("date", date);
            list = this.findWithQuery("select c from Value c where  c.id in (" + ":idList" + ")  and c.val < :date and c.val2 >= :date", map);
        }
    
        Iterator<Value> iter = list.iterator();
        while(iter.hasNext()) {
            Value tmpValue = iter.next();
            valueMap.put(tmpValue.getId, tmpValue.getValue());
        }
    
        if (valueMap.size() > 0) {
            return valueMap;
        } return Collections.emptyMap();
    }

    в) Сразу создавайте механизмы записи и вывода ошибок в таблицы (файлы) и другие источники. Если ваши алгоритмы четко выстроены, то ничто вам не мешает создать класс, который будет отрабатывать по ключу. Например, перед загрузкой есть «флаг», который позволяет записывать данные в таблицу с ошибками, тем самым вы точно знаете область оной.

    if (varKeyError == 1) {
    -- создаем экземпляр класса, например, по таблице для err сообщений и производим запись требуемых данных
    }


    Быстрее…


    Немного поговорив про возможные особенности/ошибки, перейдем к нашей непосредственной цели, а именно к обработке в параллельном режиме больших объемов информации и немного расширим существующие пример (нагло заберем их из предыдущей статьи).

    В ней создано несколько классов, отвечающих за входящие данные и обработку. Тогда мы основывались на классе RecursiveAction. Напомню, еще раз, что было сделано в примере. В классе StreamSettings мы разбиваем полученные данные на части, пока не достигнем порогового значения, выставленного в countValue = 500. Как я объяснял ранее, механизм ограничения вы можете создать любой. Например, вариант (valueId.size() / Runtime.getRuntime().availableProcessors()*2) тоже работоспособен и может применяться для нахождения некого оптимального значения.

    public class StreamSettings extends RecursiveAction {
    
        final int countValue = 500;
        final int countProcessors = Runtime.getRuntime().availableProcessors();
        List<ValueValue> data;
        int  start, end;
    
        StreamSettings(List<ValueValue> valueId,int startNumber,int endNumber) {
            data   = valueId;
            start  = startNumber;
            end    = endNumber;
        }
        protected void compute() {
            if (end - start < countValue || countProcessors < 2)   {
                for(int i = start; i < end; i++) {
                    ValueValue   value   = data.get(i);
                    try {
                        new CalcGo().calcGo(value);
                    } catch (Exception e) {
                        raiseApplicationError("Ошибка вызова"  + e.getMessage(), e);
                    }
                }
            } else {
                int middle = (start + end)/ 2;
                invokeAll(new CalcGo(data, start, middle),
                        new CalcGo(data,middle,end));
            }
        }
    }

    Продолжим наши изыскания. Попробуем посмотреть новые варианты обработки, остановимся на классе RecursiveTask. Основное отличие будет в том, что метод compute() будет возвращать результат (а это требуется ну уж очень часто). Фактически мы можем дождаться выполнения нескольких подзадач и произвести вычисления. Далее показаны примеры, на которых мы остановимся подробнее.
    Класс Stream отвечает за разбиение на подзадачи. В примере находим среднее значение и создаем экземпляр класса (Stream goVar1 = new Stream(forSplit,start, middle)) от 0 до «середины» и в (Stream goVar2 = new Stream(forSplit,middle,end)) передаем от «середины» до конечного элемента.

    Отличие от предыдущего варианта, класса StreamSettings, не используется invokeAll, а будут вызываться методы fork() и join() соответственно.

    public class Stream extends RecursiveTask<Long>{
    
        final int countProcessors = Runtime.getRuntime().availableProcessors();
        final int countLimit = 1000;
        int start;
        int end;
        int forSplit;
    
        Stream(int componentValue,int startNumber, int endNumber) {
            forSplit = componentValue;
            start = startNumber;
            end = endNumber;
        }
        protected Long compute() {
            Long countSum = 0L;
            if ( countProcessors == 1 || end - start <= countLimit) {
                System.out.println("=run=");
                System.out.println("=start="+start);
                System.out.println("=end="+end);
                for(int i = start; i <= end; i++) {
                    countSum += 1 ;
                }
            } else {
                int middle = (start + end)/ 2;
               /* invokeAll(new Stream(forSplit, start, middle),
                        new Stream(forSplit, middle+1, end));*/
                Stream goVar1 = new Stream(forSplit,start, middle);
                Stream goVar2 = new Stream(forSplit,middle,end);
                goVar1.fork();
                goVar2.fork();
                countSum = goVar1.join() + goVar2.join();
            }
            return countSum;
        }
    }
    
    import java.util.concurrent.ForkJoinPool;
    
    public class Start {
        public static void main(String[] args) {
    
            final int componentValue = 2000;
            Long beginT = System.nanoTime();
            ForkJoinPool fjp = new ForkJoinPool();
            Stream test = new Stream(componentValue,0,componentValue);
            Long countSum = fjp.invoke(test);
            Long endT = System.nanoTime();
            Long timebetweenStartEnd = endT - beginT;
            System.out.println("=====time========" +timebetweenStartEnd);
            System.out.println("=====countSum========" +countSum);
        }
    }
    

    В результате корректного выстраивания запусков и ожиданий запусков, вы можете создать удобную систему асинхронного выполнения вычислений.

    Особенности обработки информации


    Затронутая тема обширна и позволяет производить существенные усовершенствования. Выигрыш по времени составляет приблизительно в 1,7 раза по сравнению с последовательным запуском. Вы можете использовать доступные ресурсы эффективнее и перевести множество расчетов в параллельный режим.

    Всем удачи. Вопросы и пожелания оставляйте в комментариях. Следующая статья будет посвящена такому интересному инструменту, как Heartbeat. Нет? Не знаете? Тогда до встречи.

    Only registered users can participate in poll. Log in, please.

    Вы использовали параллельную обработку?

    • 59.4%Да60
    • 27.7%Нет28
    • 20.8%А зачем она?21

    Similar posts

    Ads
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More

    Comments 10

      +2
      А можете дать примеры в каком случае нужна ручная работа с Fork/Join пулами, а в каком случае вполне хватит Stream Api? Учитывая что Stream Api во много лучше ручной работы с Fork/Join пулами и можно даже задавать свой собственный пул для долгий запросов. Так все-таки когда описанное в статье может реально быть необходимым по сравнению с Stream Api? Заранее спасибо
        0
        Сейчас в Fork/Join можно с Pool обходится как угодно. Создавать самому и/или будет использован общий ForkJoinPool.
        Я определил для себя использование Fork/join как некий каркас в некоторых задачах, который отвечает за уровень параллелизма(в зависимости от нагрузки на процессоры, количества обрабатываемой информации и т.п.).
        Stream Api использую для фильтрации и/или нахождения определенных элементов.
          0
          Завтра смогу изучить ссылку, которую вы указали, и напишу по этому поводу. Спасибо.
            0
            Касательно вашей ссылки. Внимательно изучил материал и вот, что мне кажется в переписке. Курсивом буду выделять материал из ссылки.
            «Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.» — вопрос по созданию пула.
            Далее приводится пример, где в раздельных потоках происходит запуск, ну и соответственно описывают проблему.
            Далее нам пишут о том, что мы можем создать свой собственный пул и фактически работать с ним. Вот эта часть: «What do you mean by custom thread pool? There is a single common ForkJoinPool but you can always create your own ForkJoinPool and submit requests to it».
            Вот тут выскажу свое мнение.
            Я определил, что сейчас есть возможность использовать 3 варианта
            а) создавать свой
            б) поставить по умолчанию (будет зависеть от количества процессоров)
            в) или будет использоваться общий пул(с jdk8)
            дополнительно можно настроить уровни параллелизма(однозначно их не стоит ставить более количества процессоров, я бы сказал ядер)
            Вернемся к Вашей ссылке.
            далее приводится ссылка на проблему с lock-ами(нужно исследовать).
            Далее еще ответы:
            «There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.» и далее пример по созданию
            Далее просят link на то, что «But is it also specified that streams use the ForkJoinPool».
            Смотрим ссылку и в ней находим следующее:
            But it is mentioned in the ForkJoin documentation at the bottom of docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
            Читаем вот эту часть(документация):
            Besides using the fork/join framework to implement custom algorithms for tasks to be performed concurrently on a multiprocessor system (such as the ForkBlur.java example in the previous section), there are some generally useful features in Java SE which are already implemented using the fork/join framework. One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems.
            Делаю вывод, что parallelSort() как раз использует «вилку».
            Возможно, что Вы добавите дополнительную информацию или поправите меня. Спасибо.
              +2
              Кстати, с помощью моей няшной либы StreamEx это ещё проще:

              ForkJoinPool myPool = new ForkJoinPool(10);
              StreamEx.of(input).parallel(myPool). // и дальше что угодно
                0
                Я правильно понимаю, что этот функционал (StreamEx) в одной из Ваших статей описан?
                  0
                  Конкретно этой функции ещё не было, когда статью писал. С тех пор библиотека очень сильно выросла.
                    0
                    Спасибо. Буду смотреть.
              0
              По многочисленным тестам формула запуска приблизительно следующая, количество подзадач должно быть: (Количество ядер +0) или (Количество ядер +1). Эти варианты тестировался на нескольких серьезных серверах и нескольких обычных машинах.
              Неудачная формулировка. Слишком общая, а речь явно идёт об cpu-bound задачах.
                0
                Вы правы. Речь действительно идет о cpu-bound задачах. Сейчас делаю «динамический» настройщик определения приоритетных ограничений для своих проектов, но он будет готов после нового года.

              Only users with full accounts can post comments. Log in, please.