Как стать автором
Обновить

Стримы в Java: Как перестать бояться и полюбить filter-map-reduce (Часть 2)

Уровень сложностиПростой
Время на прочтение6 мин
Количество просмотров4K

В первой части мы поговорили про основы Stream API и убедились, как элегантно можно обрабатывать данные, используя цепочки операций. Но возможности на этом не ограничиваются. Пришло время подняться на новый уровень и посмотреть, что случается, когда добавляется параллельность. Как работают параллельные стримы под капотом? Чем они отличаются от ручного управления потоками? А как же всемогущий Spring и необычная RxJava? Именно об этом и пойдёт речь во второй части.

Многопоточность: параллельные стримы и атака клонов

Когда я чуточку освоился со стримами, на горизонте замаячила новая угроза (или возможность?) — параллельные стримы. Но интерес победил страх, и я решил рискнуть. Взял свой стрим, добавил .parallel() в цепочку и запустил. Вдруг компьютер загудел, свет померк... Ладно, шучу, всё отработало как обычно, просто немного быстрее (на большом наборе данных). Под капотом Java разбивает вашу коллекцию на части и обрабатывает их на разных ядрах процессора, используя ForkJoinPool. Вам же ничего этого не видно — вы просто получаете результат.

Пример использования параллельного стрима:

List<String> words = Arrays.asList("один", "два", "три", "четыре", "пять");
List<String> uppercased = words.parallelStream()
    .map(String::toUpperCase)
    .toList(); // собираем в список

Здесь .parallelStream() сразу берёт параллельный стрим от коллекции. Можно было сделать и words.stream().parallel(), эффект тот же. Каждый элемент списка может быть обработан в отдельном потоке: все строчки параллельно превратятся в верхний регистр. Для пяти слов, конечно, выигрыш по времени сомнителен (создание потоков может занять больше, чем сама работа), но представьте, если слов миллион и операция тяжёлая — параллельный стрим реально сэкономит время.

Я, вдохновившись, давай параллелить всё подряд. И, конечно, тут настал неожиданный поворот: результаты стали не всегда предсказуемы. Нет, данные-то обрабатывались правильно, но, например, порядок иногда путался. Оказалось, параллельный стрим по умолчанию не гарантирует сохранение исходного порядка элементов в результате, если не использовать специально forEachOrdered или не собирать результат в упорядоченную структуру.

Кроме того, выяснилось несколько вещей:

  • Не каждая задача хорошо параллелится. Если данные плохо делятся на части или сама операция крошечная, накладные расходы на переключение потоков могут съесть всю выгоду. Например, параллелить суммирование 100 чисел — идея так себе: быстрее обычным циклом.

  • Нужно остерегаться состояния и синхронизации. Если внутри лямбды вы лезете к разделяемым переменным, может начаться знакомый ад с гонками данных. Например, делать myList.parallelStream().forEach(list2::add) (добавлять элементы в общий список) — очень плохая затея, кончится непредсказуемым результатом или исключением. Стримы любят чистые функции без сайд-эффектов, особенно в параллельном режиме.

  • Параллельные стримы используют общий пул потоков ForkJoin (если явно не указать свой). В серверном приложении, где и так есть управление потоками, бездумное параллеление может мешать другим задачам. Не удивляйтесь потом, почему на сервере возросло потребление CPU после запуска параллельного стрима на огромной коллекции.

Однако когда задача действительно подходящая — например, обработка больших массивов данных, вычисления, преобразование изображений — параллельные стримы ощущаются как магия. Ты просто добавил слово, а программа бац и заработала в разы быстрее (иногда). И главное — тебе не пришлось вручную плодить потоки, писать Thread или ExecutorService, думать о блокировках. Всё сделал Stream API.

Stream API и Spring: друзья или враги?

Настало время внести в нашу историю крупного игрока — фреймворк Spring. Большинство Java-разработчиков рано или поздно сталкиваются с ним, как герой фильма сталкивается с обязательной романтической линией. Вопрос: как уживаются стримы с Spring?

Представьте сценку: Spring и Stream заходят в бар. Stream сразу пытается всё упорядочить по-своему: фильтрует орешки к пиву (оставляя только самые крупные), мапит меню на два языка, а потом просит счёт, суммируя стоимость. А Spring смотрит на это дело и говорит: "Ладно, парень, делай что хочешь, только не лезь в мои бины". На самом деле Spring и Stream API довольно нейтральны друг к другу: стрим — это просто инструмент, который можно использовать внутри спринговых компонентов.

Например, в Spring MVC-контроллере можно использовать стримы для преобразования данных перед выдачей клиенту:

@GetMapping("/names")
public List<String> getUserNames() {
    List<User> users = userService.findAllUsers(); // допустим, возвращает List
    return users.stream()
            .filter(User::isActive)
            .map(User::getName)
            .collect(Collectors.toList());
}

Здесь мы берём всех пользователей, фильтруем только активных и возвращаем клиенту список их имён. Всё это внутри контроллера, в привычном для Spring императивном коде, но сила стримов делает код чище. Spring'у всё равно, стрим у вас там или цикл, главное — результат верните.

Интереснее дело обстоит с Spring Data JPA. Там есть поддержка возвращения данных прямо стримом из репозитория! То есть можно написать метод репозитория, который возвращает Stream<Entity> вместо List<Entity>. Это позволяет лениво читать данные из базы и стримить их, не загружая сразу весь список в память. Правда, нельзя забывать закрыть такой стрим (например, в try-with-resources), потому что под капотом он держит соединение с базой данных.

Есть и ещё один друг — Spring WebFlux (реактивный стек Spring). Но там уже не Stream<T>, а Flux<T> и Mono<T> — другие виды "потоков", из мира реактивного программирования. Тем не менее концептуально похожи: Flux — это поток (в том числе асинхронный) из многих элементов, а Mono — поток из одного элемента (или пустого результата). Они тоже поддерживают всякие знакомые операции map, filter и т.д., но работают по-другому (реактивщина — своя философия, заслуживающая отдельной истории). Однако, если вы поняли идею Stream API, то, глядя на цепочки методов WebFlux, почувствуете дежавю.

В общем, со Spring стримы уживаются отлично. Использовать их внутри сервисов, контроллеров — пожалуйста, это часть обычного Java-кода, Spring этому никак не мешает. А интеграция через Spring Data JPA — вообще мощный приём, чтобы обрабатывать большие выборки, не вываливая всё разом в оперативку.

RxJava: стримы на стероидах (реактивный поворот)

Когда я уже совсем освоился со стримами и чувствовал себя гуру filter-map-reduce, пришёл другой коллега и сказал: "Если ты такой фанат потоков, попробуй RxJava". Я глянул, а там Observable, Flowable — какие-то новые звери. Методы map и filter у них тоже есть, но ещё куча всего: flatMap, subscribe, повсюду какие-то schedulers и subscriptions... В общем, сначала я подумал, что это очередной уровень безумия, и попытался сбежать. Но потом стало любопытно: RxJava позиционировали как реактивные стримы, способные работать асинхронно.

Если обычный Stream API — это спокойная река в вашем коде, которую вы сами прошли от истока до устья, то RxJava — это горная река, которая сама несёт вас на плоту. В RxJava основной тип — Observable<T> (или Flowable<T> для более "тяжёлых" случаев с поддержкой backpressure). Он тоже позволяет делать привычные filter, map, reduce (иногда методы могут называться чуть иначе, например, reduce в RxJava может называться fold). Разница в том, что Observable сам генерирует элементы со временем, и вы подписываетесь (subscribe) на его поток данных. Это уже не просто обработка коллекции — это реактивное программирование, где источником могут быть, например, события UI или сетевые сообщения.

Простой пример на RxJava (Observable), аналогичный нашему с суммой чётных чисел:

Observable<Integer> source = Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5));
source
    .filter(x -> x % 2 == 0)
    .map(x -> x * 2)
    .reduce(0, (a, b) -> a + b)
    .subscribe(sum -> System.out.println("Сумма: " + sum));

Вместо прямого возвращения результата у нас subscribe, потому что всё асинхронно: Observable начнёт эмитить (выдавать) элементы, наша цепочка их обработает, а итог (24) попадёт в subscribe. Тут важно: .reduce в RxJava вернёт тоже Observable (в данном случае Observable одного значения — суммы), поэтому мы на него подписываемся. В Stream API же reduce возвращает уже готовое значение, потому что стрим был синхронный и сразу всё вычислил.

RxJava часто используется в Android-разработке и вообще там, где нужно реагировать на события. Он мощный, но иногда чересчур: можно легко запутаться, на каком потоке (Scheduler) что выполнять, как правильно обрабатывать ошибки... Впрочем, философия "filter-map" у него та же. И, зная про Stream API, я смог разобраться в RxJava гораздо быстрее: "ага, вот тут он фильтрует данные, тут мапит — всё понятно, только ещё может делать это асинхронно и по частям".

Для нашего повествования важно, что познакомившись с RxJava, я осознал: filter-map-reduce — это вовсе не страшно. Хоть в каком виде — поток коллекции, поток событий — концепция та же, и она очень, очень полезна. Моя изначальная фобия ушла окончательно, потому что я взглянул страху в глаза, а за ним оказалась целая вселенная реактивных потоков, куда можно соваться, если хочется острых ощущений.

Эпилог: и вот я полюбил filter-map-reduce

Неожиданным поворотом для меня стало то, что, выучив стримы, я начал лучше понимать и другие концепции программирования: лямбды больше не пугали, иммутабельность данных заиграла новыми красками, а слово "реактивный" перестало ассоциироваться только с химией.

Теперь, когда кто-то из знакомых разработчиков признаётся, что боится или недолюбливает Stream API, я драматично встаю, кладу руку ему на плечо и произношу: "Не бойся, то, что сперва кажется монстром, может оказаться твоим лучшим другом. Просто дай стримам шанс." Возможно, это звучит кбанально, но что поделать — я действительно проникся.

В заключение хочу сказать: filter-map-reduce — не враги, а союзники. Они не отнимают у нас контроль, а освобождают от рутины, позволяя сконцентрироваться на сути задачи.

Так что если вы всё ещё настороженно смотрите на строчки numbers.stream().filter(...).map(...).reduce(...) — вдохните, выдохните и попробуйте сами. Возможно, вскоре вы тоже перестанете бояться и полюбите Stream API, как это сделал я. А уж шутить над своим прошлым страхом — это святое, и код тому подтверждение.

Теги:
Хабы:
Всего голосов 6: ↑5 и ↓1+4
Комментарии2

Публикации

Работа

Java разработчик
198 вакансий

Ближайшие события