Пробуем и разбираемся с StateFlow

Всем привет.

Несколько дней назад JetBrains выпустил новую версию корутин — 1.3.6 и одним из нововведении стал новый подвид Flow — StateFlow, который приходит на замену ConflatedBroadcastChannel. Я решил попробовать StateFlow в действии и изучить внутреннее устройство.

Думаю многие, кто использует Котлин при разработке под Андроид или в MPP знакомы с этими терминами, кто нет — данные сущности являются близкими аналогами BehaviorProcessor/BehaviorSubject из RxJava и LiveData/MutableLiveData из Jetpack.

Сам по себе StateFlow является простым расширением интерфейса Flow и представлен в двух видах:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

Идея такая же как и у LiveData/MutableLiveData — через один интерфейс может только прочитать текущее состояние, а через другой ещё и установить.

Что же нам предлагает StateFlow по сравнению с ConflatedBroadcastChannel:

  • Более простая и garbage-free внутренняя реализация.
  • Необходимость наличия элемента по-умолчанию. Null также возможен.
  • Разделение на read-only и read-write интерфейсы.
  • Сравнение элементов через equality вместо сравнения ссылок.

Попробуем теперь реализовать простое использование StateFlow. Для этого я сделал элементарную обёртку с возможность установить любой тип с null элементом по-умолчанию:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Получаем данные:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

И выводим на экран с простейшим интерфейсом для тестов, никаких проблем это не вызывает и всё работает как часы:



Заглянем теперь внутрь и посмотрим, как это реализовано.

К моему удивлению — реализация действительно очень простая и заняла на текущий момент всего 316 строчек, из которых 25% — джавадоки.

И так, основным классом реализации является класс StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state — атомик ссылка для хранения нашего состояния.

sequence — вспомогательный индикатор, который в зависимости от чётности/нечётности сообщает о текущем процессе обновления состояния
slots — массив/пул StateFlowSlot. StateFlowSlot — вспомогательная абстракция каждого «подключения» к StateFlow.
nSlots, nextIndex — вспомогательные переменные для работы с расширяемым массивом slots

Заранее рассмотрим StateFlowSlot. Он представляет всего навсего:

private val _state = atomic<Any?>(null)

Плюс методы для смены состояний слота.

Каждый слот может быть в одном из состояний:

null — создан, но не используется
NONE — используется коллектором
PENDING — в ожидании отправки нового значения в коллектор
CancellableContinuationImpl — саспендед состояние, близкое предназначение с PENDING, подвешиваем коллектор пока не придёт новое состояние в StateFlow.

Рассмотрим, что происходит при установке нового значения:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

Основная задача здесь — устаканить изменения состояния StateFlow из разных потоков для последовательных вызовов FlowCollector.

Можно выделить несколько шагов:

  1. Установка нового значения.
  2. Установка маркера sequence — в нечётное значение обозначающее, что мы уже в процессе апдейта.
  3. makePending() — установка всех состояний слотов(т.е. всех подключений) в PENDING — скоро отправим новое значение.
  4. Цикл проверки sequence == curSequence, что все задачи выполнены и установка sequence чётным числом.

Что же происходит в методе collect:

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

Основная задача — отправить начальное значение по умолчанию и ждать новые значения:

  1. Создаём или переиспользуем слот нового подключения.
  2. Проверяем состояние на null или на смену состояния. Эмиттим новое значение.
  3. Проверяем, есть ли слоты готовые для обновления (PENDING состояние) и если нет — подвешиваем слот в ожидании новых значений.

В целом, это всё. Мы не рассмотрели, как происходит аллокации слотов и смена их состояний, но я посчитал, что для общей картины StateFlow это не принципиально.

Спасибо.
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 13

    0
    вопрос — как запустить StateFlow, чтоб поведение было аналогично LiveData c привязкой к lifecycle?
    Например, чтоб результат приходил только в промежутке между onStart/onStop, а если приходит в бекграунде, то постился автоматически при переходе в фореграунд?
      0

      Если кратко, то из коробки никак:)
      Сам по себе StateFlow никак не связан с Андроид и не знает о жизненных циклах приложения.
      Единственное, что может работать — если collect запускать на скоупе либо lifecycleScope либо на скоупе ViewModel. Тогда "отмена" подписки будет происходить сама на onDestroy/onCleared.
      А чтобы сделать аналогично LiveData — только самим ручками, либо возможно Гугл сделает что-то с этим сам.

        0
        Так LiveData сам по себе тоже ничего не знает о сменах состояния lifecycle, пока ему явно не передашь LifecycleOwner, подписавшись на него вызовом метода observe().
        Похожая (но немного перевернутая) ситуация и со StateFlow — сам StateFlow ничего не знает о lifecycle, но подписавшись на него вызовом collect() в рамках «нужного» coroutine-контекста мы обеспечим его правильную работу с lifecycle
          0

          Не совсем так.
          StateFlow будет "знать" только о create/destroy. Загляните, например, внутрь реализации lifecycleScope.
          LiveData же умеет не оповещать если в фоне..StateFlow такое сам не сможет, скойуп не закэнселится в этом случае.

            +1
            Корутины тоже не стоят на месте) Вот так скоуп будет работать только в started-состоянии. Есть еще whenCreated{} и whenResumed{}
            lifecycleScope.launch {
              whenStarted {
                ...
              }
            }

            И да, StateFlow без coroutine-контекста не знает об изменениях состояния, так же как и LiveData без LifecycleOwner
              0

              Есть такое, но тут главное не путаться с "будет работать только в started-состоянии"
              Лучше сказать, что стартанёт в started(created/resumed) состоянии, но cancel будет только в destroy.

                +1
                Не только стартанет. С whenStarted корутина переходит в suspended-состояние, когда lifecycle уходит в onStop, и (State)Flow.collect() перестает получать изменения. Т.е. получаем поведение аналогичное LiveData
                  0

                  Вы правы, давно не заглядывал в исходник Lifecycle.
                  Только там хитрее. Там используется специальный PausingDispatcher через который проходят все события и они либо выполняются, либо если состояние lifecycle иное, то просто стопается вся очередь и копится, а потом отдаётся.
                  Думал, там ещё тоже самое что и с ViewModel. Гляну и её, может тоже наконец-то что-то изменилось и в ней..

                +1
                Спасибо. Т.е получается что данный код более гибкий:
                lifecycleScope.launchWhenResumed {
                            viewModel.loadingStateFlow
                                .collect { Log.d("MainFragment","Loading state: $it") }
                        }

                Т.к. мы можем наблюдать значения только на скоупе onResume-onPause, или onCreate-onDestroy. А LiveData такого не умеет, так как по умолчанию она умеет слушать только в onStart-onStop.
                P.S. ещё заметил интересную особенность StateFlow — когда постишь одно и то же значение несколько раз, то collect будет вызван только 1 раз
                  0
                  да, теперь не сработает костыль с liveData.value = liveData.value, чтобы дернуть подписчиков)
                    0
                    Если вам надо так делать, то скорее всего в коде что-то не так. Скорее всего нужен не стейт, а отдельная команда для обновления чего-то.
                      0
                      Я так и написал — костыль) С другой стороны, если вместо стейта использовать команду, то при пересоздании UI мы не получим предыдущий стейт
                    0
                    ещё заметил интересную особенность StateFlow — когда постишь одно и то же значение несколько раз, то collect будет вызван только 1 раз

                    Да, там сравнивается через equality.

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое