Как стать автором
Обновить

Создание своего Publisher'a в Combine

Время на прочтение13 мин
Количество просмотров8.7K


Сегодня я хотел бы показать вам, как создать свой собственный Publisher в новом фреймворке от Apple Combine.


И так для начала нам нужно кратко вспомнить, как взаимодействуют между собой фундаментальные части Combine, а именно Publisher, Subscription, Subscriber.


  • Subscriber присоединяется к Publisher'у
  • Publisher отправляет Subscription Subscriber'у
  • Subscriber запрашивает N значений у Subscription
  • Publisher отправляет N значений или меньше
  • Publisher отправляет сигнал о завершении

Publisher


Что же приступим созданию нашего Publisher'a. Если обратиться к документации Apple, то мы увидим, что Publisher это протокол.


public protocol Publisher {

    associatedtype Output
    associatedtype Failure : Error

    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}

Где Output — тип передаваемых значений этим Publisher'ом, Failure — тип ошибки, который должен следовать протоколу Error.


И функция receive(_: Subscriber), которая будет вызвана для добавления Subscriber'a к этому Publisher'у с помощью subscribe(_:).


В качестве примера реализуем Publisher, который будет генерировать для нас числа Фибоначчи.


struct FibonacciPublisher: Publisher {  
    typealias Output = Int
    typealias Failure = Never
}

Так как последовательность состоит из чисел, то в качестве выдаваемого значения Output будет тип Int, в качестве Failure будет особый тип Never, говорящий о том, что данный Publisher никогда не завершится с ошибкой.


Для гибкости укажем предельное количество элементов, которые мы хотим получить и обернем это значение в некий объект конфигурации нашего Publisher.


struct FibonacciConfiguration {
    var count: UInt
}

Рассмотрим этот код более внимательно, var count: UInt выглядит неплохим вариантом, но его использование ограничивает нас областью допустимых значений типа UInt и так же не совсем понятно, что указать, если мы хотим все же иметь неограниченную последовательность.


Вместо UInt воспользуемся типом Subscribers.Demand, который определен в Combine, там же он описывается как тип, который посылается от Subscriber'a к Publisher'у через Subscription. Если говорить по простому, то он показывает потребность в элементах, сколько элементов запрашивает Subscriber. unlimited — не ограничено, none — нисколько, max(N) — не больше N раз.


    public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible {
        public static let unlimited: Subscribers.Demand
        public static let none: Subscribers.Demand /// Эквивалент Demand.max(0)
        @inlinable public static func max(_ value: Int) -> Subscribers.Demand
        ....
    }

Перепишем FibonacciConfiguration поменяв тип на новый у count.


struct FibonacciConfiguration {
    var count: Subscribers.Demand
}

Вернемся к Publisher'у и реализуем метод receive(_: Subscriber), как мы помним этот метод нужен для того, чтобы добавить Subscriber к Publisher'у. И делает он это с помощью подписки Subscription, Publisher должен создать подписку и передать эту подписку подписчику.


    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)
        subscriber.receive(subscription: subscription)
    }

Это дженерик функция, которая принимает в качестве параметра Subscriber, причем выходные значения Publisher'a должны соответствовать входным значениям Subscriber'a (Output == S.Input), тоже самое и для ошибок. Это необходимо для "соединения" Publisher'a и Subscriber'a.


В самой функции создаем подписку FibonacciSubscription, в конструкторе передаем подписчика и конфигурацию. После этого подписка передается подписчику.


Наш Publisher готов, в итоге имеем:


struct FibonacciPublisher: Publisher {

    typealias Output = Int
    typealias Failure = Never

    var configuration: FibonacciConfiguration

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)
        subscriber.receive(subscription: subscription)
    }
}

Как вы видите сам Publisher не содержит никакой логики по генерации последовательности Фибоначчи, вся логика будет в классе подписки — FibonacciSubscription.


Как вы уже догадываетесь, класс FibonacciSubscription будет следовать протоколу Subscription, посмотрим на определение этого протокола.


public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
    func request(_ demand: Subscribers.Demand)
}

Функция request(_: Subscribers.Demand) сообщает Publisher'у, что он может отправить больше значений подписчику. Именно в этом методе и будет логика отправки чисел Фибоначчии.
Так же нам нужно реализовать следование протоколу Cancellable и реализовать функцию cancel().


public protocol Cancellable {
    func cancel()
}

И так же нужно следовать протоколу CustomCombineIdentifierConvertible и определить read-only переменную combineIdentifier.


public protocol CustomCombineIdentifierConvertible {
    var combineIdentifier: CombineIdentifier { get }
}

Тут есть уточнение, если прокрутить чуть ниже определения протокола CustomCombineIdentifierConvertible в Combine, то можно увидеть, что Combine предоставляет расширение для этого протокола, которое имеет вид -


extension CustomCombineIdentifierConvertible where Self : AnyObject {
    public var combineIdentifier: CombineIdentifier { get }
}

Что говорит нам о том, что определение переменной combineIdentifier: CombineIdentifier предоставляется по умолчанию, если тип, который следуют этому протоколу, так же следует протоколу AnyObject, а именно если этот тип класс. FibonacciSubscription — класс, поэтому мы получаем определение переменной по умолчанию.


Subscription


И так начнем реализовывать наш FibonacciSubscription.


private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {

    var subscriber: S?
    var configuration: FibonacciConfiguration
    var count: Subscribers.Demand

    init(subscriber: S?, configuration: FibonacciConfiguration) {
        self.subscriber = subscriber
        self.configuration = configuration
        self.count = configuration.count
    }

    ...
}

Как можно видеть FibonacciConfiguration содержит сильную ссылку на Subscriber, иначе говоря является владельцем Subscriber. Это важный момент, подписка ответственна за удержание подписчика, причем удерживать его она должна до тех пор, пока он не завершит работу, не завершится с ошибкой или с отменной.


Далее реализуем метод cancel() из протокола Cancellable.


func cancel() {
    subscriber = nil
}

Установка subscriber в nil делает его недоступным подписке.


Теперь мы готовы приступить к самой реализации отправки чисел Фибоначчи.
реализуем метод request(_: Subscribers.Demand).


func request(_ demand: Subscribers.Demand) {
        // 1
        guard count > .none else {
            subscriber?.receive(completion: .finished)
            return
        }
        // 2
        count -= .max(1)
        subscriber?.receive(0)
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }
        // 3
        count -= .max(1)
        subscriber?.receive(1)
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }
        // 4
        var prev = 0
        var current = 1
        var temp: Int
        while true {
            temp = prev
            prev = current
            current += temp
            subscriber?.receive(current)
            count -= .max(1)
            if count == .none {
                subscriber?.receive(completion: .finished)
                return
            }
        }
    }

1) С начала мы проверяем, сколько элементов может нам предоставить Publisher, если нисколько, то завершаем отправку и посылаем Subscriber'у сигнал о завершении отправки чисел.
2) Если потребность есть, то уменьшаем на единицу общее количество запрашиваемых чисел, отправляем Subscriber'у первый элемент последовательности Фибоначчи, а именно 0 и далее опять проверяем сколько еще элементов может нам предоставить Publisher, если нисколько, то отправляем Subscriber'у сигнал о завершении.
3) Такой же подход, как и во 2) пункте, но только для второго элемента в последовательности Фибоначии.
4) Если требуется больше, чем 2 элемента, то мы реализуем итеративный алгоритм нахождения чисел Фибоначчи, где на каждом шаге будем передавать очередное число из последовательности Фибоначии Subscriber'y и так же проверять сколько элементов еще может предоставить Publisher. Если Publisher больше не предоставляет новые числа, то отправляем Subscriber'у сигнал о завершении.


На данный момент мы написали такой код
struct FibonacciConfiguration {
    var count: Subscribers.Demand
}

struct FibonacciPublisher: Publisher {

    typealias Output = Int
    typealias Failure = Never

    var configuration: FibonacciConfiguration

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)
        subscriber.receive(subscription: subscription)
    }

}

private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {

    var subscriber: S?
    var configuration: FibonacciConfiguration
    var count: Subscribers.Demand

    init(subscriber: S?, configuration: FibonacciConfiguration) {
        self.subscriber = subscriber
        self.configuration = configuration
        self.count = configuration.count
    }

    func cancel() {
        subscriber = nil
    }

    func request(_ demand: Subscribers.Demand) {
        // 1
        guard count > .none else {
            subscriber?.receive(completion: .finished)
            return
        }
        // 2
        count -= .max(1)
        subscriber?.receive(0)
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }
        // 3
        count -= .max(1)
        subscriber?.receive(1)
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }
        // 4
        var prev = 0
        var current = 1
        var temp: Int
        while true {
            temp = prev
            prev = current
            current += temp
            subscriber?.receive(current)
            count -= .max(1)
            if count == .none {
                subscriber?.receive(completion: .finished)
                return
            }
        }
    }
}

Первое тестирование


Теперь протестируем, что у нас получилось, свой Publisher и Subscription у нас есть, не хватает Sibscriber'a, Combine предоставляет 2 Sibscriber'a из коробки это sink и assign.


  • sink — этот метод создает подписчика и сразу запрашивает неограниченное число значений.
  • assign — устанавливает каждый элемент от Publisher'a к свойству объекта.

sink хорошо подходит для нашей цели, особое внимание нужно обратить на то, что он запрашивает unlimited количество значений.


И тут нужно провести важное различие, наш Publisher в переменной count определяет количество элементов который может отдать наш Publisher и это условия определенно нами самими. В принципе мы могли обойтись и без этой переменной и не ограничиваться в передаче чисел Фибоначчи, но довольно скоро мы вышли бы за пределы области допустимых значений типа Int.
Случай с sink другой, каждый Subscriber определяет сколько значений он хочет получить, sink запрашивает неограниченное число значений, это значит что он будет получать значения до тех пор, пока не придет сигнал о завершении, ошибке или отмены.


Для удобства использования нашего Publisher'a добавим его создание в расширение протокола Publishers.


extension Publishers {
    private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher {
        FibonacciPublisher(configuration: configuration)
    }

    static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher {
        FibonacciPublisher(configuration: FibonacciConfiguration(count: count))
    }
}

И так опробуем наш Publisher


Publishers.fibonacci(count: .max(10))
    .sink { value in
        print(value, terminator: " ")
}
// print 0 1 1 2 3 5 8 13 21 34 - OK

А теперь граничные случаи


Publishers.fibonacci(count: .max(1))
    .sink { value in
        print(value, terminator: " ")
}
// prinst 0 - OK

Publishers.fibonacci(count: .max(2))
    .sink { value in
        print(value, terminator: " ")
}
// prints 0 1 - OK

Publishers.fibonacci(count: .none)
    .print() // логирует все события publisher'a
    .sink { value in
        print(value, terminator: " ")
}
// prints receive finished - OK

А что будет, если указать .unlimited?


Publishers.fibonacci(count: .unlimited)
    .print()
    .sink { value in
        print(value, terminator: " ")
}
// prints 0 1 1 2 3 5 8 13 21 ... и потом креш, вышли за диапазон типа Int.

Как же можно использовать .unlimited, но иметь возможность вывода нескольких чисел? Для этого нам понадобится оператор .prefix(_), который работает аналогичным образом как .prefix(_) из коллекций, а именно оставляет только первые N элементов.


Publishers.fibonacci(count: .unlimited)
    .print()
    .prefix(5)
    .sink { _ in }
// prints 0 1 1 2 3 cancel и потом креш, похоже опять вышли за диапазон типа Int.

В чем же проблема? Может в .prefix(_)? Проведем маленький эксперимент на на стандартной последовательность из Foundation.


// бесконечная последовательность 1 2 3 4 5 6 7 8 ...
1... 
.publisher
.print()
.prefix(5)
    .sink { _ in }
// prints 1 2 3 4 5 cancel - ОК

Как мы можем видеть, вышенаписанный код отработал корректно, значит проблема в нашей реализации Publisher.
Глянем на логи от .print() и увидим, что после N запросов, из .prefix(_) происходит вызов cancel() у нашего FibonacciSubscription, где мы устанавливаем subscriber в nil.


    func cancel() {
        subscriber = nil
    }

Если открыть стек вызовов, то можно увидеть, что cancel() вызывается из request(_:), а именно во время вызова subscriber?.receive(_). Из чего мы можем сделать вывод, что в некий момент времени внутри request(_:) subscriber может стать nil и тогда нужно прекратить работу генерации новых чисел. Добавим это условие в наш код.


    func request(_ demand: Subscribers.Demand) {
        // 1
        guard count > .none else {
            subscriber?.receive(completion: .finished)
            return
        }
        // 2
        count -= .max(1)
        subscriber?.receive(0)
        guard let _ = subscriber else { return } // new
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }

        // 3
        count -= .max(1)
        subscriber?.receive(1)
        guard let _ = subscriber else { return } // new
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }

        // 4
        var prev = 0
        var current = 1
        var temp: Int
        while let subscriber = subscriber { // new
            temp = prev
            prev = current
            current += temp
            subscriber.receive(current)
            count -= .max(1)
            if count == .none {
                subscriber.receive(completion: .finished)
                return
            }
        }
    }

Теперь запустим наш тестовый код.


Publishers.fibonacci(count: .unlimited)
    .print()
    .prefix(5)
    .sink { _ in }
// prints 0 1 1 2 3 cancel - ОК

Получили ожидаемое поведение.


Subscriber


И так наш FibonacciSubscription готов? Не совсем, в наших тестах мы только использовали подписчика sink, который запрашивает .unlimited количество чисел, а что если вместо него использовать подписчика, который будет ожидать некоторое ограниченное число чисел. Combine не предоставляет такого подписчика, но что мешает нам написать свой? Внизу реализация нашего FibonacciSubscriber'a.


class FibonacciSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    var limit: Subscribers.Demand

    init(limit: Subscribers.Demand) {
        self.limit = limit
    }

    func receive(subscription: Subscription) {
        subscription.request(limit)
    }

    func receive(_ input: Input) -> Subscribers.Demand {
        .none
    }

    func receive(completion: Subscribers.Completion<Failure>) {
        print("Subscriber's completion: \(completion)")
    }
}

И так наш FibonacciSubscriber имеет свойство limit, которое определяет сколько элементов хочет получить данный Subscriber. И делается это в методе receive(_: Subscription), где мы сообщаем подписке сколько нам нужно элементов. Так же надо отметить функцию receive(_: Input) -> Subscribers.Demand, эта функция вызывается когда получено новое значение, в качестве возвращаемого значения мы указываем сколько дополнительный элементов мы хотим получить: .none — нисколько, .max(N) N штук, итого общее количество принимаемых элементов будет равно сумме значения посылаемого подписке в receive(_: Subscription) и всем возвращаемым значениям из receive(_: Input) -> Subscribers.Demand.


Второе тестирование


Попробуем использовать FibonacciSubscriber.


let subscriber = FibonacciSubscriber(limit: .max(3))
Publishers.fibonacci(count: .max(5))
    .print()
    .subscribe(subscriber)
// prints 0 1 1 2 3 - а должно быть только 0 1 1

Как мы видим наш Publisher отправил 5 значений, вместо 3. Почему так? Потому что в методе request(_: Subscribers.Demand) FibonacciSubscription'a никак не учитывается потребность подписчика, давайте же это исправим, для этого добавим дополнительное свойство requested, через которое будем отслеживать потребность подписчика.


private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {

    var subscriber: S?
    var configuration: FibonacciConfiguration
    var count: Subscribers.Demand
    var requested: Subscribers.Demand = .none // new

    init(subscriber: S?, configuration: FibonacciConfiguration) {
        self.subscriber = subscriber
        self.configuration = configuration
        self.count = configuration.count
    }

    func cancel() {
        subscriber = nil
    }

    func request(_ demand: Subscribers.Demand) {
        guard count > .none else {
            subscriber?.receive(completion: .finished)
            return
        }
        requested += demand // new
        count -= .max(1)
        requested -= .max(1) // new
        requested += subscriber?.receive(0) ?? .none // new
        guard let _ = subscriber, requested > .none else { return } // new
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }

        count -= .max(1)
        requested -= .max(1) // new
        requested += subscriber?.receive(1) ?? .none // new
        guard let _ = subscriber, requested > .none else { return } // new
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }

        var prev = 0
        var current = 1
        var temp: Int
        while let subscriber = subscriber, requested > .none { // new
            temp = prev
            prev = current
            current += temp
            requested += subscriber.receive(current) // new
            count -= .max(1)
            requested -= .max(1) // new
            if count == .none {
                subscriber.receive(completion: .finished)
                return
            }
        }
    }
}

Третье тестирование


let subscriber = FibonacciSubscriber(limit: .max(3))
Publishers.fibonacci(count: .max(5))
    .print()
    .subscribe(subscriber)
// prints 0 1 1 - OK

Теперь Publisher отрабатывает корректно.


Финальный код
import Foundation
import Combine

struct FibonacciConfiguration {
    var count: Subscribers.Demand
}

struct FibonacciPublisher: Publisher {

    typealias Output = Int
    typealias Failure = Never

    var configuration: FibonacciConfiguration

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration)
        subscriber.receive(subscription: subscription)
    }

}

private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {

    var subscriber: S?
    var configuration: FibonacciConfiguration
    var count: Subscribers.Demand
    var requested: Subscribers.Demand = .none

    init(subscriber: S?, configuration: FibonacciConfiguration) {
        self.subscriber = subscriber
        self.configuration = configuration
        self.count = configuration.count
    }

    func cancel() {
        subscriber = nil
    }

    func request(_ demand: Subscribers.Demand) {
        guard count > .none else {
            subscriber?.receive(completion: .finished)
            return
        }
        requested += demand
        count -= .max(1)
        requested -= .max(1)
        requested += subscriber?.receive(0) ?? .none
        guard let _ = subscriber, requested > .none else { return }
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }

        count -= .max(1)
        requested -= .max(1)
        requested += subscriber?.receive(1) ?? .none
        guard let _ = subscriber, requested > .none else { return }
        if count == .none {
            subscriber?.receive(completion: .finished)
            return
        }

        var prev = 0
        var current = 1
        var temp: Int
        while let subscriber = subscriber, requested > .none {
            temp = prev
            prev = current
            current += temp
            requested += subscriber.receive(current)
            count -= .max(1)
            requested -= .max(1)
            if count == .none {
                subscriber.receive(completion: .finished)
                return
            }
        }
    }
}

extension Publishers {
    private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher {
        FibonacciPublisher(configuration: configuration)
    }

    static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher {
        FibonacciPublisher(configuration: FibonacciConfiguration(count: count))
    }
}

class FibonacciSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    var limit: Subscribers.Demand

    init(limit: Subscribers.Demand) {
        self.limit = limit
    }

    func receive(subscription: Subscription) {
        subscription.request(limit)
    }

    func receive(_ input: Input) -> Subscribers.Demand {
       .none
    }

    func receive(completion: Subscribers.Completion<Failure>) {
        print("Subscriber's completion: \(completion)")
    }
}

Publishers.fibonacci(count: .max(4))
    .print()
    .sink { _ in }

let subscriber = FibonacciSubscriber(limit: .max(3))
Publishers.fibonacci(count: .max(5))
    .print()
    .subscribe(subscriber)

Результат


Я надеюсь, что эта статья принесла вам больше понимания, что такое Publisher, Subscription и Subscriber, как они взаимодействуют между собой и на какие моменты нужно обратить внимание, когда вы решили реализовать свой Publisher. Любые замечания, уточнения к статье приветствуются.

Теги:
Хабы:
Всего голосов 4: ↑3 и ↓1+6
Комментарии9

Публикации

Истории

Работа

Swift разработчик
20 вакансий
iOS разработчик
18 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань