Лето — лучшее время для сплава. Поэтому, если вы пока не в отпуске, давайте устроим короткий сплав по асинхронным потокам данных.
Переход из привычной императивной парадигмы иногда бывает сложным, поэтому сначала поговорим о терминах.
Термины
Kotlin Flow — это API для работы с асинхронными потоками данных, построенное поверх корутин. Kotlin Flow реализует парадигму реактивного программирования.
Реактивное программирование — это парадигма, в которой все данные и события рассматриваются не как единичные значения, а как асинхронные потоки, протекающие через приложение. Каждый компонент «подписан» на поток входных событий и автоматически обновляет своё состояние в ответ на любые изменения.
Реактивное программирование можно рассматривать как синтез трех подходов:

Как и в декларативной парадигме, в реактивном программировании описывается «что», а не «как». Вместо явных циклов вы просто собираете цепочку операторов (map, filter, flatMap и т. д.), задавая желаемую трансформацию потока.
Как и в программировании потоков данных, в реактивном программировании каждый поток и каждый оператор образуют «узлы» и «ребра» графа, по которому данные протекают от источников к потребителям.
Как и в событийно-ориентированной парадигме, в реактивном программировании потоки часто реализуют модель публикации-подписки.
Простейший пример реактивной парадигмы мы все видели в Excel, где при обновлении значения ячейки автоматически пересчитываются все остальные ячейки, которые ссылаются на нее через формулы.
Холодные и горячие потоки
Поток данных (stream) — это последовательность значений, которые поступают со временем, зачастую асинхронно.
Потоки данных делятся на горячие и холодные. Это различие описывает, когда именно начинается генерация данных и что происходит при подписке на поток.
Горячие потоки генерируют данные независимо от наличия подписчиков, а холодные потоки ленивы и начинают генерировать их только по запросу.

Слева — «горячий поток» в виде извергающегося вулкана, непрерывно выбрасывающего данные (желтые листки), а справа — «холодный поток» в виде ледяного крана, из которого данные капают только при повороте вентиля.
«Холодные потоки, горячие каналы»
Так называется статья Романа Елизарова, опубликованная в 2019 году. Дело в том, что в Kotlin Flow есть и горячие (SharedFlow и StateFlow), и холодные (Flow) потоки данных. Кроме того в Kotlin есть еще одна реализация горячих потоков — Channels.
Чтобы не запутаться, давайте посмотрим на схему:

Объект Flow в Kotlin — это реализация холодного потока данных, который производит значения асинхронно и лениво. Таким образом, он берет лучшее и от корутин, и от последовательностей: значения вычисляются «на лету» и при этом используются корутины для асинхронной обработки данных.
За информацией о каналах отсылаю вас к своему курсу по корутинам на Степике. В данной статье мы будем говорить только о Flow и его вариациях.
Создание Flow
Хрестоматийный пример для объяснения работы Flow — вычисление чисел Фибоначчи:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun fibonacciFlow(): Flow<Long> = flow {
var a = 0L
var b = 1L
emit(a)
delay(100)
emit(b)
while (true) {
val next = a + b
emit(next)
a = b
b = next
delay(100)
}
}
fun main() = runBlocking {
fibonacciFlow().take(10).collect { println(it) }
}
Здесь мы создаем поток данных с помощью билдера flow
, получающего на вход лямбду с приемником. Как известно, если последний параметр функции в Kotlin сам является функцией, то ее можно в виде лямбда-выражения вынести за круглые скобки (trailing lambda). Поэтому вместо
flow() { ... }
лямбда-аргумент можно записать внутри фигурных скобок, следующих за именем функции flow
, что придает синтаксису выразительность.
Функция emit()
отправляет очередное значение в поток, и оно плывет к подписчику — тому, кто запросил это значение. Так как emit()
— это suspend-функция, она приостанавливает выполнение корутины после передачи каждого значения до следующего запроса.
Но весь этот механизм лежит мертвым грузом до вызова терминальной операции, которая запускает выполнение потока в корутине и получает результаты его работы. В данном случае эту роль играет функция collect
, вызов которой инициирует сбор потока, т.е. генерацию значений и получение результатов. При этом лямбда-функция, переданная в collect
, подписывается на поток, то есть становится получателем значений, которые им генерируются.
Структура Flow
Каждый Flow делится на три части:
эмиттер — блок, в котором генерируются данные и, как правило, вызывается функция
emit()
для отправки значений;промежуточные операторы — операторы, которые обрабатывают данные, проходящие через поток;
коллектор — терминальный оператор, который запускает сбор потока и получает итоговые значения.
Взаимоотношения между терминальным и промежуточными операторами выглядят примерно так:

Возвращаясь к нашему примеру с числами Фибоначчи, эмиттер — это блок внутри flow{}:
var a = 0L
var b = 1L
emit(a)
delay(100)
emit(b)
while (true) {
val next = a + b
emit(next)
a = b
b = next
delay(100)
}
Промежуточный оператор представлен вызовом функции take()
:
.take(10)
А о коллекторе вы уже догадались по названию.
Многие промежуточные операторы называются так же, как и методы коллекций: map
, filter
, take
и т.д. Однако они адаптированы для работы с асинхронными потоками данных и используют возможности корутин.
Например, создадим поток углов в градусах, затем последовательно применим преобразования с помощью оператора map
:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.math.*
fun main() = runBlocking {
flowOf(0, 30, 45, 60, 90)
.map { it.toDouble() } // преобразуем целые числа в Double
.map { Math.toRadians(it) } // переводим градусы в радианы
.map { sin(it) } // вычисляем синус угла
.collect { println(it) } // выводим результат на экран
}
Когда оператор collect
запускает выполнение потока в корутине, каждое число генерируется, последовательно проходит по всей цепочке преобразований и передается в лямбду, переданную в collect
.
Как в случае и с коллекциями, подобные цепочки реализуют декларативную парадигму программирования: указано, что сделать, а не как сделать.
Пример
Предположим, у нас есть Android-приложение, в котором нам нужно отображать на главном экране актуальное количество записей из какой-то таблицы в БД. Актуальное — то есть при изменении таблицы это значение должно оперативно пересчитываться.
@Dao
interface WordDao {
@Query("SELECT COUNT(id) FROM words")
fun count(): Flow<Int>
}
С помощью Flow мы следим за таблицей words
и автоматически эмиттим обновленный count
при каждом изменении данных.
@Singleton
class WordRepository @Inject constructor(
private val wordDao: WordDao
) {
fun countWordsFlow(): Flow<Int> =
wordDao.count().
.distinctUntilChanged()
.flowOn(Dispatchers.IO)
}
Метод countWordsFlow()
позволяет подписаться на обновления количества слов в базе. Как только данные изменятся, в соответствующий Flow поступит новое значение — но только если оно реально изменилось (за счет вызова distinctUntilChanged
). Все это выполняется асинхронно и не блокирует основной поток приложения. Поэтому нам не приходится каждый раз обращаться к базе данных вручную.
Итог
Flow — мощный инструмент реактивного программирования, позволяющий просто и эффективно обрабатывать асинхронные потоки данных, реагировать на их изменения в реальном времени, а также писать лаконичный, безопасный и читаемый код без необходимости отслеживать изменения вручную.