Привет, Хабр!

У многих из нас, при использовании какого-либо инструмента программирования, возникал вопрос: “Как? Как это работает?”. Часто при возникновении подобных вопросов я обращаюсь к гуглу, который популярным образом рассказывает общие принципы работы того или иного механизма. Но наверняка среди читателей есть те, которые, прочитав несколько статей, подумали: "Да, это интересно и, вроде, понятно. Но, все-таки, как оно работает?".

Так вот, я тоже вхожу в число таких читателей. Поработав с каким-то механизмом, становится интересен принцип его работы. Поняв принцип работы, становится интересна сама реализация механизма. Я попробовал посмотреть Project Reactor изнутри и решил написать статью, в которой и систематизировать полученные знания. Если она окажется еще и интересной и найдет свою аудиторию, возможно, я попробую выйти на целый цикл статей под кодовым названием "внутренний мир".

В данной статье не будет глубокого описания идей реактивного программирования - подобных статей уже написано чуть меньше, чем много, поэтому для читателей, которым это интересно я оставлю ссылки в конце на ресурсы по теме. Однако, некоторые основы я напишу.

Общая информация

Реактивное программирование - это асинхронная обработка потоковых данных. Сама реализация построена на широко известном паттерне "Наблюдатель" (а точнее, его частном случае "Издатель-Подписчик"): есть поток, который выполняет роль издателя, на события которого подписывается множество подписчиков. В Java 9 был добавлен стандарт реактивных стримов (пакет org.reactivestreams), содержащих 4 интерфейса:

  1. Publisher - издатель, имеющий точку соединения между издателем и подписчиком.

    Интерфейс Publisher
    public interface Publisher<T> {
        void subscribe(Subscriber<? super T> var1);
    }

  2. Subscriber - подписчик, имеющий следующие методы обработки реактивного потока:

    • onSubscribe() выполняется при подписке;

    • onNext() выполняется для каждого элемента потока;

    • onError() выполняется в случае возникновения ошибки;

    • onComplete() выполняется при завершении потока.

    Интерфейс Subscriber
    public interface Subscriber<T> {
        void onSubscribe(Subscription var1)
        void onNext(T var1);
        void onError(Throwable var1);
        void onComplete();
    }

  3. Subscription - объект подписки. С его помощью можно запросить следующий объект, или отменить подписку.

    Интерфейс Subscription
    public interface Subscription {
        void request(long var1);
        void cancel();
    }

  4. Processor - обработчик, преобразовывающий объекты в потоке.

    Интерфейс Processor
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

В целом, идея довольно проста: 

  1. Subscriber подписывается на события Publisher.

  2. В методе Subscriber.onSubscribe() посредством Subscription.request() выполняется запрос на события.

  3. События проходят через цепочку Processor'ов и попадают в метод Subscriber.onNext().

  4. По завершению потока событий вызывается метод Subscriber.onComplete().

Стоит отметить одну важную вещь, называемую back-pressure (обратное давление). Можно заметить, что в методе запроса передается некое количество. Это сделано для того, чтобы ограничить передачу данных рамками возможностей обработки. 

Для того, что бы получить данные, необходимо вызвать Subscription.request() в методе Subscriber.onSubscribe(). Если необходимо непрерывное получение данных, можно передать Long.MAX_VALUE. Если же мы используем механизм back-pressure, необходимо вызывать Subscription.request() в методе Subscriber.onNext(), что бы получать данные по мере их обработки.

В целом, это и есть вся "общая" информация о реактивных стримах. Теперь хотелось бы перейти к самому Project Reactor (далее - реактор).

Механизм стримов в реакторе

В реакторе появился новый интерфейс Disposable. Разница между ним и Subscription в том, что Subscription используется для управления подпиской внутри подписчика, а Disposable - извне.

Попробуем разобраться, как работает сам стрим в реакторе. Мы будем смотреть на примере Flux, но все, описанное ниже актуально и для Mono.

Для примера будет проверять числа на простоту. Функция специально сделана максимально примитивно, что бы занимала время.

Предикат проверки на простоту
private final Predicate<Integer> checker = num -> {
    for (int i = 2; i < num; i++) {
        if (num % i == 0) {
            return false;
        }
    }
    return true;
};

Функция генерации массива
private Integer[] generateIntArray(int size) {
    Integer[] array = new Integer[size];
    for (int i = 0; i < size; i++) {
        array[i] = ThreadLocalRandom.current().nextInt(90_000_000, 100_000_000);
    }
    return array;
}

Теперь перейдем к самому стриму:

Первый стрим
@Test
public void reactorStream() {
    Integer[] array = generateIntArray(100);

    Flux.fromArray(array)
            .filter(i -> i % 2 != 0)
            .map(i -> "Number " + i + " is prime: " + checker.test(i))
            .subscribe(System.out::println);
}
Пример вывода
Number 94296669 is prime: false
Number 94305859 is prime: false
Number 92332843 is prime: true
Number 90404043 is prime: false
Number 99827085 is prime: false
Number 90833557 is prime: false

На таком маленьком примере мы начнем изучение внутренней работы реактора. Итак, что тут происходит? Условно, работу стрима можно поделить на 3 части: сборка, подписка и выполнение.

Наш стрим начинается с фабричного метода Flux.fromArray(). Провалившись в него мы увидим, что класс Flux является абстрактным, а возвращается нам его реализация в виде FluxArray. Для себя я назвал такой flux "порождающим", поскольку он всегда стоит в начале цепочки и транслирует дальше элементы для обработки из передаваемого в него ресурса. Такой "порождающий" flux не единственный: например, Flux.fromIterable(), Flux.fromStream() и Flux.range() возвращают FluxIterable, FluxStream и FluxRange соответственно. Каждый "порождающий" flux возвращает собственную реализацию.

Далее мы сразу отсеиваем четные числа, поскольку они нас не интересуют, вызвав метод filter(). Провалившись в него, мы узнаем, что он тоже возвращает собственную реализацию flux - FluxFilter.

Получается, существует не только много "порождающих" версий flux, но и "модифицирующих". Так, вызванный нами метод filter() возвращает уже FluxFilter, ресурсом для которого является вышестоящий flux (в нашем случае - FluxArray). Вызванный нами далее map() в этом плане имеет точно такое же поведение, но возвращает уже FluxMap (а ресурсом для него является FluxFilter).

Для большей наглядности покажу, как могло бы выглядеть все вышеописанное без реактора:

Псевдокод: этап сборки
Flux<Integer> sourceFlux = Flux.fromArray(array);
Flux<Integer> filterFlux = new FluxFilter(sourceFlux, i -> i / 2 != 0);
Flux<Integer> mapFlux = new FluxMap(filterFlux, i -> "Number " + i + " is prime: " + checker.isPrimeNumber(i));

Результирующий flux можно описать таким псевдокодом: FluxMap(FluxFilter(FluxArray(source)))

На вызове метода subscribe() заканчивается этап сборки и начинается этап подписки.

Несколько слов о subscribe()

Прежде чем мы приступим к этапу подписки, хотелось бы написать несколько слов о самом методе subscribe().

Условно реализации метода subscribe() в реакторе можно поделить на 2 части: методы, принимающих лямбды для обработки различных сигналов стрима, и метод, принимающий полноценную реализацию CoreSubscriber

В первом варианте мы можем передать функции для обработки сигналов стрима в различных вариациях: одну функцию (для обработки элементов), две функции (для обработки элементов и ошибок), три функции (для обработки элементов, ошибок и действие при завершении стрима) и так далее. Впоследствии из них собирается LambdaSubscriber.

Во втором варианте мы можем передать полностью свою реализацию CoreSubscriber. Для его реализации рекомендуется использовать абстрактный класс BaseSubscriber, предоставленный реактором.

Отмечу, что метод subscribe() возвращает нам объект Disposable, с помощью которого мы можем отменить подписку на стрим "снаружи".

Метод subscribe() для "модифицирующих" flux реализован в абстрактном классе InternalFluxOperator, который они наследуют. Внутри него происходит новое "оборачивание", уже подписчиков: для каждого "модифицирующего" flux существует своя версия CoreSubscriber (в нашем случае это FilterSubscriber и MapSubscriber). Условным псевдокодом это можно описать так:

Псевдокод: этап подписки
mapFlux.subscribe(Subscriber){
			MapSubscriber mapSubscriber = new MapSubscriber(Subscriber);
			filterFlux.subscribe(mapSubscriber) {
				FilterSubscriber filterSubscriber = new FilterSubscriber(mapSubscriber);
				arrayFlux.subscribe(filterSubscriber) {
					// Реальная подписка и передача элементов
				}
			}
		}

Результирующего Subscriber можно описать таким псевдокодом: FilterSubscriber(MapSubscriber(Subscriber))

На этом заканчивается этап подписки и начинается этап выполнения потока.

"Порождающие" flux не оборачивают Subscriber - они непосредственно вызывают у него метод onSubscribe() и передают в него свою реализацию Subscription. Каждый "порождающий" flux имеет свою реализацию Subscription: ArraySubscription, IterableSubscription, StreamSubscription, RangeSubscription и т.д. Каждый Subscriber в методе onSubscribe() сохраняет у себя ссылку на Subscription и передает ее вышестоящему подписчику, вплоть до конечной реализации, в которой метод onSubscribe() должен содержать логику запроса данных (а именно - вызов метода Subscription.request()) для того, чтобы стрим стартовал. 

LambdaSubscriber как пример

Если мы передали функцию Consumer<Subscription> в методе подписки стрима - вызовется она (subscriptionConsumer в коде), иначе LambdaSubscriber по умолчанию сделает запрос на получение данных по мере их поступления.

  public final void onSubscribe(Subscription s) {
      if (Operators.validate(subscription, s)) {
          this.subscription = s;
          if (subscriptionConsumer != null) {
              try {
                  subscriptionConsumer.accept(s);
              }
              catch (Throwable t) {
                  Exceptions.throwIfFatal(t);
                  s.cancel();
                  onError(t);
              }
          }
          else {
              s.request(Long.MAX_VALUE);
          }
      }
  }

Subscription и является итоговым источником последовательности - она содержит соответствующий ресурс (массив, стрим и т.д.) и при вызове метода request()начинает передачу данных по потоку. Так же Subscription хранит и ссылку на конечного Subscriber (в нашем случае - FilterSubscriber), в которого и начинает транслировать последовательность, вызывая у него метод onNext(). Именно в реализациях подписчиков и выполняется вся логика потока. Для понятности опишу последовательность выполнения нашего стрима:

  1. LambdaSubscriber вызывает ArraySubscription.request(Long.MAX_VALUE).

  2. ArraySubscription вызывает FilterSubscriber.onNext().

  3. FilterSubscriber выполняет логику переданного нами предиката, и затем вызывает MapSubscriber.onNext() для элементов, прошедших проверку.

  4. MapSubscriber выполняет логику переданной нами функции преобразования и вызывает LambdaSubscriber.onNext().

  5. LambdaSubscriber выполняет переданную нами в методе subscribe() логику.

  6. В конце последовательности ArraySubscription вызывает FilterSubscriber.onComplete(), который транслируется по той же цепочке до LambdaSubscriber и выполняется переданная пользователем логика (в нашем случае мы никакую логику не передавали, потому никаких действий не произойдет).

Это примерная логика работы стрима. Внимательный читатель может резонно заметить: а где же асинхронность? И действительно, в вышестоящем примере никакой асинхронности нет, все выполняется в одном потоке. Для того, что бы сделать наш код асинхронным, выполним ряд преобразований.

Управление потоками

Для начала немного изменим логику нашего стрима. С помощью Flux.create() создадим тестовую последовательность, которую будем преобразовывать в массивы чисел. Далее мы фильтруем массив на четные числа, и начинаем проверять оставшиеся на простоту в операторе flatMap(). FluxFlatMap принимает функцию, преобразующую элемент в Publisher - другой стрим - и встраивает его в основную цепочку преобразований. Сам подписчик FlatMapMain отличается от остальных - его реализация на порядок сложнее.

Синхронный стрим
@Test
public void syncStream() {
    Flux.<Integer>create(s -> {
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(150);
                s.next(150);
                s.next(150);
                s.next(150);
                s.complete();
            }).map(this::generateIntArray)
            .map(array -> Arrays.stream(array)
                    .filter(i -> (i % 2 != 0))
                    .collect(Collectors.toList()))
            .flatMap(list -> Flux.fromIterable(list)
                    .map(i -> "Number " + i + " is prime: " + checker.test(i)))
            .subscribe(result -> System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> " + result));
}
Часть вывода
IN thread [main] -> Number 99126449 is prime: false
IN thread [main] -> Number 99453773 is prime: true
IN thread [main] -> Number 98450499 is prime: false
IN thread [main] -> Number 90454073 is prime: false
IN thread [main] -> Number 93393817 is prime: false

После выполнения отметим, что все объекты обрабатываются в main потоке.

Теперь добавим в цепочку оператор subscribeOn() с аргументом Schedulers.boundedElastic(). Schedulers - аналог ExecutorService в реакторе, а сам Schedulers.boundedElastic() - аналог Executors.newCachedThreadPool(). Оператор subscribeOn() позволяет изменить рабочий поток выполнения стрима, начиная с подписки. 

Немного о шедулерах

В реакторе представлены 3 типа шедулеров, по аналогии с ExecutorService:

  • Schedulers.boundedElastic(): аналог Executors.newCachedThreadPool(). Динамически создает рабочие потоки и кэширует пулы потоков выполнения. Максимальное число создаваемых пулов потоков выполнения не ограничивается, поэтому этот планировщик можно использовать для организации выполнения задач, связанных с вводом/выводом.

  • Schedulers.parallel(): аналог Executors.newFixedThreadPool(). Имеет фиксированный размер пула рабочих потоков (по умолчанию количество потоков ограничивается числом ядер процессора). Поддерживает планирование задач по времени.

  • Schedulers.single(): аналог Executors.newSingleThreadExecutor(). Используется для выполнения задач в одном выделенном потоке. Поддерживает планирование задач по времени.

Каждый из них также имеет методы создания с параметрами, а так же имеется возможность создать свой шедулер из Executor или ExecutorService.

Будьте осторожны: методы Schedulers.boundedElastic(), Schedulers.parallel() и Schedulers.single() создают шедулер единожды при первом вызове и кэшируют его. Дальнейшие вызовы этих методов вернут кэшированный экземпляр шедулера, что может быть особенно критично при использовании Schedulers.parallel() и Schedulers.single().

Асинхронный стрим
@Test
public void asyncStream() {
    Flux.<Integer>create(s -> {
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(150);
                s.next(150);
                s.next(150);
                s.next(150);
                s.complete();
            }).map(this::generateIntArray)
            .map(array -> Arrays.stream(array)
                    .filter(i -> (i % 2 != 0))
                    .collect(Collectors.toList()))
            .flatMap(list -> Flux.fromIterable(list)
                    .map(i -> "Number " + i + " is prime: " + checker.test(i)))
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(result -> System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> " + result));
}

Запустим наш стрим и... увидим, что ничего не выполнилось. Программа просто завершилась. Это произошло, потому что выполнение стрима перенеслось в другой поток, а поток main попросту завершился вместе с программой. В реальном приложении такое вряд ли произойдет, но для нашего теста добавим замок для основного потока.

Асинхронный стрим 2
@Test
public void asyncStream() throws InterruptedException {
    var cdl = new CountDownLatch(1);

    Flux.<Integer>create(s -> {
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(150);
                s.next(150);
                s.next(150);
                s.next(150);
                s.complete();
            }).map(this::generateIntArray)
            .map(array -> Arrays.stream(array)
                    .filter(i -> (i % 2 != 0))
                    .collect(Collectors.toList()))
            .flatMap(list -> Flux.fromIterable(list)
                    .map(i -> "Number " + i + " is prime: " + checker.test(i)))
            .subscribeOn(Schedulers.boundedElastic())
            .doFinally(ignore -> cdl.countDown())
            .subscribe(result -> System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> " + result));

    cdl.await();
}
Часть вывода
IN thread [boundedElastic-1] -> Number 90553495 is prime: false
IN thread [boundedElastic-1] -> Number 97965125 is prime: false
IN thread [boundedElastic-1] -> Number 95508257 is prime: false
IN thread [boundedElastic-1] -> Number 92073469 is prime: true
IN thread [boundedElastic-1] -> Number 93561047 is prime: false
IN thread [boundedElastic-1] -> Number 90207993 is prime: false
IN thread [boundedElastic-1] -> Number 90418581 is prime: false

В методе doFinaly() добавим уменьшение счетчика для нашего замка. Отмечу, что если в созданной последовательности не указать s.complete(), данный метод вызван не будет, поскольку сигнала об окончании последовательности не поступит.

Запустим, и увидим, что все действия выполнялись в другом потоке.

Снова изменим наш стрим, заменив subscribeOn() на publishOn()

Оператор publishOn()

Данный оператор позволяет перенести определенный этап выполнения в другой поток.  publishOn() имеет очередь, в которую записываются поступившие элементы, которые впоследствии извлекает и обрабатывает рабочий поток. Как следствие, последовательность разбивается на два независимых этапа обработки данных.

Асинхронный стрим 3
@Test
public void asyncStream() throws InterruptedException {
    var cdl = new CountDownLatch(1);

    Flux.<Integer>create(s -> {
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(150);
                s.next(150);
                s.next(150);
                s.next(150);
                s.complete();
            }).map(this::generateIntArray)
            .map(array -> {
                System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> filtering array with size: " + array.length);
                return Arrays.stream(array)
                        .filter(i -> (i % 2 != 0))
                        .collect(Collectors.toList());
            })
            .publishOn(Schedulers.boundedElastic())
            .flatMap(list -> Flux.fromIterable(list)
                    .map(i -> "Number " + i + " is prime: " + checker.test(i)))
            .doFinally(ignore -> cdl.countDown())
            .subscribe(result -> System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> " + result));

    cdl.await();
}
Часть вывода
IN thread [main] -> filtering array with size: 150
IN thread [main] -> filtering array with size: 150
IN thread [main] -> filtering array with size: 150
IN thread [main] -> filtering array with size: 150
IN thread [boundedElastic-1] -> Number 97321949 is prime: false
IN thread [boundedElastic-1] -> Number 92914819 is prime: false
IN thread [boundedElastic-1] -> Number 94001827 is prime: false

Запустим, и увидим, что первая часть стрима отрабатывает в main потоке, а оканчивается в другом. Мы разделили выполнение стрима на 2 части - формирование данных и их обработка.

Однако, что, если мы хотим больше? Например, обработать каждый сгенерированный массив в отдельном потоке? 

Асинхронный стрим 4
@Test
public void researchReactor_2_4() throws InterruptedException {
    var cdl = new CountDownLatch(1);

    Flux.<Integer>create(s -> {
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(150);
                s.next(150);
                s.next(150);
                s.next(150);
                s.complete();
            }).map(this::generateIntArray)
            .map(array -> {
                System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> filtering array with size: " + array.length);
                return Arrays.stream(array)
                        .filter(i -> (i % 2 != 0))
                        .collect(Collectors.toList());
            })
            .flatMap(list -> Flux.fromIterable(list)
                    .publishOn(Schedulers.boundedElastic())
                    .map(i -> "Number " + i + " is prime: " + checker.test(i)))
            .doFinally(ignore -> cdl.countDown())
            .subscribe(result -> System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> " + result));

    cdl.await();
}
Часть вывода
IN thread [main] -> filtering array with size: 150
IN thread [main] -> filtering array with size: 150
IN thread [boundedElastic-2] -> Number 98846721 is prime: false
IN thread [boundedElastic-2] -> Number 92572645 is prime: false
IN thread [boundedElastic-2] -> Number 98842737 is prime: false
IN thread [boundedElastic-12] -> Number 95426589 is prime: false
IN thread [boundedElastic-12] -> Number 91028725 is prime: false
IN thread [boundedElastic-3] -> Number 91457669 is prime: true
IN thread [boundedElastic-3] -> Number 95625013 is prime: false
IN thread [boundedElastic-4] -> Number 91923637 is prime: false

Прекрасно, теперь каждый массив обрабатывается в отдельном потоке. Но что если мы хотим выделить поток для каждой задачи? Например, проверка происходит не в нашем приложении, а производится rest вызов, и мы хотим для каждого элемента сделать асинхронный запрос?

Асинхронный стрим 5
@Test
public void researchReactor_2_5() throws InterruptedException {
    var cdl = new CountDownLatch(1);

    Flux.<Integer>create(s -> {
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(50);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(100);
                s.next(150);
                s.next(150);
                s.next(150);
                s.next(150);
                s.complete();
            }).map(this::generateIntArray)
            .map(array -> {
                System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> filtering array with size: " + array.length);
                return Arrays.stream(array)
                        .filter(i -> (i % 2 != 0))
                        .collect(Collectors.toList());
            })
            .flatMap(list -> Flux.fromIterable(list)
                    .flatMap(i -> Mono.defer(() -> Mono.just("Number " + i + " is prime: " + checker.test(i)))
                            .subscribeOn(Schedulers.boundedElastic()))
            )
            .doFinally(ignore -> cdl.countDown())
            .subscribe(result -> System.out.println("IN thread [" + Thread.currentThread().getName() + "] -> " + result));

    cdl.await();
}
Часть вывода
IN thread [main] -> filtering array with size: 100
IN thread [boundedElastic-30] -> Number 92187347 is prime: false
IN thread [boundedElastic-9] -> Number 96199459 is prime: false
IN thread [boundedElastic-24] -> Number 96286047 is prime: false

И вот мы применили целых 5 стратегий выполнения потока: синхронно полностью в основном потоке, синхронно полностью в выбранном потоке, асинхронно в двух потоках (в одном подготовка, в другом обработка), асинхронно по потоку на массив, асинхронно по потоку на элемент, при этом почти не меняя сам код.

Холодные и горячие потоки

Холодные потоки генерируют всю последовательность заново для каждого нового подписчика. Примеры таких стримов приведены выше.

Горячие потоки генерируют последовательность независимо от наличия подписчиков. При появлении подписчика такой стрим посылает ему только новые данные. Таким образом, подписчик получает только те данные, на которые успел подписаться. Например, если запустить следующий код, первый подписчик начнет обрабатывать элементы, начиная с 3, а второй - с 7.

@Test
public void hotStream() throws InterruptedException {
    var cdl = new CountDownLatch(1);

    Flux<Integer> stream = Flux.range(0, 10)
            .delayElements(Duration.ofMillis(500))
            .doFinally(ignore -> cdl.countDown())
            .subscribeOn(Schedulers.single())
            .share();

    stream.subscribe();

    Thread.sleep(2000);

    stream.subscribe(o -> System.out.println("[" + Thread.currentThread().getName() + "] Subscriber 1 -> " + o));

    Thread.sleep(2000);

    stream.subscribe(o -> System.out.println("[" + Thread.currentThread().getName() + "] Subscriber 2 -> " + o));

    cdl.await();
}
Пример бесконечной горячей последовательности
@Test
public void hotStream() throws InterruptedException {
    var cdl = new CountDownLatch(1);

    Flux<Object> stream = Flux.create(fluxSink -> {
                while (true) {
                    fluxSink.next(System.currentTimeMillis());
                }
            })
            .sample(Duration.ofMillis(500))
            .doFinally(ignore -> cdl.countDown())
            .subscribeOn(Schedulers.single())
            .share();

    stream.subscribe(o -> System.out.println("[" + Thread.currentThread().getName() + "] Subscriber 1 -> " + o));

    Thread.sleep(4000);

    stream.subscribe(o -> System.out.println("[" + Thread.currentThread().getName() + "] Subscriber 2 -> " + o));

    cdl.await();
}

Работа с контекстом

Еще одной интересной особенностью реактора является контекст стрима. В “классическом” программировании (“поток на задачу”) контекст задачи сделать довольно просто, реализовав его ThreadLocal<?> переменной. В реактивных стримах такой возможности нет, т.к. одну задачу могут выполнять разные потоки в разное время. Соответственно, есть необходимость как то хранить контекст для задачи, не привязываясь к потоку-исполнителю. Рассмотрим контекст на простом примере - наш стрим будет проверять, является ли число в контексте делителем числа из потока.

В реакторе есть несколько способов создать контекст потока:

  • передать его в качестве аргумента в методе подписки

    Пример инициализации контекста 1
    @Test
    public void reactorContext_1() {
        String ctxKey = "key";
    
        Flux.fromArray(generateIntArray(10))
                .flatMap(i -> Mono.deferContextual(ctx -> {
                            int value = ctx.<Integer>getOrEmpty(ctxKey).orElseThrow(() -> new IllegalArgumentException("Ctx key not found!"));
                            String result = i % value == 0
                                    ? String.format("Thread [%s] -> %d divisor of the number %d", Thread.currentThread().getName(), value, i)
                                    : String.format("Thread [%s] -> %d NOT divisor of the number %d", Thread.currentThread().getName(), value, i);
    
                            return Mono.just(result);
                        })
                ).subscribe(System.out::println,
                        null,
                        null,
                        Context.of(ctxKey, ThreadLocalRandom.current().nextInt(2, 10)));
    }
    Пример вывода
    Thread [main] -> 6 NOT divisor of the number 96484372
    Thread [main] -> 6 NOT divisor of the number 99751334
    Thread [main] -> 6 NOT divisor of the number 98114603
    Thread [main] -> 6 NOT divisor of the number 94526528
    Thread [main] -> 6 NOT divisor of the number 99601715
    Thread [main] -> 6 NOT divisor of the number 94450652
    Thread [main] -> 6 NOT divisor of the number 96186878
    Thread [main] -> 6 divisor of the number 95334678
    Thread [main] -> 6 NOT divisor of the number 91412254
    Thread [main] -> 6 divisor of the number 97741992

  • передать его в операторе contextWrite()

    Пример инициализации контекста 2
    @Test
    public void reactorContext_2() {
        String ctxKey = "key";
    
        Flux.fromArray(generateIntArray(10))
                .flatMap(i -> Mono.deferContextual(ctx -> {
                            int value = ctx.<Integer>getOrEmpty(ctxKey).orElseThrow(() -> new IllegalArgumentException("Ctx key not found!"));
                            String result = i % value == 0
                                    ? String.format("Thread [%s] -> %d divisor of the number %d", Thread.currentThread().getName(), value, i)
                                    : String.format("Thread [%s] -> %d NOT divisor of the number %d", Thread.currentThread().getName(), value, i);
    
                            return Mono.just(result);
                        })
                ).contextWrite(ctx -> ctx.put(ctxKey, ThreadLocalRandom.current().nextInt(2, 10)))
                .subscribe(System.out::println);
    }
    Пример вывода
    Thread [main] -> 3 divisor of the number 94070187
    Thread [main] -> 3 NOT divisor of the number 96881164
    Thread [main] -> 3 NOT divisor of the number 93117008
    Thread [main] -> 3 NOT divisor of the number 99847222
    Thread [main] -> 3 NOT divisor of the number 99232121
    Thread [main] -> 3 divisor of the number 90207831
    Thread [main] -> 3 divisor of the number 98137233
    Thread [main] -> 3 divisor of the number 93991545
    Thread [main] -> 3 divisor of the number 99188091
    Thread [main] -> 3 divisor of the number 99287157

    Пример инициализации контекста 3
    @Test
    public void reactorContext_2_1() {
        String ctxKey = "key";
    
        Flux.fromArray(generateIntArray(10))
                .flatMap(i -> Mono.deferContextual(ctx -> {
                            int value = ctx.<Integer>getOrEmpty(ctxKey).orElseThrow(() -> new IllegalArgumentException("Ctx key not found!"));
                            String result = i % value == 0
                                    ? String.format("Thread [%s] -> %d divisor of the number %d", Thread.currentThread().getName(), value, i)
                                    : String.format("Thread [%s] -> %d NOT divisor of the number %d", Thread.currentThread().getName(), value, i);
    
                            return Mono.just(result);
                        })
                ).contextWrite(Context.of(ctxKey, ThreadLocalRandom.current().nextInt(2, 10)))
                .subscribe(System.out::println);
    }
    Пример вывода
    Thread [main] -> 4 divisor of the number 94968444
    Thread [main] -> 4 divisor of the number 98424152
    Thread [main] -> 4 NOT divisor of the number 99689442
    Thread [main] -> 4 divisor of the number 97327236
    Thread [main] -> 4 NOT divisor of the number 94170947
    Thread [main] -> 4 divisor of the number 96579680
    Thread [main] -> 4 NOT divisor of the number 91238971
    Thread [main] -> 4 NOT divisor of the number 90252134
    Thread [main] -> 4 divisor of the number 95264784
    Thread [main] -> 4 NOT divisor of the number 99502289

Контекст будет виден во всех операторах, объявленных выше по цепочке. Если мы объявим контекст в начале стрима, то увидим ошибку

Ошибочное обращение к контексту 1
@Test
public void reactorContext_3() {
    String ctxKey = "key";

    Flux.fromArray(generateIntArray(10))
            .contextWrite(ctx -> ctx.put(ctxKey, 4))
            .flatMap(i -> Mono.deferContextual(ctx -> {
                        int value = ctx.<Integer>getOrEmpty(ctxKey).orElseThrow(() -> new IllegalArgumentException("Ctx key not found!"));
                        String result = i % value == 0
                                ? String.format("Thread [%s] -> %d divisor of the number %d", Thread.currentThread().getName(), value, i)
                                : String.format("Thread [%s] -> %d NOT divisor of the number %d", Thread.currentThread().getName(), value, i);

                        return Mono.just(result);
                    })
            ).subscribe(System.out::println);
}
Пример вывода
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Ctx key not found!
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Ctx key not found!
Caused by: java.lang.IllegalArgumentException: Ctx key not found!
	at ru.brutforcer.reactor.aricle.ReactorTests.lambda$reactorContext_3$62(ReactorTests.java:340)
	at java.base/java.util.Optional.orElseThrow(Optional.java:403)

Также мы можем объявить контекст для “внутренней” цепочки. В примере специально в контекст установлено рандомное число: в примерах выше оно генерируется единожды и является единым для всего стрима. В новом примере для каждого элемента последовательности генерируется свое число для проверки.

Пример инициализации контекста 4
@Test
public void reactorContext_4() {
    String ctxKey = "key";

    Flux.fromArray(generateIntArray(10))
            .flatMap(i -> Mono.deferContextual(ctx -> {
                    int value = ctx.<Integer>getOrEmpty(ctxKey).orElseThrow(() -> new IllegalArgumentException("Ctx key not found!"));
                    String result = i % value == 0
                            ? String.format("Thread [%s] -> %d divisor of the number %d", Thread.currentThread().getName(), value, i)
                            : String.format("Thread [%s] -> %d NOT divisor of the number %d", Thread.currentThread().getName(), value, i);

                    return Mono.just(result);
                }).contextWrite(ctx -> ctx.put(ctxKey, ThreadLocalRandom.current().nextInt(2, 10)))
            ).subscribe(System.out::println);
}
Пример вывода
Thread [main] -> 4 NOT divisor of the number 91486019
Thread [main] -> 2 NOT divisor of the number 95953267
Thread [main] -> 5 NOT divisor of the number 95931323
Thread [main] -> 4 NOT divisor of the number 99936074
Thread [main] -> 4 divisor of the number 99891164
Thread [main] -> 3 divisor of the number 96090381
Thread [main] -> 9 NOT divisor of the number 97190858
Thread [main] -> 9 NOT divisor of the number 97639514
Thread [main] -> 4 divisor of the number 90896348
Thread [main] -> 4 NOT divisor of the number 98617771

И важная особенность: контекст, объявленный во “внутренней” цепочке, не будет виден во “внешней”. В примере ниже выйдет ошибка.

Ошибочное обращение к контексту 2
@Test
public void reactorContext_5() {
    String ctxKey = "key";

    Flux.fromArray(generateIntArray(10))
            .flatMap(i -> Mono.<Integer>deferContextual(ctx -> {
                int v = ctx.<Integer>getOrEmpty(ctxKey).orElseThrow(() -> new IllegalArgumentException("Ctx key not found!"));
                return Mono.just(i);
            }))
            .flatMap(i -> Mono.deferContextual(ctx -> Mono.just(i))
                    .contextWrite(ctx -> ctx.put(ctxKey, ThreadLocalRandom.current().nextInt(2, 10))))
            .subscribe(System.out::println);
}

Другие статьи из цикла "Внутренний мир"

Полезные ссылки