RxJava — это реализация ReactiveX для Java — библиотеки для асинхронной обработки потоков данных. Паттерн observable на стероидах, как они сами пишут. В интернете, в том числе на Хабре, есть много «введений в RxJava». Я хочу привести несколько примеров реальных задач. Они не очень сложные, но возможно кто-то увидит какие-то сходства со своими и задумается.
Собственно, задачи:
1. Простое клиентское TCP-соединение. Есть протокол поверх TCP/IP, нужно сформировать сообщение, подключиться к удаленному узлу, если еще не подключился, передать сообщение и прочитать ответ. Плюс обработка ошибок, проверка таймаутов, повтор отправки в случае неудачи. Жестких требований к производительности нет, трафик не большой.
2. Есть двигатель и некоторый датчик. Нужно произвести сканирование — пройтись двигателем по заданной траектории: послать двигатель к точке, дождаться, когда он к ней приедет, снять показания датчика, отобразить точку на графике (в GUI потоке), поехать к следующей точке…
3. Полученные после сканирования данные нужно обработать (условно длительный вычислительный процесс) и засунуть в pdf-отчет (условно длительный процесс ввода-вывода) вместе с изображением графика и данными введенными пользователем (GUI поток).
Предположим, есть некоторый протокол обмена сообщениями. Сообщения могут содержать заголовок, контрольную сумму или что-нибудь еще. На каждое сообщение должен быть ответ от сервера. В простейшем виде решение может выглядеть как-то так:
Детали реализации я не описываю, но вкратце: connect() создает java.net.Socket и подключается к серверу, sendBytes() пишет в output-поток сокета, readAnswer() читает из input-потока сокета. Помимо addHeader() могут быть еще методы, добавляющие контрольную сумму, кодирование и прочее.
Проблемы этого кода: блокирующие запись/чтение и неудобная обработка ошибок — не понятно, что делать с исключением: то ли наверх пробрасывать, то ли тут что-то сделать (рекурсивно повторить отправку?). Как раз эти две проблемы и решает RxJava. Перепишем:
Применение:
В общем-то, получилось то же самое, только в виде монады, набора операторов и с рядом нюансов.
Во-первых, метод sendBytes() теперь возвращает boolean. RxJava работает с потоками данных, а если кто-то возвращает void вместо данных, то потока как-бы уже и нет. Поэтому нужно либо добавить возвращаемый результат в метод (хотя бы return true), либо вместо map использовать doOnNext — этот оператор возвращает то же, что и получил.
Во-вторых, метод send() теперь возвращает Observable, а не сам String. Поэтому нужен отдельный обработчик ответа (или лямбда, как в примере). С исключением то же самое. Тут нужно, как говорится, начать мыслить асинхронно. Вместо самого результата, мы получаем объект, который потом когда-нибудь предоставит нам результат, а мы должны предоставить ему того, что этот результат получит. Вот только этот код все еще блокирующий, поэтому это асинхронное мышление не имеет смысла. Можно, правда, сделать обертку для String и вытащить результат из монады через замыкание этой обертки, но это уже грязные хаки, которые нарушают принципы функционального программирования.
Улучшим этот код. Начнем с обработки ошибок. RxJava отлавливает исключения, возникающие в операторах, и передает их подписчику. Второй аргумент метода subscribe() — это функциональный интерфейс Action1 — он как раз и отвечает за обработку исключения. Если какой-то из методов раньше мог кидать IOException или какое-то еще checked исключение, то теперь больше нельзя. Такие исключения нужно ловить руками и что-то с ними делать. Например, оборачивать в RuntimeException, чтобы предоставить дальнейшие решения RxJava. Но Action1 не сильно отличается от обычного try-catch подхода. У RxJava есть операторы для работы с ошибками: doOnError(), onErrorReturn(), onErrorResumeNext() и onExceptionResumeNext(). А еще есть банальный retry(), который тут как раз и нужен. Если возникла какая-то ошибка с подключением, то можно просто повторить отправку n-раз.
Обработчик исключения, передаваемый в subscribe() будет вызван только в том случае, если все повторы закончатся с ошибкой. Для надежности еще вызываем disconnect() перед повторной попыткой, чтобы закрыть и обнулить сокет. Иначе в checkConnection() внутри при вызове isConnected() можем получить ложно положительное срабатывание, и все повторные попытки опять приведут к ошибке. Например, если сервер убил подключение по таймауту, то метод Socket.isConnected() на стороне клиента все еще будет возвращать true — со стороны клиента сокет же подключен, все норм.
Можно еще добавить таймаут на случай, если серверу поплохело, и клиент заблокировался на записи в сокет:
Оператор timeout кидает исключение, если в течение заданного времени от Observable не поступило ни одного элемента. А исключения мы уже умеем обрабатывать.
Теперь вторая проблема — у нас все еще блокирующие операции, поэтому если вызывать send() из потока GUI, то можно получить подвисания интерфейса. Нужно просто сказать RxJava, чтобы все эти действия выполнялись в другом потоке.
Для этого есть операторы observeOn() и subscribeOn(). У многих людей возникают проблемы с пониманием, чем отличаются эти операторы — есть куча статей на эту тему и вопросов на stackoverflow. Давайте вновь поднимем эту тему и вместе подумаем, что нам нужно сейчас использовать. Вот что пишут в официальной документации:
Observable — это тот, кто поставляет данные. Observer — это тот, кто получает данные и что-то с ними делает. Нам нужно, чтобы все выполнялось в другом потоке. Вернее, нам нужно, чтобы наш Observable поставлял данные изначально в другом потоке. А раз данные поставляются в другом потоке, то и все observer'ы будут их обрабатывать в другом потоке. Это по определению subscribeOn() — он определяет планировщика для Observable, которого мы создали в самом начале:
Теперь операторы будут выполняться в потоке, который им предоставит планировщик io. Если несколько раз подряд вызвать send(), не дожидаясь ответа, то могут возникнуть проблемы с синхронизацией. По-хорошему, функции переданные в операторы, должны быть чистыми (без побочных эффектов), но в случае с сокетом это проблематично. Чистые функции вообще не очень дружат с вводом/выводом. Нужно синхронизировать обращения к сокету или реализовать что-то типа ConnectionPool'а — тут нужно исходить из задачи.
Стоит иметь в виду, что тогда и обработка ответа подписчиком (он тоже observer) будет осуществляться в другом потоке, а это не всегда хорошо. Например, если мы хотим отобразить ответ в графическом интерфейсе, то скорее всего получим исключение, что мы это делаем не в главном потоке. Для этого нужно поместить обработчик в очередь событий фреймворка, отвечающего за графический интерфейс. В разных фреймворках это делается по-разному. В JavaFX для этого есть метод Platform.runLater(runnable). Можно вызывать его напрямую в обработчике ответа, а можно написать свой планировщик:
К слову, для Android существует AndroidSchedulers.mainThread() в RxAndroid — дополнении для RxJava. Пример отправки команды тогда имеет вид:
Здесь мы используем уже observeOn() — нам нужно сообщить RxJava, что «следующий observer должен выполняться через такой-то планировщик».
RxJava предоставляет удобное управление конвейером операторов. Рядом с .map(bytes -> sendBytes(bytes)) можно добавить расчет контрольной суммы, потом прогнать байты через кодирование. Можно добавить вначале логгирование исходящей команды, а в конце — полученного ответа. В общем, идею вы поняли.
Есть набор точек — это могут быть углы поворота двигателя в градусах или позиции устройства, которое приводится в движение двигателем. В общем, есть какой-то актуатор. А еще есть внешний датчик, с которого можно получать значения. Нужно проехаться двигателем по набору точек, в каждой точке получить значение с датчика, построить кривую на графике. Повторить процедуру n раз (n кривых на графике). При этом двигатель работает не мгновенно, нужно ждать, когда он выйдет на позицию.
Итак, мы имеем набор точек, для каждой нужно что-то сделать (желательно в другом потоке), а результат обрабатывается в GUI потоке (добавить точку на LineChart, например). Похоже на типичную задачу для RxJava.
Используем Schedulers.io(): управление двигателем и датчиком — это все-таки операции ввода-вывода. moveMotor() посылает команду двигателю (через написанный ранее Connection, например).
blockUntilTargetReached() запрашивает у двигателя его позицию, сравнивает с целевой и усыпляет поток на сколько-то миллисекунд, если двигатель еще не доехал. createResultPoint() запрашивает у датчика значение в возвращает объект класса Point, содержащий пару чисел — целевую позицию и значение с датчика. repeat() работает почти как retry() — он повторяет весь поток с самого начала каждый раз, а retry() только после ошибки.
Исходный Observable будет выдавать точки по одной. Следующую точку он выдаст только когда предыдущая пройдет все операторы вплоть до подписчика. Это соответствует функциональному подходу с его ленивыми вычислениями и потоковой обработкой. Таким же образом работают StreamAPI и LINQ. За счет этого сканирование будет идти по точкам по очереди, а не forEach(this::moveMotor), затем forEach(this::blockUntilTargetReached) и так далее.
Применение:
Проблема в том, что подписчик не отличает на каком именно повторе была получена точка. То есть вместо n кривых, мы получим одну кривую в n раз длиннее. Нужно как-то вручную отслеживать, что началось новое сканирование. Например, считать количество точек и начинать новую кривую, если значение счетчика превысило количество точек в траектории. Или сравнивать пришедшую точку с первой точкой траектории.
В subscribe() появился третий аргумент — это обработчик onComplete(), который вызывается, когда в Observable закончились элементы.
subscribe() возвращает объект, имеющий интерфейс Subscription. Если вызвать у него метод unsubscribe(), то у Observable больше не будет подписчика, принимающего данные, поэтому он просто перестанет их выдавать. Принцип ленивых вычислений — если данные никому не нужны, то не нужно их передавать. Побочных эффектов у операторов все равно не должно быть в соответствии с парадигмой функционального программирования, поэтому просто выполнять операторы без подписчика у Observable смысла нет. С помощью unsubscribe() можно реализовать отмену сканирования. Разве что еще двигателю нужно послать команду на останов движения — за это unsubscribe() не отвечает.
Мы получили после сканирования много полезных данных, теперь их нужно обработать, рассчитать нужные значения и сгенерировать pdf-отчет.
В отчете так же должны быть значения некоторых полей из интерфейса (например, ФИО пользователя) и рисунок полученных графиков. В случае JavaFX рисунок можно получить методом snapshot(), который есть у каждого графического объекта. Так как это действия с объектами JavaFX, выполняться они должны в GUI-потоке. Для этого у нас уже есть FxScheduler.
m_reportInfoProvider — это реализация интерфейса ReportInfoProvider — прослойки между моделью и представлением. По сути это вызов геттеров из TextView, но модели все равно — у нее просто интерфейс.
Для расчетов есть Schedulers.computation().
Теперь мы хотим объединить данные из формы и данные из расчетов и поместить все это в тяжелый pdf-файл. Для этого есть оператор zip() и Schedulers.io():
zip() принимает до девяти разных Observable и соединяет элементы из них в кортежи. Функцию для соединения нужно предоставить самому как и результирующий тип для кортежей. В итоге получение данных из интерфейса (включая изображение графика) и обработка результатов сканирования проходят параллельно. Нужно ли распараллеливание таких действий, зависит от конкретных задач и объемов данных — я привел несколько упрощенный пример.
Стоит иметь в виду, что когда у нас несколько потоков данных, может проявляться backpressure. Это различные проблемы связанные с разной производительностью потоков и разной производительностью Observable и Observer. В общем, это ситуации, когда кто-то простаивает, а у кого-то уже через буфер переливается. Так что нужно быть аккуратным.
Скорее всего для этих задач есть другие решения (и более эффективные) — если кто-то мне их укажет, я с радостью приму это к сведению и учту в работе. На примере этих задач я постарался показать некоторые особенности RxJava: обработка ошибок, отличие subscribeOn() и observeOn(), кастомные планировщики и получение результата в GUI-потоке, принцип ленивых вычислений и его применение для управления внешними устройствами, прерывание работы Observable, параллельная работа нескольких Observable и их объединение. Так что даже если эти задачи не совсем удачные для RxJava, сами рассмотренные принципы могут быть полезны для других.
Собственно, задачи:
1. Простое клиентское TCP-соединение. Есть протокол поверх TCP/IP, нужно сформировать сообщение, подключиться к удаленному узлу, если еще не подключился, передать сообщение и прочитать ответ. Плюс обработка ошибок, проверка таймаутов, повтор отправки в случае неудачи. Жестких требований к производительности нет, трафик не большой.
2. Есть двигатель и некоторый датчик. Нужно произвести сканирование — пройтись двигателем по заданной траектории: послать двигатель к точке, дождаться, когда он к ней приедет, снять показания датчика, отобразить точку на графике (в GUI потоке), поехать к следующей точке…
3. Полученные после сканирования данные нужно обработать (условно длительный вычислительный процесс) и засунуть в pdf-отчет (условно длительный процесс ввода-вывода) вместе с изображением графика и данными введенными пользователем (GUI поток).
1. Простое клиентское TCP-соединение
Предположим, есть некоторый протокол обмена сообщениями. Сообщения могут содержать заголовок, контрольную сумму или что-нибудь еще. На каждое сообщение должен быть ответ от сервера. В простейшем виде решение может выглядеть как-то так:
public String send(String command) { try { if (!isConnected()) { connect(); } byte[] bytes = command.getBytes(); bytes = addHeader(bytes); sendBytes(bytes); return readAnswer(); } catch (IOException e) { // паника } }
Детали реализации я не описываю, но вкратце: connect() создает java.net.Socket и подключается к серверу, sendBytes() пишет в output-поток сокета, readAnswer() читает из input-потока сокета. Помимо addHeader() могут быть еще методы, добавляющие контрольную сумму, кодирование и прочее.
Проблемы этого кода: блокирующие запись/чтение и неудобная обработка ошибок — не понятно, что делать с исключением: то ли наверх пробрасывать, то ли тут что-то сделать (рекурсивно повторить отправку?). Как раз эти две проблемы и решает RxJava. Перепишем:
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .map(result -> readAnswer()); }
Применение:
connection.send("echo 123") .subscribe( answer -> { /*обработать ответ*/ }, throwable -> { /*обработать ошибку*/ } );
В общем-то, получилось то же самое, только в виде монады, набора операторов и с рядом нюансов.
Во-первых, метод sendBytes() теперь возвращает boolean. RxJava работает с потоками данных, а если кто-то возвращает void вместо данных, то потока как-бы уже и нет. Поэтому нужно либо добавить возвращаемый результат в метод (хотя бы return true), либо вместо map использовать doOnNext — этот оператор возвращает то же, что и получил.
Во-вторых, метод send() теперь возвращает Observable, а не сам String. Поэтому нужен отдельный обработчик ответа (или лямбда, как в примере). С исключением то же самое. Тут нужно, как говорится, начать мыслить асинхронно. Вместо самого результата, мы получаем объект, который потом когда-нибудь предоставит нам результат, а мы должны предоставить ему того, что этот результат получит. Вот только этот код все еще блокирующий, поэтому это асинхронное мышление не имеет смысла. Можно, правда, сделать обертку для String и вытащить результат из монады через замыкание этой обертки, но это уже грязные хаки, которые нарушают принципы функционального программирования.
Улучшим этот код. Начнем с обработки ошибок. RxJava отлавливает исключения, возникающие в операторах, и передает их подписчику. Второй аргумент метода subscribe() — это функциональный интерфейс Action1 — он как раз и отвечает за обработку исключения. Если какой-то из методов раньше мог кидать IOException или какое-то еще checked исключение, то теперь больше нельзя. Такие исключения нужно ловить руками и что-то с ними делать. Например, оборачивать в RuntimeException, чтобы предоставить дальнейшие решения RxJava. Но Action1 не сильно отличается от обычного try-catch подхода. У RxJava есть операторы для работы с ошибками: doOnError(), onErrorReturn(), onErrorResumeNext() и onExceptionResumeNext(). А еще есть банальный retry(), который тут как раз и нужен. Если возникла какая-то ошибка с подключением, то можно просто повторить отправку n-раз.
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT); }
Обработчик исключения, передаваемый в subscribe() будет вызван только в том случае, если все повторы закончатся с ошибкой. Для надежности еще вызываем disconnect() перед повторной попыткой, чтобы закрыть и обнулить сокет. Иначе в checkConnection() внутри при вызове isConnected() можем получить ложно положительное срабатывание, и все повторные попытки опять приведут к ошибке. Например, если сервер убил подключение по таймауту, то метод Socket.isConnected() на стороне клиента все еще будет возвращать true — со стороны клиента сокет же подключен, все норм.
Можно еще добавить таймаут на случай, если серверу поплохело, и клиент заблокировался на записи в сокет:
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT); }
Оператор timeout кидает исключение, если в течение заданного времени от Observable не поступило ни одного элемента. А исключения мы уже умеем обрабатывать.
Теперь вторая проблема — у нас все еще блокирующие операции, поэтому если вызывать send() из потока GUI, то можно получить подвисания интерфейса. Нужно просто сказать RxJava, чтобы все эти действия выполнялись в другом потоке.
Для этого есть операторы observeOn() и subscribeOn(). У многих людей возникают проблемы с пониманием, чем отличаются эти операторы — есть куча статей на эту тему и вопросов на stackoverflow. Давайте вновь поднимем эту тему и вместе подумаем, что нам нужно сейчас использовать. Вот что пишут в официальной документации:
SubscribeOn — specify the Scheduler on which an Observable will operate.
ObserveOn — specify the Scheduler on which an observer will observe this Observable.
Observable — это тот, кто поставляет данные. Observer — это тот, кто получает данные и что-то с ними делает. Нам нужно, чтобы все выполнялось в другом потоке. Вернее, нам нужно, чтобы наш Observable поставлял данные изначально в другом потоке. А раз данные поставляются в другом потоке, то и все observer'ы будут их обрабатывать в другом потоке. Это по определению subscribeOn() — он определяет планировщика для Observable, которого мы создали в самом начале:
public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT) .subscribeOn(Schedulers.io()); }
Теперь операторы будут выполняться в потоке, который им предоставит планировщик io. Если несколько раз подряд вызвать send(), не дожидаясь ответа, то могут возникнуть проблемы с синхронизацией. По-хорошему, функции переданные в операторы, должны быть чистыми (без побочных эффектов), но в случае с сокетом это проблематично. Чистые функции вообще не очень дружат с вводом/выводом. Нужно синхронизировать обращения к сокету или реализовать что-то типа ConnectionPool'а — тут нужно исходить из задачи.
Стоит иметь в виду, что тогда и обработка ответа подписчиком (он тоже observer) будет осуществляться в другом потоке, а это не всегда хорошо. Например, если мы хотим отобразить ответ в графическом интерфейсе, то скорее всего получим исключение, что мы это делаем не в главном потоке. Для этого нужно поместить обработчик в очередь событий фреймворка, отвечающего за графический интерфейс. В разных фреймворках это делается по-разному. В JavaFX для этого есть метод Platform.runLater(runnable). Можно вызывать его напрямую в обработчике ответа, а можно написать свой планировщик:
public final class FxScheduler extends Scheduler { private final static FxScheduler m_instance = new FxScheduler(); private FxScheduler() {} public static FxScheduler getInstance() { return m_instance; } @Override public Worker createWorker() { return new Worker() { private final CompositeSubscription m_subscription = new CompositeSubscription(); @Override public Subscription schedule(Action0 action0) { Platform.runLater(action0::call); return m_subscription; } @Override public Subscription schedule(Action0 action0, long delay, TimeUnit timeUnit) { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { Platform.runLater(action0::call); } }, timeUnit.toMillis(delay)); return m_subscription; } @Override public void unsubscribe() { m_subscription.unsubscribe(); } @Override public boolean isUnsubscribed() { return m_subscription.isUnsubscribed(); } }; } }
К слову, для Android существует AndroidSchedulers.mainThread() в RxAndroid — дополнении для RxJava. Пример отправки команды тогда имеет вид:
send("echo 123") .observeOn(FxScheduler.getInstance()) .subscribe( answer -> { /*обработать ответ*/ }, throwable -> { /*обработать ошибку*/ } );
Здесь мы используем уже observeOn() — нам нужно сообщить RxJava, что «следующий observer должен выполняться через такой-то планировщик».
RxJava предоставляет удобное управление конвейером операторов. Рядом с .map(bytes -> sendBytes(bytes)) можно добавить расчет контрольной суммы, потом прогнать байты через кодирование. Можно добавить вначале логгирование исходящей команды, а в конце — полученного ответа. В общем, идею вы поняли.
2. Сканирование с помощью двигателя и датчика
Есть набор точек — это могут быть углы поворота двигателя в градусах или позиции устройства, которое приводится в движение двигателем. В общем, есть какой-то актуатор. А еще есть внешний датчик, с которого можно получать значения. Нужно проехаться двигателем по набору точек, в каждой точке получить значение с датчика, построить кривую на графике. Повторить процедуру n раз (n кривых на графике). При этом двигатель работает не мгновенно, нужно ждать, когда он выйдет на позицию.
Итак, мы имеем набор точек, для каждой нужно что-то сделать (желательно в другом потоке), а результат обрабатывается в GUI потоке (добавить точку на LineChart, например). Похоже на типичную задачу для RxJava.
public Observable<Point> startScan(List<Double> trajectory, int iterationCount) { return Observable.from(trajectory) .subscribeOn(Schedulers.io()) .doOnNext(this::moveMotor) .doOnNext(this::blockUntilTargetReached) .map(this::createResultPoint) .repeat(iterationCount); }
Используем Schedulers.io(): управление двигателем и датчиком — это все-таки операции ввода-вывода. moveMotor() посылает команду двигателю (через написанный ранее Connection, например).
blockUntilTargetReached() запрашивает у двигателя его позицию, сравнивает с целевой и усыпляет поток на сколько-то миллисекунд, если двигатель еще не доехал. createResultPoint() запрашивает у датчика значение в возвращает объект класса Point, содержащий пару чисел — целевую позицию и значение с датчика. repeat() работает почти как retry() — он повторяет весь поток с самого начала каждый раз, а retry() только после ошибки.
Исходный Observable будет выдавать точки по одной. Следующую точку он выдаст только когда предыдущая пройдет все операторы вплоть до подписчика. Это соответствует функциональному подходу с его ленивыми вычислениями и потоковой обработкой. Таким же образом работают StreamAPI и LINQ. За счет этого сканирование будет идти по точкам по очереди, а не forEach(this::moveMotor), затем forEach(this::blockUntilTargetReached) и так далее.
Применение:
final List<Double> trajectory = ...; final int n = ...; startScan(trajectory, n) .observeOn(FxScheduler.getInstance()) .subscribe( point -> processPoint(point), throwable -> processError(throwable), () -> processData() );
Проблема в том, что подписчик не отличает на каком именно повторе была получена точка. То есть вместо n кривых, мы получим одну кривую в n раз длиннее. Нужно как-то вручную отслеживать, что началось новое сканирование. Например, считать количество точек и начинать новую кривую, если значение счетчика превысило количество точек в траектории. Или сравнивать пришедшую точку с первой точкой траектории.
В subscribe() появился третий аргумент — это обработчик onComplete(), который вызывается, когда в Observable закончились элементы.
subscribe() возвращает объект, имеющий интерфейс Subscription. Если вызвать у него метод unsubscribe(), то у Observable больше не будет подписчика, принимающего данные, поэтому он просто перестанет их выдавать. Принцип ленивых вычислений — если данные никому не нужны, то не нужно их передавать. Побочных эффектов у операторов все равно не должно быть в соответствии с парадигмой функционального программирования, поэтому просто выполнять операторы без подписчика у Observable смысла нет. С помощью unsubscribe() можно реализовать отмену сканирования. Разве что еще двигателю нужно послать команду на останов движения — за это unsubscribe() не отвечает.
3. Обработка данных и отчет
Мы получили после сканирования много полезных данных, теперь их нужно обработать, рассчитать нужные значения и сгенерировать pdf-отчет.
В отчете так же должны быть значения некоторых полей из интерфейса (например, ФИО пользователя) и рисунок полученных графиков. В случае JavaFX рисунок можно получить методом snapshot(), который есть у каждого графического объекта. Так как это действия с объектами JavaFX, выполняться они должны в GUI-потоке. Для этого у нас уже есть FxScheduler.
class ReportMetaInfo { private String fileName; private String name; private WritableImage image; } final Observable<ReportMetaInfo> reportGuiData = Observable.just(m_reportInfoProvider) .subscribeOn(FxScheduler.getInstance()) .map(provider -> { final ReportMetaInfo info = new ReportMetaInfo(); info.fileName = provider.getFileName(); info.name = provider.getName(); info.image = provider.getChartSnapshot(); return info; });
m_reportInfoProvider — это реализация интерфейса ReportInfoProvider — прослойки между моделью и представлением. По сути это вызов геттеров из TextView, но модели все равно — у нее просто интерфейс.
Для расчетов есть Schedulers.computation().
final Observable<ScanResult> reportComputationalData = Observable.just(scanData) .subscribeOn(Schedulers.computation()) .map(data -> new ResultProcessor(data).calculateAll());
Теперь мы хотим объединить данные из формы и данные из расчетов и поместить все это в тяжелый pdf-файл. Для этого есть оператор zip() и Schedulers.io():
class ReportData { ReportMetaInfo metaInfo; ScanResult result; ReportData(ReportMetaInfo metaInfo, ScanResult result) { this.metaInfo = metaInfo; this.result = result; } } Observable.zip( reportGuiData, reportComputationalData, (reportInfo, scanResult) -> new ReportData(reportInfo, scanResult) ) .observeOn(Schedulers.io()) .map(reportData -> ReportGenerator.createPdf( reportData.metaInfo.fileName, reportData.metaInfo.name, reportData.metaInfo.image, reportData.result )).subscribe( isOk -> { /* здесь, в общем-то, делать нечего */ }, throwable -> { /* что-то пошло не так */ }, () -> { /* здесь мы окажемся, если все прошло успешно */ } );
zip() принимает до девяти разных Observable и соединяет элементы из них в кортежи. Функцию для соединения нужно предоставить самому как и результирующий тип для кортежей. В итоге получение данных из интерфейса (включая изображение графика) и обработка результатов сканирования проходят параллельно. Нужно ли распараллеливание таких действий, зависит от конкретных задач и объемов данных — я привел несколько упрощенный пример.
Стоит иметь в виду, что когда у нас несколько потоков данных, может проявляться backpressure. Это различные проблемы связанные с разной производительностью потоков и разной производительностью Observable и Observer. В общем, это ситуации, когда кто-то простаивает, а у кого-то уже через буфер переливается. Так что нужно быть аккуратным.
Заключение
Скорее всего для этих задач есть другие решения (и более эффективные) — если кто-то мне их укажет, я с радостью приму это к сведению и учту в работе. На примере этих задач я постарался показать некоторые особенности RxJava: обработка ошибок, отличие subscribeOn() и observeOn(), кастомные планировщики и получение результата в GUI-потоке, принцип ленивых вычислений и его применение для управления внешними устройствами, прерывание работы Observable, параллельная работа нескольких Observable и их объединение. Так что даже если эти задачи не совсем удачные для RxJava, сами рассмотренные принципы могут быть полезны для других.