Привет, Хабр!
В этой статье рассмотрим, как работает Kotlin Flow — инструмент для асинхронной обработки данных. Flow позволяет легко получать данные по мере их готовности, не блокируя основной поток, а также управлять отменой, обработкой ошибок и сменой контекста.
Основные концепции и функции Flow
Начнём с основ. Если вы знакомы с коллекциями, то Flow — это коллекция, которая выдаёт все элементы не сразу, а постепенно, по мере их готовности.
Для сравнения возьмём простой пример коллекции:
fun simpleList(): List<Int> = listOf(1, 2, 3) fun main() { simpleList().forEach { println(it) } }
Здесь все значения доступны сразу, и цикл отрабатывает синхронно. Но если вычисление элементов требует времени, можно воспользоваться последовательностями:
fun simpleSequence(): Sequence<Int> = sequence { for (i in 1..3) { Thread.sleep(100) // имитируем длительное вычисление yield(i) } } fun main() { simpleSequence().forEach { println(it) } }
Последовательности позволяют выдавать значения по одному, но при этом остаются синхронными и блокируют поток. Тут хорошо помогут suspend‑функции и, собственно, Flow.
Suspend‑функция позволяет выполнять операции асинхронно, не блокируя поток:
suspend fun simpleSuspend(): List<Int> { delay(1000) // имитируем асинхронную операцию return listOf(1, 2, 3) } fun main() = runBlocking { simpleSuspend().forEach { println(it) } }
Чтобы возвращать значения постепенно и асинхронно, мы используем Flow. Пример базового Flow:
fun simpleFlow(): Flow<Int> = flow { for (i in 1..3) { delay(100) // не блокируем поток, ждем готовности данных emit(i) // эмитируем значение в поток } } fun main() = runBlocking { simpleFlow().collect { value -> println(value) } }
Flow — это холодный поток, что означает, что код внутри flow {... } выполняется заново при каждом вызове collect().
Операторы и управление потоками
Flow предлагает богатый набор операторов для трансформации и управления данными, аналогичный тем, что используются в коллекциях, но с поддержкой асинхронных вызовов.
Фильтрация и преобразование данных. Например, если нужно отфильтровать чётные числа и преобразовать их в строки:
fun main() = runBlocking { (1..5).asFlow() .filter { println("Фильтруем: $it") it % 2 == 0 } .map { println("Мапим: $it") "Число: $it" } .collect { println("Коллектим: $it") } }
Оператор transform предоставляет возможность эмитировать произвольное количество значений на основе одного элемента:
fun main() = runBlocking { (1..3).asFlow() .transform { value -> emit("Начинаем обработку $value") delay(50) // можем вызывать suspend-функции внутри transform emit("Закончили обработку $value") } .collect { println(it) } }
Отмена и смена контекста. Flow поддерживает отмену, как и все корутины. Если требуется остановить сбор данных, можно использовать отмену внутри collect(). Например:
fun cancellableFlow(): Flow<Int> = flow { for (i in 1..5) { println("Эмитим $i") emit(i) } } fun main() = runBlocking { cancellableFlow().collect { value -> if (value == 3) cancel() // отменяем сбор при достижении определенного значения println("Получили $value") } }
Если в цикле нет suspend‑функций, отмена может не сработать мгновенно. Для таких случаев есть оператор cancellable():
fun main() = runBlocking { (1..5).asFlow() .cancellable() .collect { value -> if (value == 3) cancel() println("Число: $value") } }
Иногда нужно, чтобы эмитирование значений происходило в другом контексте. Не стоит пытаться менять контекст с помощью withContext внутри flow — используйте для этого оператор flowOn():
fun cpuIntensiveFlow(): Flow<Int> = flow { for (i in 1..3) { Thread.sleep(100) // эмулируем тяжелую работу println("Эмитим $i в потоке ${Thread.currentThread().name}") emit(i) } }.flowOn(Dispatchers.Default) fun main() = runBlocking { cpuIntensiveFlow().collect { value -> println("Получили $value в потоке ${Thread.currentThread().name}") } }
Для ситуации, когда эмиттер и коллекционер работают с разной скоростью, полезны операторы buffer(), conflate() и collectLatest(). Они позволяют избежать «узких мест» при обработке данных, буферизуя или пропуская промежуточные значения.
Пример интеграции
Допустим, есть у нас UI‑приложение, которое должно регулярно получать обновления с сервера. Вместо того, чтобы создавать бесконечный цикл в активности, можно просто создать Flow, который будет выдавать новые данные каждые несколько секунд.
Представим, что есть два источника новостей, и нужно объединить их потоки, фильтровать дубликаты и выводить данные в реальном времени:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.random.Random // Модель данных для новости data class NewsArticle( val id: Int, val source: String, val headline: String, val timestamp: Long ) // Имитация сетевого запроса для получения новостей с заданного источника object NetworkClient { suspend fun fetchNews(source: String): List<NewsArticle> { delay(500) // имитация задержки сети if (Random.nextInt(0, 10) < 2) { throw RuntimeException("Ошибка сети при получении новостей от $source") } val currentTime = System.currentTimeMillis() return List(3) { NewsArticle( id = Random.nextInt(1000, 9999), source = source, headline = "Срочная новость от $source №${Random.nextInt(100)}", timestamp = currentTime ) } } } // Функция-генератор потока новостей для заданного источника. Каждые intervalMs миллисекунд производится опрос сервера. fun newsFlow(source: String, intervalMs: Long): Flow<NewsArticle> = flow { while (true) { try { println("[$source] Опрашиваем источник...") val articles = NetworkClient.fetchNews(source) articles.forEach { article -> emit(article) } } catch (e: Exception) { println("[$source] Ошибка: ${e.message}") } delay(intervalMs) } }.flowOn(Dispatchers.IO) // Основной пример: агрегатор новостей. Объединяем потоки, фильтруем дубликаты и выводим данные. fun main() = runBlocking { println("=== Запуск агрегатора новостей ===") // Создаем потоки для двух источников с разными интервалами опроса val sourceAFlow = newsFlow("ИсточникA", intervalMs = 3000) val sourceBFlow = newsFlow("ИсточникB", intervalMs = 5000) // Объединяем потоки с помощью merge, буферизуем до 50 элементов и фильтруем дубликаты по id val mergedNewsFlow = merge(sourceAFlow, sourceBFlow) .buffer(50) .distinctBy { it.id } // Запускаем сбор новостей в отдельном Job для возможности отмены val aggregatorJob = launch { mergedNewsFlow .onEach { article -> // Здесь можно обновлять UI, сохранять в БД и пр. println("Получена новость: [${article.source}] ${article.headline} (время: ${article.timestamp})") } .catch { e -> println("Ошибка обработки новостей: ${e.message}") } .onCompletion { cause -> if (cause != null) println("Агрегатор завершился с ошибкой: $cause") else println("Агрегатор успешно завершил сбор новостей.") } .collect() } // Даем агрегатору работать 20 секунд, затем корректно останавливаем сбор данных delay(20000) println("=== Остановка агрегатора новостей ===") aggregatorJob.cancelAndJoin() println("=== Агрегатор остановлен ===") }
В консоли будут периодически появляться сообщения вроде «[ИсточникA] Опрашиваем источник...» и «Получена новость: [ИсточникB] Срочная новость...». Если при запросе возникнут ошибки, вы увидите соответствующие сообщения. По истечении 20 секунд работы агрегатора появится уведомление об остановке, и сбор новостей корректно завершится.
18 февраля в 19:00 пройдёт открытый урок ��а тему «Разработка монолитного приложения со Spring».
Поговорим о преимуществах и недостатках монолитной архитектуры и фреймворка Spring, а также об особенностях разработки со Spring в Kotlin. Попрактикуемся в разработке работающего монолитного приложения и разместим его в Docker-контейнере. Записаться
