Привет, Хабр. Представляю вашему вниманию перевод статьи Understanding Publish, Connect, RefCount and Share in RxSwift.
В оригинале статьи используется Swift второй версии и соответствующая версия RxSwift. Я имел смелость переписать приведенные ниже куски кода под Swift 3.
Так же хочется отметить, что такие понятия, как Observable и Sequence, можно считать одним и тем же. То же касается Observer и Subscriber.
Так же рекомендую почитать про share(), share Replay(), shareReplayLatestWhileConnected() в RxSwift.
В этой статье я постараюсь объяснить такие операторы для работы с Connectable Observable в RxSwift, как
Прежде чем перейти к сути, мне хотелось бы сказать пару слов о hot и cold Observables. Как по мне, так понятия горячих и холодных Observables немного размыты.
Давайте горячий Observable мы будем называть Active Sequence, а холодный Passive Sequence.
Примером Passive Sequence может служить запрос в сеть, который начинается только тогда, когда мы подписались на последовательность. Примерами Active Sequence могут служить web-socket соединение, события таймера или текст, производимый
И это все. Думайте об активных и пассивных последовательностях. Понятия горячих/холодных/теплых/прохладных Observables слишком запутанны и могут сбить с толку.
Если Вы когда-нибудь подписывались дважды (или больше) на один и тот же Observable, Вы могли бы быть удивлены результатами.
Взгляните на следующий кусочек кода:
Взглянув в консоль, мы увидим два HTTP респонса. Observable выполнил запрос дважды, хоть это противоречит нашим ожиданиям.

Очевидно, что это не то, чего мы хотим от обычного HTTP-реквеста. Но мы можем изменить такое поведение и выполнить всего один запрос. Надо просто применить оператор
Как и ожидалось, выполнился только один HTTP-запрос.

По сути, оператор
Стоп-стоп-стоп! Что еще за
Тогда, когда применен оператор publish(), то Observable трансформируется в Connectable Observable. В документации ReactiveX говорится:
В приведенном выше примере, Observer'ы подписываются на
Интересная штука в том, как происходит очистка ресурсов. Посмотрите на этот код.
Даже если все подписчики отписались от нашего Observable, то последний все еще живет и продолжает производить события под капотом.
Теперь давайте сравним это с
Разница между
Вы можете воспринимать оператор
Обратите внимание вот на что. Когда мы подписались заново, Observable начал эмитить элементы с начала.
Чувствуете разницу теперь?
Когда Вы используете
С другой стороны,
Если что-то осталось не до конца ясным, пожалуйста, дайте знать об этом в комментариях. Спасибо.
В оригинале статьи используется Swift второй версии и соответствующая версия RxSwift. Я имел смелость переписать приведенные ниже куски кода под Swift 3.
Так же хочется отметить, что такие понятия, как Observable и Sequence, можно считать одним и тем же. То же касается Observer и Subscriber.
Так же рекомендую почитать про share(), share Replay(), shareReplayLatestWhileConnected() в RxSwift.
В этой статье я постараюсь объяснить такие операторы для работы с Connectable Observable в RxSwift, как
publish, connect, refCount и share. Они используются вместе в различных комбинациях. Очень важно понимать разницу между:publish().connect()- и
publish().refcount()(или простоshare())
Активные и пассивные Observables
Прежде чем перейти к сути, мне хотелось бы сказать пару слов о hot и cold Observables. Как по мне, так понятия горячих и холодных Observables немного размыты.
Давайте горячий Observable мы будем называть Active Sequence, а холодный Passive Sequence.
- Active Sequence эмитит элементы постоянно, независимо от того, подписан на нее кто-нибудь или нет
- Passive Sequence начинает эмитить элементы по запросу
Примером Passive Sequence может служить запрос в сеть, который начинается только тогда, когда мы подписались на последовательность. Примерами Active Sequence могут служить web-socket соединение, события таймера или текст, производимый
UITextField'ом.И это все. Думайте об активных и пассивных последовательностях. Понятия горячих/холодных/теплых/прохладных Observables слишком запутанны и могут сбить с толку.
Несколько подписок на один Observable
Если Вы когда-нибудь подписывались дважды (или больше) на один и тот же Observable, Вы могли бы быть удивлены результатами.
Взгляните на следующий кусочек кода:
let url = URL(string: "https://habrahabr.ru/")! let requestObservable = URLSession.shared .rx.data(request: URLRequest(url: url)) requestObservable.subscribe(onNext: { print($0) }) requestObservable.subscribe(onNext: { print($0) })
Взглянув в консоль, мы увидим два HTTP респонса. Observable выполнил запрос дважды, хоть это противоречит нашим ожиданиям.

share() как спасение
Очевидно, что это не то, чего мы хотим от обычного HTTP-реквеста. Но мы можем изменить такое поведение и выполнить всего один запрос. Надо просто применить оператор
share() к нашему Observable.let url = URL(string: "https://habrahabr.ru/")! let requestObservable = URLSession.shared .rx.data(request: URLRequest(url: url)) .share() requestObservable.subscribe(onNext: { print($0) }) requestObservable.subscribe(onNext: { print($0) })
Как и ожидалось, выполнился только один HTTP-запрос.

По сути, оператор
share() — это просто обертка над publish().refcount().Стоп-стоп-стоп! Что еще за
publish(), что за refcount()?publish() и его друг connect()
Тогда, когда применен оператор publish(), то Observable трансформируется в Connectable Observable. В документации ReactiveX говорится:
Connectable Observable похож на обычный Observable за исключением одного момента. Он начинает производить элементы не тогда, когда на него подписываются, а только тогда, когда на нем вызван оператор connect().let myObservable = Observable.just(1).publish() print("Subscribing") myObservable.subscribe(onNext: { print("first = \($0)") }) myObservable.subscribe(onNext: { print("second = \($0)") }) DispatchQueue.main.asyncAfter(deadline: .now() + 3) { print("Calling connect after 3 seconds") myObservable.connect() } /* Output: Subscribing Calling connect after 3 seconds first = 1 second = 1 */
В приведенном выше примере, Observer'ы подписываются на
myObservable сразу после того, как он был создан. Но срабатывают они только через 3 секунды, когда был вызван оператор connect(). Проще говоря, connect() активирует Connectable Observable и включает подписчиков.Интересная штука в том, как происходит очистка ресурсов. Посмотрите на этот код.
let myObservable = Observable<Int> .interval(1, scheduler: MainScheduler.instance) .publish() myObservable.connect() print("Starting at 0 seconds") let mySubscription = myObservable.subscribe(onNext: { print("Next: \($0)") }) DispatchQueue.main.asyncAfter(deadline: .now() + 3) { print("Disposing at 3 seconds") mySubscription.dispose() } DispatchQueue.main.asyncAfter(deadline: .now() + 6) { print("Subscribing again at 6 seconds") myObservable.subscribe(onNext: { print("Next: \($0)") }) } // Output: /* Starting at 0 seconds Next: 0 Next: 1 Next: 2 Disposing at 3 seconds Subscribing again at 6 seconds Next: 6 Next: 7 Next: 8 Next: 9 ... */
Даже если все подписчики отписались от нашего Observable, то последний все еще живет и продолжает производить события под капотом.
Примечание переводчика
Метод
connect() возвращает Disposable. Таким образом, остановить продюсинг элементов можно, вызвав метод dispose() у данного Disposable, либо предоставить эту возможность DisposeBag'у.Теперь давайте сравним это с
publish().refcount().Разница между publish().connect() и publish().refcount()
Вы можете воспринимать оператор
refcount() как магию, которая за Вас обрабатывают отписку Observer'ов. refcount() вызывает connect() автоматически, когда подписывается первый Observer, так что нет нужды делать это самостоятельно.let myObservable = Observable<Int> .interval(1, scheduler: MainScheduler.instance) .publish() .refCount() print("Starting at 0 seconds") let mySubscription = myObservable.subscribe(onNext: { print("Next: \($0)") }) DispatchQueue.main.asyncAfter(deadline: .now() + 3) { print("Disposing at 3 seconds") mySubscription.dispose() } DispatchQueue.main.asyncAfter(deadline: .now() + 6) { print("Subscribing again at 6 seconds") myObservable.subscribe(onNext: { print("Next: \($0)") }) } // Output: /* Starting at 0 seconds Next: 0 Next: 1 Next: 2 Disposing at 3 seconds Subscribing again at 6 seconds Next: 0 Next: 1 Next: 2 Next: 3 ... */
Обратите внимание вот на что. Когда мы подписались заново, Observable начал эмитить элементы с начала.
Заключение
Чувствуете разницу теперь?
publish().connect() и publish().refcount() (или share()) управляют механизмом отписки от Obervable'ов по-разному.Когда Вы используете
publish().connect(), Вам необходимо вручную управлять механизмом очистки ресурсов ва��его Observable (об этом говорилось в примечании под спойлером). Ваша последовательность ведет себя как активная и производит элементы все время, независимо от подписок.С другой стороны,
publish().refcount()/share() следит за том, как много Observer'ов подписано на Observable и не отключает первых от последнего до тех пор, пока существует хотя бы один подписчик. Другими словами, когда счетчик подписчиков падает до нуля, Observable «умирает» и перестает производить какие-либо элементы.Если что-то осталось не до конца ясным, пожалуйста, дайте знать об этом в комментариях. Спасибо.