Как стать автором
Обновить
1325.92
Яндекс
Как мы делаем Яндекс

Осторожно, Combine! Откуда берутся утечки памяти, потеря эвентов, нестабильность операторов и что с этим всем делать

Уровень сложностиСредний
Время на прочтение27 мин
Количество просмотров3.4K

Привет, меня зовут Никита, я iOS‑разработчик в Яндекс Диске. В прошлый раз я рассказывал, как мы начали свой путь в SwiftUI. Теперь настало время рассказать ещё об одном фреймворке, который мы затащили в свой проект параллельно со SwiftUI, — Combine.

В этой статье разберу проблемы, с которыми столкнулась наша команда, расскажу, как их исправили, а также к каким выводам после этого пришли. А ещё поделюсь библиотекой с фиксами и экстеншенами к Combine.

Начнём с небольшой предыстории

Combine — это реактивный фреймворк, который предоставляет декларативный API для обработки событий во времени. Combine был выпущен Apple в 2019 году одновременно со SwiftUI, поэтому оба этих фреймворка были тесно связаны до 2023-го и выхода Observation. По этой причине во всех проектах со SwiftUI использовался Combine: как минимум — для работы с UI, а как максимум — для работы с асинхронным кодом.

В нашем случае в проекте уже был third‑party‑фреймворк — RxSwift. Поэтому начать использовать Combine означало не только стандартизировать работу с асинхронным кодом, но и избавиться от ненужной тяжелой third‑party‑зависимости.

By adopting Combine, you’ll make your code easier to read and maintain, by centralizing your event-processing code and eliminating troublesome techniques like nested closures and convention-based callbacks. Apple Developer

«Внедрив Combine, вы упростите чтение и обслуживание своего кода», — говорят нам в документации. Но что там на самом деле и действительно ли всё стало проще, разберёмся дальше.

Subjects: как решали проблему потокобезопасности

CurrentValueSubject

Начнём обзор проблем Combine с самого популярного паблишера (publisher), а именно — с CurrentValueSubject. Он хранит текущее значение (value), позволяет его изменять, а ещё сразу же отправляет это значение всем новым подписчикам.

Такое поведение делает его очень удобным, чтобы хранить состояние UI. Например, ячейки коллекции после переиспользования могут взять последнее значение из стейта UI, синхронно его отобразить и дальше реагировать на обновления.

А ещё такой паблишер удобен для связки обычного кода с реактивным, особенно когда важно сохранить последнее значение. Например, когда у вас есть сервис, делающий запрос в сеть, и вам необходимо отправить результат запроса всем подписчикам.

CurrentValueSubject
CurrentValueSubject

Проблема этого сабжекта (subject) — отсутствие потокобезопасности того самого текущего значения. Хотя разработчики Combine утверждают обратное.

Суть проблемы — в гонке между получением начального значения подписчиком и отправкой ему новых. Для иллюстрации рассмотрим небольшой тест:

func testAsyncSubscribe() {
    let subject = CurrentValueSubject<Int, Never>(0)
    let finishExpectation = expectation(description: "Subject finished")
    
    var results: [Int] = []
    
    // Асинхронно отправляем новое значение
    DispatchQueue.global().async {
        subject.send(1)
        subject.send(completion: .finished)
    }
    
    // Подписываемся на `subject`
    let cancellable = subject
        .sink { completion in
            finishExpectation.fulfill()
        } receiveValue: { value in
            results.append(value)
        }
    
    // Ждём получения `.finished`
    waitForExpectations(timeout: 2)
    
    let expectedResults: Set = [
        [],
        [1],
        [0, 1],
    ]
    
    XCTAssertTrue(expectedResults.contains(results), "Expected: \(expectedResults) - Received: \(results)")
    
    cancellable.cancel()
}

В нём мы подписываемся на наш сабжект, параллельно отправляя в него новое значение, а затем completion.

Этот тест устраивает race condition между отправкой значений и подпиской, поэтому в результате мы ожидаем один из трёх вариантов:

  1. [], если мы подпишемся на сабжект после того, как он получит completion.

  1. [1], если мы подпишемся после получения сабжектом 1, но перед получением completion.

  1. [0, 1], если мы подпишемся раньше, чем сабжект получил новые значения.

Запускаем тест на 10 000 повторений и получаем фейл. Потому что результат оказался [1, 1]. Как сказали бы эксперты, сомнительно, но ОКЭЙ. Добавляем в expectedResults — [1, 1] и перезапускаем тест.

Снова фейл. В этот раз мы получаем [1, 0]. Что ещё более странно, потому что теперь вообще нет никакой гарантии, в каком порядке придут эвенты. Добавим [1, 0] в expectedResults и перезапустим тест.

К счастью, теперь он проходит, и мы не получим совсем безумных кейсов в виде [0] и [0, 0]. Спасибо и на этом.

Теперь немного наблюдений, почему это происходит.

Важная ремарка: так как Combine не является библиотекой с открытым исходным кодом, то понять, как реализованы операторы, можно только по косвенным признакам, например стек‑трейсу вызовов или Memory Graph. Поэтому все описания того, как паблишеры работают под капотом, могут содержать неточности.

Все подписки, которые отдаёт своим подписчикам CurrentValueSubject, используют os_unfair_lock, чтобы сериализовать доступ к текущему значению. А также os_unfair_recursive_lock — для отправки новых значений подписчикам. Это нужно для гарантии потокобезопасности методов request(_:) и offer(_:).

request(_:) используется для запроса новых значений, а также для отправки текущего значения. offer(_:)используется для отправки новых значений подписчику. Каждый из методов использует оба лока следующим образом:

  1. Захватить Lock (os_unfair_lock).

  2. Проверить, сколько значений нужно отправить подписчику.

  3. [request(_:)] Взять текущее значение из CurrentValueSubject.

  4. Отпустить Lock.

  5. Захватить RecursiveLock (os_unfair_recursive_lock).

  6. Отправить значение подписчику.

  7. Отпустить RecursiveLock.

Между вызовами методов request(_:) и offer(_:) и происходит гонка, а конкретно — при отправке значения подписчику (начиная с 5-го пункта). Нет никаких гарантий, какой из потоков успеет первым захватить RecursiveLock и отправить новое значение подписчику, отсюда и получается [1, 0].

Следующий лог хорошо иллюстрирует проблему:

REQUEST: Acquire Lock
REQUEST: Update demand // Проверили, нужно ли что-то отправлять подписчику
REQUEST: Get current value // Получили начальное значение
REQUEST: Relinquish Lock

OFFER: Acquire Lock
OFFER: Update demand // Проверили, нужно ли что-то отправлять подписчику
OFFER: Relinquish Lock

OFFER: Wait RecursiveLock // Начинаем ждать лок на отправку нового значения
REQUEST: Wait RecursiveLock // Начинаем ждать лок на отправку начального значения

OFFER: Acquire RecursiveLock
OFFER: Send new value // Сначала отправили новое значение
OFFER: Relinquish RecursiveLock

REQUEST: Acquire RecursiveLock
REQUEST: Send initial value // Затем отправили начальное
REQUEST: Relinquish RecursiveLock

Этот же лог, но в визуальном формате:

И для наглядности картинка с тем, как всё должно быть:

В случае с [1, 1] всё то же самое, просто мы при подписке успеваем захватить новое значение вместо старого.

А ещё теперь понятно, почему в принципе возможны только [1, 0] и [1, 1], а [0] и [0, 0] мы никогда не получим.

Проблема здесь заключается в неатомарности операций подписки и отправки эвентов. У подписки нет информации о том, что в данный момент кто‑то уже отправляет новое значение, и ей не нужно пытаться отправить текущее.

Какой вывод?

Всегда слать события и подписываться на CurrentValueSubject только с одного потока. Такое поведение обязательно соблюдать в RxSwift, и в случае с Combine его тоже стоит придерживаться. Иначе есть немалая вероятность получить race condition, из‑за которого ваш UI или сервис будет работать не так, как ожидается.

Operators: как исправить потерю деманда и утечки памяти

CombineLatest

Следующий оператор — CombineLatest. Основная задача этого оператора — комбинировать эвенты от двух и более паблишеров, чтобы всегда получать набор из последних эвентов от всех паблишеров.

И его первая проблема — потеря деманда (demand) для одного из апстримов (upstream), из‑за чего CombineLatest не получает вообще ни одного эвента.

Пока не будем углубляться в детали работы Combine с Back Pressure. Разберём эту тему подробнее в другой части статьи. Пока достаточно знать, что каждый подписчик сначала запрашивает определённое количество эвентов у паблишера, и только после этого паблишер посылает эвенты подписчику, но не больше, чем у него попросили. Число эвентов в таком запросе и называют demand.

Напишем простенький тест, чтобы проверить это:

func testCombineLatest() {
    let finishExpectation = expectation(description: "Subject finished")
    let queue = DispatchQueue(label: "testCombineLatest")
    
    let publisher0 = [0].publisher
    let publisher1 = [1].publisher
    
    var results: [Int] = []
    
    let cancellable = publisher0
        .subscribe(on: queue)
        .combineLatest(publisher1)
        .sink { value in
            results.append(value.0)
            results.append(value.1)
            
            finishExpectation.fulfill()
        }
    
    waitForExpectations(timeout: 1) // Asynchronous wait failed
    
    XCTAssertEqual([0, 1], results) // []
    
    cancellable.cancel()
}

Тест падает на попытке дождаться выполнения экспектейшена. Потому что никогда не получит значение от publisher0.

Это происходит из‑за того, что CombineLatest перед тем, как начать запрашивать эвенты у своих апстримов, должен дождаться получения от них подписок. И только после этого посылать им запросы с demand. Однако на практике CombineLatest ничего не ждёт и сразу пытается переслать запросы с demand своим апстримам, из‑за чего такие запросы и могут быть потеряны. Например, если подписка на один из апстримов ещё не была получена.

На картинке ниже представлена визуализация этого процесса: запрос эвентов, (request) происходит до того, как завершён процесс получения подписки (subscribe) одного из паблишеров. Из‑за чего мы не получим значения A1.

Что с этим можно сделать?

Одно из решений — написать свою реализацию CombineLatest, которая будет корректно обрабатывать запросы деманда. Но такое решение будет достаточно сложным и объёмным, учитывая число перегрузок этого оператора. Для убедительности вот реализация CombineLatest в CombineX.

Поэтому можно пойти более простым путём и исправить баги в работе subscribe(on:options:), тем более что они более критические и вносят проблемы в работу не только CombineLatest, но и других операторов. О подробностях читайте в следующей главе.

Здесь могла быть ваша реклама, но пока только моя

SubscribeOn

Следующей проблемой, с которой мы столкнулись, были утечки памяти, а их виновником оказался SubscribeOn.

Небольшая справка о важности этого оператора. SubscribeOn нужен, чтобы перенести действия, связанные с подпиской, запросом деманда и отменой, на какой‑то шедулер, чаще всего бэкграунд‑очередь. Это нужно, чтобы все сайд‑эффекты, связанные с созданием подписки, не блокировали подписчика. Например, если ваш паблишер при создании делает синхронный запрос к БД, то без SubscribeOn вам точно не обойтись.

Что не так с SubscribeOn? Почему он вызывает утечки памяти?

Всё дело в том, что он подписывается на апстрим асинхронно (кто бы мог подумать?). И несмотря на то, что это концептуально не нарушает никакие контракты Combine, на практике возникают разные проблемы: скажем, потеря demand, из‑за которой операторы могут перестать отправлять какие‑либо эвенты, например CombineLatest. А также невозможность отменить подписку, пока SubscribeOn не закончит своё выполнение. В результате подписка утечёт и продолжит работать, в том числе отправлять эвенты, без возможности её отменить когда‑либо.

Снова напишем небольшой тест, чтобы проникнуться:

func testSubscriptionLeaks() {
    let subscribeExpectation = expectation(description: "Sink subscribed")
    let serialQueue = DispatchQueue(label: "testSubscriptionLeaks")
    let subject = PassthroughSubject<Int, Never>()
    var results: [Int] = []
    
    let cancellable = subject
        .subscribe(on: serialQueue)
        .handleEvents(receiveSubscription: { _ in
            subscribeExpectation.fulfill()
        })
        .sink { value in
            results.append(value)
        }
    
    // Пробуем отменить подписку до её получения
    cancellable.cancel()
    
    wait(for: [subscribeExpectation], timeout: 1)
    
    // Ещё раз пробуем отменить подписку уже после получения
    cancellable.cancel()
    
    // Проверяем, отправляются ли значения
    subject.send(1)
    
    XCTAssertEqual([], results) // [1]
}

Тест достаточно прямолинейный: подписываемся на наш сабжект и тут же отменяем подписку. Затем дожидаемся, пока sink получит подписку, и начинаем отправлять эвенты в сабжект.

В итоге, несмотря на то что подписка уже отменена, мы всё равно продолжим получать эвенты. Тест упадёт с ошибкой XCTAssertEqual failed: (“[]”) is not equal to (“[1]”).

Дела становятся ещё интереснее, если вспомнить, что все паблишеры держат сильную ссылку на апстрим. А это значит, что у нас может потечь какой‑нибудь таймер и остановить его будет уже невозможно. Это иллюстрирует следующий тест:

func testSubscriptionLeaks2() {
    let cancelExpectation = expectation(description: "Subscription cancelled")
    let serialQueue = DispatchQueue(label: "testSubscriptionLeaks2")
    var results: [Int] = []
    
    let timer = Timer.publish(every: 1, on: .main, in: .common)
        .autoconnect()
        .map { _ in 1 }
        .scan(0, +)
    
    let cancellable = timer
        .subscribe(on: serialQueue)
        .handleEvents(receiveCancel: {
            cancelExpectation.fulfill()
        })
        .sink { value in
            results.append(value)
        }
    
    cancellable.cancel()
    
    wait(for: [cancelExpectation], timeout: 5) // Asynchronous wait failed
    
    XCTAssertEqual([], results) // [1, 2, 3, 4, 5]
}

Как и предполагалось, тест падает на ожидании отмены подписки, а в массиве results содержатся числа [1, 2, 3, 4, 5].

О причинах такого поведения можно только догадываться. Но на важную мысль может натолкнуть реализация этого оператора в OpenCombine, потому что у него ровно такие же проблемы, как и у оригинальной версии.

Мы уже разобрались, что подписка на апстрим создаётся асинхронно, и, пока она не создана, все запросы деманда от даунстрима (downstream), а также вызовы cancel() будут проигнорированы. Потому что пересылать эти запросы некому — подписка ещё не создана.

Возникает вопрос: почему после получения подписки вызовы cancel() не имеют никакого эффекта? А ответ на него дают сами Apple в документации к cancel().

After you receive one call to cancel(), subsequent calls shouldn’t do anything. Canceling should also eliminate any strong references it currently holds.

Подписка после отмены очищает все ссылки и переходит в отменённое состояние, которое является финальным. И все последующие вызовы не делают ничего, кроме проверки, что подписка уже была отменена.

Из‑за этого апстрим будет продолжать работать, пока не завершится. В случае с такими паблишерами, как таймер, подписчик NotificationCenter или KVO — пока приложение не будет убито системой или пользователем.

Что с этим можно сделать?

  • Можно просто не использовать subscribe(on:options:), заменив его на старый добрый DispatchQueue.async(). Но это не будет полноценной заменой и скорее вызовет больше проблем.

  • Всегда вызывать cancel() на той же очереди, что и был выполнен subscribe(on:options:). Но здесь есть два подводных камня:

    • Очередь обязательно должна быть последовательной, иначе всё ещё можно потерять подписки. Потому что в параллельной очереди задачи выполняются параллельно (не благодарите) и гонка между подпиской и отменой никуда не исчезнет.

    • Не всегда известно, на какой очереди произошла подписка, особенно если паблишер создаётся где‑то в другом месте. Из‑за чего проконтролировать это становится намного сложнее.

  • Написать безопасную версию subscribe(on:options:), которая будет корректно обрабатывать запросы деманда и вызовы cancel(), что мы и сделали. Смотрите реализацию по ссылке. Да, здесь тоже могла быть ваша реклама, но все еще только моя.

Multicast

О сложностях работы с памятью в Combine и неочевидных вещах, которые могут привести к её утечкам, Мэтт Галлахер писал ещё пять лет назад в 22 short tests of combine. В своей статье он пришёл к выводам, что для нормальной работы без утечек памяти нужно иметь хотя бы один AnyCancellable в вашем графе подписок.

Но есть нюанс, и имя ему multicast(_:) — для него это правило не работает. Но обо всем по порядку. Сначала разберёмся, что такое multicast(_:) и зачем он нужен.

Multicast
Multicast

Multicast — один из важнейших операторов в реактивных фреймворках. Его основная задача — шерить подписку на апстрим между всеми подписчиками. Такое поведение может пригодиться, если апстрим выполняет какие‑то ресурсоёмкие операции и их результат дешевле пошерить между подписчиками, чем считать отдельно для каждого. Классический пример таких операций — запросы к бэкенду. Например, нам нужно пошерить результат запроса к бэкенду между двумя экранами, но мы не хотим выполнять два одинаковых сетевых запроса. На помощь приходит Multicast, который одной строчкой кода решает эту проблему.

Теперь, когда нам понятна важность этого оператора, а также что это один из ключевых операторов, которые часто используются в коде, можно начать говорить о проблемах с утечками памяти. Заметить эти утечки, на самом деле, не так уж и просто, потому что они не отмечаются как утечки в Memory Graph и Xcode считает эти объекты валидными.

Итак, давайте снова напишем тест, на этот раз он будет достаточно большим
func testSubscriptionLeaks3() {
    let subject = PassthroughSubject<Int, Never>()
    
    var upstreamResults: [Int] = []
    var upstreamCancelled: Bool = false
    
    var sinkResults: [Int] = []
    var sinkCancelled: Bool = false
    var sinkCancellable: (any Cancellable)?
    
    var multicastCancellable: (any Cancellable)?
    
    do {
        // Создаём multicast
        let multicast = subject
            .print("Upstream")
            .handleEvents(receiveOutput: {
                upstreamResults.append($0)
            }, receiveCancel: {
                upstreamCancelled = true
            })
            .multicast(subject: PassthroughSubject())
        
        // Создаём подписчика
        sinkCancellable = multicast
            .print("Sink")
            .handleEvents(receiveCancel: {
                sinkCancelled = true
            })
            .sink { sinkResults.append($0) }
        
        // Активируем multicast
        multicastCancellable = multicast.connect()
    }
    
    subject.send(1)
    
    // Последняя сильная ссылка на multicast исчезает здесь
    sinkCancellable?.cancel()
    
    // Теперь токен бесполезен
    multicastCancellable?.cancel()
    
    // Отправляем ещё одно значение, чтобы убедиться
    subject.send(2)
    
    XCTAssertTrue(sinkCancelled) // true
    XCTAssertTrue(upstreamCancelled) // false
    
    XCTAssertEqual([1], sinkResults) // [1]
    XCTAssertEqual([1], upstreamResults) // [1, 2]
}

В итоге тест всегда будет фейлиться из‑за того, что Multicast не прекращает работать после вызова cancel(). И внимательный читатель уже понял почему, но давайте для начала подробнее рассмотрим, что здесь вообще происходит.

Sink: receive subscription: (Multicast)
Sink: request unlimited

Upstream: receive subscription: (PassthroughSubject)
Upstream: request unlimited
Upstream: receive value: (1)

Sink: receive value: (1)
Sink: receive cancel

Upstream: receive value: (2)

Подписка, созданная Multicast, получает событие об отмене, но при этом оно почему‑то не пересылается дальше апстриму.

Чтобы понять, почему так происходит, достаточно открыть Memory Graph. Мы увидим, что токен от multicast.connect() держит на сам Multicast слабую ссылку. И так как при выходе из замыкания do { } держателем единственной сильной ссылки на Multicast остаётся подписчик, то отмена его подписки становится не просто ошибкой, а фатальной ошибкой.

Теперь сильных ссылок на Multicast нет, и он деинится. А после этого токен для отмены подписки на апстрим, который возвращается из multicast.connect(), становится бесполезным. Из‑за чего апстрим продолжает работать без возможности остановиться.

Какой вывод?

А выводы неутешительные. Если создаёте Multicast, то всегда нужно держать на него сильную ссылку, иначе все его подписки утекут и отменить их больше не получится.

Если вы создаёте Multicast, то при его активации обязательно замыкайте его сильной ссылкой. Например, так:

let multicastCancellable = multicast.connect()
let cancellable = AnyCancellable { [multicast] in
    _ = multicast // !!! DO NOT DELETE THIS LINE !!!
    multicastCancellable.cancel()
}

Главное — не забыть оставить комментарий, чтобы кто‑то случайно не удалил эту магическую строку.

Combine Contracts: Как не терять эвенты 

Back Pressure

Продолжим традицию, небольшая историческая справка.

Back Pressure в Combine — это контракт между паблишером и сабскрайбером, при помощи которого последний может сказать о том, сколько эвентов он хочет получить. Такой запрос, как мы уже знаем, называется демандом, и каждый паблишер старается максимально его удовлетворить. Всё это нужно, чтобы паблишер вдруг не начал отправлять эвентов больше, чем подписчик может обработать, иначе это может вызвать много проблем, чаще всего связанных с производительностью.

Например, если мы отображаем какой‑то прогресс, то нет никакого смысла пытаться обновлять UI чаще, чем экран устройства может отобразить. Мы просто потратим ресурсы впустую.

В этом пункте поговорим о работе с Back Pressure и о том, насколько на неё можно положиться.

Спойлер: не стоит ожидать чего‑то большего, чем пропуска эвентов, которые не были запрошены. Потому что даже основные операторы работают с Back Pressure по‑разному, кто‑то с багами (например, CombineLatest, Zip, Merge), а кто‑то ещё и не самым очевидным способом (Zip, Merge). Поэтому самые нетерпеливые могут сразу перейти к выводам, потому что дальше мы будем подробно рассматривать поведение самых популярных операторов.

Пройдёмся по нескольким операторам и тому, как они работают с Back Pressure.

Zip. Он просто пересылает деманд каждому из апстримов и пытается их буферизовать. На первый взгляд, всё логично, ведь задача этого оператора — сопоставить элементы по индексам. Но есть один нюанс: если мы запросим меньше элементов, чем придёт от апстрима, то лишние будут отброшены.

На картинке ниже мы пропускаем эвент 1B, потому что он не был запрошен:

Пример теста с потерей эвентов, в нём мы получим только 1 и 2:

func testZip() {
    let publisher1 = PassthroughSubject<Int, Never>()
    let publisher2 = PassthroughSubject<String, Never>()
    
    var results: [String] = []
    var subscription: (any Subscription)!
    
    publisher1
        .zip(publisher2)
        .subscribe(
            AnySubscriber { s in
                subscription = s
            } receiveValue: { value in
                results.append("\(value.0)\(value.1)")
                return .none
            }
        )
    
    subscription.request(.max(1))
    
    publisher1.send(0)
    publisher2.send("A")
    
    publisher1.send(1) // Будет проигнорирован
    publisher2.send("B") // Будет проигнорирован
    
    subscription.request(.max(1))
    
    XCTAssertEqual(["0A", "1B"], results) // ["0A"]
}

А если запросим сразу много, то Zip может просто умереть, чего на самом деле делать не должен. Пример теста, в котором паблишер запрашивает слишком много (осторожно, его лучше не запускать):

func testZip() {
    let otherPublisher = PassthroughSubject<Int, Never>()
    
    (0 ... Int.max).publisher
        .zip(otherPublisher)
        .sink { _ in }
}

CombineLatest. Снова вспомним про «самый стабильный оператор». Мы уже говорили о том, что он может потерять первоначальный деманд. Но это ещё полбеды, ведь он теряет и все остальные запросы. Потому что запрашивает деманд только у второго апстрима, полностью игнорируя первый.

На картинке ниже мы пропускаем эвент B1, потому что запрос деманда не был отправлен первому паблишеру:

Например, такой тест будет падать с ошибкой XCTAssertEqual failed: ("["A1", "B1", "B2"]") is not equal to ("["A1", "A2"]"):

func testCombineLatest2() {
    let publisher1 = PassthroughSubject<String, Never>()
    let publisher2 = PassthroughSubject<Int, Never>()
    
    var results: [String] = []
    
    publisher1
        .combineLatest( publisher2)
        .subscribe(
            // Подписчик, запрашивающий элементы по одному
            AnySubscriber { subscription in
                subscription.request(.max(1))
            } receiveValue: { value in
                results.append("\(value.0)\(value.1)")
                return .max(1)
            }
        )
    
    publisher1.send("A")
    publisher2.send(1)
    
    publisher1.send("B") // Будет проигнорирован
    publisher2.send(2)
    
    XCTAssertEqual(["A1", "B1", "B2"], results) // ["A1", "A2"]
}

Merge. У Merge поведение чем‑то похожее на Zip, потому что он тоже жадно запрашивает эвенты, но, в отличие от последнего, делает это строго по одному и буферизует результаты. Единственная неожиданность здесь: после каждого полученного значения Merge запрашивает ещё один эвент, превышая запрос даунстрима.

Иллюстрируя описание выше, следующий тест успешно проходит, и мы получим все элементы, которые отправляли:

func testMerge() {
    let publisher1 = PassthroughSubject<String, Never>()
    let publisher2 = PassthroughSubject<String, Never>()
    
    var results: [String] = []
    var subscription: (any Subscription)!
    
    publisher1
        .merge(with: publisher2)
        .subscribe(
            AnySubscriber { s in
                subscription = s
            } receiveValue: { value in
                results.append(value)
                return .none
            }
        )
    
    subscription.request(.max(2))
    
    publisher1.send("A")
    publisher2.send("B") // Demand был удовлетворён
    
    // Следующие эвенты должны быть проигнорированы
    publisher1.send("C")
    publisher2.send("D")
    
    subscription.request(.max(2))
    
    XCTAssertEqual(["A", "B", "C", "D"], results) // ["A", "B", "C", "D"]
}

Почему было выбрано именно такое поведение для каждого оператора, не ясно.
Merge, по идее, должен делить поровну деманд между всеми апстримами и не превышать деманд даунстрима.

Почему Zip, который должен сопоставлять элементы по индексам, теряет эвенты? А Merge, которому не важен порядок и который может спокойно пропускать лишние эвенты, буферизует больше, чем требуется?

Слишком много вопросов, ответы на которые я не нашёл. И, судя по ссылкам выше, не я один задаюсь этими вопросами.

Какие выводы?

Несмотря на то, что в Combine есть встроенный механизм для работы с Back Pressure, полагаться на него опасно, поэтому самым надёжным решением будет всегда запрашивать .unlimited‑деманд, как, к слову, и делает Sink, и использовать проверенные временем RxSwift throttle(for:scheduler:latest:), debounce(for:scheduler:options:) и collect(_:options:).

Combine Contracts: Как решали проблемы со стандартным шедулером

СoncurrentQueue и Scheduler

Пара слов о том, что такое Scheduler. В документации нам говорят, что это протокол, который определяет, как и когда выполнять замыкания. На практике это почти всегда привычные всем DispatchQueue, на тредах которых производят вычисления операторы Combine.

Для начала предлагаю вспомнить вот такой тред с форума Swift про Scheduler в Combine и пройтись по основным тезисам и проблемам, которые там обсуждают.

Первая проблема: отсутствие гарантии порядка получения эвентов при использовании СoncurrentQueue. Это ловушка, в которую очень легко попасть, ведь все глобальные очереди, которые доступны из коробки, — параллельные.

Поэтому следующий пример можно даже встретить на практике:

func testConcurrentJust() {
    let finishExpectation = expectation(description: "Publisher Finished")
    var results: [Int] = []
    
    let cancellable = Just(0)
        .receive(on: DispatchQueue.global())
        .sink { _ in
            finishExpectation.fulfill()
        } receiveValue: { value in
            results.append(value)
        }
    
    waitForExpectations(timeout: 1)
    
    XCTAssertEqual([0], results) // []
    
    cancellable.cancel()
}

Этот тест будет периодически падать из‑за того, что комплишен (completion) пришёл раньше, чем 0.

Вторая проблема: безопасность использования ConcurrentQueue. Разработчики Combine утверждают, что, несмотря на проблему с порядком эвентов, параллельные очереди использовать безопасно с точки зрения возникновения дата‑рейсов.

Но нужно быть осторожными при работе с Subscription. Потому что cancel() и request(_:) могут работать параллельно, поэтому должны быть потокобезопасными. Но, к сожалению, большинство стандартных операторов не соблюдают этот контракт, из‑за чего при использовании параллельных очередей всё ещё возможны дата‑рейсы, а значит, и креши.

Подробнее работу с Subscription и примеры паблишеров, которые делают это неправильно, мы разберём чуть позже. А пока стоит запомнить ещё один звоночек насчёт использования ConcurrentQueue.

Третья проблема: эффективность работы стандартных Scheduler, или default schedulers используют только async. Здесь раскрывается ещё один нюанс относительно subscribe(on:options:) и receive(on:options:) — это неоптимальный шедулинг тасок. Что выливается в две проблемы.

Первая заключается в том, что все операторы после receive(on:options:) будут срабатывать с задержкой. Независимо от того, с какого шедулера пришёл эвент.

Например, если мы отправляем эвент с основной очереди и наш подписчик обрабатывает события на ней же, то такой код никогда не отработает синхронно. Поэтому следующий тест будет всегда фейлиться:

func testDispatchQueueMain() {
    let subject = PassthroughSubject<Int, Never>()
    
    var results: [Int] = []
    
    let cancellable = subject
        .receive(on: DispatchQueue.main)
        .sink { value in results.append(value) }
    
    // Отправляем эвент с основной очереди
    subject.send(0)
    
    // Ожидаем сразу же его получить
    XCTAssertEqual([0], results) // []
    
    cancellable.cancel()
}

Вторая проблема связана с subscribe(on:options:). И для её понимания придётся немного погрузиться в процесс подписки на апстрим.

Сначала мы прикрепляем подписчика к паблишеру, затем подписчик дожидается, пока паблишер пришлёт ему подписку, через которую будет происходить всё дальнейшее взаимодействие. Только потом он начинает запрашивать эвенты. И в случае DispatchQueue в роли Scheduler это выливается в два вызова DispatchQueue.async: один — на прикрепление подписчика, а другой — на запрос эвентов или деманда.

Такое поведение будет прерывать синхронное выполнение кода даже в ситуациях, когда мы уже находимся на нужной очереди. А ещё может привести к неожиданным переключениям контекстов и более долгой подписке на апстрим. Особенно это болезненно с DispatchQueue.main и обновлениями UI и чревато промаргиваниями контента или прыгающими анимациями.

Чтобы посчитать число обращений к DispatchQueue.async, можно использовать враппер над Scheduler (тут снова реклама нашей библиотеки), который будет записывать выполняемые задачи. Следующий тест успешно проходит:

func testSchedulerCalls() {
    let scheduler = TestScheduler()
    var results: [Int] = []
    
    let cancellable = Just(1)
        .subscribe(on: scheduler)
        .sink { value in results.append(value) }
    
    scheduler.waitUntilAllScheduled()
    
    XCTAssertEqual([1], results) // [1]
    XCTAssertEqual(scheduler.schedulingCount, 0) // 0
    XCTAssertEqual(scheduler.scheduledCount, 2) // 2
    
    cancellable.cancel()
}

Какие выводы?

  • Никогда не использовать ConcurrentQueue при работе с Combine! Потому что эвенты начнут приходить в хаотичном порядке и начнутся креши из‑за дата‑рейсов в стандартных операторах. В общем и целом это правило касается и GCD, но об этом в другой раз.

  • Если важно не делать лишних переключений, когда вы уже находитесь на нужной очереди, например, при обновлении UI, то придётся написать свой аналог MainScheduler или UIScheduler. Или взять уже готовые из нашей библиотеки.

Combine Contracts: что делать, если нельзя полагаться на стабильность дефолтных операторов

Subscription

Это протокол для подписок. В Combine Subscription — это связующее звено между операторами. Именно при помощи него подписчики могут запрашивать эвенты, а также отменять подписки.

Протокол требует реализовать всего две функции:

  • func request(_ demand: Subscribers.Demand) — запрос у апстрима определённого количества элементов.

  • func cancel() — функция для отмены подписки и прекращения запроса/получения эвентов. Здесь есть важный момент: такая функция должна отрабатывать только один раз, все остальные вызовы не должны делать ничего. Ну и вдобавок не блокировать вызывающего.

А ещё эти функции объединяет то, что они обе должны быть потокобезопасными. О cancel() даже написано в документации, там же вскользь упомянуто, что она может выполняться параллельно с request(_:). Но понять, почему это так и почему request(_:) тоже должен быть полностью потокобезопасным, можно только из Swift‑форума.

Отлично, с тем, что это такое и как должно работать в теории, мы разобрались. А что же там на практике? Давайте напишем ещё пару тестов, чтобы всё проверить.

Начнём, пожалуй, с cancel() и посчитаем, сколько раз он вызывается:

func testAnySubscriberCancellation() {
    let finishExpectation = expectation(description: "Publisher finished")
    finishExpectation.expectedFulfillmentCount = 2
    
    var subscription: (any Subscription)?
    var cancellationCounter = 0
    
    Just(0)
        .handleEvents(receiveCancel: {
            cancellationCounter += 1
        })
        .subscribe(
            AnySubscriber { s in
                subscription = s
            }
        )
    
    DispatchQueue.global().async {
        subscription?.cancel()
        finishExpectation.fulfill()
    }
    
    DispatchQueue.global().async {
        subscription?.cancel()
        finishExpectation.fulfill()
    }
    
    waitForExpectations(timeout: 1)
    
    XCTAssertEqual(1, cancellationCounter) // 2
}

В тесте выше мы создаём простенького подписчика, который даже не запрашивает элементы у апстрима, а после параллельно два раза пытаемся отменить подписку. В итоге тест будет периодически падать с ошибкой о том, что cancel() был вызван два раза.

Но это ещё не самое интересное. Что будет, если мы удалим handleEvents() и ассерт в конце:

func testAnySubscriberCancellation2() {
    let finishExpectation = expectation(description: "Publisher finished")
    finishExpectation.expectedFulfillmentCount = 2
    
    var subscription: (any Subscription)?
    
    Just(0)
        .subscribe(
            AnySubscriber { s in
                subscription = s
            }
        )
    
    DispatchQueue.global().async {
        subscription?.cancel() // Возможный EXC_BAD_ACCESS
        finishExpectation.fulfill()
    }
    
    DispatchQueue.global().async {
        subscription?.cancel() // Возможный EXC_BAD_ACCESS
        finishExpectation.fulfill()
    }
    
    waitForExpectations(timeout: 1)
}

Теперь тест периодически начнёт крешить с EXC_BAD_ACCESS на одном из вызовов cancel(). О чём нам это говорит?

Во‑первых, о том, что не все стандартные операторы плохо соблюдают контракты Subscription. Например, handleEvents() честно отправлял только одну отмену апстриму, и если его вернуть, то крешей не будет. А вот Just, в отличие от других паблишеров и сабжектов, не сериализует вызовы request(_:) и cancel(), из‑за чего и происходит дата рейс, а в последствии креш. Но один ли Just ведёт себя так? Возможно, да, а возможно, нет — операторов и тем более их комбинаций слишком много, чтобы проверить все.

Креш можно исправить, если обернуть subscription в AnyCancellable, он тоже корректно обрабатывает параллельные вызовы cancel().

Во‑вторых, если даже стандартные операторы не соблюдают свой же контракт, то непонятно, как правильно писать экстеншены к Combine, ведь, как мы выяснили, полагаться на стабильность дефолтных операторов нельзя. Поэтому остаётся только всегда сериализовать вызовы всех методов Subscription, чтобы не получать дата‑рейсы и, как следствие, креши.

Возможно, для кого‑то этот пункт прозвучал очевидным, но для того, чтобы понять масштаб проблемы, достаточно посмотреть на самую популярную библиотеку с экстеншенами для Combine и увидеть, что у них нет ни одного оператора с потокобезопасным cancel() и request(_:).

Теперь проверим, насколько безопасен request(_:), проведя аналогичный предыдущему тест:

func testAnySubscriberRequest() {
    let finishExpectation = expectation(description: "Publisher finished")
    finishExpectation.expectedFulfillmentCount = 2
    
    var subscription: (any Subscription)?
    
    Just(0)
        .subscribe(
            AnySubscriber { s in
                subscription = s
            }
        )
    
    
    DispatchQueue.global().async {
        subscription?.request(.max(1)) // Возможный EXC_BAD_ACCESS
        finishExpectation.fulfill()
    }
    
    DispatchQueue.global().async {
        subscription?.request(.max(1)) // Возможный EXC_BAD_ACCESS
        finishExpectation.fulfill()
    }
    
    waitForExpectations(timeout: 1)
}

И да, здесь снова будет креш — из‑за параллельного вызова request(_:).

Я провёл аналогичный тест для Future, Deferred, Empty, Fail, Record, PassthroughSubject и CurrentValueSubject — все они работают, как ожидается. Проверку всех остальных операторов делать я не стал.

Стоит отметить, что Just — один из самых популярных паблишеров, если не самый. И наверное, не сосчитать, в скольких местах в коде он используется и сколько крешей из‑за него происходит.

Но проблема Subscription, которую я хотел подсветить, заключается в другом. Всё выглядит так, что разработчики Combine по какой‑то причине решили, что делать безопасные вызовы на каждом операторе — дорого. Поэтому сделали безопасными в основном только дефолтные паблишеры (не считая Just, про него почему‑то забыли), AnyCancellable и, возможно, какие‑то ещё операторы. А все остальные просто делегируют обработку этих вызовов апстриму, не сильно заботясь о последствиях. Из‑за чего добавление нового оператора в имеющуюся последовательность может привести к неожиданным дата‑рейсам и крешам.

Какие выводы?

Если вдруг решите писать свои паблишеры, нужно всегда помнить о том, что вызовы cancel() должны быть потокобезопасными — не только между собой, потому что могут быть вызваны несколько раз, но и между request(_ demand:), а также всеми внутренними сущностями, которые обращаются к тем же переменным.

API: отсутствие какого-либо дебага

Combine не предоставляет никакого API для тестов. Всё это придётся написать самим.

А ещё в нём отсутствуют какие‑либо рантайм‑проверки, например, для выявления реэнтранси (reentrance). Если вы вдруг написали какую‑то циклическую отправку эвентов, то поздравляю, вместо алерта, как, например, в RxSwift, вы получите ничего. И снова придётся очень долго разбираться с тем, почему, например, в CombineLatest приходят элементы в непонятном порядке.

Здесь нет какого‑то решения, потому что оно должно быть реализовано под капотом у каждого оператора. Поэтому остаётся только смириться.

API: отсутствие большинства важных операторов

Итак, небольшой списочек того, чего очень не хватает в Combine, и того, что вам, скорее всего, придётся написать самим:

  • AnyPublisher.create для связи императивного кода с реактивным. Он был в Xcode 11 beta 2, но исчез в beta 3.

  • RetryWhen, или ретраи по асинхронному условию. Вспомним супербазовый сценарий: нужно ретраить запрос к бэкенду, когда появилась сеть. Пойдём искать, как можно ретраить в Combine. И, кроме безусловного ретрая, не найдём ничего. Поэтому даже такой простой кейс не получится написать реактивно, придётся изощряться с рекурсией и catch(_:).

  • WithLatestFrom для комбинации эвентов, чтобы закрыть пробел между CombineLatest и Merge.

  • InclusivePrefixWhile — если вам вдруг понадобится инклюзивный префикс, то его придётся написать самому. На эту тему даже есть тред на форуме Swift.

  • Materialize/Dematerialize — очень полезный оператор, чтобы следить за стримом ошибок.

  • Scan — без мутабельного буфера, это, конечно, сильно. Этот оператор часто используется, чтобы собрать эвенты в какой‑то буфер, например Set. И очевидно, что если для каждого следующего элемента создавать новую копию Set, то рано или поздно мы столкнёмся с фризами.

  • MainScheduler или UIScheduler — без них производить операции со стейтом UI будет очень проблематично, и есть немалый риск начать писать код вроде этого:

publisher
    .sink {
        let action = { /* … */ }
        if Thread.isMainThread {
            action()
        } else {
            DispatchQueue.main.async(execute: action)
        }
    }

Заключение

Время подвести итоги. Combine не самая стабильная библиотека, которую выпускала Apple, у неё очень много проблем, которые начинаются с таких базовых вещей, как:

  • Subjects — рейс‑кондишены из‑за непотокобезопасного CurrentValueSubject.

  • Multicasts — утечки памяти при потере ссылки на Multicast.

  • Operators — потеря эвентов в СombineLatest, утечки памяти в SubscribeOn и отсутствие многих популярных операторов, написать которые не так уже и просто из‑за того, что большинство операторов не соблюдают свои же контракты.

  • Schedulers — рейс‑кондишены и дата‑рейсы с бэкграунд‑очередями, особенно с параллельными, а также неэффективные стандартные шедулеры.

  • Subscription — отсутствие сериализации на большинстве операторов, из‑за чего контракт не соблюдается.

  • Back Pressure — отсутствие консистентности в том, как операторы должны с ней работать, что ведет к неожиданным результатам. А также багам, например, с CombineLatest, Zip и Merge.

Все проблемы — это либо баги, которые не исправляют уже очень давно и, скорее всего, не исправят уже никогда, либо неочевидное поведение, узнать о котором можно лишь с Swift‑форума или Stack Overflow.

Создать какой‑то стабильный API, используя Combine, очень сложно, ведь всё может с лёгкостью перестать работать, потечь или начать крешить при добавлении нового оператора в цепочку, например, если вдруг в какой‑то момент вы решите изменить шедулер, на котором должны работать операторы, на любой, кроме DispatchQueue.main.

Готовы ли вы променять плюсы first‑party‑библиотеки (размер приложения, отсутствие third‑party‑зависимостей) на скорость и сложность разработки, а также проблемы с поддержкой уже написанного кода? На этот вопрос каждая команда должна ответить сама, а моё мнение, что:

  • Если вы уже используете в проекте какую‑то реактивщину, например RxSwift, то переход на Combine может стать для вас болезненным, не только в связи с большим числом багов, но и из‑за отсутствия части операторов. Теперь вы знаете о проблемах, с которыми можете столкнуться. Keep in mind.

  • Если вы думаете, на что переехать с GCD, и выбираете между Combine и Structured Concurrency, то, учитывая всё вышесказанное, стоит присмотреться ко второму варианту. Конечно, багов там уж точно не меньше, но Apple хотя бы активно занимаются их исправлением, в то же время все проблемы Combine актуальны с момента его релиза. А недостающий функционал придётся реализовывать в обоих случаях.

И на последок, еще раз поделюсь библиотекой с экстеншенами для Combine, которую мы написали. Что в ней уже есть:

  • AnyPublisher.create — для взаимодействия callback‑based кода с Combine;

  • Популярные операторы, которых нет в Combine: WithLatestFrom, RetryWhen и другие;

  • Более эффективаная и безопасная реализация SubscribeOnDiscardableSubscribeOn;

  • UIScheduler и MainScheduler для работы с UI.

Теги:
Хабы:
Всего голосов 17: ↑17 и ↓0+22
Комментарии2

Публикации

Информация

Сайт
www.ya.ru
Дата регистрации
Дата основания
Численность
свыше 10 000 человек
Местоположение
Россия