Как стать автором
Обновить

Java 8 в параллель. Учимся создавать подзадачи и контролировать их выполнение

Время на прочтение6 мин
Количество просмотров34K
Продолжаем цикл статей, посвященный обработке больших объемов данных в параллель (красивое слово, неправда?).

В предыдущей статье мы познакомились и интересным инструментарием Fork/Join Framework, позволяющим разбить обработку на несколько частей и запустить параллельно выполнение отдельных задач. Что нового в этой статье – спросите Вы? Отвечу – более содержательные примеры и новые механизмы для качественной обработки информации. Параллельно я вам расскажу о ресурсных и прочих особенностях работы в этом режиме.



Всех заинтересованных приглашаю под кат:

Начало


Все, что хорошо делится — делим. Примерно так я писал в предыдущей статье, предлагая разделить обработку на части и максимально загрузить процессоры (если конечно они у вас есть). Как-то грубо прозвучало.

Да, конечно, есть, уже давно даже домашние компьютеры – многоядерные. Здесь и кроется первая особенность работы в этом режиме. Нужно соблюдать паритет между количеством подзадач и количеством ядер. По многочисленным тестам формула запуска приблизительно следующая, количество подзадач должно быть: (Количество ядер +0) или (Количество ядер +1). Эти варианты тестировался на нескольких серьезных серверах и нескольких обычных машинах.

Механизмы ограничения


Под механизмами ограничения я понимаю, всевозможные механизмы («отсечки») максимально быстрой и удобной обработкой и отладкой ошибок. В своих проектах стараюсь создать максимальное количество способов отладки кода, например:

а) Постарайтесь реализовать механизмы однозадачных и многозадачных режимов для ваших вычислений. Зачем? Сейчас объясню. Допустим, что у вас есть успешно переданный проект, и даже, возможно протестированный. В случае нестандартно ситуации, первое, что можно сделать – это для быстрого понимания и исправления ошибки переключиться в однозадачный режим (принудительно) и сразу получаеть ошибку на экране сервера приложений (если такое возможно и есть доступ).

Удобнее выявлять недочеты в однозадачном режиме — если у вас включен параллельный режим, то при остановке первой подзадачи, вы увидите часть выполнения второй (это не всегда удобно для просмотра).

б) Продумывайте механизмы доступа и загрузки постоянно требующихся данных.

Расскажу подробнее. Например, вы хотите перед началом обработки перевести около 100 огромных таблиц в Map<K,V>. Да, это быстро удобно, но есть несколько неприятных моментов.

Допустим, что вы начинаете тестировать большие данные. Присутствует проблема по трем договорам, блокам, клиентам, без разницы, по трем позициям (давайте будем называть «позиции»). Вы разобрались, в чем ошибка, исправили, перезаписали jar, перезапустили и… Ничего! Снова сидим, ждет несколько минут. Ждем расчет.

В этой ситуации нам бы помогли механизмы выборочной загрузки (загрузок) данных.

Например, далее не самый лучший вариант. Фактически, построение Map по всем данным. Хотя иногда и он применяется.

public Map<Long, String> getValueInDao(Date date) {
    Map<Long, String> valueMap = new HashMap<Long, String>();

    HashMap map = new LinkedHashMap();
    map.put("date", date);
    List<Values> ValuesList = this.findWithQuery("select c  from Values c where  c.vf < :date and c.vd >= :date", map);
    if (ValuesList.size() > 0) {
        ListIterator<Values> iterValues = ValuesList.listIterator();
        while (iterValues.hasNext()) {
            Values tmpValues = iterValues.next();
            valueMap.put(tmpValues.getId,tmpValues.getDescr ());
        }
    }
    if (valueMap.size() > 0) {
        return valueMap;
    } return Collections.emptyMap();
}

Следующий вариант предпочтительнее (имхо). В нем мы ограничиваемся входящими данными, так как получаем только то, что нам действительно нужно.

public Map<Long,String> getValue(List<Long> idList,Date date) {

    Map<Long,String> valueMap = new HashMap<Long, String>();
    HashMap map = new LinkedHashMap();

    List<Value> list = new ArrayList<>();
    if (idList.size() > 1000) {
        int j = 0;
        for (int i = 0; i < idList.size() / 999; i++) {
            map.put("idList", idList.subList(j, j + 999));
            map.put("date", date);
            list.addAll(this.findWithQuery("select c from Value c where  c.id in (" + ":idList" + ")  and c.val < :date and c.val2 >= :date", map));
            map.clear();
            j += 999;
        }
        if (j <= idList.size()-1) {
            map.put("idList", idList.subList(j, idList.size()));
            map.put("date", date);
            list.addAll(this.findWithQuery("select c from Value c where  c.id in (" + ":idList" + ")  and c.val < :date and c.val2 >= :date", map));
        }
    }
    else {
        map.put("idList", idList);
        map.put("date", date);
        list = this.findWithQuery("select c from Value c where  c.id in (" + ":idList" + ")  and c.val < :date and c.val2 >= :date", map);
    }

    Iterator<Value> iter = list.iterator();
    while(iter.hasNext()) {
        Value tmpValue = iter.next();
        valueMap.put(tmpValue.getId, tmpValue.getValue());
    }

    if (valueMap.size() > 0) {
        return valueMap;
    } return Collections.emptyMap();
}

в) Сразу создавайте механизмы записи и вывода ошибок в таблицы (файлы) и другие источники. Если ваши алгоритмы четко выстроены, то ничто вам не мешает создать класс, который будет отрабатывать по ключу. Например, перед загрузкой есть «флаг», который позволяет записывать данные в таблицу с ошибками, тем самым вы точно знаете область оной.

if (varKeyError == 1) {
-- создаем экземпляр класса, например, по таблице для err сообщений и производим запись требуемых данных
}


Быстрее…


Немного поговорив про возможные особенности/ошибки, перейдем к нашей непосредственной цели, а именно к обработке в параллельном режиме больших объемов информации и немного расширим существующие пример (нагло заберем их из предыдущей статьи).

В ней создано несколько классов, отвечающих за входящие данные и обработку. Тогда мы основывались на классе RecursiveAction. Напомню, еще раз, что было сделано в примере. В классе StreamSettings мы разбиваем полученные данные на части, пока не достигнем порогового значения, выставленного в countValue = 500. Как я объяснял ранее, механизм ограничения вы можете создать любой. Например, вариант (valueId.size() / Runtime.getRuntime().availableProcessors()*2) тоже работоспособен и может применяться для нахождения некого оптимального значения.

public class StreamSettings extends RecursiveAction {

    final int countValue = 500;
    final int countProcessors = Runtime.getRuntime().availableProcessors();
    List<ValueValue> data;
    int  start, end;

    StreamSettings(List<ValueValue> valueId,int startNumber,int endNumber) {
        data   = valueId;
        start  = startNumber;
        end    = endNumber;
    }
    protected void compute() {
        if (end - start < countValue || countProcessors < 2)   {
            for(int i = start; i < end; i++) {
                ValueValue   value   = data.get(i);
                try {
                    new CalcGo().calcGo(value);
                } catch (Exception e) {
                    raiseApplicationError("Ошибка вызова"  + e.getMessage(), e);
                }
            }
        } else {
            int middle = (start + end)/ 2;
            invokeAll(new CalcGo(data, start, middle),
                    new CalcGo(data,middle,end));
        }
    }
}

Продолжим наши изыскания. Попробуем посмотреть новые варианты обработки, остановимся на классе RecursiveTask. Основное отличие будет в том, что метод compute() будет возвращать результат (а это требуется ну уж очень часто). Фактически мы можем дождаться выполнения нескольких подзадач и произвести вычисления. Далее показаны примеры, на которых мы остановимся подробнее.
Класс Stream отвечает за разбиение на подзадачи. В примере находим среднее значение и создаем экземпляр класса (Stream goVar1 = new Stream(forSplit,start, middle)) от 0 до «середины» и в (Stream goVar2 = new Stream(forSplit,middle,end)) передаем от «середины» до конечного элемента.

Отличие от предыдущего варианта, класса StreamSettings, не используется invokeAll, а будут вызываться методы fork() и join() соответственно.

public class Stream extends RecursiveTask<Long>{

    final int countProcessors = Runtime.getRuntime().availableProcessors();
    final int countLimit = 1000;
    int start;
    int end;
    int forSplit;

    Stream(int componentValue,int startNumber, int endNumber) {
        forSplit = componentValue;
        start = startNumber;
        end = endNumber;
    }
    protected Long compute() {
        Long countSum = 0L;
        if ( countProcessors == 1 || end - start <= countLimit) {
            System.out.println("=run=");
            System.out.println("=start="+start);
            System.out.println("=end="+end);
            for(int i = start; i <= end; i++) {
                countSum += 1 ;
            }
        } else {
            int middle = (start + end)/ 2;
           /* invokeAll(new Stream(forSplit, start, middle),
                    new Stream(forSplit, middle+1, end));*/
            Stream goVar1 = new Stream(forSplit,start, middle);
            Stream goVar2 = new Stream(forSplit,middle,end);
            goVar1.fork();
            goVar2.fork();
            countSum = goVar1.join() + goVar2.join();
        }
        return countSum;
    }
}

import java.util.concurrent.ForkJoinPool;

public class Start {
    public static void main(String[] args) {

        final int componentValue = 2000;
        Long beginT = System.nanoTime();
        ForkJoinPool fjp = new ForkJoinPool();
        Stream test = new Stream(componentValue,0,componentValue);
        Long countSum = fjp.invoke(test);
        Long endT = System.nanoTime();
        Long timebetweenStartEnd = endT - beginT;
        System.out.println("=====time========" +timebetweenStartEnd);
        System.out.println("=====countSum========" +countSum);
    }
}

В результате корректного выстраивания запусков и ожиданий запусков, вы можете создать удобную систему асинхронного выполнения вычислений.

Особенности обработки информации


Затронутая тема обширна и позволяет производить существенные усовершенствования. Выигрыш по времени составляет приблизительно в 1,7 раза по сравнению с последовательным запуском. Вы можете использовать доступные ресурсы эффективнее и перевести множество расчетов в параллельный режим.

Всем удачи. Вопросы и пожелания оставляйте в комментариях. Следующая статья будет посвящена такому интересному инструменту, как Heartbeat. Нет? Не знаете? Тогда до встречи.
Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Вы использовали параллельную обработку?
59.62% Да62
27.88% Нет29
20.19% А зачем она?21
Проголосовали 104 пользователя. Воздержался 41 пользователь.
Теги:
Хабы:
+7
Комментарии10

Публикации

Изменить настройки темы

Истории

Работа

Java разработчик
356 вакансий

Ближайшие события

Weekend Offer в AliExpress
Дата20 – 21 апреля
Время10:00 – 20:00
Место
Онлайн
Конференция «Я.Железо»
Дата18 мая
Время14:00 – 23:59
Место
МоскваОнлайн