RxSwift шпаргалка по операторам (+ PDF)



    Заинтересовавшись темой функционального программирования я встал на распутье, — какой фреймворк выбрать для ознакомления. ReactiveCocoa — ветеран в iOS кругах, по нему вдоволь информации. Но он вырос с Objective-C, и хотя это не является проблемой, но все же в данный момент я в основном пишу именно на Swift, — хотелось бы взять решение изначально спроектированное с учетом всех плюшек языка. RxSwift же порт Reactive Extensions, имеющего долгую историю, но сам порт свежий и написанный именно под Swift. На нем я и решил остановиться.
    Но специфика документации по RxSwift в том, что описание всех команд ведет на reactivex.io, а там в основном дается общая информация, руки у разработчиков не дошли еще сделать документацию именно для RxSwift, что не всегда удобно. Некоторые команды имеют тонкости в реализации, есть те, о которых в общей документации нет ничего кроме упоминания.
    Прочитав все главы вики с RxSwift гитхаба, я сразу решил поразбираться с официальными примерами, тут то и стало ясно, что с RX такое не пройдет, нужно хорошо понимать основы, иначе будешь как мартышка с копипастом гранатой. Я начал разбирать самые сложные для понимания команды, потом те, что вроде понятны, но задав себе вопросы по ним я понял, что лишь догадываюсь на то как верно ответить, но не уверен.
    В общем ничтоже сумняшеся я решил проработать все операторы RxSwift. Лучший способ что то понять в программировании — запустить код и посмотреть как он отработает. Затем учитывая специфику реактивного программирования — очень полезны схемы, ну и краткое описание на русском. Закончив сегодня работу, я подумал, что грех не поделиться результатами с тем, кто лишь присматривается к теме реактивного программирования.
    Много картинок и текста под катом, очень много!


    Предварительно я рекомендую просмотреть официальную документацию, у меня передана основная суть и специфика RxSwift команд, а не основы.
    Так же можно «поиграться» с шариками в схемах, так называемые RxMarbles, есть бесплатная версия под iPhone/iPad

    Итак, в этой статье я рассмотрю все (ну или почти все) команды RxSwift, по каждой я дам краткое описание, схему(если это имеет смысл), код, результат выполнения, при необходимости сделаю комментарии по выводу в лог результатов выполнения кода.
    В статье заголовок каждой команды — ссылка на на официальную документацию, т.к. я не ставил перед собой цели перевести все нюансы по командам.

    Вот ссылка на гитхаб, куда я склонировал официальный репозиторий RxSwift, и добавил свою песочницу (DescribeOperators.playground), где и будет практически тот же код, что и в статье.
    А вот и ссылка конкретно на PDF где в виде mindMap собраны все команды, что позволяет быстро просмотреть их все. Кусочки кода в PDF приложены для того чтобы увидеть как и с каким параметрами нужно работать с командой. Изначально ради этого PDF я все и затеял — иметь под рукой документ в котором наглядно видны все команды с их схемами. PDF получился огромным (в плане рабочего пространства, а не веса), но я проверял, даже на iPad 2 все нормально просматривается.

    Обо всех ошибках просьба писать в личку, объем работ оказался слегка великоват, после четвертой вычитки текста мои глаза меня прокляли.
    Что ж, надеюсь моя работа кому то пригодится. Приступим.

    Содержание


    Заметки

    Создание Observable


    asObservable
    create
    deferred
    empty
    error
    interval
    just
    never
    of
    range
    repeatElement
    timer

    Комбинирование Observable


    amb
    combineLatest
    concat
    merge
    startWith
    switchLatest
    withLatestFrom
    zip

    Фильтрация


    distinctUntilChanged
    elementAt
    filter
    ignoreElements
    sample
    single
    skip
    skip (duration)
    skipUntil
    skipWhile
    skipWhileWithIndex
    take
    take (duration)
    takeLast
    takeUntil
    takeWhile
    takeWhileWithIndex
    throttle

    Трансформация


    buffer
    flatMap
    flatMapFirst
    flatMapLatest
    flatMapWithIndex
    map
    mapWithIndex
    window

    Операторы математические и агрегирования


    reduce
    scan
    toArray

    Работа с ошибками


    catchError
    catchErrorJustReturn
    retry
    retryWhen

    Операторы для работы с Connectable Observable


    multicast
    publish
    refCount
    reply
    replayAll

    Вспомогательные методы


    debug
    doOn / doOnNext
    delaySubscription
    observeOn
    subscribe
    subscribeOn
    timeout
    using



    В схемах я буду использовать обозначение Source/SO в качестве Source Observable, RO/Result в качестве Result Observable.

    В качестве вспомогательного кода я буду пользоваться функцией createSequenceWithWait, она создает Observable из массива элементов с указанным интервалом между элементами.

    public enum ResultType {
        case Infinite
        case Completed
        case Error
    }
    
    public func createSequenceWithWait<T, U>(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable<U> {
        return Observable<U>.create{ observer  in
            for (idx, letter) in array.enumerate() {
                let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC))
                dispatch_after(time, dispatch_get_main_queue()) {
                    if let describer = describer {
                        let value = describer(value: letter)
                        observer.on(.Next(value))
                    } else {
                        observer.on(.Next(letter as! U))
                    }
                    
                }
            }
            
            if resultType != .Infinite {
                let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC))
                dispatch_after(allTime, dispatch_get_main_queue()) {
                    switch resultType {
                    case .Completed:
                        observer.onCompleted()
                    case .Error:
                        observer.onError(RxError.Unknown)
                    default:
                        break
                    }
                    
                }
            }
            
            return NopDisposable.instance
        }
    }
    

    Функция example — просто позволяет отделять вывод в консоли, её код следующий (взят из RxSwift)
    public func example(description: String, action: () -> ()) {
        print("\n--- \(description) example ---")
        action()
    }
    

    Во всех примерах, где необходимо работать с временными задержками, если этот код будет запускаться в песочнице — необходимо прописать
    import XCPlayground
    XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
    


    Так же подразумевается, что читатель имеет общее представление о том, что такое реактивное программирование в общем и о RxSwift в частности. Не знаю есть ли смысл городить очередную вводную.

    Создание Observable





    asObservable


    Этот метод реализован в классах RxSwift, если они поддерживают конвертацию в Observable. Например: ControlEvent, ControlProperty, Variable, Driver

    example("as Observable") {
        let variable = Variable<Int>(0)
        
        variable.asObservable().subscribe { e in
            print(e)
        }
        variable.value = 1
    }
    


    Консоль:
    --- as Observable example ---
    Next(0)
    Next(1)
    Completed
    


    В данном примере мы Variable преобразовали в Observable и подписались на его события




    create



    Этот метод позволяет создавать Observable с нуля, полностью контролируя какие элементы и когда он будет генерировать.

    example("create") {
        let firstSequence = Observable<AnyObject>.of(1, 2, 3)
        let secondSequence = Observable<AnyObject>.of("A", "B", "C")
        
        let multipleSequence = Observable<Observable<AnyObject>>.create { observer in
            observer.on(.Next(firstSequence))
            observer.on(.Next(secondSequence))
            return NopDisposable.instance
        }
        let concatSequence = multipleSequence.concat()
        concatSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- create example ---
    Next(1)
    Next(2)
    Next(3)
    Next(A)
    Next(B)
    Next(C)
    


    В данном примере мы создали Observable вручную, он сгенерирует два Observable, элементы которых мы затем объединим командой concat, на получившийся Observable мы и подпишемся




    deferred


    Этот оператор позволяет отложить создание Observable, до момента подписки с помощью subscribe

    example("without deferred") {
        var i = 1
        let justObservable = Observable.just(i)
        i = 2
        _ = justObservable.subscribeNext{ print ("i = \($0)") }
    }
    
    example("with deferred") {
        var i = 1
        let deferredJustObservable = Observable.deferred{
            Observable.just(i)
        }
        i = 2
        _ = deferredJustObservable.subscribeNext{ print ("i = \($0)") }
    }
    


    Консоль:
    --- without deferred example ---
    i = 1
    
    --- with deferred example ---
    i = 2
    


    В первом случае Observable создается сразу, с помощью Observable.just(i), и изменение значение i уже не влияет на генерируемый элемент это последовательностью. Во втором же случае мы создаем с помощью deferred, и мы можем поменять значение i перед subscribe




    empty


    Пустая последовательность, заканчивающаяся Completed



    example("empty") {
        let sequence = Observable<Int>.empty()
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- empty example ---
    Completed
    





    error


    Создаст последовательность, которая состоит из одного события — Error



    example("error") {
        let sequence = Observable<Int>
            .error(RxError.Unknown)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- error example ---
    Error(Unknown error occured.)
    





    interval


    Создает бесконечную последовательность, возрастающую с 0 с шагом 1, с указанной периодичностью



    example("interval") {
        let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- interval example ---
    Next(0)
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    ....
    





    just


    Создает последовательность из любого значения, которая завершается Completed



    example("just") {
        let sequence = Observable.just(100)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- just example ---
    Next(100)
    Completed
    





    never


    Пустая последовательность, чьи observer’ы никогда не вызываются, т.е. не будет сгенерировано ни одно событие



    example("never") {
        let sequence = Observable<Int>.never()
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- never example ---
    





    of


    Последовательность из variadic переменной, после всех элементов генерируется Completed



    example("simple of") {
        let sequence = Observable.of(1, 2)
    
        sequence.subscribe { e in
            print(e)
        }
    }
    
    example("of for Observables") {
        let firstSequence = Observable<AnyObject>.of(1, 2, 3)
        let secondSequence = Observable<AnyObject>.of("A", "B", "C")
        
        let bothSequence = Observable.of(firstSequence, secondSequence)
        let mergedSequence = bothSequence.merge()
        
        mergedSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- simple of example ---
    Next(1)
    Next(2)
    Completed
    
    --- of for Observables example ---
    Next(1)
    Next(2)
    Next(3)
    Next(A)
    Next(B)
    Next(C)
    Completed
    


    В первом случае мы создали последовательность из двух чисел. Во втором из двух Observable, а затем их объединили между собой с помощью оператора merge




    range


    Создает последовательность с конечным числом элементов, возрастающую с шагом 1 от указанного значения указанное число раз, после всех элементов генерируется Completed



    example("range") {
        let sequence = Observable.range(start: 5, count: 3)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- range example ---
    Next(5)
    Next(6)
    Next(7)
    Completed
    

    Сгенерировались элементы начиная с 5, 3 раза с шагом 1




    repeatElement


    Бесконечно создавать указанный элемент, без задержек. Никогда не будет сгенерированы события Completed или Error



    example("repeatElement") {
        let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- repeatElement example ---
    Next(1)
    Next(2)
    Next(3)
    .....
    





    timer


    Бесконечная последовательность, возрастающая с 0 с шагом 1, с указанной периодичностью и возможность задержки при старте. Никогда не будет сгенерированы события Completed или Error



    example("timer") {
        let sequence = Observable<Int64>.timer(2, period: 3, scheduler: MainScheduler.instance)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- timer example ---
    Next(0)
    Next(1)
    Next(2)
    


    В данном примере последовательность начнет генерировать элементы с задержкой в 2 секунды, каждые 3 секунды

    Комбинирование Observable






    amb



    SO = [Observable<T>] или SO1, SO2 = Observable<T>
    RO = Observable<T>
    


    Из всех Observable SO выбирается тот, который первым начинает генерировать элементы, его элементы и дублируются в RO, остальные SO игнорируются



    example("amb") {
        let subjectA = PublishSubject<Int>()
        let subjectB = PublishSubject<Int>()
        let subjectC = PublishSubject<Int>()
        let subjectD = PublishSubject<Int>()
        
        let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb()
        ambSequence.subscribe { e in
            print(e)
        }
        
        subjectC.onNext(0)
        subjectA.onNext(3)
        subjectB.onNext(102)
        subjectC.onNext(1)
        subjectD.onNext(45)
    }
    


    Консоль:
    --- amb example ---
    Next(0)
    Next(1)
    


    Т.к. первым сгенерировал элемент subjectC, — лишь его элементы дублируются в RO, остальные игнорируются




    combineLatest



    SO = SO1, SO2,... SON = Observable<T>
    RO = Observable<f(T,T)> 
    


    Как только все Observable сгенерировали хотя бы по одному элементу — эти элементы используются в качестве параметров в переданную функцию, и результат этой функции генерируется RO в качестве элемента. В дальнейшем при генерации любым Observable элемента — генерируется новый результат функции с последними элементами из всех комбинируемых Observable



    example("combineLatest") {
        let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in
            "\(element)"
        }.debug("firstSequence")
        let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in
            "\(element)"
            }
            .delaySubscription(3, scheduler: MainScheduler.instance)
            .debug("secondSequence")
        
        let concatSequence = Observable.combineLatest(firstSequence, secondSequence) {
            first, second -> String in
            "\(first) - \(second)"
        }
        concatSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- combineLatest example ---
    2016-04-12 16:59:35.421: firstSequence -> subscribed
    2016-04-12 16:59:35.422: secondSequence -> subscribed
    2016-04-12 16:59:35.434: firstSequence -> Event Next(1)
    2016-04-12 16:59:37.423: firstSequence -> Event Next(2)
    2016-04-12 16:59:38.423: secondSequence -> Event Next(A)
    Next(2 - A)
    2016-04-12 16:59:39.423: firstSequence -> Event Next(3)
    Next(3 - A)
    2016-04-12 16:59:39.522: secondSequence -> Event Next(B)
    Next(3 - B)
    2016-04-12 16:59:40.622: secondSequence -> Event Next(C)
    Next(3 - C)
    2016-04-12 16:59:41.722: firstSequence -> Event Completed
    2016-04-12 16:59:41.722: firstSequence -> disposed
    2016-04-12 16:59:41.722: secondSequence -> Event Completed
    2016-04-12 16:59:41.722: secondSequence -> disposed
    Completed
    


    В этом примере я создал Observable с помощью createSequenceWithWait, чтобы элементы генерировались с разной задержкой, чтобы было видно как перемешиваются элементы.
    firstSequence успел сгенерировать 1 и 2, прежде чем secondSequence сгенерировал A, поэтому 1 отбросили, и первым выводом стало 2 — A




    concat



    SO = Observable<Observable<T>> или SO1, SO2 = Observable<T>
    RO = Observable<T>
    


    В RO элементы включают сначала все элементы первого Observable, и лишь затем следующего. Это означает, что если первый Observable никогда не сгенерирует Completed, — элементы второго никогда не поступят в RO. Ошибка в текущем Observable пробрасывается в RO



    example("concat object method") {
        let firstSequence = Observable<AnyObject>.of(1, 2, 3)
        let secondSequence = Observable<AnyObject>.of("A", "B", "C")
        let concatSequence = firstSequence.concat(secondSequence)
        concatSequence.subscribe { e in
            print(e)
        }
    }
    
    example("concat from array") {
        let firstSequence = Observable.of(1,2,3)
        let secondSequence = Observable.of(4,5,6)
        let concatSequence = Observable.of(firstSequence, secondSequence)
            .concat()
        
        concatSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- concat object method example ---
    Next(1)
    Next(2)
    Next(3)
    Next(A)
    Next(B)
    Next(C)
    Completed
    
    --- concat from array example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Next(5)
    Next(6)
    Completed
    


    В первом примере мы присоединяем второй Observable к первому.
    Во втором генерируем последовательность из массива.




    merge



    SO = Observable<Observable<T>>
    RO = Observable<T>
    


    Элементы RO включают элементы из исходных Observable в том порядке, в котором они были выпущены в исходных Observable



    example("simple merge") {
        let firstSequence = Observable<AnyObject>.of(1, 2, 3)
        let secondSequence = Observable<AnyObject>.of("A", "B", "C")
    
        let bothSequence = Observable.of(firstSequence, secondSequence)
        let mergedSequence = bothSequence.merge()
        
        mergedSequence.subscribe { e in
            print(e)
        }
    }
    
    example("merge with wait") {
        let firstSequence = createSequenceWithWait([1,2,3]) { element in
            "\(element)"
        }
        let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in
            "\(element)"
        }
    
        let bothSequence = Observable.of(firstSequence, secondSequence)
        let mergedSequence = bothSequence.merge()
        
        mergedSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- simple merge example ---
    Next(1)
    Next(2)
    Next(3)
    Next(A)
    Next(B)
    Next(C)
    Completed
    
    --- merge with wait example ---
    Next(1)
    Next(A)
    Next(2)
    Next(3)
    Next(B)
    Next(C)
    Completed
    


    В первом примере мы сливаем две последовательности созданные без задержки, в итоге первый Observable успевает сгенерировать все свои элементы перед тем как это начнет делать второй, результат оказывается идентичен concat
    Во втором же случае последовательности сделаны с задержкой в генерации, и видно что элементы в RO теперь вперемешку, в том порядке в котором они были сгенерированы в исходных Observable




    startWith



    SO = Observable<T>
    RO = Observable<T>
    


    В начало SO добавляются элементы переданные в качестве аргумента



    example("startWith") {
        let sequence = Observable.of(1, 2, 3).startWith(0)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- startWith example ---
    Next(0)
    Next(1)
    Next(2)
    Next(3)
    Completed
    





    switchLatest



    SO = Observable<Observable<T>>
    RO = Observable<T> 
    


    Изначально подписываемся на O1 генерируемого SO, его элементы зеркально генерируются в RO. Как только из SO генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка от O1, подписываемся на O2 и так далее. Таким образом в RO — элементы лишь из последнего сгенерированного Observable



    example("switchLatest") {
        let varA = Variable<Int>(0)
        let varB = Variable<Int>(100)
        
        let proxyVar = Variable(varA.asObservable())
        let concatSequence = proxyVar.asObservable().switchLatest()
        
        concatSequence.subscribe { e in
            print(e)
        }
        varA.value = 1
        varA.value = 2
        varB.value = 3
        proxyVar.value = varB.asObservable()
        varB.value = 4
        varA.value = 5
    }
    
    
    example("switchLatest") {
        let observableA = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(10))
            }
            delay(3) {
                observer.on(.Next(20))
            }
            delay(5) {
                observer.onCompleted()
            }
            return NopDisposable.instance
            }.debug("oA")
        let observableB = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(100))
            }
            delay(1) {
                observer.on(.Next(200))
            }
            delay(2) {
                observer.onCompleted()
            }
            return NopDisposable.instance
            }.debug("oB")
        
        let observableC = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(1000))
            }
            delay(1) {
                observer.on(.Next(2000))
            }
            delay(2) {
                observer.onCompleted()
            }
            return NopDisposable.instance
            }.debug("oC")
        
            let subjects = [observableA, observableB, observableC]
            let sequence:Observable<Observable<Int>> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0}
            let switchLatestSequence:Observable<Int> = sequence.switchLatest()
            switchLatestSequence.subscribe { e in
                print(e)
            }
    }
    


    Консоль:
    
    --- switchLatest example ---
    Next(0)
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Completed
    
    --- switchLatest example ---
    2016-04-12 17:15:22.710: oA -> subscribed
    2016-04-12 17:15:22.711: oA -> Event Next(10)
    Next(10)
    2016-04-12 17:15:23.797: oA -> disposed // происходит отписка как только сгенерировался oB
    2016-04-12 17:15:23.797: oB -> subscribed
    2016-04-12 17:15:23.797: oB -> Event Next(100)
    Next(100)
    2016-04-12 17:15:24.703: oB -> disposed // происходит отписка как только сгенерировался oC
    2016-04-12 17:15:24.703: oC -> subscribed 
    2016-04-12 17:15:24.703: oC -> Event Next(1000)
    Next(1000)
    2016-04-12 17:15:25.800: oC -> Event Next(2000)
    Next(2000)
    2016-04-12 17:15:26.703: oC -> Event Completed
    2016-04-12 17:15:26.703: oC -> disposed
    Completed
    


    В первом примере показано как команда работает в статике, когда мы руками переподключаем Observable.
    Во втором же у нас последовательности с задержками. observableA, observableB, observableC из SO генерируются раз в 1 секунду. Их же элементы генерируются с различными задержками.




    withLatestFrom



    SO1, SO2 = Observable<T>
    RO = Observable<f(T,T)>
    


    Как только O1 генерирует элемент проверяется сгенерирован ли хоть один элемент в O2, если да, то берутся последние элементы из O1 и O2 и используются в качестве аргументов для переданной функции, результат которой генерируется RO в качестве элемента



    example("withLatestFrom") {
        let varA = Variable<Int>(0)
        let varB = Variable<Int>(10)
        
        let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) {
            "\($0) - \($1)"
        }
        withLatestFromSequence.subscribe { e in
            print(e)
        }
        varA.value = 1
        varA.value = 2 
        varB.value = 20
        
        varB.value = 30
        varA.value = 5
        varA.value = 6
    }
    


    Консоль:
    --- withLatestFrom example ---
    Next(0 - 10)
    Next(1 - 10)
    Next(2 - 10)
    Next(5 - 30)
    Next(6 - 30)
    Completed
    





    zip



    SO = Observable<Observable<T>>
    RO = Observable<f(T,T)> 
    


    Элементы RO представляют собой комбинацию из элементов сгенерированных исходными Observable, объединение идет по индексу выпущенного элемента



    example("zip with simple Variable") {
        let varA = Variable<Int>(0)
        let varB = Variable<Int>(10)
        
        let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "\($0) - \($1)"
        }
        
        zippedSequence.subscribe { e in
            print(e)
        }
        varA.value = 1
        varA.value = 2
        varB.value = 20
        
        varB.value = 30
        varA.value = 3
        varA.value = 4
    }
    
    example("zip with PublishSubject") {
        let subjectA = PublishSubject<Int>()
        let subjectB = PublishSubject<Int>()
    
        let zippedSequence = Observable.zip(subjectA, subjectB) { "\($0) - \($1)"
        }
        
        zippedSequence.subscribe { e in
            print(e)
        }
        subjectA.onNext(0)
        subjectA.onNext(1)
        subjectA.onNext(2)
        subjectB.onNext(100)
        subjectB.onNext(101)
        subjectA.onNext(3)
        subjectB.onNext(102)
        subjectA.onNext(4)
    }
    


    Консоль:
    --- zip with simple Variable example ---
    Next(0 - 10)
    Next(1 - 20)
    Next(2 - 30)
    Completed
    
    --- zip with PublishSubject example ---
    Next(0 - 100)
    Next(1 - 101)
    Next(2 - 102)
    


    Из примеров видно, что элементы комбинируются в том порядке, в каком они были сгенерированы в исходных Observable

    Фильтрация






    distinctUntilChanged


    Пропускаем все повторяющиеся подряд идущие элементы



    example("distinctUntilChanged") {
        let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged()
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- distinctUntilChanged example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Next(1)
    Completed
    


    Здесь тонкий момент, что отбрасываются не уникальные для всей последовательности элементы, а лишь те, которые идут подряд.




    elementAt



    В RO попадает лишь элемент выпущенный N по счету



    example("elementAt") {
        let sequence = Observable.of(0, 10, 20, 30, 40)
            .elementAt(2)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- elementAt example ---
    Next(20)
    Completed
    





    filter



    Отбрасываются все элементы, которые не удовлетворяют заданным условиям



    example("filter") {
        let sequence = Observable.of(1, 20, 3, 40)
            .filter{ $0 > 10}
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- filter example ---
    Next(20)
    Next(40)
    Completed
    





    ignoreElements



    Отбрасывает все элементы, передаёт только терминальные сообщения Completed и Error



    example("ignoreElements") {
        let sequence = Observable.of(1, 2, 3, 4)
            .ignoreElements()
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- ignoreElements example ---
    Completed
    





    sample



    При каждом сгенерированном элементе последовательности семплера (воспринимать как таймер) — брать последний выпущенный элемент исходной последовательности и дублировать его в RO, ЕСЛИ он не был сгенерирован ранее



    example("sampler") {
        let sampler = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sampler")
        
        let sequence:Observable<Int> = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- sampler example ---
    2016-04-12 18:28:20.322: sampler -> subscribed
    2016-04-12 18:28:21.323: sampler -> Event Next(0)
    Next(1)
    2016-04-12 18:28:22.324: sampler -> Event Next(1) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
    2016-04-12 18:28:23.323: sampler -> Event Next(2)
    Next(2)
    2016-04-12 18:28:24.323: sampler -> Event Next(3) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
    2016-04-12 18:28:25.323: sampler -> Event Next(4) // элемент в RO не был сгенерирован, т.к. он уже был сгенерирован ранее
    2016-04-12 18:28:26.323: sampler -> Event Next(5)
    Next(3)
    ...
    





    single



    Из исходной последовательности берется единственный элемент, если элементов > 1 — генерировать ошибку. Есть вариант с предикатом



    example("single generate error") {
        let sequence = Observable.of(1, 2, 3, 4).single()
        sequence.subscribe { e in
            print(e)
        }
    }
    
    example("single") {
        let sequence = Observable.of(1, 2, 3, 5).single {
            $0 % 2 == 0
        }
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- single generate error example ---
    Next(1)
    Error(Sequence contains more than one element.)
    
    --- single example ---
    Next(2)
    Completed
    


    В первом примере в исходной последовательности оказалось больше 1 элемента, поэтому была сгенерирована ошибка в момент генерирования в SO второго элемента
    Во втором примере условиям предиката удовлетворил всего 1 элемент, поэтому ошибки сгенерировано не было




    skip


    Из SO отбрасываем первые N элементов



    example("skip") {
        let sequence = Observable.of(1, 2, 3, 4).skip(2)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- skip example ---
    Next(3)
    Next(4)
    Completed
    





    skip (duration)



    Из SO отбрасываем первые элементы, которые были сгенерированы в первые N



    example("skip duration with wait") {
        let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- skip duration with wait example ---
    Next(3)
    Next(4)
    Completed
    





    skipUntil



    Отбрасываем из SO элементы, которые были сгенерированы до начала генерации элементов последовательностью переданной в качестве параметра



    example("skipUntil") {
        let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 }
        let secondSequence = Observable.just(1)
            .delaySubscription(1, scheduler: MainScheduler.instance)
        let skippedSequence = firstSequence.skipUntil(secondSequence)
        
        skippedSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- skipUntil example ---
    Next(3)
    Next(4)
    Completed
    


    Генерация элементов в secondSequence была отложена на 1 секунду с помощью команды delaySubscription, таким образом элементы из firstSequence стали дублироваться в RO лишь через 1 секунду




    skipWhile



    Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией



    example("skipWhile") {
        let firstSequence = [1,2,3,4,0].toObservable()
        let skipSequence = firstSequence.skipWhile { $0 < 3 }
        
        skipSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- skipWhile example ---
    Next(3)
    Next(4)
    Next(0)
    Completed
    





    skipWhileWithIndex


    Отбрасываем из SO элементы до тех пор, пока истинно условие возвращаемой переданное функцией. Отличие от skipWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента



    example("skipWhileWithIndex") {
        let firstSequence = [1,2,5,0,7].toObservable()
        let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in
            value < 4 || idx < 2
        }
        skipSequence.subscribe { e in
            print(e)
        }
    }
    
    


    Консоль:
    --- skipWhileWithIndex example ---
    Next(5)
    Next(0)
    Next(7)
    Completed
    





    take


    Из SO берутся лишь первые N элементов



    example("take") {
        let sequence = Observable.of(1, 2, 3, 4).take(2)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- take example ---
    Next(1)
    Next(2)
    Completed
    





    take (duration)



    Из SO берутся лишь элементы сгенерированные в первые N секунд



    example("take duration with wait") {
        let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        let takeSequence = sequence.take(2, scheduler: MainScheduler.instance)
        takeSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- take duration with wait example ---
    Next(1)
    Next(2)
    Completed
    





    takeLast



    Из SO берутся лишь последние N элементов. Что означает, если SO никогда не закончит генерировать элементы — в RO не попадет ни одного элемента.



    example("takeLast") {
        let sequence = Observable.of(1, 2, 3, 4).takeLast(2)
        sequence.subscribe { e in
            print(e)
        }
    }
    
    example("takeLast with wait") {
        let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        let takeSequence = sequence.takeLast(2)
        takeSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- takeLast example ---
    Next(3)
    Next(4)
    Completed
    
    --- takeLast with wait example ---
    Next(3)
    Next(4)
    Completed
    

    Второй пример приведен для иллюстрации в задержке генерации элементов в RO из за ожидания завершения генерации элементов в SO




    takeUntil



    Из SO берутся элементы, которые были выпущены до начала генерации элементов последовательностью переданной в качестве параметра



    example("takeUntil") {
        let stopSequence = Observable.just(1)
            .delaySubscription(2, scheduler: MainScheduler.instance)
        let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
            .takeUntil(stopSequence)
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- takeUntil example ---
    Next(1)
    Next(2)
    Completed
    





    takeWhile



    Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией



    example("takeWhile") {
        let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 }
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- takeWhile example ---
    Next(1)
    Next(2)
    Completed
    





    takeWhileWithIndex



    Из SO берутся Элементы до тех пор, пока истинно условие возвращаемой переданной функцией. Отличие от takeWhile в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента



    example("takeWhileWithIndex") {
        let sequence = [1,2,3,4,5,6].toObservable()
            .takeWhileWithIndex{ (val, idx) in
                val % 2 == 0 || idx < 3
        }
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- takeWhileWithIndex example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Completed
    





    throttle



    Из SO берутся лишь элементы, после которых не было новых элементов N секунд.



    example("throttle") {
        let sequence = Observable.of(1, 2, 3, 4)
            .throttle(1, scheduler: MainScheduler.instance)
        sequence.subscribe { e in
            print(e)
        }
    }
    
    example("throttle with wait") {
        let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
            .throttle(0.5, scheduler: MainScheduler.instance)
        
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- throttle example ---
    Next(4)
    Completed
    
    --- throttle with wait example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Completed
    


    В первом случае SO генерирует элементы без задержек, поэтому лишь последний элемент не имеет после себя новых элементов.
    Во втором примере элементы генерируются медленнее, чем N секунд переданные в throttle, поэтому за каждым генерируемым элементом достаточный временной промежуток.

    Трансформация






    buffer



    SO = Observable<>>
    RO = Observable<[T]>
    


    Элементы из SO по определенным правилам объединяются в массивы и генерируются в RO. В качестве параметров передается count, — максимальное число элементов в массиве, и timeSpan время максимального ожидания наполнения текущего массива из элементов SO. Таким образом элемент RO, являет собой массив [T], длинной от 0 до count.



    example("buffer") {
        let varA = Variable<Int>(0)
        
        let bufferSequence = varA.asObservable()
            .buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
        bufferSequence.subscribe { e in
            print("\(NSDate()) - \(e)")
        }
        varA.value = 1
        varA.value = 2
        varA.value = 3
        delay(3) {
            varA.value = 4
            varA.value = 5
            delay(5) {
                varA.value = 6
            }
        }
    }
    


    Консоль:
    --- buffer example ---
    2016-04-12 16:10:58 +0000 - Next([0, 1, 2])
    2016-04-12 16:11:01 +0000 - Next([3]) 
    2016-04-12 16:11:04 +0000 - Next([4, 5])
    2016-04-12 16:11:07 +0000 - Next([6])
    2016-04-12 16:11:07 +0000 - Completed
    
    

    Длина массива была указана как 3, — как только были сгенерированы 3 элемента — в RO сгенерировался элемент [0, 1, 2]
    После генерации элемента 3, — была задержка в 3 секунды, сработал таймаут, и массив не был заполнен полностью.
    То же касается и задержки после генерации элемента 5




    flatMap



    Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge)



    example("flatMap with wait") {
        let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
        let flatMapSequence:Observable<String> = sequence.flatMap{val in
            createSequenceWithWait([10,11,12], waitTime: 2) { element in
                "\(element) - \(val)"
            }
        }
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- flatMap with wait example ---
    Next(10 - 0)
    Next(10 - 1)
    Next(11 - 0)
    Next(10 - 2)
    Next(11 - 1)
    Next(12 - 0)
    Next(11 - 2)
    Next(12 - 1)
    Next(12 - 2)
    Completed
    





    flatMapFirst



    Каждый элемент SO превращается в отдельный Observable.
    1) Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Пока O1 генерирует элементы — все последующие Observable сгенерированные из SO отбрасываются, на них не подписываемся.
    2) как только O1 оканчивается, — если будет сгенерирован новый Observable — на него подпишутся и его элементы будут дублироваться в RO.
    Повторяем пункт 1, но вместо O1 берем последний сгенерированный Observable



    example("flatMapFirst") {
        let sequence:Observable<Int> = Observable.of(10, 20, 30)
            .debug("sequence")
        let flatMapSequence:Observable<String> = sequence
            .flatMapFirst{val in
                Observable.of(0, 1, 2)
                    .map{"\($0) - \(val)"
                }
        }
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    
    
    example("flatMapFirst with delay") {
        let subjectA = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(10))
            }
            delay(1) {
                observer.on(.Next(20))
            }
            delay(7) {
                observer.onCompleted()
            }
            return NopDisposable.instance
        }.debug("sA")
        let subjectB = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(100))
            }
            delay(1) {
                observer.on(.Next(200))
            }
            delay(2) {
                observer.onCompleted()
            }
            return NopDisposable.instance
        }.debug("sB")
    
        let subjectC = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(1000))
            }
            delay(1) {
                observer.on(.Next(2000))
            }
            delay(2) {
                observer.onCompleted()
            }
            return NopDisposable.instance
        }.debug("sC")
    
        let subjects = [subjectA, subjectB, subjectC]
        let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:4){$0}
            .debug("sequence")
        let flatMapSequence:Observable<Int> = sequence.flatMapFirst{val in
            return subjects[val].asObservable()
            }.debug("flatMapSequence")
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    
    --- flatMapFirst example ---
    2016-04-12 19:19:46.915: sequence -> subscribed
    2016-04-12 19:19:46.916: sequence -> Event Next(10)
    Next(0 - 10)
    Next(1 - 10)
    Next(2 - 10)
    2016-04-12 19:19:46.918: sequence -> Event Next(20)
    Next(0 - 20)
    Next(1 - 20)
    Next(2 - 20)
    2016-04-12 19:19:46.919: sequence -> Event Next(30)
    Next(0 - 30)
    Next(1 - 30)
    Next(2 - 30)
    2016-04-12 19:19:46.921: sequence -> Event Completed
    Completed
    2016-04-12 19:19:46.921: sequence -> disposed
    
    --- flatMapFirst with delay example ---
    2016-04-12 19:19:46.925: flatMapSequence -> subscribed
    2016-04-12 19:19:46.926: sequence -> subscribed
    2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO генерирует 1й элемент
    2016-04-12 19:19:46.935: sA -> subscribed // на его основе генерируется Observable sA, на  который мы подписываемся
    2016-04-12 19:19:46.936: sA -> Event Next(10)
    2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10)
    Next(10)
    2016-04-12 19:19:47.936: sA -> Event Next(20)
    2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20)
    Next(20)
    2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO генерирует 2й элемент, но на этот момент sA еще генерирует элементы, поэтому подписки на sB не произошло, он просто отбрасывается
    2016-04-12 19:19:53.935: sA -> Event Completed 
    2016-04-12 19:19:53.936: sA -> disposed // sA закончил генерировать элементы, от него отписались
    2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO генерирует 3й элемент
    2016-04-12 19:19:55.137: sC -> subscribed // т.к. на этот момент нет активных Observable (от sA мы отписались, sB - мы отбросили) - мы можем подписаться на него
    2016-04-12 19:19:55.137: sC -> Event Next(1000)
    2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000)
    Next(1000)
    2016-04-12 19:19:56.236: sC -> Event Next(2000)
    2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000)
    Next(2000)
    2016-04-12 19:19:57.335: sC -> Event Completed
    2016-04-12 19:19:57.336: sC -> disposed
    2016-04-12 19:19:58.926: sequence -> Event Completed
    2016-04-12 19:19:58.926: flatMapSequence -> Event Completed
    Completed
    2016-04-12 19:19:58.926: sequence -> disposed
    


    Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — это уже разрешено, поэтому в RO попадают элементы со всех Observable
    А вот второй пример очень объемный, но позволят в подробностях наблюдать как происходят подписка/отписка и как это влияет на генерацию элементов




    flatMapLatest



    Каждый элемент SO превращается в отдельный Observable. Изначально подписываемся на O1, его элементы зеркально генерируются в RO. Как только из SO выпускается очередной элемент и на его основе генерируется очередной Observable — элементы предыдущего Observable отбрасываются, т.к. происходит отписка. Таким образом в RO — элементы из последнего генерированного Observable



    example("flatMapLatest") {
        let sequence:Observable<Int> = Observable.of(10, 20, 30)
        let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2)
            .map{"\($0) - \(val)"
            }
        }
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    
    example("flatMapLatest with delay") {
        let subjectA = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(10))
            }
            delay(3) {
                observer.on(.Next(20))
            }
            delay(5) {
                observer.onCompleted()
            }
            return NopDisposable.instance
            }.debug("sA")
        let subjectB = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(100))
            }
            delay(1) {
                observer.on(.Next(200))
            }
            delay(2) {
                observer.onCompleted()
            }
            return NopDisposable.instance
            }.debug("sB")
        
        let subjectC = Observable<Int>.create{ observer in
            delay(0) {
                observer.on(.Next(1000))
            }
            delay(1) {
                observer.on(.Next(2000))
            }
            delay(2) {
                observer.onCompleted()
            }
            return NopDisposable.instance
            }.debug("sC")
        
        let subjects = [subjectA, subjectB, subjectC]
        let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:1) {$0}
            .debug("sequence")
        let flatMapSequence:Observable<Int> = sequence.flatMapLatest{val in
            return subjects[val].asObservable()
            }.debug("flatMapSequence")
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- flatMapLatest example ---
    Next(0 - 10)
    Next(1 - 10)
    Next(2 - 10)
    Next(0 - 20)
    Next(1 - 20)
    Next(2 - 20)
    Next(0 - 30)
    Next(1 - 30)
    Next(2 - 30)
    Completed
    
    --- flatMapLatest with delay example ---
    2016-04-12 19:30:50.309: flatMapSequence -> subscribed
    2016-04-12 19:30:50.310: sequence -> subscribed
    2016-04-12 19:30:50.318: sequence -> Event Next(0) // SO сгенерировала 1й элемент, и на его основе создали sA
    2016-04-12 19:30:50.319: sA -> subscribed // подписались на sA
    2016-04-12 19:30:50.319: sA -> Event Next(10) // он генерирует элемент
    2016-04-12 19:30:50.319: flatMapSequence -> Event Next(10) // flatMap генерирует новый элемент 
    Next(10) // и он попадает в RO
    2016-04-12 19:30:51.310: sequence -> Event Next(1) // SO сгенерировала 2й элемент, и на его основе создали sA
    2016-04-12 19:30:51.311: sA -> disposed // и хоть sA не успел сгенерировать все элементы, от него мы отписываемся
    2016-04-12 19:30:51.311: sB -> subscribed // и подписываемся на новый Observable sB
    2016-04-12 19:30:51.311: sB -> Event Next(100)
    2016-04-12 19:30:51.311: flatMapSequence -> Event Next(100)
    Next(100)
    2016-04-12 19:30:52.310: sequence -> Event Next(2)
    2016-04-12 19:30:52.311: sB -> disposed
    2016-04-12 19:30:52.311: sC -> subscribed
    2016-04-12 19:30:52.311: sC -> Event Next(1000)
    2016-04-12 19:30:52.311: flatMapSequence -> Event Next(1000)
    Next(1000)
    2016-04-12 19:30:53.372: sequence -> Event Completed
    2016-04-12 19:30:53.372: sequence -> disposed
    2016-04-12 19:30:53.372: sC -> Event Next(2000)
    2016-04-12 19:30:53.372: flatMapSequence -> Event Next(2000)
    Next(2000)
    2016-04-12 19:30:54.501: sC -> Event Completed
    2016-04-12 19:30:54.501: sC -> disposed
    2016-04-12 19:30:54.501: flatMapSequence -> Event Completed
    Completed
    


    Первый пример показывает, что т.к. Observable успевают сгенерировать свои элементы, ко времени когда приходит очередь подписаться на новый Observable — предыдущий уже успел сгенерировать все свои элементы, поэтому в RO попадают элементы со всех Observable
    Во втором примере благодаря задержкам в генерации мы видим, что как только генерируется новый Observable — происходит отписка от предыдущего Observable




    flatMapWithIndex



    Каждый элемент SO превращается в отдельный Observable, и все элементы из [O1, O2, O3…] объединяются в RO. Порядок генерации элементов в RO зависит от времени их генерации в исходных [O1, O2, O3…] (как в команде merge). Отличие от flatMap в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента



    example("flatMapWithIndex") {
        let sequence:Observable<Int> = Observable.of(10, 20, 30)
        let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx  in Observable.of("A", "B", "C").map{"index: (\(idx)) - \($0) - \(val)"} }
        print(flatMapSequence.dynamicType)
        
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    
    example("flatMapWithIndex with wait") {
        let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 }
        let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in
            createSequenceWithWait(["A","B","C"], waitTime: 2) { element in
                "index: (\(idx)) - \(element) - \(val)"
            }
        }
        print(flatMapSequence.dynamicType)
        
        flatMapSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    FlatMapWithIndex<Int, Observable<String>>
    Next(index: (0) - A - 10)
    Next(index: (0) - B - 10)
    Next(index: (0) - C - 10)
    Next(index: (1) - A - 20)
    Next(index: (1) - B - 20)
    Next(index: (1) - C - 20)
    Next(index: (2) - A - 30)
    Next(index: (2) - B - 30)
    Next(index: (2) - C - 30)
    Completed
    
    --- flatMapWithIndex with wait example ---
    FlatMapWithIndex<Int, Observable<String>>
    Next(index: (0) - A - 0)
    Next(index: (1) - A - 1)
    Next(index: (0) - B - 0)
    Next(index: (2) - A - 2)
    Next(index: (1) - B - 1)
    Next(index: (0) - C - 0)
    Next(index: (2) - B - 2)
    Next(index: (1) - C - 1)
    Next(index: (2) - C - 2)
    Completed
    





    map



    Observable<T> -> Observable<U>
    


    Элементы SO преобразовываются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов.



    example("map") {
        let sequence = Observable.of(1, 2, 3)
            .map{ "\($0 * 5)" }
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- map example ---
    Next(5)
    Next(10)
    Next(15)
    Completed
    





    mapWithIndex



    Observable<T> -> Observable<U>
    


    Элементы SO преобразовываются, не меняя порядок их генерации. Можно менять не только значение, но и тип элементов. Отличие от map в том, что в качестве еще одного параметра переданного в функцию является индекс сгенерированного элемента



    example("mapWithIndex") {
        let sequence = Observable.of("A", "B", "C")
            .mapWithIndex({ "\($0) / \($1)" })
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- mapWithIndex example ---
    Next(A / 0)
    Next(B / 1)
    Next(C / 2)
    Completed
    





    window



    SO = Observable<T>
    RO = Observable<Observable<T>>
    


    Элемент из SO по определенным правилам передаются в генерирующиеся новые Observable. В качестве параметров передается count, — максимальное число элементов которые будут сгенерированы каждым Observable, и timeSpan — время максимального ожидания наполнения текущего Observable из элементов SO. Таким образом элемент RO, являет собой Observable число генерируемых элементов которого равно от 0 до N. Основным отличием от bufffer — элементы SO зеркалятся сгенерированными Observable моментально, а в случае buffer — мы вынуждены ждать указанное в качестве параметра максимальное время (если буфер не заполнится раньше)



    example("window") {
        let varA = Variable<Int>(0)
        
        let bufferSequence:Observable<Observable<Int>> = varA.asObservable()
            .window(timeSpan: 3, count: 3, scheduler: MainScheduler.instance)
            .debug("bufferSequence")
        bufferSequence.subscribe { e in
            if case .Next(let observable) = e {
                print("\(NSDate()) - генерируем новый Observable")
                observable.subscribe { val in
                    print(val)
                    
                }
            }
        }
        varA.value = 1
        varA.value = 2
        varA.value = 3
        delay(4) {
            varA.value = 4
            
            varA.value = 5
            delay(4) {
                varA.value = 6
            }
        }
    }
    


    Консоль:
    
    --- window example ---
    2016-04-12 19:51:54.372: bufferSequence -> subscribed
    2016-04-12 19:51:54.373: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
    2016-04-12 16:51:54 +0000 - генерируем новый Observable
    Next(0)
    Next(1)
    Next(2)
    Completed
    2016-04-12 19:51:54.377: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
    2016-04-12 16:51:54 +0000 - генерируем новый Observable
    Next(3)
    Completed
    2016-04-12 19:51:57.378: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
    2016-04-12 16:51:57 +0000 - генерируем новый Observable
    Next(4)
    Next(5)
    Completed
    2016-04-12 19:52:00.380: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>)
    2016-04-12 16:52:00 +0000 - генерируем новый Observable
    Next(6)
    Completed
    2016-04-12 19:52:02.895: bufferSequence -> Event Completed
    


    В примере используются временные задержки что помогает добиться частичной наполненности генерируемых Observable

    Операторы математические и агрегирования






    reduce



    Каждый элемент SO преобразуется с помощью переданной функции, результат операции передается в качестве параметра в функцию на следующем шаге. Как только SO генерирует терминальное состояние, RO генерирует результат, т.е. RO сгенерирует лишь один элемент.



    example("reduce") {
        let sequence = Observable.of(1, 2, 3, 4)
            .reduce(1) { $0 * $1 }
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- reduce example ---
    Next(24)
    Completed
    





    scan



    Каждый элемент SO преобразуется с помощью переданной функции, результат операции генерируется в RO, но кроме этого оно передается в качестве параметра в функцию на следующем шаге. В отличии от reduce число элементов в RO равно числу элементов в SO.



    example("scan") {
        let sequence = Observable.of(1, 2, 3).scan(10) { result, element in
            return result + element
        }
        sequence.subscribe { e in
            print(e)
        }
    }
    
    example("scan multiply") {
        let sequence = Observable.of(2, 3, 5).scan(10) { result, element in
            return result * element
        }
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- scan example ---
    Next(11)
    Next(13)
    Next(16)
    Completed
    
    --- scan multiply example ---
    Next(20)
    Next(60)
    Next(300)
    Completed
    





    toArray



    SO = Observable<T>
    RO = Observable<[T]>   
    


    Все элементы из SO после генерации терминального состояния объединяются в массив и генерируются RO



    example("toArray") {
        let sequence = Observable.of(1, 2, 3)
    
        let arraySequence = sequence.toArray()
        arraySequence.subscribe { e in
                print(e)
            }
    }
    


    Консоль:
    --- toArray example ---
    Next([1, 2, 3])
    Completed
    


    Работа с ошибками






    catchError



    Позволяет перехватить генерированную ошибку из SO и заменить ее на новый Observable, который теперь будет генерировать элементы



    example("with catchError") {
        let sequenceWithError = Observable<Int>.create { observer in
            observer.on(.Next(1))
            observer.on(.Next(2))
            observer.on(.Next(3))
            observer.on(.Next(4))
            observer.onError(RxError.Unknown)
            observer.on(.Next(5))
            return NopDisposable.instance
        }
        let sequenceIgnoreError = sequenceWithError.catchError{ error in
            return Observable.of(10, 11, 12)
        }
        
        sequenceIgnoreError.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- with catchError example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4) 
    Next(10)
    Next(11)
    Next(12)
    Completed
    


    После генерации элемента 4, была сгенерирована ошибка RxError.Unknown, но мы её перехватили и вернули взамен новый Observable




    catchErrorJustReturn



    Позволяет перехватить генерированную ошибку из SO и заменить её на указанный элемент, после этого SO генерирует Completed



    example("with catchErrorJustReturn") {
        let sequenceWithError = Observable.of(1, 2, 3, 4)
            .concat(Observable.error(RxError.Unknown))
            .concat(Observable.just(5))
        let sequenceIgnoreError = sequenceWithError.catchErrorJustReturn(-1)
        sequenceIgnoreError.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- with catchErrorJustReturn example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Next(-1)
    Completed
    


    После генерации элемента 4, была сгенерирована ошибка RxError.Unknown, но мы её перехватили и вернули взамен элемент -1




    retry



    Позволяет перехватить генерированную ошибку из SO и в зависимости от переданного параметра попытаться запустить SO c начала нужное число раз в надежде что ошибка не повторится



    example("retry full sequence") {
        let sequenceWithError = Observable.of(1, 2, 3, 4).concat(Observable.error(RxError.Unknown))
        let wholeSequenceWithErrorRetry = sequenceWithError.retry(2)
        
        wholeSequenceWithErrorRetry.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- retry full sequence example ---
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Next(1)
    Next(2)
    Next(3)
    Next(4)
    Error(Unknown error occured.)
    

    Т.к. был применен оператор retry(2) — мы один раз повторили продписку на SO, но ошибка повторилась, и была сгенерирована в RO
    Таким образом retry(1) — не сделает ни одного повтора




    retryWhen



    Позволяет перехватить сгенерированную ошибку из SO и в зависимости от типа ошибки мы либо повторно генерируем ошибку, которая пробрасывается в RO и на этом выполнение заканчивается, либо генерируем Observable (tryObservable), генерация каждого корректного элемента которого выполнит повторную подписку на SO, в надежде что ошибка исчезнет. Если tryObservable заканчивается ошибкой — она пробрасывается в RO и на этом выполнение заканчивается



    example("retryWhen") {
        var counter = 0
        let sequenceWithError = Observable<Int>.create { observer in
            observer.on(.Next(1))
            observer.on(.Next(2))
            observer.on(.Next(3))
            observer.on(.Next(4))
            counter += 1
            if counter < 3 {
                observer.onError(RxError.Unknown)
            } /*else {
                observer.onError(RxError.Overflow)
            }*/
            observer.on(.Next(5))
            return NopDisposable.instance
        }.debug("with error")
        let sequenceWithoutError = Observable<Int>.create { observer in
            observer.on(.Next(10))
    //observer.onError(RxError.NoElements)
            return NopDisposable.instance
            }.debug("without error")
        let retrySequence = sequenceWithError.retryWhen{ (error: Observable<RxError>) -> Observable<Int> in
            let seq:Observable<Int> = error.flatMap { (generatedError: RxError) -> Observable<Int> in
                if case .Unknown = generatedError {
                    return sequenceWithoutError
                }
                return Observable<Int>.error(generatedError)
            }
            return seq
    
        }//.debug()
     
        retrySequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    
    --- retryWhen example ---
    2016-04-12 20:18:04.484: with error -> subscribed
    2016-04-12 20:18:04.485: with error -> Event Next(1)
    Next(1)
    2016-04-12 20:18:04.486: with error -> Event Next(2)
    Next(2)
    2016-04-12 20:18:04.486: with error -> Event Next(3)
    Next(3)
    2016-04-12 20:18:04.487: with error -> Event Next(4)
    Next(4)
    2016-04-12 20:18:04.487: with error -> Event Error(Unknown error occured.)
    2016-04-12 20:18:04.488: without error -> subscribed
    2016-04-12 20:18:04.488: without error -> Event Next(10)
    2016-04-12 20:18:04.489: with error -> disposed
    2016-04-12 20:18:04.489: with error -> subscribed
    2016-04-12 20:18:04.489: with error -> Event Next(1)
    Next(1)
    2016-04-12 20:18:04.490: with error -> Event Next(2)
    Next(2)
    2016-04-12 20:18:04.490: with error -> Event Next(3)
    Next(3)
    2016-04-12 20:18:04.490: with error -> Event Next(4)
    Next(4)
    2016-04-12 20:18:04.491: with error -> Event Error(Unknown error occured.)
    2016-04-12 20:18:04.491: without error -> subscribed
    2016-04-12 20:18:04.492: without error -> Event Next(10)
    2016-04-12 20:18:04.492: with error -> disposed
    2016-04-12 20:18:04.492: with error -> subscribed
    2016-04-12 20:18:04.493: with error -> Event Next(1)
    Next(1)
    2016-04-12 20:18:04.493: with error -> Event Next(2)
    Next(2)
    2016-04-12 20:18:04.493: with error -> Event Next(3)
    Next(3)
    2016-04-12 20:18:04.494: with error -> Event Next(4)
    Next(4)
    2016-04-12 20:18:04.494: with error -> Event Next(5)
    Next(5)
    


    Я встроил инкремент переменной i в генерацию sequenceWithError, чтобы на 3й попытке — ошибка исчезла. Если раскоментировать генерацию ошибку RxError.Overflow — мы её не перехватим в операторе retryWhen и пробросим в RO

    Операторы для работы с Connectable Observable






    multicast



    Позволяет проксировать элементы из исходной SO на Subject переданный в качестве параметра. Подписываться нужно именно на этот Subject, генерация элементов Subject начнется после вызова оператора connect.

    example("multicast") {
        let subject = PublishSubject<Int>()
        let firstSequence = createSequenceWithWait([0,1,2,3,4,5]) { $0 }
            .multicast(subject)
        delay(2) {
            _ = subject.subscribe { e in
                print("first: \(e)")
            }
        }
        delay(3) {
            _ = subject.subscribe { e in
                print("second: \(e)")
            }
        }
        firstSequence.connect()
    }
    


    Консоль:
    --- multicast example ---
    first: Next(2)
    first: Next(3)
    second: Next(3)
    first: Next(4)
    second: Next(4)
    first: Next(5)
    second: Next(5)
    first: Completed
    second: Completed
    





    publish



    publish = multicast + replay subject
    Позволяет создавать Connectable Observable, которые не генерируют события даже после subscribe. Для старта генерации таким Observable нужно дать команду connect. Это позволяет подписать несколько Observer к одному Observable и начать генерировать элементы одновременно, вне зависимости от того, когда был выполнен subscribe



    example("subscribe connectable sequnce with connect") {
        let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence").publish()
        var disposable1: Disposable!
        var disposable2: Disposable!
        disposable1 = sequence.subscribe { e in
            print("first: \(e)")
        }
        delay(2) {
            disposable2 = sequence.subscribe { e in
                print("second: \(e)")
            }
        }
        
        delay(4) {
            sequence.connect()
        }
        
        delay(8) {
            disposable1.dispose()
            disposable2.dispose()
        }
    }
    


    Консоль:
    --- subscribe connectable sequnce with connect example ---
    2016-04-12 21:35:32.130: sequence -> subscribed
    2016-04-12 21:35:33.131: sequence -> Event Next(0)
    first: Next(0)
    second: Next(0)
    2016-04-12 21:35:34.131: sequence -> Event Next(1)
    first: Next(1)
    second: Next(1)
    2016-04-12 21:35:35.132: sequence -> Event Next(2)
    first: Next(2)
    second: Next(2)
    2016-04-12 21:35:36.132: sequence -> Event Next(3)
    2016-04-12 21:35:37.132: sequence -> Event Next(4)
    


    Как видно, хоть подписка была произведена в разное время, пока не вызвали команду connect — генерация элементов не началась. Зато благодаря команде debug видно, что даже после того как все отписались — последовательность продолжила генерировать элементы




    refCount



    Позволяет создать обычный Observable из Connectable. После первого вызова subscribe к этому обычному Observable — происходит подписка Connectable на SO.
    Получается что то вроде
    publishSequence = SO.publish()
    refCountSequence = publishSequence.refCount()

    SO будет продолжать генерировать элементы до тех пор, пока есть хотя бы один подписанный на refCountSequence. Как только все подписки на refCountSequence аннулируются, — происходит отписка и publishSequence от SO



    example("with refCount") {
        let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence")
        let publishSequence = sequence.publish() // создаем Connectable Observable
        let refCountSequence = publishSequence.refCount().debug("refCountSequence")
        let subscription1 = refCountSequence.subscribe{ e in
            print("first: \(e)")
        }
        let subscription2 = refCountSequence.subscribe{ e in
            print("second: \(e)")
        }
        
        delay(2) {
            subscription1.dispose() // отписываемся в первый раз
            
        }
        delay(3) {
            subscription2.dispose() // здесь мы отписываемся во второй, больше подписок на Observable созданный с помощью refCount  нет. Поэтому этот Observable отпсиывается от SO
            
        }
        delay(5) {
            _ = refCountSequence.subscribe { e in
                print("after: \(e)")
            }
        }
    }
    
    


    Консоль:
    --- with refCount example ---
    2016-04-12 20:25:24.154: refCountSequence -> subscribed // подписались на refCountSequence в 1й раз
    2016-04-12 20:25:24.155: sequence -> subscribed // после этого publishSequence подписался на SO
    2016-04-12 20:25:24.156: refCountSequence -> subscribed // // подписались на refCountSequence во 2й раз
    2016-04-12 20:25:25.156: sequence -> Event Next(0)
    2016-04-12 20:25:25.156: refCountSequence -> Event Next(0)
    first: Next(0)
    2016-04-12 20:25:25.156: refCountSequence -> Event Next(0)
    second: Next(0)
    2016-04-12 20:25:26.156: sequence -> Event Next(1)
    2016-04-12 20:25:26.156: refCountSequence -> Event Next(1)
    first: Next(1)
    2016-04-12 20:25:26.157: refCountSequence -> Event Next(1)
    second: Next(1)
    2016-04-12 20:25:26.353: refCountSequence -> disposed // отписались от refCountSequence в 1й раз
    2016-04-12 20:25:27.156: sequence -> Event Next(2) // SO продолжает генерировать элементы, т.к. есть еще одна подписка на refCountSequence
    2016-04-12 20:25:27.157: refCountSequence -> Event Next(2)
    second: Next(2)
    2016-04-12 20:25:27.390: refCountSequence -> disposed // отписались от refCountSequence во 2й раз
    2016-04-12 20:25:27.390: sequence -> disposed // подписок на refCountSequence больше н еосталось, поэтому publishSequence отписался на SO
    2016-04-12 20:25:29.157: refCountSequence -> subscribed // подписались на refCountSequence снова
    2016-04-12 20:25:29.157: sequence -> subscribed // т.к. это первая подписка - publishSequence заново подписался на SO
    2016-04-12 20:25:30.158: sequence -> Event Next(0)
    2016-04-12 20:25:30.159: refCountSequence -> Event Next(0)
    after: Next(0)
    2016-04-12 20:25:31.158: sequence -> Event Next(1)
    2016-04-12 20:25:31.159: refCountSequence -> Event Next(1)
    after: Next(1)
    2016-04-12 20:25:32.159: sequence -> Event Next(2)
    2016-04-12 20:25:32.159: refCountSequence -> Event Next(2)
    after: Next(2)
    .... 
    далее бесконечно генерируются элементы
    





    replay



    Если SO обычный, — конвертирует его в Connectable. И после этого все кто подпишутся на него после вызова connect() — мгновенно получат в качестве первых элементов последние генерированные N элементов. Даже если отпишутся все, — Connectable будет продолжать генерировать элементы



    example("replay") {
        let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(2)
        
        let firstDisposable = firstSequence.subscribe { e in
            print("first: \(e)")
        }
        
        firstSequence.connect()
        
        var secondDisposable: Disposable!
        delay(3) {
            secondDisposable = firstSequence.subscribe { e in
            print("second: \(e)")
            }
        }
        
        delay(4) {
            firstDisposable.dispose()
        }
        delay(5) {
            secondDisposable.dispose()
        }
        
        delay(7) {
            firstSequence.subscribe { e in
                print("third: \(e)")
            }
        }
    }
    


    Консоль:
    --- replay example ---
    first: Next(0)
    first: Next(1)
    first: Next(2)
    second: Next(1)
    second: Next(2)
    first: Next(3) // после генерации этого элемента мы отписываемся во 1й раз
    second: Next(3)
    second: Next(4) //после генерации этого элемента мы отписываемся во 2й раз, но SO продолжает генерировать элементы
    //тут мы после задержки подписываемся в третий раз
    third: Next(5)
    third: Next(6)
    





    replayAll



    Если SO обычный, — конвертирует его в Connectable. Все кто подпишутся на него после вызова connect() — получат сначала все элементы, которые были генерированы ранее. Даже если отпишутся все, — Connectable будет продолжать генерировать элементы



    example("replayAll") {
        let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replayAll()
        
        let firstDisposable = firstSequence.subscribe { e in
            print("first: \(e)")
        }
        
        firstSequence.connect()
        
        var secondDisposable: Disposable!
        delay(3) {
            secondDisposable = firstSequence.subscribe { e in
                print("second: \(e)")
            }
        }
        
        delay(4) {
            firstDisposable.dispose()
        }
        delay(5) {
            secondDisposable.dispose()
        }
        
        delay(7) {
            firstSequence.subscribe { e in
                print("third: \(e)")
            }
        }
    }
    


    Консоль:
    --- replayAll example ---
    first: Next(0)
    first: Next(1)
    first: Next(2)
    second: Next(0)
    second: Next(1)
    second: Next(2)
    first: Next(3) // после генерации этого элемента мы отписываемся во 1й раз
    second: Next(3)
    second: Next(4)
    //после генерации этого элемента мы отписываемся во 2й раз, но SO продолжает генерировать элементы
    //тут мы после задержки подписываемся в третий раз
    third: Next(0)
    third: Next(1)
    third: Next(2)
    third: Next(3)
    third: Next(4)
    third: Next(5)
    third: Next(6)
    third: Next(7)
    


    Вспомогательные методы






    debug



    RO полностью дублирует SO, но логируются все события с временной меткой

    example("debug") {
        let sequence = Observable<AnyObject>.of(1, 2, 3)
            .debug("sequence")
            .subscribe{}
    }
    


    Консоль:
    --- debug example ---
    2016-04-12 21:41:08.467: sequence -> subscribed
    2016-04-12 21:41:08.469: sequence -> Event Next(1)
    2016-04-12 21:41:08.469: sequence -> Event Next(2)
    2016-04-12 21:41:08.469: sequence -> Event Next(3)
    2016-04-12 21:41:08.469: sequence -> Event Completed
    





    do / doOnNext



    RO полностью дублирует SO, но мы встраиваем перехватчик всех событий из жизненного цикла SO

    example("simple doOn") {
        let firstSequence = Observable.of(1,2).doOn{e in
            print(e)
        }
        firstSequence.subscribeNext{ e in // замечу что логирование производится не в методе subscribe, как обычно, а в doOn
        }
    }
    


    Консоль:
    --- simple doOn example ---
    Next(1)
    Next(2)
    Completed
    





    delaySubscription



    Дублирует элементы из SO в RO, но с временной задержкой указанной в качестве параметра



    example("delaySubscription") {
        let sequence = Observable.of(1, 2, 3).debug("sequence")
            .delaySubscription(3, scheduler: MainScheduler.instance).debug("delayed sequence")
        
        
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- delaySubscription example ---
    2016-04-12 21:44:05.226: delayed sequence -> subscribed // подписка на delayed sequence произошла в 5 секунд
    2016-04-12 21:44:08.228: sequence -> subscribed // а подписка на SO произошла только через указанных 3 секунды
    2016-04-12 21:44:08.229: sequence -> Event Next(1)
    2016-04-12 21:44:08.229: delayed sequence -> Event Next(1)
    Next(1)
    2016-04-12 21:44:08.229: sequence -> Event Next(2)
    2016-04-12 21:44:08.229: delayed sequence -> Event Next(2)
    Next(2)
    2016-04-12 21:44:08.229: sequence -> Event Next(3)
    2016-04-12 21:44:08.229: delayed sequence -> Event Next(3)
    Next(3)
    2016-04-12 21:44:08.230: sequence -> Event Completed
    2016-04-12 21:44:08.230: delayed sequence -> Event Completed
    Completed
    2016-04-12 21:44:08.230: sequence -> disposed
    





    observeOn



    Указывает на каком Scheduler выполнять свою работу Observer, особенно критично при работе с GUI

    example("without observeOn") {
        let sequence = Observable<AnyObject>.of(1, 2, 3)
        
        sequence.subscribe { e in
            print("\(NSThread.currentThread())\(e)")
        }
    }
    
    example("with observeOn") {
        let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
        let sequence = Observable<AnyObject>.of(1, 2, 3)
    
        sequence.observeOn(ConcurrentDispatchQueueScheduler.init(queue: queue))
            .subscribe { e in
                print("\(NSThread.currentThread())\(e)")
        }
    }
    
    


    Консоль:
    --- without observeOn example ---
    <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(1)
    <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(2)
    <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(3)
    <NSThread: 0x7fac1ac13240>{number = 1, name = main}Completed
    
    --- with observeOn example ---
    <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(1)
    <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(2)
    <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(3)
    <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Completed
    


    Как видно, благодаря observeOn мы смогли выполнить код внутри subscribe на другом потоке




    subscribe



    Оператор, связывающий Observable с Observer, позволяет подписаться на все события из Observable

    example("subscribe") {
        let firstSequence = Observable.of(1)
        
        firstSequence.subscribe { e in
            print(e)
        }
        
        firstSequence.subscribeCompleted {
            print("!completed")
        }
        
        firstSequence.subscribeNext{next in
            print("next: \(next)")
        }
    }
    
    example("subscribeNext") {
        let firstSequence = Observable.of(1)
        
        firstSequence.subscribeNext{next in
            print("next: \(next)")
        }
    }
    
    example("subscribeCompleted") {
        let firstSequence = Observable.of(1)
        
        firstSequence.subscribeCompleted {
            print("!completed")
        }
    }
    
    example("subscribeError") {
        let firstSequence = Observable<Int>.error(RxError.ArgumentOutOfRange)
        
        firstSequence.subscribeError {e in
            print("!error \(e)")
        }
    }
    


    Консоль:
    
    --- subscribe example ---
    Next(1)
    Completed
    !completed
    next: 1
    
    --- subscribeNext example ---
    next: 1
    
    --- subscribeCompleted example ---
    !completed
    
    --- subscribeError example ---
    !error Argument out of range.
    
    


    Продемонстрированы 4 формы: subscribe, subscribeNext, subscribeCompleted, subscribeError




    subscribeOn



    Указывает на каком Scheduler выполнять свою работу Observable, особенно критично при работе с GUI

    example("with subscribeOn and observeOn") {
        let queue1 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
        let queue2 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
        print("init thread: \(NSThread.currentThread())")
        let sequence = Observable<Int>.create { observer in
            print("observable thread: \(NSThread.currentThread())")
            observer.on(.Next(1))
            observer.on(.Next(2))
            observer.on(.Next(3))
            observer.on(.Completed)
            return NopDisposable.instance
            }
            .subscribeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue1"))
            .observeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue2"))
        
        
        sequence.subscribe { e in
            print("observer thread: \(NSThread.currentThread()) \(e)")
        }
    }
    


    Консоль:
    --- with subscribeOn and observeOn example ---
    init thread: <NSThread: 0x7ff6914132b0>{number = 1, name = main} // изначально мы были на потоке #1
    observable thread: <NSThread: 0x7ff6916a0cb0>{number = 4, name = (null)} // за счет subscribeOn код внутри Observable выполняется на потоке #4 а не #1
    observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(1) // а код observer'а выполняется и не в 1м и не в 4м, а в других за счет observeOn. Если бы мы не указали observeOn, номер потока был такой же как после subscribeOn #4
    observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(2)
    observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Next(3)
    observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Completed
    





    timeout



    Дублирует элементы из SO в RO, но если в течении указанного времени SO не сгенерировало ни одного элемента — RO генерирует ошибку



    example("failed timeout ") {
        let sequence = createSequenceWithWait([1,2,3,4]) { $0 }
        let timeoutSequence = sequence.timeout(0.9, scheduler: MainScheduler.instance)
        timeoutSequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- failed timeout  example ---
    Next(1)
    Error(Sequence timeout.)
    





    using



    Позволяет проинструктировать Observable создать ресурс, который будет жить лишь пока жив RO, в качестве параметров передаются 2 фабрики, одна генерирует ресурс, вторая — Observable, у которых будет единое время жизни

    class FakeDisposable: Disposable {
        func dispose() {
            print("disposed")
        }
    }
    
    example("using") {
        let sequence = Observable.using({
            return FakeDisposable()
            }, observableFactory: { d in
                Observable.just(1)
        }) as Observable<Int>
    
        
        sequence.subscribe { e in
            print(e)
        }
    }
    


    Консоль:
    --- using example ---
    Next(1)
    Completed
    disposed
    


    Как видно, после того как Observable закончил генерировать элементы, у нашего ресурса FakeDisposable был вызван метод dispose
    Поделиться публикацией
    Ой, у вас баннер убежал!

    Ну. И что?
    Реклама
    Комментарии 21
      0
      -
        0
        Это стёртый комментарий, или что то не так в статье?
          0
          Все, увидел в почте. Спасибо за RxMarbles на iOS, по первости очень помогало )
          0
          Спасибо за труд. Что касается PDF-ки, то Adobe Acrobat Reader утверждает что она испорченная.
            0
            Странно. Вы скачиваете с гитхаба и смотрите на PC, Маке или iPad? PDF я ровнял в деме Acrobat DC под маком. Открывается встроенным просмотрщиком.
            На PC у меня стоит PDF-XChange View, показывает прекрасно даже при заходе через браузер в предпросмотре. Поставил читалку от Adobe — тоже все читает.
            Попробуйте обновить Acrobat или сменить PDF читалку и отпишитесь о результате, если не сложно. А может PDF скачался ошибками?
            0
            Спасибо!
              0
              Рад, что пригодилось )
              0
              Спасибо за статью.
              Скажите, в чем создавали графики?
                0
                Рад, что не зря заморочился )
                Sketch на маке, рекомендую. Хоть это и полноценный графический редактор, а не построитель диаграмм, но благодаря его простоте, после пары минут тюнинга — это стало не важно.
                0
                Спасибо
                  0
                  Всегда пожалуйста.
                  0
                  Автор, ты большой молодец! Это действительно большой труд и лично от себя говорю огромное спасибо! Я сам писал статью про RxSwift и рад, что в русскоязычном сообществе наконец-то подхватили эту тему :-)
                    0
                    Спасибо на добром слове. Тема действительно очень интересная, но при этом относиться к ней нужно с осторожностью. Мало того что нужно переформатировать в голове подход к написанию кода, есть очень много нюансов: потоки, использование последних значений, горячая/холодная подписка и т.д. Чтобы написать качественный материал, нужно самому хорошо во всем разобраться. Сейчас отвлекся на другие проекты, но надеюсь попозже осветить RxSwift и со стороны GUI.
                    0
                    хотелось бы взять решение изначально спроектированное с учетом всех плюшек языка

                    Сильное заявление, конечно, особенно учитывая тот факт, что RAC 4 использует куда больше плюшек языка, несмотря на то что «вырос с Objective-C».
                      0
                      Мог быть не прав, судил по тем обзорам, что были на момент выбора.
                      Если у вас есть опыт, — может поделитесь чем RAC лучше RxSwift? Я думаю всем будет интересно, мне в первую очередь.
                        +1
                        • RAC рзаличает холодные и горячие observables на уровне типов, их нельзя просто так скомпоновать, что увеличивает количество ошибок, обнаруживаемых компилятором. Однако RxSwift считает это недостатком, и это можно понять, так как это увеличивает порог входа для новых разработчиков.
                        • RAC позволяет задать тип ошибки сигнала. Если использовать тип NoError (специальный тип, фактически enum без единого кейза) в качестве типа ошибки, получим compile-time гарантию того, что этот сигнал действительно никогда не завершится ошибкой. Это очень важно, например, для UI-биндингов.

                        В целом сложно однозначно сказать, что хуже, а что лучше. RxSwift — почти идетничный порт Rx.NET, что дает пользователям RxSwift возможность легко переносить опыт с других платформ, на которых есть Rx, это однозначно плюс.


                        RAC же, в свою очередь, пытается решить проблемы оригинального Rx, предоставить больше compile-time гарантий ценой ухода от оригинального Rx API.

                          0
                          Да, про эти два различия я уже встречал. И пожалуй по обоим пунктах мне более импонирует подход RAC.
                          Т.к. все что отслеживается на уровне компиляции — благо.
                          В RxSwift нужно при работе с GUI постоянно перехватывать ошибки и наворачивать защиту вида catchError и возвращать другой Observable, или полноценно работать с retry/retryWhen.

                          С другой стороны — переносимость Rx меня и подкупила.
                          Но вы упомянули про большее количество Swift плюшек у RAC 4, не моли бы вы привести примеры, если не сложно.
                            0

                            Ну, по мне эти фичи как раз показывают разумное использование системы типов (а это вполне себе Swift-плюшка) во благо.


                            Ну и конечно protocol extension, кастомные операторы для биндинга, etc. Но это и в RxSwift есть.

                              0
                              Ясно, спасибо за то, что поделились опытом. При наличии свободно времени — посмотрю в сторону RAC. В любом случае выбор между RxSwift и RAC изначально был сделан не по религиозным причинам, так что препятствий для изучения обоих фрейморков нет.
                      +1
                      Мы выложили RxMarbles в опенсорс https://github.com/RxSwiftCommunity/RxMarbles.
                        0
                        Отлично, большое спасибо )

                      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                      Самое читаемое