Как стать автором
Поиск
Написать публикацию
Обновить
511.83
OTUS
Развиваем технологии, обучая их создателей

flowOn vs withContext: правильное переключение потоков в Flow в Kotlin

Уровень сложностиПростой
Время на прочтение6 мин
Количество просмотров194

Привет, Хабр!

Сегодня мы рассмотрим, как правильно переключать контексты в Kotlin Flow и почему flowOn — это не то же самое, что withContext.

Зачем вообще переключать контексты

В Kotlin‑корутинах потоки сами по себе недорогие — запускать их можно десятками тысяч. Но сама по себе корутина не даёт гарантий, что она работает в нужном месте. Контекст (то есть CoroutineDispatcher) — это фактический выбор, на каком треде выполнится код. И если вы запустили flow из UI, но не передали тяжёлую логику в Dispatchers.IO, весь код, включая медленные запросы к базе или файловую систему, поедет в главном потоке.

Второй момент — инварианты потока. Kotlin Flow внутри себя строго следит за тем, чтобы всё исполнялось последовательно и в одном контексте. Если внезапно воткнуть withContext или emit() из другого диспатчера — получите IllegalStateException: Flow invariant is violated. Эта ошибка как раз и говорит: «ты нарушил правила — emit пошёл с другого потока». Решается она только одним способом — использовать flowOn, который создаёт буфер и грамотно отделяет upstream от downstream.

Ну и третий момент — гонки данных. Если в процессе эмита вы параллельно переключаете контексты, обрабатываете значения и отправляете их в общий StateFlow, при неправильной синхронизации можно легко поймать гонку: два потока записывают одно значение одновременно. На тестах всё будет хорошо, а в работе может появится редкий баг, который трудно отловить. Поэтому переключать контекст нужно централизованно, понятно и через flowOn — это безопасный, ожидаемый и проверяемый способ.

Как работает flowOn

flowOn — это оператор, который определяет где и на каком контексте будет исполняться апстрим. Апстрим — это всё, что идёт выше по цепочке после flow {} или любого другого оператора, возвращающего Flow. Если вы пишете flowOn(Dispatchers.IO), это значит: «всё, что выше — emit, map, filter и прочее — пусть выполняется на Dispatchers.IO». А вот всё, что ниже, то есть после flowOn, останется в том контексте, где находится коллектор.

Пример:

val usersFlow = flow {
    logThread("start emit")      
    emit(loadUsers())            
}.flowOn(Dispatchers.IO)         
 .map { user -> enrich(user) }   

Что произойдёт:

  • loadUsers() будет вызван на IO

  • map { enrich(...) } выполнится уже в том контексте, где вы коллектите flow (например, Main или Default)

Эта особенность — разделение контекста между апстримом и даунстримом — реализуется через вставку промежуточного буфера. Именно flowOn делает паузу между частями цепочки и не позволяет апстриму блокировать даунстрим. Если loadUsers() вдруг начнёт тормозить, это не заблокирует collect, потому что между ними есть буфер. Именно поэтому flowOn работает адекватно с тяжёлыми операциями.

Также стоит понимать, что flowOn влияет только на цепочку выше себя. Он не захватывает весь флоу целиком. Пример неправильного ожидания:

flow {
    emit(doSomething())
}.map { heavyCompute(it) }
 .flowOn(Dispatchers.Default)

heavyCompute выполнится не на Dispatchers.Default, а в том же контексте, где идёт collect, потому что flowOn влияет только на emit и всё выше. Если вы хотите, чтобы и emit, и map были на Default, нужно переставить flowOn выше:

flow {
    emit(doSomething())
}
    .flowOn(Dispatchers.IO)
    .map { heavyCompute(it) }
    .flowOn(Dispatchers.Default)

Можно ставить flowOn несколько раз — и это как раз и позволяет строить гибкие пайплайны, где каждый участок выполняется на подходящем пуле. Главное помнить то, что читаются flowOn слева направо, и каждый из них влияет только на всё, что выше.

flowOn — это единственный рекомендуемый способ менять контекст выполнения кода внутри Flow, без риска нарушить потоковую модель, потерять элементы или нарваться на IllegalStateException.

Как работает withContext (и почему это не вариант)

withContext — это suspend‑функция, которая временно переключает выполнение кода в другой CoroutineContext. Она идеально подходит для задач вроде withContext(Dispatchers.IO) { readFromDisk() }, когда нужно выполнить единичную тяжёлую операцию в нужном диспатчере. Но внутри Flow всё совсем не так просто.

Если вы вызываете withContext внутри тела flow {} или любого оператора обработки типа map, вы не просто меняете контекст. Вы вставляете suspend‑точку на каждый элемент и переключаете тред каждый раз, когда обрабатывается элемент. Это значит, что если у вас тысяча элементов — вы сделаете тысячу переключений тредов. А это уже неплохая такая нагрузка.

val wrongFlow = flow {
    (1..1000).forEach { id ->
        emit(withContext(Dispatchers.IO) {
            loadById(id)
        })
    }
}

Выглядит невинно, но фактически вы делаете Dispatchers.IOflow контекст → Dispatchers.IO… тысячу раз. В многопоточном приложении это может привести к:

  1. лишней нагрузке на пул потоков, особенно если он ограничен;

  2. задержке исполнения, если withContext создаёт очередь задач;

  3. непредсказуемому поведению и просадке пропускной способности — особенно при комбинировании с другими suspend‑функциями.

Самое неприятное: это может не вызывать ошибок прямо сейчас, но ухудшает масштабируемость и стабильность. Если при этом emit() происходит с другого контекста, нарушается инвариант потока. Именно об этом предупреждает официальная документация: emit должен вызываться в том же контексте, что и запуск collect, если flowOn не применялся. А withContext ломает это правило — и вы получаете IllegalStateException: Flow invariant is violated.

Ещё пример:

flow {
    val data = withContext(Dispatchers.IO) {
        heavyQuery()
    }
    emit(data)
}

На первый взгляд безопасно. Но если collect вызывается в другом контексте (например, в Main), emit(data) происходит не в том же контексте, где collect, и Kotlin Flow выбрасывает исключение.

withContext — инструмент не для Flow. Он хорош в точках входа (например, внутри ViewModel), но внутри цепочки его почти всегда нужно заменять на flowOn. Так вы сохраняете потоковую модель, избегаете лишней нагрузки на планировщик и не рискуете получить баги, которые появятся только в нагрузке или в бою.

Мини-бенчмарк

Упрощённая метрика, чтобы почувствовать проблему переключений:

suspend fun benchmark() {
    val flowOnTime = measureTimeMillis {
        (1..1_000_000).asFlow()
            .map { it + 1 }                        // CPU
            .flowOn(Dispatchers.Default)
            .collect()
    }

    val withContextTime = measureTimeMillis {
        (1..1_000_000).asFlow()
            .map { withContext(Dispatchers.Default) { it + 1 } }
            .collect()
    }

    println("flowOn:       $flowOnTime ms")
    println("withContext:  $withContextTime ms")
}
flowOn:       150–250 ms
withContext:  3000–6000 ms

В среднем на десктопе flowOn‑вариант быстрее, потому что переключается ровно один раз и держит CPU‑часть отдельной цепочкой.

flowOn(Dispatchers.Default) один раз переключает апстрим на нужный диспатчер, и вся обработка выполняется поточно, без лишних переключений между тредами.

withContext(Dispatchers.Default) внутри map делает миллион тред‑свитчей, по одному на каждый элемент. Каждый withContext — это suspend, переключение, возврат, и всё это в цепочке. Суммарные накладные расходы колоссальны.

Если дополнительно включить профилировку корутин (-Dkotlinx.coroutines.debug или setprop debug.coroutines.enable_creation_stack_trace true), можно увидеть, как создаются миллионы короткоживущих корутин и как они давят на планировщик.


Итоги

flowOn и withContext решают схожую задачу, но делают это кардинально разными путями. flowOn — это декларативное переключение апстрима с сохранением инвариантов и минимальными затратами; withContext — точечное, иногда дорогое и часто ненужное. Если держать в голове эту разницу и не забывать про backpressure, ваши Flows будут быстрыми и адеквтаными.

Если вы работаете с Java и задумывались о переходе на Kotlin для серверной разработки, приглашаем вас на два открытых урока курса Kotlin Backend Developer. Professional:

Кроме того, пройдите вступительное тестирование, чтобы оценить свой уровень и узнать, подойдет ли вам программа курса «Kotlin Backend Developer. Professional».

Теги:
Хабы:
0
Комментарии1

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS