Pull to refresh

Введение в RxJava: Жизненный цикл подписки

Reading time4 min
Views59K
image

Одна из главных идей, лежащих в основе Rx, заключается в том, что неизвестно когда именно последовательность выдаст новое значение или завершится. Однако, у нас есть возможность управлять временем в которое мы начнем или закончим получать эти значения. К тому же, если наши подписчики используют внешние ресурсы, то мы вероятно захотим освободить их по окончанию некой последовательности.

Содержание:

  • Часть первая – Вступление
    1. Почему Rx?
    2. Ключевые типы
    3. Жизненный цикл подписки
  • Часть вторая – Последовательности
    1. Создание последовательности
    2. Фильтрация последовательности
    3. Исследование
    4. Агрегация
    5. Трансформация
  • Часть третья – Управление последовательностями
  • Часть четвертая – Параллельность

Подписка


Существует несколько перегруженных методов Observable::subscribe, выполняющих одну и ту же функцию

Subscription    subscribe()
Subscription    subscribe(Action1<? super T> onNext)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription    subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription    subscribe(Observer<? super T> observer)
Subscription    subscribe(Subscriber<? super T> subscriber)

subscribe() поглощает события, но сам по себе не выполняет непосредственных действий. Его перегруженные версии, имеющие хотя бы один параметр типа Action, создают объект Subscriber. Если не передать функции для событий onError и onCompleted, они попросту проигнорируются.

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));

Вывод
0
java.lang.Exception: Oops

Если не передать функцию для обработки ошибок, будет выброшено OnErrorNotImplementedException в месте, где на стороне провайдера вызван s.onError. В данном случае, провайдер[1] и потребитель[2] находятся в одном блоке кода, что позволяет использовать традиционный try-catch. Однако в реальности, провайдер и потребитель могут находится в разных местах. В таком случае, если потребитель не предоставит функцию для обработки ошибок, он никогда не узнает когда и по какой причине завершилась последовательность.

Отписка


Вы можете перестать получать данные еще до того как последовательность завершится. Все перегрузки метода subscribe возвращают объект интерфейса Subscribtion, который имеет 2 метода:

boolean isUnsubscribed()
void unsubscribe()

Вызов unsubscribe остановит поступление событий в observer.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription = values.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e),
    () -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);

Вывод
0
1

Отписав одного подписчика, мы никак не повлияем на других подписчиков этого же ovbservable.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v)
);
Subscription subscription2 = values.subscribe(
    v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);

Вывод
First: 0
Second: 0
First: 1
Second: 1
Unsubscribed first
Second: 2


onError и onCompleted


onError и onCompleted означают завершение последовательности. Добросовестный observable, который следует контрактам Rx перестанет выдавать значения после наступления одного из этих событий. Это то, что следует всегда помнить при создании собственного Observable.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v),
    e -> System.out.println("First: " + e),
    () -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);

Вывод
First: 0
First: 1
Completed


Освобождение ресурсов


Подписка удерживает в памяти ресурсы, с которыми связана. Эти ресурсы не будут автоматически освобождены при выходе объекта Subscription из области видимости. Если после вызова метода subscribe проигнорировать возвращаемое значение, то существует риск потерять единственную возможность отписаться. Подписка будет существовать далее, в то время как доступ к ней будет потерян, что может привести к утечке памяти и нежелательным действиям.

Существуют исключения. Например, при вызове перегруженных методов subscribe, неявно конструирующих объект Subscriber, будет создан механизм, который автоматически отвяжет подписчиков в момент когда завершится последовательность. Однако, даже в таком случае, следует помнить о бесконечных последовательностях. Вам все равно будет необходим объект Subscription, чтобы в какой-то момент прекратить получать данные от них.

Существует несколько реализаций интерфейса Subscription:
  • BooleanSubscription
  • CompositeSubscription
  • MultipleAssignmentSubscription
  • RefCountSubscription
  • SafeSubscriber
  • Scheduler.Worker
  • SerializedSubscriber
  • SerialSubscription
  • Subscriber
  • TestSubscriber

Мы еще встретимся с ними в будущих статьях. Стоит отметить, что Subscriber тоже реализует Subscription, что означает, что у нас также есть возможность отписаться используя ссылку на объект Subscriber.

С пониманием жизненного цикла подписок, вы получаете контроль над связанными с ними ресурсами. Это позволит вашей программе быть более предсказуемой, легко поддерживаемой и расширяемой и, надеемся, менее подверженной багам.

В следующей части мы научимся создавать и обрабатывать последовательности используя мощный инструментарий библиотеки.


[1] Тот, кто управляет (создает) observable – Примеч.
[2] Тот, кто использует значения, выданные observable – Примеч.
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
Total votes 12: ↑11 and ↓1+10
Comments5

Articles