Весной 2020 года вышла свежая версия фреймворка RxJava – RxJava 3. Давайте рассмотрим, в чем заключаются основные изменения, как можно перейти с RxJava 2 на новую версию и стоит ли вообще мигрировать.
Отметим, что в новой версии нет глобальных изменений, но появилась поддержка Java 8, а также библиотекой стало удобнее пользоваться.
Дисклеймер: эта статья основана на обзоре GitHub, кроме того, мы делимся впечатлениями наших мобильных разработчиков от RxJava 3, но не претендуем на исчерпывающее исследование – потому что новая версия вышла недавно и практического опыта пока немного. Если у вас есть дополнения, напишите в комментариях, будем рады обсудить)
Итак, что мы видим в RxJava 3:
— Streams
— Stream Collectors
— Optional
— CompletableFeature
*Для использования Java8 API нужно поднять minSDK до 24 версии.
В свою очередь, разработчики убрали поддержку таких фич, как:
Также изменилась структура пакетов:
Представленные изменения можно разделить на 2 группы:
Ранее одна из проблем RxJava 2 заключалась в том, что в некоторых кейсах ошибки могли потеряться и не быть обработаны. Теперь в RxJava 3 отписка от источника, который может выбросить ошибку, инициирует прокидывание этой ошибки в общий обработчик ошибок через RxJavaPlugins.onError()
В RxJava 2 существовала проблема с горячими источниками: при получении ConnectableObservable терминального события новые подключения игнорировали все элементы и получали только событие terminate.
В RxJava 3 появилась функция сброса состояния ConnectableObservable с помощью функции reset(), чтобы дать возможность вновь подключенным подписчикам обрабатывать данные.
В RxJava 3 после получения определенного количества значений Flowable.publish ставится на паузу, при этом оставшиеся элементы доступны для следующих подписчиков.
Если вы попытаетесь вызвать PublishProcessor.offer(), BehaviourProcessor.offer() или MulticastProcessor.offer() и передать null, происходит выброс NullPointerException вместо передачи ошибки в onError-обработчик, при этом инициируется терминальное состояние.
Ранее в RxJava 2 метод getCause() иногда значительно нагружал память, вызывая метод initCause на каждое исключение, работал нестабильно, не всегда генерировал цепочку исключений.
В новой версии данный метод изменен изнутри и теперь генерирует цепочку ошибок в виде стектрейса – простым форматированием строк.
При невалидном параметре некоторые операторы теперь будут бросать IllegalArgumentException вместо IndexOutOfBoundsException:
— skip
— skipLast
— takeLast
— takeLastTimed
Библиотеки fromAction() и fromRunnable() стали консистентными с остальными fromX()-вызовами. Колбеки fromRunnable() и fromAction() теперь будут закрыты моментально при соответствующем вызове. В RxJava 2 закрытие этих операторов происходило после окончания выполнения тела лямбды, переданной в параметры.
У оператора using() есть параметр, отвечающий за то, когда будет очищен используемый ресурс (true – до завершения, false – после). В ранней версии библиотеки этот параметр игнорировался, и ресурс всегда очищался до получения терминального состояния, но в RxJava 3 все работает корректно.
В RxJava 3 в связи с расширением исключений функциональных интерфейсов с Exception до Throwable введен новый тип Supplier – аналог Callable, но с throws Throwable
В связи с тем, что RxJava3 будет поддерживать Java8 API, на смену отдельным классам-фабрикам пришло новое решение: методы этих фабрик стали статическими методами интерфейса самого Disposable.
Как было ранее:
Стало:
Flowable promotions
dematerialize(Function)
Observable promotions
dematerialize(Function)
Maybe promotions
doOnTerminate(Action)
materialize()
Single promotions
dematerialize(Function)
materialize()
Complectable promotions
delaySubscription(long, TimeUnit)
delaySubscription(long, TimeUnit, Scheduler)
materialize()
Flowable
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Observable
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Maybe
startWith(Publisher)
startWith(ObservableSource)
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Single
startWith(Publisher)
startWith(ObservableSource)
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Complectable
startWith(MaybeSource)
startWith(SingleSource)
Добавлен новый оператор fromOptional() для Flowable, Observable и Maybe
Добавлен новый оператор fromStream() для Flowable и Observable
Добавлен новый оператор fromCompletionStage() для всех пяти типов источников данных
Добавлен новый оператор mapOptional() для Flowable, Observable, Maybe и Single. Он пропускает только non-empty значения
Добавлен новый оператор blockingStream() для Flowable и Observable. Оператор представляет поток данных в виде stream, при этом рекомендуется закрывать stream по завершению работы с ним, чтобы не допустить всевозможных утечек
Добавлены новые операторы flatMapStream() и concatMapStream() для Flowable и Observable – позволяют преобразовать каждый item в отдельный stream
Добавлены новые операторы flattenStreamAsFlowable() и flattenStreamAsObservable() для Maybe и Single – эквивалентные операторы flatMapStream() для Observable и Flowable
Maybe.toSingle(), замена – Maybe.defaultIfEmpty()
subscribe(…, …, …, ), замена – doOnSubscribe()
Single.toCompletable(), замена – Single.ignoreElement()
Completable.blockingGet(), замена – Completable.blockingAwait()
replay(Scheduler), замена – observeOn(Scheduler)
dematerialize(), замена – deserialize(Function)
onExceptionResumeNext(), замена – onErrorResumeNext(Function)
combineLatest() (с vararg-параметром), замена – combineLatestArray()
fromFuture() (с Scheduler-параметром), замена – subscribeOn()
concatMapIterable() (с buffer-параметром), замена – concatMapIterable(Function)
Также из TestSubscriber и TestObserver удалены следующие методы:
Многие разработчики отмечают, что миграция с RxJava 2 на RxJava 3 — достаточно трудоемкий процесс. Для осуществления перехода есть два варианта:
Итак, как мигрировать:
Мы рассмотрели, что нового появилось в RxJava 3. А теперь попробуем ответить на главный вопрос — стоит ли вообще мигрировать?
RxJava 3 практически представляет собой улучшение API. В связи с тем, что кардинальных изменений не было, в целом нет необходимости прямо сейчас мигрировать на последнюю версию. Сам по себе переход требует усилий, при этом многие сторонние библиотеки все еще связаны с RxJava 2, для работы с Java8 дополнительно нужно будет поднять minSDK до 24 версии. В некоторых командах есть и альтернативные решения — например, использовать Kotlin Coroutines.
Однако, стоит отметить, что RxJava 2 теперь переходит в режим “обслуживания”, и это означает, что из обновлений будут только багфиксы. Ожидается, что поддержка второй версии закончится 28 февраля 2021 года.
Спасибо за внимание! Надеемся, что этот материал был вам полезен. Авторские материалы для разработчиков мы также публикуем в наших соцсетях – ВКонтакте и Telegram.
Отметим, что в новой версии нет глобальных изменений, но появилась поддержка Java 8, а также библиотекой стало удобнее пользоваться.
Дисклеймер: эта статья основана на обзоре GitHub, кроме того, мы делимся впечатлениями наших мобильных разработчиков от RxJava 3, но не претендуем на исчерпывающее исследование – потому что новая версия вышла недавно и практического опыта пока немного. Если у вас есть дополнения, напишите в комментариях, будем рады обсудить)
Ключевые изменения в RxJava 3
Итак, что мы видим в RxJava 3:
- Базовая версия Java теперь увеличена до 8*;
- Появилась поддержка таких фич языка, как:
— Streams
— Stream Collectors
— Optional
— CompletableFeature
*Для использования Java8 API нужно поднять minSDK до 24 версии.
В свою очередь, разработчики убрали поддержку таких фич, как:
- java.time.Duration – порождает большое количество перегрузок, всегда может быть заменено на time + unit;
- java.util.function – не может кидать исключения, при этом перегрузки способны создать лишнюю “двусмысленность”.
Также изменилась структура пакетов:
Представленные изменения можно разделить на 2 группы:
- Поведенческие изменения
- Изменения API
Поведенческие изменения
- Все ошибки будут обработаны
Ранее одна из проблем RxJava 2 заключалась в том, что в некоторых кейсах ошибки могли потеряться и не быть обработаны. Теперь в RxJava 3 отписка от источника, который может выбросить ошибку, инициирует прокидывание этой ошибки в общий обработчик ошибок через RxJavaPlugins.onError()
- Connectable.reset()
В RxJava 2 существовала проблема с горячими источниками: при получении ConnectableObservable терминального события новые подключения игнорировали все элементы и получали только событие terminate.
В RxJava 3 появилась функция сброса состояния ConnectableObservable с помощью функции reset(), чтобы дать возможность вновь подключенным подписчикам обрабатывать данные.
- Возможность поставить Flowable.publish на паузу
В RxJava 3 после получения определенного количества значений Flowable.publish ставится на паузу, при этом оставшиеся элементы доступны для следующих подписчиков.
- Processor.offer() null-параметр
Если вы попытаетесь вызвать PublishProcessor.offer(), BehaviourProcessor.offer() или MulticastProcessor.offer() и передать null, происходит выброс NullPointerException вместо передачи ошибки в onError-обработчик, при этом инициируется терминальное состояние.
- CompositeException.getCause()
Ранее в RxJava 2 метод getCause() иногда значительно нагружал память, вызывая метод initCause на каждое исключение, работал нестабильно, не всегда генерировал цепочку исключений.
В новой версии данный метод изменен изнутри и теперь генерирует цепочку ошибок в виде стектрейса – простым форматированием строк.
- Изменены исключения при валидации параметра
При невалидном параметре некоторые операторы теперь будут бросать IllegalArgumentException вместо IndexOutOfBoundsException:
— skip
— skipLast
— takeLast
— takeLastTimed
- Предварительное закрытие источников для fromX()
Библиотеки fromAction() и fromRunnable() стали консистентными с остальными fromX()-вызовами. Колбеки fromRunnable() и fromAction() теперь будут закрыты моментально при соответствующем вызове. В RxJava 2 закрытие этих операторов происходило после окончания выполнения тела лямбды, переданной в параметры.
- Порядок очищения ресурса при использовании using()
У оператора using() есть параметр, отвечающий за то, когда будет очищен используемый ресурс (true – до завершения, false – после). В ранней версии библиотеки этот параметр игнорировался, и ресурс всегда очищался до получения терминального состояния, но в RxJava 3 все работает корректно.
Изменения API
- Обработка исключений функциональных интерфейсов новой версии библиотеки расширена с Exception до Throwable
- Новые типы: Supplier
В RxJava 3 в связи с расширением исключений функциональных интерфейсов с Exception до Throwable введен новый тип Supplier – аналог Callable, но с throws Throwable
- В RxJava 3 операторы конвертации одного источника данных в другой были изменены на конкретные конвертеры
- Перемещенные компоненты
В связи с тем, что RxJava3 будет поддерживать Java8 API, на смену отдельным классам-фабрикам пришло новое решение: методы этих фабрик стали статическими методами интерфейса самого Disposable.
Как было ранее:
Стало:
- С целью сокращения warnings класс DisposableContainer, который использовался внутри CompositeDisposable, сделан частью публичного API
- Часть операторов в RxJava 2 находились на экспериментальной стадии, а в третьей версии они стали стандартными операторами:
Flowable promotions
dematerialize(Function)
Observable promotions
dematerialize(Function)
Maybe promotions
doOnTerminate(Action)
materialize()
Single promotions
dematerialize(Function)
materialize()
Complectable promotions
delaySubscription(long, TimeUnit)
delaySubscription(long, TimeUnit, Scheduler)
materialize()
- В новой версии библиотеки оператор concatMap() в целях гибкости управления потоками был перегружен Scheduler’ом
- Добавлена перегрузка оператора blockingForEach() с возможностью указать размер буфера
- Оператор blockingSubscribe() теперь доступен для использования в Maybe, Single и Completable
- Оператор onErrorComplete() теперь доступен во всех пяти типах источников данных
- Оператор onErrorResumeWith() теперь доступен для использования в Completable
- Оператор retryUntil() теперь доступен для использования в Single и Completable
- Оператор switchOnNext() и switchOnNextDelayError() теперь доступны для использования в Maybe, Single и Completable
- Оператор dematerialize() теперь доступен для использования в Maybe
- Произошел апдейт различных типов источников данных fromX()-операторами
- Оператор timeInterval() теперь доступен для использования в Maybe и Single
- Оператор toFuture() теперь доступен для использования в Maybe и Completable
- Оператор ofType() теперь доступен для использования в Maybe и Single
- Оператор doOnLifecycle() теперь доступен для использования в Maybe, Single и Completable
- Оператор concatMapX() (X – другой тип источника данных) теперь доступен для использования в Maybe и Single
- Оператор concatDelayError() теперь доступен для использования в Maybe, Single и Completable
- Оператор mergeArray() теперь доступен для использования в Single
- Оператор startWith() теперь доступен для использования во всех пяти типах источников данных
Flowable
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Observable
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Maybe
startWith(Publisher)
startWith(ObservableSource)
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Single
startWith(Publisher)
startWith(ObservableSource)
startWith(MaybeSource)
startWith(SingleSource)
startWith(ComplectableSource)
Complectable
startWith(MaybeSource)
startWith(SingleSource)
- Оператор onErrorReturn() теперь доступен для использования в Completable
- Оператор safeSubscribe() теперь доступен для использования в Maybe, Single и Completable
- Оператор flatMap() теперь доступен для использования в Single
- Оператор concatEager() и concatEagerDelayError() теперь доступны для использования в Flowable, Observable, Maybe и Single
- Добавлен оператор fromSupplier() во все пять типов источников данных
Новое с Java8
Добавлен новый оператор fromOptional() для Flowable, Observable и Maybe
Добавлен новый оператор fromStream() для Flowable и Observable
Добавлен новый оператор fromCompletionStage() для всех пяти типов источников данных
Добавлен новый оператор mapOptional() для Flowable, Observable, Maybe и Single. Он пропускает только non-empty значения
Добавлен новый оператор blockingStream() для Flowable и Observable. Оператор представляет поток данных в виде stream, при этом рекомендуется закрывать stream по завершению работы с ним, чтобы не допустить всевозможных утечек
Добавлены новые операторы flatMapStream() и concatMapStream() для Flowable и Observable – позволяют преобразовать каждый item в отдельный stream
Добавлены новые операторы flattenStreamAsFlowable() и flattenStreamAsObservable() для Maybe и Single – эквивалентные операторы flatMapStream() для Observable и Flowable
Что переименовано
- Вместо startWith() – startWithArray(), startWithIterable(), startWithItem()
- Вместо onErrorResumeNext() – onErrorResumeWith()
- Вместо zipIterable() – zip()
- Вместо combineLatest() (с аргументом-массивом) – combineLatestArray(), combineLatestArrayDelayError()
- Вместо Single.equals() – sequenceEqual(SingleSource, SingleSource)
- Вместо Maybe.flatMapSingleElement() – Maybe.flatMapSingle()
- Вместо Callable – Supplier (при использовании лямбд требуется только рекомпиляция)
Что удалено из API
Maybe.toSingle(), замена – Maybe.defaultIfEmpty()
subscribe(…, …, …, ), замена – doOnSubscribe()
Single.toCompletable(), замена – Single.ignoreElement()
Completable.blockingGet(), замена – Completable.blockingAwait()
replay(Scheduler), замена – observeOn(Scheduler)
dematerialize(), замена – deserialize(Function)
onExceptionResumeNext(), замена – onErrorResumeNext(Function)
combineLatest() (с vararg-параметром), замена – combineLatestArray()
fromFuture() (с Scheduler-параметром), замена – subscribeOn()
concatMapIterable() (с buffer-параметром), замена – concatMapIterable(Function)
Также из TestSubscriber и TestObserver удалены следующие методы:
Как мигрировать
Многие разработчики отмечают, что миграция с RxJava 2 на RxJava 3 — достаточно трудоемкий процесс. Для осуществления перехода есть два варианта:
- иметь у себя на проекте обе версии библиотеки;
- осуществить полную миграцию на RxJava 3, при этом можно использовать специальную библиотеку.
Итак, как мигрировать:
- Актуализировать работу с API — использовать аналоги тех методов RxJava 2, которые были подвержены изменению.
- Важно учитывать, что часть сторонних библиотек все еще “сидит” на RxJava 2. Для упрощения перехода можно взять RxJavaBridge, который прячет большую часть миграции под капот.
- При использовании в проекте Retrofit нужно подтянуть RxJava3CallAdapterFactory или сделать свою фабрику для корректного маппинга ответа.
Заключение
Мы рассмотрели, что нового появилось в RxJava 3. А теперь попробуем ответить на главный вопрос — стоит ли вообще мигрировать?
RxJava 3 практически представляет собой улучшение API. В связи с тем, что кардинальных изменений не было, в целом нет необходимости прямо сейчас мигрировать на последнюю версию. Сам по себе переход требует усилий, при этом многие сторонние библиотеки все еще связаны с RxJava 2, для работы с Java8 дополнительно нужно будет поднять minSDK до 24 версии. В некоторых командах есть и альтернативные решения — например, использовать Kotlin Coroutines.
Однако, стоит отметить, что RxJava 2 теперь переходит в режим “обслуживания”, и это означает, что из обновлений будут только багфиксы. Ожидается, что поддержка второй версии закончится 28 февраля 2021 года.
Спасибо за внимание! Надеемся, что этот материал был вам полезен. Авторские материалы для разработчиков мы также публикуем в наших соцсетях – ВКонтакте и Telegram.