Всем привет.
Несколько дней назад JetBrains выпустил новую версию корутин — 1.3.6 и одним из нововведении стал новый подвид Flow — StateFlow, который приходит на замену ConflatedBroadcastChannel. Я решил попробовать StateFlow в действии и изучить внутреннее устройство.
Думаю многие, кто использует Котлин при разработке под Андроид или в MPP знакомы с этими терминами, кто нет — данные сущности являются близкими аналогами BehaviorProcessor/BehaviorSubject из RxJava и LiveData/MutableLiveData из Jetpack.
Сам по себе StateFlow является простым расширением интерфейса Flow и представлен в двух видах:
Идея такая же как и у LiveData/MutableLiveData — через один интерфейс может только прочитать текущее состояние, а через другой ещё и установить.
Что же нам предлагает StateFlow по сравнению с ConflatedBroadcastChannel:
Попробуем теперь реализовать простое использование StateFlow. Для этого я сделал элементарную обёртку с возможность установить любой тип с null элементом по-умолчанию:
Получаем данные:
И выводим на экран с простейшим интерфейсом для тестов, никаких проблем это не вызывает и всё работает как часы:
Заглянем теперь внутрь и посмотрим, как это реализовано.
К моему удивлению — реализация действительно очень простая и заняла на текущий момент всего 316 строчек, из которых 25% — джавадоки.
И так, основным классом реализации является класс StateFlowImpl:
_state — атомик ссылка для хранения нашего состояния.
sequence — вспомогательный индикатор, который в зависимости от чётности/нечётности сообщает о текущем процессе обновления состояния
slots — массив/пул StateFlowSlot. StateFlowSlot — вспомогательная абстракция каждого «подключения» к StateFlow.
nSlots, nextIndex — вспомогательные переменные для работы с расширяемым массивом slots
Заранее рассмотрим StateFlowSlot. Он представляет всего навсего:
Плюс методы для смены состояний слота.
Каждый слот может быть в одном из состояний:
null — создан, но не используется
NONE — используется коллектором
PENDING — в ожидании отправки нового значения в коллектор
CancellableContinuationImpl — саспендед состояние, близкое предназначение с PENDING, подвешиваем коллектор пока не придёт новое состояние в StateFlow.
Рассмотрим, что происходит при установке нового значения:
Основная задача здесь — устаканить изменения состояния StateFlow из разных потоков для последовательных вызовов FlowCollector.
Можно выделить несколько шагов:
Что же происходит в методе collect:
Основная задача — отправить начальное значение по умолчанию и ждать новые значения:
В целом, это всё. Мы не рассмотрели, как происходит аллокации слотов и смена их состояний, но я посчитал, что для общей картины StateFlow это не принципиально.
Спасибо.
Несколько дней назад JetBrains выпустил новую версию корутин — 1.3.6 и одним из нововведении стал новый подвид Flow — StateFlow, который приходит на замену ConflatedBroadcastChannel. Я решил попробовать StateFlow в действии и изучить внутреннее устройство.
Думаю многие, кто использует Котлин при разработке под Андроид или в MPP знакомы с этими терминами, кто нет — данные сущности являются близкими аналогами BehaviorProcessor/BehaviorSubject из RxJava и LiveData/MutableLiveData из Jetpack.
Сам по себе StateFlow является простым расширением интерфейса Flow и представлен в двух видах:
public interface StateFlow<out T> : Flow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
/**
* The current value of this state flow.
*
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
*/
public override var value: T
}
Идея такая же как и у LiveData/MutableLiveData — через один интерфейс может только прочитать текущее состояние, а через другой ещё и установить.
Что же нам предлагает StateFlow по сравнению с ConflatedBroadcastChannel:
- Более простая и garbage-free внутренняя реализация.
- Необходимость наличия элемента по-умолчанию. Null также возможен.
- Разделение на read-only и read-write интерфейсы.
- Сравнение элементов через equality вместо сравнения ссылок.
Попробуем теперь реализовать простое использование StateFlow. Для этого я сделал элементарную обёртку с возможность установить любой тип с null элементом по-умолчанию:
class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Получаем данные:
lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
И выводим на экран с простейшим интерфейсом для тестов, никаких проблем это не вызывает и всё работает как часы:
Заглянем теперь внутрь и посмотрим, как это реализовано.
К моему удивлению — реализация действительно очень простая и заняла на текущий момент всего 316 строчек, из которых 25% — джавадоки.
И так, основным классом реализации является класс StateFlowImpl:
private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue) // T | NULL
private var sequence = 0 // serializes updates, value update is in process when sequence is odd
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0 // number of allocated (!free) slots
private var nextIndex = 0 // oracle for the next free slot index
. . .
}
_state — атомик ссылка для хранения нашего состояния.
sequence — вспомогательный индикатор, который в зависимости от чётности/нечётности сообщает о текущем процессе обновления состояния
slots — массив/пул StateFlowSlot. StateFlowSlot — вспомогательная абстракция каждого «подключения» к StateFlow.
nSlots, nextIndex — вспомогательные переменные для работы с расширяемым массивом slots
Заранее рассмотрим StateFlowSlot. Он представляет всего навсего:
private val _state = atomic<Any?>(null)
Плюс методы для смены состояний слота.
Каждый слот может быть в одном из состояний:
null — создан, но не используется
NONE — используется коллектором
PENDING — в ожидании отправки нового значения в коллектор
CancellableContinuationImpl — саспендед состояние, близкое предназначение с PENDING, подвешиваем коллектор пока не придёт новое состояние в StateFlow.
Рассмотрим, что происходит при установке нового значения:
public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return // Don't do anything if value is not changing
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
curSequence++ // make it odd
sequence = curSequence
} else {
// update is already in process, notify it, and return
sequence = curSequence + 2 // change sequence to notify, keep it odd
return
}
curSlots = slots // read current reference to collectors under lock
}
/*
Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
Loop until we're done firing all the changes. This is sort of simple flat combining that
ensures sequential firing of concurrent updates and avoids the storm of collector resumes
when updates happen concurrently from many threads.
*/
while (true) {
// Benign race on element read from array
for (col in curSlots) {
col?.makePending()
}
// check if the value was updated again while we were updating the old one
synchronized(this) {
if (sequence == curSequence) { // nothing changed, we are done
sequence = curSequence + 1 // make sequence even again
return // done
}
// reread everything for the next loop under the lock
curSequence = sequence
curSlots = slots
}
}
}
Основная задача здесь — устаканить изменения состояния StateFlow из разных потоков для последовательных вызовов FlowCollector.
Можно выделить несколько шагов:
- Установка нового значения.
- Установка маркера sequence — в нечётное значение обозначающее, что мы уже в процессе апдейта.
- makePending() — установка всех состояний слотов(т.е. всех подключений) в PENDING — скоро отправим новое значение.
- Цикл проверки sequence == curSequence, что все задачи выполнены и установка sequence чётным числом.
Что же происходит в методе collect:
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
try {
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
val newState = _state.value
// Conflate value emissions using equality
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
if (!slot.takePending()) { // try fast-path without suspending first
slot.awaitPending() // only suspend for new values when needed
}
}
} finally {
freeSlot(slot)
}
}
Основная задача — отправить начальное значение по умолчанию и ждать новые значения:
- Создаём или переиспользуем слот нового подключения.
- Проверяем состояние на null или на смену состояния. Эмиттим новое значение.
- Проверяем, есть ли слоты готовые для обновления (PENDING состояние) и если нет — подвешиваем слот в ожидании новых значений.
В целом, это всё. Мы не рассмотрели, как происходит аллокации слотов и смена их состояний, но я посчитал, что для общей картины StateFlow это не принципиально.
Спасибо.