company_banner

Справочник по источникам событий в Rx

  • Tutorial


RxJava используется в большом количестве android-приложений, но при этом многие не знают других источников событий, кроме Observable и, может быть, Flowable. Они забывают о специализированных классах Single, Maybe и Completable, которые зачастую способны добавить больше ясности в код.

Под катом вас ждёт шпаргалка по источникам событий, которые существуют в RxJava.

Completable фактически является Rx-аналогом Runnable. Он представляет собой операцию, которая может быть выполнена или нет. Если проводить аналогию с Kotlin, то это fun completable() из мира Rx. Соответственно, для подписки на него необходимо реализовать onComplete и onError. Он не может быть создан из значения (Observable#just, ...), потому что не рассчитан на это.

Single — реактивный Callable, потому что тут появляется возможность вернуть результат операции. Продолжая сравнение с Kotlin, можно сказать, что Single — это fun single(): T { }. Таким образом, чтобы подписаться на него, необходимо реализовать onSuccess(T) и onError.

Maybe — нечто среднее между Single и Completable, потому что поддерживает одно значение, отсутствие значений и ошибку. Тут сложнее провести однозначную параллель с методами, но я думаю, что Maybe — это fun maybe(): T? { }, которая возвращает null, когда результата нет. Несложно догадаться, что для подписки потребуется определить onSuccess(T), onComplete и onError.
Тут важно обратить внимание, что onSuccess(T) и onComplete — взаимоисключающие. Т.е. в случае вызова первого можно не ждать второго.
Observable — наиболее часто встречающийся источник, что обусловлено его универсальностью. Он умеет как не производить события вообще, так и генерировать множество таковых, поэтому его можно использовать всегда, когда не подходят остальные варианты. Несмотря на это, у Observable есть недостаток — он совершенно не умеет обрабатывать backpressure. Для подписки на него нужны onNext(T), onError и onComplete.

Backpressure — ситуация, когда новые события поступают существенно быстрее, чем успевают обрабатываться, и начинают скапливаться в буфере, переполняя его. Это может привести к неприятностям вроде OutOfMemoryError. Подробнее можно посмотреть тут.

ConnectableObservable — прогретый вариант Observable. Все источники данных начинают выдавать свой поток событий в момент подписки. Но только не этот парень. Для этого ConnectableObservable ждёт вызова connect. Сделано это для того, чтобы несколько наблюдателей могли обозревать один поток событий, не перезапуская его при каждой подписке. Для иллюстрации приведу следующий сниппет:

val observable = Observable.fromCallable {
   Log.d("RxLogs", "observable fromCallable executed")
   Thread.sleep(1000)
}.subscribeOn(Schedulers.computation())

observable.subscribe()
observable.subscribe()
observable.subscribe()
observable.subscribe()
В консоли будет:
observable fromCallable executed
observable fromCallable executed
observable fromCallable executed
observable fromCallable executed

val connectedObservable = Observable.fromCallable {
   Log.d("RxLogs", "connectedObservable fromCallable executed")
   Thread.sleep(1000)
}.subscribeOn(Schedulers.computation())
   .publish()

connectedObservable.subscribe()
connectedObservable.subscribe()
connectedObservable.subscribe()
connectedObservable.subscribe()

connectedObservable.connect()

А в этом случае: observable fromCallable executed

Flowable — источник, предоставляющий дополнительные операторы для обработки backpressure. Когда требуется обрабатывать более 10000 событий, происходящих быстро одно за другим, рекомендуется использовать его вместо Observable.

Последний может создавать ConnectableFlowable, открывающий те же возможности, что и ConnectableObservable.

Говоря о генераторах событий, нельзя не упомянуть Subject и Processor.

Subject — класс, который может быть и источником, и обозревателем. Это позволяет использовать его, например, в разного рода контроллерах, которые будут отдавать его наружу в виде Observable и внутри оповещать как Observer. Далее пройдёмся по разным реализациям этого класса.

AsyncSubject/AsyncProcessor держит последнее событие до корректного завершения потока, после чего отдаёт его подписчикам. При возникновении ошибки никакие события проброшены не будут.

image

PublishSubject/PublishProcessor пробрасывает приходящие в него события дальше, пока не поступит терминальный сигнал. После конца потока или ошибки он возвращает соответствующие события.

image

BehaviorSubject/BehaviorProcessor работает аналогично PublishSubject/PublishProcessor, но при подписке возвращает последнее событие, если оно есть и если Subject не перешёл в терминальное состояние.

image

ReplaySubject/ReplayProcessor — BehaviourSubject/BehaviorProcessor на стероидах. Возвращает не одно последнее событие, а сколько душе угодно. Если подписаться на завершённый ReplaySubject или ReplayProcessor, то будут получены все накопленные данные.

image
Таким образом, ReplaySubject.createWithSize(1) и BehaviourSubject.create() после перехода в терминальное состояние работают по-разному. Во время подписки первый вернёт последнее событие, а второй нет. Оно же верно и для ReplayProcessor.
CompletableSubject, MaybeSubject и SingleSubject работают аналогично PublishSubject, только рассчитаны на использование с Completable, Maybe и Single соответственно.

UnicastSubject/UnicastProcessor — это фактически ReplaySubject, который следит, чтобы у него был только один подписчик. Он выбрасывает IllegalStateException при попытке повторной подписки.

image

Т.е. следующий сниппет

val subject = UnicastSubject.create<String>(3)
repeat(3) {
   subject.onNext(it.toString())
}

subject.onComplete()

subject.subscribe({
   Log.d("RxLogs", it)
}, {
}, {
   Log.d("RxLogs", "complete")
})

выведет в лог
0
1
2
complete

MulticastProcessor работает по аналогии с PublishProcessor, за исключением одной небольшой особенности. Он умеет обрабатывать backpressure для входящего в него потока. MulticastProcessor позволяет задать размер буфера, в котором он будет предзапрашивать элементы из upstream для будущих подписчиков.

На схеме ниже создаётся процессор с хранилищем на 2 элемента, которые он сразу запрашивает у своего источника. Поэтому, когда на него подписывается первый наблюдатель, он тут же выдаёт содержимое буфера, который моментально заполняется новыми событиями. После терминального события MulticastProcessor очищает своё хранилище и новые подписчики сразу получают завершение потока.

image
  • +39
  • 7,3k
  • 4
FunCorp
202,34
Разработка развлекательных сервисов
Поделиться публикацией

Комментарии 4

    +1
    Я почему-то думал, что для подписки требуется subscribe(), а onNext, onSuccess, onComplete и onError реализуются при необходимости их обработки. Так ведь?
      0
      Это верно. Я имел в виду доступные для получения коллбеки внутри subscribe()
      0
      возможно опечатка
      > Если проводить аналогию с Kotlin, то это fun completable() из мира Rx Kotlin.
        0
        Не, тут все верно. Тут это (Completable) из мира Rx, а не функция

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое