Вероятнее всего у вас спрашивали на собесе «как работают корутины под капотом?», вы не долго думая выбрасывали что‑то в стиле «там под капотом стейт‑машина, она определяет какая suspend функция будет выполняться», но понимали ли вы на самом деле всё о чем говорили? Возможно, только вам это известно, но если честно я очень плохо понимал собственные ответы на такие вопросы как бы это парадоксально не звучало и даже после десятка пройденных собесов у меня не было полноценной картины как работает внутрянка этой поистине невероятной библиотеки «сладкой асинхронщины».
Ладно, вступление и так получилось слишком затянутым, как это бывает в непрофессиональных фильмах от которых ждешь экшена половину хронометража, а потом получаешь унылые бои на светящихся палках, погнали короче разбираться!
Знакомство с крутыми перцами: CoroutineContext и CoroutineScope
Начнём с простенького примера:
fun main() = runBlocking {
// запускаем новую корутину
launch {
println("Hello, I'm a Kotlin coroutine, how are you?")
}
}
Пока нас интересует только функция launch
, которая чаще всего используется для создания корутин, давайте провалимся в её исходники:
// (1) launch является Kotlin Extension функцией для CoroutineScope
fun CoroutineScope.launch(
// (2) контекст похож на HashMap'у, также хранит всякие штуки по ключу
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// (3) при создании новой корутины контекст может быть изменён
val newContext = newCoroutineContext(context)
// к остальной части кода вернёмся позже
}
Пройдёмся подробнее по пунктам, которые я выделил в комментах к коду:
1) Чтобы создать новую корутину нужно вызвать launch
в пределах CoroutineScope
, никак иначе, так было сделано чтобы корутина смогла получить контекст и пробрасывать его в дочерние корутины, если глянуть исходник CoroutineScope
, то всё станет очевидным:
interface CoroutineScope {
val coroutineContext: CoroutineContext
}
При запуске корутины берётся текущий контекст из CoroutineScope
в котором был вызван launch
.
2) CoroutineContext
не является хэш-таблицей, а реализован на основе паттерна Компоновщик:
/*
основной прикол паттерна Компоновщик: создать дерево
из вложенных друг в друга объектов
чтобы реализовать такой паттерн нужен общий родитель,
которым в данном случае является CoroutineContext
*/
/*
обычные элементы контекста, такие как Job, CoroutineName и тд являются
простыми объектами, которые не содержат другие или в терминах паттерна
листовыми узлами
*/
interface Element : CoroutineContext {
val key: Key<*>
/*
для удобного доступа к элементам контекста переопределён оператор get
это позволяет делать такие штуки:
coroutineContext[Job] вместо coroutineContext.get(Job)
*/
override operator fun <E : Element> get(key: Key<E>): E? =
if (this.key == key) this as E else null
}
/*
помимо листовых узлов есть комплексные наборы данных,
которые могут в себя включать другие такие наборы
и простые объекты
*/
class CombinedContext(
val left: CoroutineContext,
val element: CoroutineContext.Element
) : CoroutineContext {
override fun <E : Element> get(key: Key<E>): E? {
/*
логика простая: проверяем сначала простой объект,
если ключи не совпадают, смотрим left, если он является
комплексным узлом CombinedContext, рекурсивно повторяем
*/
var currentContext = this
while (true) {
currentContext.element[key]?.let { return it }
val next = currentContext.left
if (next is CombinedContext) {
currentContext = next
} else {
return currentContext.get(key)
}
}
}
}
class CoroutineName(val name: String) : CoroutineContext.Element {
/*
в качестве ключей для обычных элементов CoroutineContext
используются названия самих классов, что очень удобно
поле key требует наследника Key<*> который определён ниже
через companion object, это работает даже если на первый взгляд
выглядит сомнительно и неоднозначно
*/
override val key: Key<*> = CoroutineName
companion object Key : Key<CoroutineName>
}
fun main() {
// Job и CoroutineDispatcher являются элементами CoroutineContext
val combinedContext = CombinedContext(
CombinedContext(
Job(),
Dispatchers.Default
),
CoroutineName("My name's Kotlin coroutine")
)
/*
в итоге мы можем положить в CoroutineContext то что нужно корутинам:
Job, CoroutineName, CoroutineDispatcher, CoroutineExceptionHandler и тд,
а затем прокидывать контекст через CoroutineScope в сами корутины
*/
val job = combinedContext[Job]
}
3) При создании новой корутины можно изменить CoroutineContext
:
launch(CoroutineName("I'm a parent coroutine")) {
launch(CoroutineName("I'm child coroutine"))) {
// ...
}
}
Логика работы в таком случае следующая: корутина получает текущий контекст из CoroutineScope
и складывает его с контекстом, переданным в качестве параметра функции launch
, таким образом дочерняя корутина из примера содержит другое имя.
Важный момент: не все элементы CoroutineContext
могут быть корректно изменены, например при указании у дочерней корутины другой Job
вы можете разрушить принцип Structured Concurrency, который заключается в каскадной отмене всех корутин, покажу на примере:
val topLevelJob = viewModelScope.launch {
/*
тут всё ок, при создании новой корутины
возьмётся Job'а из текущего контекста, в нашем случае это
topLevelJob, затем корутина будет добавлена как дочерняя Job'а и
связь родитель - ребёнок не нарушится
*/
launch {
println("I'm coroutine #1")
}
/*
не ок, так как мы создаём Job'у которая не привязана
к родительской, грубо говоря мы не сделали
parentJob.attachChild(childJob), который кстати нельзя
сделать потому что это internal api библиотеки
*/
launch(Job()) {
println("I'm coroutine #2")
}
}
// вторая дочерняя корутина не будет отменена
topLevelJob.cancel()
Подведём итоги:
корутина создаётся через Kotlin Extension функцию
CoroutineScope.launch
CoroutineScope
является простым интерфейсом, который предоставляет корутине контекстCoroutineContext
нужен для хранения всяких полезных штук при выполнении корутины:CoroutineName
,CoroutineDispatcher
,Job
,CoroutineExceptionHandler
и тд, важно добавить что вы можете легко создать свой элемент контекста, унаследовавCoroutineContext.Element
интерфейс.при создании корутины можно передать новый контекст, это приведёт к созданию контекста, основанного на текущем с изменёнными элементами, взятыми из нового
не все элементы контекста можно адекватно изменить, например нужно быть осторожным при указании другой
Job
'ы, отличным решением будет придерживаться правила: менятьJob
только для самых высокоуровневых корутин, в идеале при созданииCoroutineScope
Continuation интерфейс и реализация suspend блока
Возвращаемся снова к начальному примеру:
fun main() = runBlocking {
launch {
println("Hello, I'm a Kotlin coroutine, how are you?")
}
}
public fun CoroutineScope.launch(
// ...
block: suspend CoroutineScope.() -> Unit
): Job {}
Обратите внимание на ключевое слово suspend
, именно благодаря тому что launch
выполняет лямбду помеченную этим ключевым словом мы можем запускать suspend
функции в пределах корутины, что ещё интереснее эта лямбда во время компиляции превращается в нечто интересное:
BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label = 0;
public final Object invokeSuspend(Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(var1);
String var2 = "Some";
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
// этот метод будет использоваться для создания Continuation объекта
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
}), 3, (Object)null);
Ага, вот она та самая стейт-машина (switch блок), про которую на собесе вскользь упонимают, сейчас она ничего сложного не делает, кроме как выводит текст в консоль и возвращает пустой результат.
После достаточно долгих вечеров копания в исходниках и процесса дебага я всё таки выяснил, что сгенерированный выше код это ничто иное как реализация абстрактного класса ContinuationImpl
, одного из наследников Continuation
:
/*
само название говорит за себя, что это "продолжение" после приостановки
корутины, да та самая магическая приостановка или SUSPENDED состояние о
которой говорят постоянно, позже узнаем что никакой магии тут нет
*/
public interface Continuation<in T> {
/*
как мы выяснили в предыдущем разделе CoroutineContext содержит
важные штуки для корутин, например CoroutineDispatcher, который
может пригодиться для переключения потоков
*/
public val context: CoroutineContext
/*
вызов этого метода происходит после возвращения корутины из состояния
приостановки, именно сюда кладутся результаты suspend функций, также
дочерние корутины могут вызывать этот метод у родительских для
продолжения работы последних
*/
public fun resumeWith(result: Result<T>)
}
Без такой штуки как Continuation
корутины не могли бы возвращаться туда где произошла приостановка и следовательно мы не могли бы выполнять код на разных потоках, используя последовательную запись кода, напомню что одной из ключевых идей как раз и является выполнение асинхронного кода, как последовательного, небольшой пример:
// создаём корутину на главном потоке
launch {
// вызываем функцию fetchAndroidUnderTheHoodPosts() в background потоке
val myPosts = fetchAndroidUnderTheHoodPosts()
/*
чтобы следующий код получил результат нужно как минимум
каким-то образом получить его и не забыть переключиться на главный поток,
а так как fetchAndroidUnderTheHoodPosts() выполняется на другом потоке
и неизвестно когда функция закончит своё выполнение,
остаётся только вариант передачи в функцию callback'а,
который будет вызван когда она завершится,
таким callback'ом является Continuation объект
*/
println(myPosts)
}
После того как функция fetchAndroidUnderTheHoodPosts
завершит выполнение своего кода, результат будет передан через Continuation.resumeWith()
, что приведёт к дальнейшему выполнение корутины, в текущем примере - вывод всех постов в консоль.
Окей, мы определились что без Continuation
ничего не выйдет и даже узнали что suspend блок в launch
функции на самом деле тоже является Continuation
объектом и наследуется от ContinuationImpl
, но основная реализация содержится в BaseContinuationImpl
классе, от которого наследуется ContinuationImpl
, сложно? Ничего, привыкайте, это хардкор, а не няшный мульт про поней.
Идём смотреть на реализацию Continuation
для suspend блока:
/*
компилятор генерирует реализацию этого
абстрактного класса для suspend блоков
*/
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
// контекст берётся из корутины, дальше это увидим
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
private var intercepted: Continuation<Any?>? = null
/*
ContinuationInterceptor это штука которая оборачивает
текущий Continuation объект в новый, например DispatchedContinuation
и возвращает его, такой механизм используется для переключения потоков
через CoroutineDispatcher, кстати оборачивание одного объекта
в другой с общим интерфейсом (Continuation) ничто иное как
паттерн Декоратор
*/
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// очистка обёрнутого Continuation объекта:
// зануление ненужных ссылок и тд
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
// корутина закончила своё выполнение
this.intercepted = CompletedContinuation
}
}
/*
если в ContinuationImpl реализована поддержка CoroutineDispatcher'ов,
то в BaseContinuationImpl содержится основная логика работы с состоянием
приостановки
*/
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {и
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
/*
текущая реализация Continuation требует в качестве completion
более высокоуровневый Continuation объект, обычно им является
сама корутина
напоминаю что мы сейчас рассматриваем реализацию
suspend блока в launch функции, а не саму корутину
*/
val completion = completion!!
val outcome: Result<Any?> =
try {
/*
вот тут происходит самое интересное, invokeSuspend
выполняет внутри себя тот самый сгенерированный код
в котором содержатся наши suspend функции и если одна
из них перешла в состояние приостановки,
метод тупо делает return, Continuation в данном
случае не продолжит своё выполнение пока
не будет снова вызван resumeWith()
*/
val outcome = invokeSuspend(param)
/*
COROUTINE_SUSPENDED - зарезервированная константа,
сигнализирующая о том, что внутри invokeSuspend
произошла приостановка
*/
if (outcome === COROUTINE_SUSPENDED) return
// если invokeSuspend вернул результат получаем его
Result.success(outcome)
} catch (exception: Throwable) {
// если произошло исключение тоже получаем его
Result.failure(exception)
}
/*
так как текущий BaseContinuationImpl получил результат,
значит suspend блок в корутине завершился, поэтому
текущий объект BaseContinuationImpl больше не нужен,
а следовательно всякие используемые штуки,
такие как CoroutineDispatcher'ы например должен быть очищены
*/
releaseIntercepted()
/*
если мы запустили корутину в другой корутине, то в качестве
completion будет suspend блок родительской корутины:
launch { родительский suspend блок BaseContinuationImpl
launch { дочерний suspend блок BaseContinuationImpl
}
}
*/
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
/*
вызывается самый высокоуровневый Continuation объект,
в большинстве случаев это сама корутина
*/
completion.resumeWith(outcome)
return
}
}
}
}
// тот самый метод, переопределённый в сгенерированном коде
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
Суммируем:
мы имеем
Continuation
интерфейс, позволяющий продолжить выполнение корутины после её приостановкидля suspend блока генерируется специальная реализация
ContinuationImpl
со стейт-машиной (switch или when конструкцией) в переопределённом методеinvokeSuspend()
когда suspend функция приостанавливается происходят следующие вещи:
-invokeSuspend()
возвращает специальное значениеCOROUTINE_SUSPENDED
-BaseContinuationImpl
завершается через return и ожидает следующего вызоваresumeWith()
логика обработки состояния приостановки содержится в
BaseContinuationImpl
, а логика переключения потоков с помощьюCoroutineDispatcher
'ов происходит в наследникеContinuationImpl
.
Что же такое корутина?
Теперь мы можем разобраться чем же на самом деле является корутина и как происходит выполнение suspend блока в ней, для этого снова возвращаемся к исходникам launch
функции:
public fun CoroutineScope.launch(
// ...
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
/*
мы не будем рассматривать LazyStandaloneCoroutine, так как эта штука
очень похожа на базовую реализацию корутины StandaloneCoroutine,
только запускается по требованию
*/
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
/*
запускаем корутину, LazyStandaloneCoroutine не запустится таким образом,
нужно будет вручную вызвать метод start() у объекта Job,
который возвращает launch
*/
coroutine.start(start, coroutine, block)
return coroutine
}
Помните в BaseContinuationImpl
был такой параметр как completion, я ещё говорил что им может быть сама корутина, так вот StandaloneCoroutine
и есть реализация корутины, короче не будем тянуть, идём смотреть исходники:
// корутина наследует Job, Continuation и CoroutineScope
class StandaloneCoroutine<in T>(
parentContext: CoroutineContext,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
/*
кладёт Job текущей корутины в Job'у родительской
это нужно чтобы родительские корутины знали о
дочерних и не завершались раньше них
*/
initParentJob(parentContext[Job])
}
/*
так можно сделать потому что StandaloneCoroutine
является реализацией Job, а Job является одним из элементов
CoroutineContext'а
*/
override val context: CoroutineContext = parentContext + this
// вы уже знаете, что когда корутина выходит из состояния приостановки,
// вызывается метод Continuation.resumeWith()
override fun resumeWith(result: Result<T>) {
// makeCompletingOnce пытается завершить Job'у корутины
val state = makeCompletingOnce(result.toState())
// если у текущей Job'ы есть дочерние и они не были завершены,
// корутина не может быть завершена
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
/*
запуск suspend блока в корутине происходит в CoroutineStart enum'е,
в качестве параметра receiver передаётся сама корутина, это нужно
чтобы suspend блок получил CoroutineContext, если уже забыли про это,
возвращайтесь к исходнику ContinuationImpl
completion это как раз то самый высокоуровневый Continuation объект,
вызываемый из сгенерированного Continuation объекта для suspend блока,
как я уже говорил ранее им является сама корутина
*/
fun <R> start(
start: CoroutineStart,
receiver: R,
block: suspend R.() -> T
) {
start(block = block, receiver = receiver, competion = this)
}
}
// поначалу я долго искал где запускается корутина, так как не сразу
// заметил что у CoroutineStart переопределён invoke оператор
enum class CoroutineStart {
// я опустил остальные варианты, оставил только базовый
DEFAULT;
operator fun <R, T> invoke(
block: suspend R.() -> T,
receiver: R,
completion: Continuation<T>
): Unit =
when (this) {
// для suspend блока будет сгенерированна ContinuationImpl
// реализация, которая запустится при вызове launch функции
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
else -> Unit
}
}
Осталось только разобраться как все таки запускается suspend блок, для этого проваливаемся ещё на уровень ниже, в startCoroutineCancellable()
функцию:
// обратите внимание, что startCoroutineCancellable() является
// Kotlin Extension функцией для suspend блока
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R,
completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
// первым делом из suspend блока создаётся Continuation объект
createCoroutineUnintercepted(receiver, completion)
/*
далее оборачивается в DispatchedContinuation, если используется
CoroutineDispatcher, а он в большинстве случаев используется
*/
.intercepted()
/*
ну и происходит вызов Continuation.resumeWith()
в зависимости от типа Continuation, чтобы блок в корутине
начал выполняться, иначе ничего не произойдёт
*/
.resumeCancellableWith(Result.success(Unit), onCancellation)
// создаёт Continuation из suspend блока
actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
/*
в нашем случае suspend блок является объектом ContinuationImpl,
а это наследник BaseContinuationImpl, поэтому выполняется
первая ветка
*/
return if (this is BaseContinuationImpl)
// метод create будет сгенерирован компилятором для ContinuationImpl
// реализации, кстати ранее уже был пример сгенерированного кода
create(receiver, completion)
else
...
}
/*
делает вызов resumeWith в зависимости от типа Continuation:
1) DispatchedContinuation может передать вызов resumeWith()
CoroutineDispatcher'у, который выполнит код на другом потоке
2) обычный вызов Continuation.resumeWith() произойдёт без
смены потоков и тд
*/
fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
Можно сказать что вы прошли джедайское обучение и готовы к настоящему бою, разберём самый первый пример:
fun main() = runBlocking {
// запускаем новую корутину
launch {
println("Hello, I'm a Kotlin coroutine, how are you?")
}
}
Если посмотреть на пример с точки зрения внутрянки корутин, то получим примерно следующий код:
fun main() {
SuspendLaunchBlock(
// (2)
completion = StandaloneCoroutine()
).resumeWith(Result.success(Unit)) // (3)
}
// (1)
class SuspendLaunchBlock(
completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {
var label = 0
// (5)
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (newResult === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
// (4)
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("Hello, I'm a Kotlin coroutine, how are you?")
return Unit
}
else -> error("Illegal state")
}
}
}
class StandaloneCoroutine<T>(...) : Continuation<T> {
// (6)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
Давайте по порядку:
Создаётся сгенерированная реализация
SuspendLaunchBlock
для suspend блока вlaunch
функции со стейт-машиной или конкретнее when конструкцией.Создаётся
StandaloneCoroutine
и передаётся в качестве параметра completion вSuspendLaunchBlock
Запускается корутина через вызов
SuspendLaunchBlock.resumeWith()
метода, который далее выполняет сгенерированныйinvokeSuspend()
методВ
invokeSuspend()
выполняется единственная ветка в when блоке - вывод в консоль и возвращение пустого результатаПосле завершения
invokeSuspend()
вSuspendLaunchBlock.resumeWith()
происходит сначала проверка на состояние приостановки, в данном случаеinvokeSuspend()
выполнилась без приостановки, поэтому сразу вызываетсяcompletion.resumeWith()
, а так как completion этоStandaloneCoroutine
, то вызываетсяStandaloneCoroutine.resumeWith()
реализацияStandaloneCoroutine.resumeWith()
проверяет нет ли незавершенных дочерних корутин, у нас их нет, и прекращает выполнение
Вы практически магистр джедаев! Сделайте паузу, заварите чай или кофе, скушайте шоколадку и ещё раз пройдитесь по примеру, очень важно понять что механизм корутин заключается в переходах между состояниями приостановки через вызов Continuation.resumeWith()
метода, а завершение происходит в корневом Continuation
объекте и самое главное, что нет никакой магии.
А если в корутине цепочка из suspend функций?
Усложним пример из прошлого раздела, добавив две suspend функции:
// опустим подробности реализации
suspend fun fetchAuthToken() = ...
suspend fun fetchProfileData(token: String) = ...
fun main() = runBlocking {
// запускаем корутины с двумя suspend функциями
launch {
val token = fetchAuthToken()
val profile = fetchProfileData(token = token)
println(profile)
}
}
Интересно какой теперь будет код с точки зрения внутрянки корутин, смотрим:
fun main() {
SuspendLaunchBlock(
// (2)
completion = StandaloneCoroutine()
).resumeWith(Result.success(Unit)) // (3)
}
// (5)
suspend fun fetchAuthToken(continuation: SuspendLaunchBlock): Any? {
/*
любое асинхронное выполнение кода приводит к состоянию приостановки
это необязательно использование многопоточности, дальше вы это увидите
runCodeInBackground - магический метод, который выполняет
блок кода в фоновом потоке
*/
runCodeInBackground {
val token = ...
runCodeInMain {
// (6)
// магический метод, который выполняет блок кода на главном потоке
continuation.resumeWith(token)
}
}
return COROUTINE_SUSPENDED
}
suspend fun fetchProfileData(token: String) = ...
// (1)
class SuspendLaunchBlock(
completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
// (9)
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
// while(true) нужен чтобы выполнять ветки дальше,
// если suspend функция не перешла в состояние приостановки
while (true) {
when(label) {
// (4)
0 -> {
throwIfFailureResult(result)
label = 1
// Continuation передаётся в качестве
// аргумента suspend функции
val state = fetchAuthToken(this)
if (state == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
// (7)
1 -> {
throwIfFailureResult(result)
label = 2
val token = result.unwrap()
val state = fetchProfileData(token, this)
if (state == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
// (8)
2 -> {
throwIfFailureResult(result)
val profile = result.unwrap()
println(profile)
break
}
else -> error("Illegal state")
}
}
return Unit
}
}
class StandaloneCoroutine<T>(...) : Continuation<T> {
// (10)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
Чтобы было поинтереснее будем считать, что обе suspend функции переходят в состояние приостановки, смотрим логику:
Создаётся сгенерированная реализация
SuspendLaunchBlock
для suspend блока вlaunch
функции со стейт-машиной или конкретнее when конструкцией.Создаётся
StandaloneCoroutine
и передаётся в качестве параметра completion вSuspendLaunchBlock
Запускается корутина через вызов
SuspendLaunchBlock.resumeWith()
метода, который далее выполняет сгенерированныйinvokeSuspend()
методВ
invokeSuspend()
выполняется первая ветка (label == 0), где происходит вызов функцииfetchAuthToken()
, в качестве единственного параметра передаётся текущийContinuation
объект, в данном случае этоSuspendLaunchBlock
, значение переменной label меняется на 1Функция
fetchAuthToken()
возвращает значениеCOROUTINE_SUSPENDED
, что свидетельствует о состоянии приостановки, важно что здесь нет никакой магии, выполнение кода происходит в другом потоке, а так как это асинхронное выполнение, внешнему коду можно только передать результат через callback, которым кстати являетсяSuspendLaunchBlock
После выполнение своего кода
fetchAuthToken()
вызывает методSuspendLaunchBlock.resumeWith()
с результатом своей работы, в примере это строка с токеномSuspendLaunchBlock.resumeWith()
возобновляет своё выполение и повторно вызывает invokeSuspend(), где уже выполняется вторая ветка (label == 1), в ней происходит вызов fetchProfileData() метода, в качестве первого параметра он принимает токен от предыдущей suspend функцииfetchAuthToken()
, а в качестве второго ссылку на Continuation объект, которым как мы уже знаем являетсяSuspendLaunchBlock
, методfetchProfileData()
выполняется аналогичноfetchAuthToken()
, label становится равным 2В последней ветке
invokeSuspend()
, где label == 2, происходит вывод в консоль результата функцииfetchProfileData()
и возвращение пустого значенияНа этот раз возвращенное значение из
invokeSuspend()
не являетсяCOROUTINE_SUSPENDED
, поэтому выполнениеSuspendLaunchBlock
завершается, дальнейшее управление передаётсяStandaloneCoroutine
через вызовcompletion.resumeWith()
StandaloneCoroutine.resumeWith()
проверяет нет ли незавершенных дочерних корутин, у нас их нет, и прекращает выполнение
Отлично, теперь вы знаете как происходят переходы между отдельными suspend функциями, поздравляю, вы можете смело умничать на собесах, но не забывайте что скромность красит человека)
Переключение потоков, delay() и CoroutineDispatcher
Мы забыли о самом важном ради чего в принципе используются корутины - выполнение suspend функций на других потоках, отсюда собственно и возникает потребность приостановить выполнение текущей корутины, как мы уже выяснили для этого нужно передать Continuation
объект suspend функции и ждать пока она сама не вызовет Continuation.resumeWith()
метод:
fun fetchAuthToken(continuation: Continuation<Any?>): Any? {
// магический метод, который выполняет блок кода в фоновом потоке
runCodeInBackground {
val token = ...
// магический метод, который выполняет блок кода на главном потоке
runCodeInMain {
/*
чтобы сообщить корутине что suspend функция завершила
своё выполнение нужно вызвать Continuation.resumeWith()
с результатом работы функции
*/
continuation.resumeWith(token)
}
}
// так как функция не может сразу вернуть результат,
// она переходит в состояние приостановки
return COROUTINE_SUSPENDED
}
Это достаточно упрощенная версия кода, но зато она отражают общий механизм, а самое главное показывает что состояние приостановки это ничто иное как выполнение некоторого кода на другом потоке и возвращение результата через Continuation.resumeWith
как через обычный callback.
В прошлом разделе я вскользь упомянул, что любое асинхронное выполнение приводит к состоянию приостановки, но не раскрыл эту тему, давайте разбираться.
Что нужно чтобы код стал асинхронным? Правильно сделать его выполнение независимым от текущей точки выполнения, например создать новый поток:
fun main() {
// создаётся новый поток и сразу запускается
Thread {
val sum = 3 + 7
println(sum)
}.start()
// код main() продолжает выполняться независимо от того,
// выполнился ли весь код в Thread
val mul = 3 * 7
println(mul)
// функция main() может завершиться раньше созданного потока
}
Но давайте на минутку забудем про многопоточность и вспомним главный поток Android, он позволит нам выполнить код асинхронно? Конечно же да, можно это сделать через старый добрый Handler
:
// Handler поставит выполнение кода в очередь главного потока
handler.post {
println("I'll run soon")
}
Кстати Handler
используется в реализации функции delay
для главного потока Android, если забыли, то эта функция позволяет сделать задержку без блокировки текущего потока:
fun delay(continuation: Continuation<Any?>): Any? {
val block = Runnable {
// после того как задержка пройдёт, выполнится этот блок кода
// и корутина продолжит своё выполнение после приостановки
continuation.resumeWith(Result.success(Unit))
}
// handler.postDelayed() выполняет block через указанный
// промежуток времени в миллисекундах
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
// если корутина была отменена нужно отменить задержку
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
// отменяет текущую корутину, так как Handler для главного потока
// был закрыт
cancelOnRejection(continuation.context, block)
}
return COROUTINE_SUSPENDED
}
Вот и вся суть асинхронного выполнения кода, им может быть любой механизм, позволяющий выполнить X код независимо от Y кода и если попробовать дать определение корутинам сейчас, то оно будет примерно такое:
Корутина — это абстракция, которая оборачивает некоторую асинхронную работу, это может быть выполнение кода в другом потоке или использование очереди главного потока, например MessageQueue из Android, в удобный последовательный код с механизмами отмены и другими прикольными фишками.
Ладно, вроде бы разобрались что такое приостановка корутины и асинхронное выполнение кода, можем переходить к более прикладным вещам, например к функции withContext()
, чаще всего используемой для изменения CoroutineDispatcher
'а:
suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
/*
как мы знаем Continuation работает под капотом корутин и его нельзя
получить явно в прикладном коде, поэтому была придумана inline функция
suspendCoroutineUninterceptedOrReturn (и не одна кстати),
которая после компиляции подставит текущий Continuation объект
*/
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val oldContext = uCont.context
/*
если вы ещё не забыли то новый контекст производится
путём сложения двух контекстов, в качестве результата
мы имеем контекст в котором старые элементы заменены новыми,
например Dispatchers.Main можно поменять на Dispatchers.Default
*/
val newContext = oldContext.newCoroutineContext(context)
// проверка что корутина все ещё выполняется
newContext.ensureActive()
// мы не будем рассматривать все ветки, нас интересует только
// переключение потоков через CoroutineDispatcher
if (newContext === oldContext) {
...
}
// CoroutineDispatcher является наследником ContinuationInterceptor
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
...
}
// ну вот опять создаётся какая-то неизвестная нам корутина,
// не беспокойтесь там достаточно простая логика
val coroutine = DispatchedCoroutine(newContext, uCont)
// стартуем также как и обычную корутину
block.startCoroutineCancellable(coroutine, coroutine)
/*
если есть возможность сразу отдать результат без приостановки корутины
то withContext сразу завершится, в противном случае корутина,
содержащая withContext() вызов перейдёт в состояние приостановки
*/
coroutine.getResult()
}
}
Функция withContext
делает 3 простые вещи:
Получает текущий
Continuation
объект, он может быть взят из параметра suspend функции или из текущей корутины в которой была вызвана функцияwithContext
Берёт контекст из
Continuation
объекта и складывает с контекстом, переданным в качестве параметра, в результате создаётся новый контекстНа основе нового контекста создаёт определённый вид корутин, например если был изменён
CoroutineDispatcher
будет создана корутинаDispatchedCoroutine
Что ж давайте теперь глянем исходники DispatchedCoroutine
:
/*
чаще всего в качестве continuation параметра выступает
Continuation объект, который генерируется для suspend блока в корутине,
если забыли, то это реализация абстрактного класса ContinuationImpl
*/
internal class DispatchedCoroutine<in T> internal constructor(
context: CoroutineContext,
continuation: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
/*
метод afterResume() вызывается перед завершением Continuation.resumeWith(),
когда корутина закончила выполнять все свои suspend функции и
у неё больше нет дочерних корутин в состоянии выполнения
*/
override fun afterResume(state: Any?) {
/*
метод afterResume() может быть вызван раньше getResult(),
например если блок кода в withContext() очень быстро выполнился
в таком случае результат вернёт getResult()
*/
if (tryResume()) return
/*
Я уже вскользь упоминал что делает каждый метод в этой цепочке,
когда мы рассматривали как стартует корутины, ещё раз повторим:
intercepted() оборачивает continuation в DispatchedContinuation,
который реализует логику работы с CoroutineDispatcher'ами
resumeCancellableWith() вызывает resumeWith() в зависимости от типа
Continuation, в данном случае будет вызван метод
DispatchedContinuation.resumeCancellableWith()
*/
continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
internal fun getResult(): Any? {
// если нельзя сразу вернуть результат корутина приостанавливается
if (trySuspend()) return COROUTINE_SUSPENDED
// результат выполнения withContext()
val state = ...
return state as T
}
}
Суммируем, DispatchedCoroutine
выполняет две ключевые задачи:
Добавляет возможность вернуть результат без перехода в состояние приостановки, если такая возможность есть, для этого используется
getResult()
метод.Переключает
Continuation
объект, в котором был вызванwithContext()
, на родной поток, например если ваша корутина выполняется на главном потоке, затем вызываетwithContext()
на фоновом, то результат должен вернуться снова на главный.
Ладно, с DispatchedCoroutine
более менее разобрались, чтобы понять как на самом деле происходит переключение потоков в корутинах провалимся в DispatchedContinuation
, в который оборачиваются другие Continuation
объекты:
/*
DispatchedContinuation принимает на вход:
dispatcher - выполняет блок кода, чаще всего на другом потоке
или с использованием очереди, например Handler / MessageQueue из Android
continuation - Continuation объект, работа с которым будет происходить
через указанный диспатчер
*/
internal class DispatchedContinuation<in T>(
val dispatcher: CoroutineDispatcher,
val continuation: Continuation<T>
) : Continuation<T> by continuation {
/*
логика resumeWith() идентична resumeCancellableWith() с отличием только
в разных режимах resumeMode, обычно чаще всего вызывается
именно resumeCancellableWith, так как режим MODE_CANCELLABLE
позволяет прокинуть CancellationException для отмены корутины
*/
override fun resumeWith(result: Result<T>) { ... }
internal inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
/*
CoroutineDispatcher имеет два логически связанных метода:
isDispatchNeeded() решает выполнять код в диспатчере или нет
dispatch() выполняет код в диспатчере: код может выполниться на
другом потоке, поставлен в очередь и тд
*/
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
// для наглядности я упростил блок кода и написал его здесь
val block = Runnable {
continuation.resumeWith(state)
}
dispatcher.dispatch(context, block)
} else {
/*
если диспатчер не хочет выполнять код, а такое может быть
например если диспатчер переключает на главный поток, а мы уже
на главном потоке и внутри диспатчера реализована проверка,
то isDispatchNeeded() вернёт false, в таком случае выполнение
корутины будет добавлено в EventLoop
*/
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
}
Как видите логика CoroutineDispatcher
'а достаточно простая:
Вызывается метод
isDispatchNeeded()
чтобы понять отдавать выполнение кода диспатчеру или нет, это нужно чтобы избежать лишних вызововdispatch()
, например не делать переключение на главный поток, если мы уже находимся на нёмЕсли
isDispatchNeeded()
вернул true разумеется отдаём выполнение кода диспатчеру, вызвав методdispatch()
Если
isDispatchNeeded()
вернул false запускаем корутину наEventLoop
'е, об этом в следующем разделе
В качестве примера рассмотрим такие интересные диспатчеры из Android, как Dispatchers.Main
и Dispatchers.Main.immediate
:
val mainLooper = Looper.getMainLooper()
val handler = Handler(mainLooper)
// реализация для Dispatchers.Main
override fun isDispatchNeeded(...) = true
// реализация для Dispatchers.Main.immediate
override fun isDispatchNeeded(...): Boolean {
// сравнивает Looper текущего потока с главным
return Looper.myLooper() != mainLooper
}
override fun dispatch(
context: CoroutineContext,
block: Runnable
) {
// handler.post() выполняет блок кода на главном потоке
handler.post(block)
}
Вот и вся разница между ними: Dispatchers.Main
всегда переключает выполнение кода на главный поток через Handler.post()
, а Dispatchers.Main.immediate
только, если код не выполняется на главном потоке.
Для закрепления знаний попробуем собрать всё воедино и описать логику для следующего примера:
// примерно такой код можно встретить в рабочих проектах
viewModelScope.launch {
// я не стал выносить в отдельную функцию, чтобы не усложнять пример
val posts = withContext(Dispatchers.IO) {
try {
// получаем список постов в background потоке
apiService.fetchPosts()
} catch (exception: Exception) {
// важно прокидывать CancellationException дальше
// так как это часть механизма отмены корутины
if (exception is CancellationException) throw exception
emptyList()
}
}
// отображаем данные на главном потоке
println(posts)
}
Под капотом весь этот код будет выглядить примерно как-то так:
// (1)
class ViewModelScopeLaunchBlock(
completion: Continuation<Any?>
) : ContinuationImpl(completion) {
var label = 0
// (5)
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
// (17)
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
// while(true) нужен чтобы выполнять ветки дальше,
// если suspend функция не перешла в состояние приостановки
while (true) {
when(label) {
// (5)
0 -> {
throwIfFailureResult(result)
label = 1
// Continuation передаётся в качестве
// аргумента suspend функции
val state = fetchPosts(this)
// (10)
if (state == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
// (16)
1 -> {
throwIfFailureResult(result)
val profile = result.unwrap()
println(profile)
break
}
else -> error("Illegal state")
}
}
return Unit
}
}
class StandaloneCoroutine(...) {
// (18)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
// (6)
class WithContextBlock(
completion: DispatchedCoroutine
) : ContinuationImpl(completion) {
var label = 0
// (12)
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
// (12)
completion.resumeWith(newResult)
}
// (11)
fun invokeSuspend(result: Result<Any?>): Any? {
try {
val posts = apiService.fetchPosts()
return posts
} catch (exception: Exception) {
// важно прокидывать CancellationException дальше так как это часть
// механизма отмены корутины, вспомните resumeCancellableWith
if (exception is CancellationException) throw exception
return emptyList()
}
}
}
class DispatchedCoroutine(
...
// (7)
val continuation: ViewModelScopeLaunchBlock
): ScopeCoroutine(context, continuation) {
// (13)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
// (14)
override fun afterResume(state: Any?) {
if (tryResume()) return
continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
}
class DispatchedContinuation<in T>(
val dispatcher: CoroutineDispatcher,
val continuation: Continuation<T>
) : Continuation<T> by continuation {
// (4, 9, 15)
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
val block = Runnable {
continuation.resumeWith(state)
}
// (9, 15)
dispatcher.dispatch(context, block)
} else {
// (4)
continuation.resumeWith(state)
}
}
}
// (2)
val topLevelCoroutine = StandaloneCoroutine(...)
val viewModelScopeLaunchBlock = ViewModelScopeLaunchBlock(
// (2)
completion = topLevelCoroutine
)
// (3)
DispatchedContinuation(
dispatcher = Dispachers.Main.immediate
continuation = viewModelScopeLaunchBlock
).resumeCancellableWith(Result.success(Unit))
val withContextBlock = WithContextBlock(
// (7)
completion = DispatchedCoroutine(viewModelScopeLaunchBlock)
)
// (8)
DispatchedContinuation(
dispatcher = Dispatchers.IO,
continuation = withContextBlock
).resumeCancellableWith(Result.success(Unit))
Очень запутанно? Согласен, давайте по порядку:
Генерируется реализация
ViewModelScopeLaunchBlock
для suspend блока вviewModelScope.launch()
функции со стейт-машиной или конкретнее when конструкцией.Создаётся
StandaloneCoroutine
и передаётся в качестве параметра completion вViewModelScopeLaunchBlock
ViewModelScopeLaunchBlock
оборачивается вDispatchedContinuation
, в качестве диспатчера передаётся тот, что был указан вviewModelScope
, для Android этим диспатчером будетDispatchers.Main.immediate
Вызывается
DispatchedContinuation.resumeCancellableWith()
дляViewModelScopeLaunchBlock
, где происходит проверка на главный поток, но так какviewModelScope.launch()
и так выполняется на главном потоке, то не будет никакого переключения через диспатчер, а методViewModelScopeLaunchBlock.resumeWith()
будет вызван напрямуюВ
ViewModelScopeLaunchBlock.resumeWith()
происходит вызовinvokeSuspend()
, где начинает выполняться первая ветка (label == 0), в которой вызываетсяfetchPosts()
, в качестве первого параметра передаётся ссылка на текущийContinuation
объект, в данном случае этоViewModelScopeLaunchBlock
, переменная label становится равной 1В функции
fetchPosts()
вызываетсяwithContext()
, который также как иviewModelScope.launch()
принимает suspend блок, поэтому генерируется реализацияWithContextBlock
Создаётся
DispatchedCoroutine
и передаётся в качестве параметра completion вWithContextBlock
.
Обратите внимание, чтоDispatchedCoroutine
в качестве объектаContinuation
принимаетViewModelScopeLaunchBlock
, блок из которого вызывается функцияwithContext()
, так как нам нужно каким-то образом вернуть результат обратно.WithContextBlock
запускается аналогичноViewModelScopeLaunchBlock
, также оборачивается вDispatchedContinuation
, только теперь в качестве диспатчера передаётсяDispatchers.IO
.Вызывается
DispatchedContinuation.resumeCancellableWith()
дляWithContextBlock
, где происходит переключение главного потока на background поток черезDispatchers.IO
диспатчер,WithContextBlock.resumeWith()
теперь будет выполняться на background потокеDispatchedCoroutine
не может отдать сразу результат запроса и поэтомуfetchPosts()
вViewModelScopeLaunchBlock.invokeSuspend()
возвращаетCOROUTINE_SUSPENDED
, что приводит к приостановке корутиныМетод
WithContextBlock.invokeSuspend()
выполняет единственную ветку кода - запрос в сеть и получение ответа на background потоке.Когда запрос завершится метод
WithContextBlock.invokeSuspend()
вернёт результат вWithContextBlock.resumeWith()
, где произойдёт дальнейшая отправка результата через вызовcompletion.resumeWith()
в объект корутины, в данном случае этоDispatchedCoroutine
В
DispatchedCoroutine.resumeWith()
сначала произойдёт проверка на дочерние корутины в состоянии выполнения и если их нет, как в данном примере, выполнится кодDispatchedCoroutine.afterResume()
Метод
DispatchedCoroutine.afterResume()
должен вернуть результат вViewModelScopeLaunchBlock
, но здесь есть проблема:WithContextBlock
сейчас выполняется на background потоке, который предоставилDispatchers.IO
, аViewModelScopeLaunchBlock
должен получить результат на главном, поэтому вызывается цепочкаcontinuation.intercepted().resumeCancellableWith()
, методintercepted()
не будет повторно оборачиватьViewModelScopeLaunchBlock
вDispatchedContinuation
, это сделано для оптимизацииСнова вызывается
DispatchedContinuation.resumeCancellableWith()
дляViewModelScopeLaunchBlock
, но только теперь происходит переключение на главный поток через диспатчерDispatchers.Main.immediate
, если вы не забыли там под капотомHandler.post()
вызов, в итогеViewModelScopeLaunchBlock.resumeWith()
выполняется на главном потоке, а в качестве результата передаётся список постовВ
ViewModelScopeLaunchBlock.resumeWith()
происходит второй вызовViewModelScopeLaunchBlock.invokeSuspend()
, теперь уже выполняется вторая ветка (label == 1), которая берёт список постов и выводит его в консоль.Метод
ViewModelScopeLaunchBlock.invokeSuspend()
завершается успешно без приостановки, поэтомуViewModelScopeLaunchBlock.resumeWith()
заканчивает своё выполнение и делает вызовcompletion.resumeWith()
, где в качестве completion выступает корутинаStandaloneCoroutine
В
StandaloneCoroutine.resumeWith()
происходит проверка на дочерние корутины в состоянии выполнения, их в данном примере нет, корутина завершается.
Если вы дошли до этого момента, то вы явно не простой перец, обязательно сделайте перерыв с шоколадкой и попробуйте аналогичным образом проследить путь выполнение корутин в своём коде.
К сожалению это ещё не конец, корутины могут запускаться в других корутинах и не по одной, а целыми пачками, здесь работает похожий механизм с диспатчерами, но помимо него есть ещё и EventLoop
, если вы уже отдохнули и готовы сделать последний рывок, продолжаем!
Дочерние корутины, EventLoop и runBlocking
Выполнение дочерних корутин важно разграничивать на два вида:
Дочерняя корутина запускается через диспатчер
Дочерняя корутина выполняется на
EventLoop
'е
Первый случай возникает, когда диспатчер всегда переключает выполнение корутины, даже если она выполняется на правильном потоке, например Dispatchers.Main
, второй же свойственен для диспатчеров, где переключение происходит только по необходимости, яркий пример из Android: Dispatchers.Main.immediate
, который переключает выполнение корутины на главный поток, только если она не выполняется на нём.
Что ж, рассмотрим по порядку оба случая, начнём с первого:
/*
Dispatchers.Main всегда переключает выполнение корутины
на главный поток через Handler.post() механизм,
даже если корутина и так выполняется на главном
*/
val uiScope = CoroutineScope(Dispatchers.Main + Job())
uiScope.launch {
launch {
println("I'm child coroutine #1")
}
launch {
println("I'm child coroutine #2")
}
println("I'm parent coroutine")
}
// примерно во что всё это превратится
class UiScopeParentBlock(
completion: StandaloneCoroutine
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("I'm parent coroutine")
return Unit
}
else -> error("Illegal state")
}
}
}
class UiScopeChild1Block(
completion: UiScopeParentBlock
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("I'm child coroutine #1")
return Unit
}
else -> error("Illegal state")
}
}
}
class UiScopeChild2Block(
completion: UiScopeParentBlock
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("I'm child coroutine #2")
return Unit
}
else -> error("Illegal state")
}
}
}
class StandaloneCoroutine(...) {
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
/*
resumeWith() не завершится полностью пока
дочерние корутины не закончат своё выполнение
когда мы рассматривали код StandaloneCoroutine, то можно было
увидеть как происходит добавление Job'ы дочерней корутины в Job'у
родительской, поэтому родительская корутина знает состояния
своих дочерних корутин
*/
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
// родительская корутина
val parentCoroutine = StandaloneCoroutine(...)
val uiScopeParentBlock = UiScopeParentBlock(
completion = parentCoroutine
)
DispatchedContinuation(
dispatcher = Dispachers.Main
continuation = uiScopeParentBlock
).resumeCancellableWith(Result.success(Unit))
// первая дочерняя корутина
val childCoroutine1 = StandaloneCoroutine(...)
val uiScopeChild1Block = UiScopeChild1Block(
completion = childCoroutine1
)
DispatchedContinuation(
dispatcher = Dispachers.Main
continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))
// вторая дочерняя корутина
val childCoroutine2 = StandaloneCoroutine(...)
val uiScopeChild1Block = UiScopeChild1Block(
completion = childCoroutine2
)
DispatchedContinuation(
dispatcher = Dispachers.Main
continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))
Я не буду расписывать отдельные шаги, как это было в прошлом разделе, вы сами прекрасно можете справиться с этим, к тому же это будет хорошей практикой для закрепления знаний, вкратце основная суть:
Родительская корутина
StandaloneCoroutine
не будет завершена до тех пора пока все её дочерние корутины не выполнились. При создании новой корутины её объектJob
добавляется в качестве дочернего элемента в объектJob
родительской корутины, благодаря этому корутины могут отслеживать состояния своих детей.suspend блоки дочерних корутин
UiScopeChild1Block
иUiScopeChild2Block
будут обёрнуты вDispatchedContinuation
и переключены на главный поток черезHandler.post()
в не зависимости была ли родительская корутина изначательно на главном потоке или нет,Dispatchers.Main
в отличии отDispatchers.Main.immediate
всегда делает переключение.Объект
Continuation
родительской корутины никак не связан сContinuation
объектами дочерних корутин, поэтому когда завершатся последние результат не будет проброшен обратно вUiScopeParentBlock
, да и в этом особо нет смысла, как например сwithContext()
, который гарантирует последовательный порядок выполнения с возвращением результата.Диспатчеры в принципе не могут гарантировать порядок выполнения, так как выполняют код асинхронно, тот же метод
Handler.post()
из Android не даёт 100% уверенности, что код всегда будет выполняться в том порядке, который мы запланировали.
С запуском дочерних корутин через диспатчеры в целом разобрались, но что произойдёт если например у нас диспатчер Dispatchers.Main.immediate
и все корутины выполняются на главном потоке, дочерние корутины ведь не будут переключаться снова через Handler.post()
как это было с Dispatchers.Main
, в таком случае начинает работать так называемый EventLoop
:
// метод из DispatchedContinuation
internal inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
// ...
} else {
/*
executeUnconfined делает одну из двух вещей:
1) выполняет лямбду на EventLoop'е
2) ставит лямбду в очередь EventLoop'а
*/
executeUnconfined(state, MODE_CANCELLABLE) {
continuation.resumeWith(result)
}
}
}
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
): Boolean {
// EventLoop - штука куда кладутся лямбды в очередь на исполнение
val eventLoop = ThreadLocalEventLoop.eventLoop
// isUnconfinedLoopActive изначательно равен false поэтому для
// родительской корутины срабатывает вторая ветка, а для дочерних - первая
return if (eventLoop.isUnconfinedLoopActive) {
_state = contState
resumeMode = mode
// выполнение дочерних корутин ставится в очередь EventLoop'а
eventLoop.dispatchUnconfined(this)
true
} else {
/*
выполняет Continuation.resumeWith() для родительской корутины,
дочерние в этот момент создаются и кладутся в очередь EventLoop'а,
после завершения инициализация родительской корутины дочерние
по очереди берутся из EventLoop'а и выполняются
*/
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}
Считайте что EventLoop
это простая очередь задач или лямбд как в нашем случае, куда кладутся запросы на выполнение дочерних корутин, затем они выполняются в том порядке, в котором были добавлены в очередь, как раз такая организация обеспечивает последовательное выполнение корутин:
/*
viewModelScope под капотом использует Dispatchers.Main.immediate,
который не будет переключать дочерние корутины, так как они и так
находятся на главном потоке, поэтому будет задействован
механизм EventLoop'а
*/
viewModelScope.launch {
launch {
println("I'm the second!")
}
launch {
println("I'm the third!")
}
/*
весь блок кода в родительской корутине выполняется
до момента выполнения дочерних корутин, это необходимо
чтобы поставить дочерние корутины в очередь,
а потом начать их выполнять
*/
println("I'm the first!")
}
В качестве дополнения приведу ещё парочку интересных особенностей EventLoop
'а:
EventLoop
наследуется отCoroutineDispatcher
и может быть использован в корутинах, например так работаетrunBlocking()
EventLoop
создаётся для каждого потока чере механизмThreadLocal
переменных, как например экземплярLooper
класса из AndroidПод капотом
EventLoop
лежитArrayDeque
из Kotlin коллекций для формирования очереди задач.
Напоследок рассмотрим как runBlocking()
ждёт завершения своих корутин и соблюдает их порядок, хотя ответ очевиден - используется EventLoop
:
/*
EventLoop создаётся на основе переданного CoroutineContext'а и кладётся
в специальную корутину BlockingCoroutine, затем вызывается
метод joinBlocking(), который ждёт пока все дочерние корутины
не выполнятся
*/
actual fun <T> runBlocking(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
val currentThread = Thread.currentThread()
val eventLoop: EventLoop = ...
val newContext: CoroutineContext = ...
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {
/*
ожидание происходит в while(true) цикле, а любой бесконечный цикл
как вы уже догадываетесь блокирует текущий поток, отсюда
и название runBlocking
чтобы завершить цикл ожидания нужно поменять isCompleted на false,
это произойдет только когда все дочерние корутины завершатся
*/
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
eventLoop?.processNextEvent()
if (isCompleted) break
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
}
Именно благодаря бесконечному циклу runBlocking()
дожидается выполнения всех дочерних корутин, при этом блокируя текущий поток.
Хотелось бы добавить что порядок в runBlocking()
будет гарантироваться только когда корутины выполняются без каких либо асинхронных вызовов и переключений на другие диспатчеры, например здесь порядок не будет соблюдён:
fun main() = runBlocking<Unit> {
launch {
val result = withContext(Dispatchers.IO) {
delay(500)
"I'm the second!"
}
println(result)
}
launch {
println("I'm the first!")
}
}
Заключение
Не думал что и сам дотяну до этого момента, статья получилась очень объёмной и без капли самоуверенности заявляю охренительно полезной!
В качестве последних слов я собрал парочку фактов:
Корутина - это всего лишь удобная абстракция c крутыми фишками над асинхронным выполнением кода, примером асинхронного выполнения может быть переключение на другой поток, использование очереди главного потока (MessageQueue из Android) и тд.
Continuation
— кирпичик на котором построена практически вся библиотека корутин, является простейшим интерфейсом с единственным методомresumeWith()
, данный метод вызывается когда происходит переход между состоянием приостановки корутины и состоянием её выполнения.Состояние приостановки - так как корутины позволяют писать асинхронный код в последовательном стиле, то необходим механизм возвращения к точкам выполнения этого кода, в большинстве случаев такой механизм реализуется с помощью callback'ов, которыми как раз и являются
Continuation
реализации.К реализациям
Continuation
интерфейса относятся: обычная корутинаStandaloneCoroutine
, сгенерированный suspend блок на базеContinuationImpl
, реализация для работы диспатчеровDispatchedContinuation
, корутина используемая вrunBlocking()
методе -BlockingCoroutine
и другие.DispatchedContinuation
оборачивает другиеContinuation
объекты, чтобы передать выполнениеContinuation.resumeWith()
метода диспатчеру.CoroutineDispatcher
в большинстве случаев используется для переключения корутины на другой или другие потоки, но и есть и исключения, такие какEventLoop
например, который позволяет выполнить корутины в правильном порядке.EventLoop
- это простая очередь задач, куда кладутся запросы на выполнение корутин, а затем они выполняются в том порядке, в котором были добавлены в очередь, такая организация обеспечивает правильный порядок выполнения, но это работает только если ни в одной корутине не будет переключений через диспатчеры.
Полезные ссылки:
Пишите в комментах ваше мнение и всем хорошего кода!