Как стать автором
Обновить

Kotlin Coroutines под капотом

Время на прочтение36 мин
Количество просмотров17K

Вероятнее всего у вас спрашивали на собесе «как работают корутины под капотом?», вы не долго думая выбрасывали что‑то в стиле «там под капотом стейт‑машина, она определяет какая suspend функция будет выполняться», но понимали ли вы на самом деле всё о чем говорили? Возможно, только вам это известно, но если честно я очень плохо понимал собственные ответы на такие вопросы как бы это парадоксально не звучало и даже после десятка пройденных собесов у меня не было полноценной картины как работает внутрянка этой поистине невероятной библиотеки «сладкой асинхронщины».

Ладно, вступление и так получилось слишком затянутым, как это бывает в непрофессиональных фильмах от которых ждешь экшена половину хронометража, а потом получаешь унылые бои на светящихся палках, погнали короче разбираться!

Знакомство с крутыми перцами: CoroutineContext и CoroutineScope

Начнём с простенького примера:

fun main() = runBlocking {
    // запускаем новую корутину
    launch {
        println("Hello, I'm a Kotlin coroutine, how are you?")
    }
}

Пока нас интересует только функция launch, которая чаще всего используется для создания корутин, давайте провалимся в её исходники:

// (1) launch является Kotlin Extension функцией для CoroutineScope
fun CoroutineScope.launch(
    // (2) контекст похож на HashMap'у, также хранит всякие штуки по ключу
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // (3) при создании новой корутины контекст может быть изменён
    val newContext = newCoroutineContext(context)
    // к остальной части кода вернёмся позже
}

Пройдёмся подробнее по пунктам, которые я выделил в комментах к коду:

1) Чтобы создать новую корутину нужно вызвать launch в пределах CoroutineScope, никак иначе, так было сделано чтобы корутина смогла получить контекст и пробрасывать его в дочерние корутины, если глянуть исходник CoroutineScope, то всё станет очевидным:

interface CoroutineScope {
    val coroutineContext: CoroutineContext
}

При запуске корутины берётся текущий контекст из CoroutineScope в котором был вызван launch.

2) CoroutineContext не является хэш-таблицей, а реализован на основе паттерна Компоновщик:

/*
основной прикол паттерна Компоновщик: создать дерево 
из вложенных друг в друга объектов

чтобы реализовать такой паттерн нужен общий родитель, 
которым в данном случае является CoroutineContext 
*/


/*
обычные элементы контекста, такие как Job, CoroutineName и тд являются 
простыми объектами, которые не содержат другие или в терминах паттерна 
листовыми узлами 
*/
interface Element : CoroutineContext {
    val key: Key<*>

    /*
    для удобного доступа к элементам контекста переопределён оператор get
    
    это позволяет делать такие штуки: 
    coroutineContext[Job] вместо coroutineContext.get(Job)
    */
    override operator fun <E : Element> get(key: Key<E>): E? =
        if (this.key == key) this as E else null
}

/*
помимо листовых узлов есть комплексные наборы данных, 
которые могут в себя включать другие такие наборы 
и простые объекты
*/
class CombinedContext(
    val left: CoroutineContext, 
    val element: CoroutineContext.Element
) : CoroutineContext {

    override fun <E : Element> get(key: Key<E>): E? {
        /*
        логика простая: проверяем сначала простой объект,
        если ключи не совпадают, смотрим left, если он является 
        комплексным узлом CombinedContext, рекурсивно повторяем
        */
        var currentContext = this
        while (true) {
            currentContext.element[key]?.let { return it }
            val next = currentContext.left
            if (next is CombinedContext) {
                currentContext = next
            } else {
                return currentContext.get(key)
            }
        }
    }
  
}

class CoroutineName(val name: String) : CoroutineContext.Element {
    /*
    в качестве ключей для обычных элементов CoroutineContext 
    используются названия самих классов, что очень удобно

    поле key требует наследника Key<*> который определён ниже 
    через companion object, это работает даже если на первый взгляд 
    выглядит сомнительно и неоднозначно
    */
    override val key: Key<*> = CoroutineName 

    companion object Key : Key<CoroutineName>
}

fun main() {
    // Job и CoroutineDispatcher являются элементами CoroutineContext
    val combinedContext = CombinedContext(
        CombinedContext(
            Job(),
            Dispatchers.Default
        ), 
        CoroutineName("My name's Kotlin coroutine")
    )

    /*
    в итоге мы можем положить в CoroutineContext то что нужно корутинам:
    Job, CoroutineName, CoroutineDispatcher, CoroutineExceptionHandler и тд,
    а затем прокидывать контекст через CoroutineScope в сами корутины
    */
    val job = combinedContext[Job]
}

3) При создании новой корутины можно изменить CoroutineContext:

launch(CoroutineName("I'm a parent coroutine")) {
    launch(CoroutineName("I'm  child coroutine"))) {
        // ...
    }
}

Логика работы в таком случае следующая: корутина получает текущий контекст из CoroutineScope и складывает его с контекстом, переданным в качестве параметра функции launch, таким образом дочерняя корутина из примера содержит другое имя.

Важный момент: не все элементы CoroutineContext могут быть корректно изменены, например при указании у дочерней корутины другой Job вы можете разрушить принцип Structured Concurrency, который заключается в каскадной отмене всех корутин, покажу на примере:

val topLevelJob = viewModelScope.launch {
    /*
    тут всё ок, при создании новой корутины 
    возьмётся Job'а из текущего контекста, в нашем случае это
    topLevelJob, затем корутина будет добавлена как дочерняя Job'а и
    связь родитель - ребёнок не нарушится
    */
    launch {
        println("I'm coroutine #1")
    }
    /*
    не ок, так как мы создаём Job'у которая не привязана
    к родительской, грубо говоря мы не сделали 
    parentJob.attachChild(childJob), который кстати нельзя
    сделать потому что это internal api библиотеки
    */
    launch(Job()) {
        println("I'm coroutine #2")
    }
}

// вторая дочерняя корутина не будет отменена
topLevelJob.cancel()

Подведём итоги:

  • корутина создаётся через Kotlin Extension функцию CoroutineScope.launch

  • CoroutineScope является простым интерфейсом, который предоставляет корутине контекст

  • CoroutineContext нужен для хранения всяких полезных штук при выполнении корутины: CoroutineName, CoroutineDispatcher, Job, CoroutineExceptionHandler и тд, важно добавить что вы можете легко создать свой элемент контекста, унаследовав CoroutineContext.Element интерфейс.

  • при создании корутины можно передать новый контекст, это приведёт к созданию контекста, основанного на текущем с изменёнными элементами, взятыми из нового

  • не все элементы контекста можно адекватно изменить, например нужно быть осторожным при указании другой Job'ы, отличным решением будет придерживаться правила: менять Job только для самых высокоуровневых корутин, в идеале при создании CoroutineScope

Continuation интерфейс и реализация suspend блока

Возвращаемся снова к начальному примеру:

fun main() = runBlocking {
    launch {
        println("Hello, I'm a Kotlin coroutine, how are you?")
    }
}

public fun CoroutineScope.launch(
    // ...
    block: suspend CoroutineScope.() -> Unit
): Job {}

Обратите внимание на ключевое слово suspend, именно благодаря тому что launch выполняет лямбду помеченную этим ключевым словом мы можем запускать suspend функции в пределах корутины, что ещё интереснее эта лямбда во время компиляции превращается в нечто интересное:

BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    int label = 0;

    public final Object invokeSuspend(Object var1) {
        Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(var1);
                String var2 = "Some";
                System.out.println(var2);
                return Unit.INSTANCE;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }

    // этот метод будет использоваться для создания Continuation объекта
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
    }

}), 3, (Object)null);

Ага, вот она та самая стейт-машина (switch блок), про которую на собесе вскользь упонимают, сейчас она ничего сложного не делает, кроме как выводит текст в консоль и возвращает пустой результат.

После достаточно долгих вечеров копания в исходниках и процесса дебага я всё таки выяснил, что сгенерированный выше код это ничто иное как реализация абстрактного класса ContinuationImpl, одного из наследников Continuation:

/*
само название говорит за себя, что это "продолжение" после приостановки 
корутины, да та самая магическая приостановка или SUSPENDED состояние о
которой говорят постоянно, позже узнаем что никакой магии тут нет
*/
public interface Continuation<in T> {
    /*
    как мы выяснили в предыдущем разделе CoroutineContext содержит
    важные штуки для корутин, например CoroutineDispatcher, который
    может пригодиться для переключения потоков
    */
    public val context: CoroutineContext

    /* 
    вызов этого метода происходит после возвращения корутины из состояния
    приостановки, именно сюда кладутся результаты suspend функций, также
    дочерние корутины могут вызывать этот метод у родительских для
    продолжения работы последних
    */
    public fun resumeWith(result: Result<T>)
}

Без такой штуки как Continuation корутины не могли бы возвращаться туда где произошла приостановка и следовательно мы не могли бы выполнять код на разных потоках, используя последовательную запись кода, напомню что одной из ключевых идей как раз и является выполнение асинхронного кода, как последовательного, небольшой пример:

// создаём корутину на главном потоке
launch {
    // вызываем функцию fetchAndroidUnderTheHoodPosts() в background потоке
    val myPosts = fetchAndroidUnderTheHoodPosts()
    /* 
    чтобы следующий код получил результат нужно как минимум
    каким-то образом получить его и не забыть переключиться на главный поток,
    а так как fetchAndroidUnderTheHoodPosts() выполняется на другом потоке 
    и неизвестно когда функция закончит своё выполнение, 
    остаётся только вариант передачи в функцию callback'а, 
    который будет вызван когда она завершится,
    таким callback'ом является Continuation объект
    */
    println(myPosts)
}

После того как функция fetchAndroidUnderTheHoodPosts завершит выполнение своего кода, результат будет передан через Continuation.resumeWith(), что приведёт к дальнейшему выполнение корутины, в текущем примере - вывод всех постов в консоль.

Окей, мы определились что без Continuation ничего не выйдет и даже узнали что suspend блок в launch функции на самом деле тоже является Continuation объектом и наследуется от ContinuationImpl, но основная реализация содержится в BaseContinuationImpl классе, от которого наследуется ContinuationImpl, сложно? Ничего, привыкайте, это хардкор, а не няшный мульт про поней.

Идём смотреть на реализацию Continuation для suspend блока:

/* 
компилятор генерирует реализацию этого 
абстрактного класса для suspend блоков
*/
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    // контекст берётся из корутины, дальше это увидим
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    private var intercepted: Continuation<Any?>? = null

    /*
    ContinuationInterceptor это штука которая оборачивает 
    текущий Continuation объект в новый, например DispatchedContinuation 
    и возвращает его, такой механизм используется для переключения потоков 
    через CoroutineDispatcher, кстати оборачивание одного объекта
    в другой с общим интерфейсом (Continuation) ничто иное как 
    паттерн Декоратор
    */
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    // очистка обёрнутого Continuation объекта: 
    // зануление ненужных ссылок и тд
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        // корутина закончила своё выполнение
        this.intercepted = CompletedContinuation
    }
}

/*
если в ContinuationImpl реализована поддержка CoroutineDispatcher'ов,
то в BaseContinuationImpl содержится основная логика работы с состоянием
приостановки
*/
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {и
    
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                /*
                текущая реализация Continuation требует в качестве completion
                более высокоуровневый Continuation объект, обычно им является 
                сама корутина
                напоминаю что мы сейчас рассматриваем реализацию 
                suspend блока в launch функции, а не саму корутину
                */
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        /*
                        вот тут происходит самое интересное, invokeSuspend 
                        выполняет внутри себя тот самый сгенерированный код
                        в котором содержатся наши suspend функции и если одна 
                        из них перешла в состояние приостановки, 
                        метод тупо делает return, Continuation в данном
                        случае не продолжит своё выполнение пока
                        не будет снова вызван resumeWith()
                        */
                        val outcome = invokeSuspend(param)
                        /*
                        COROUTINE_SUSPENDED - зарезервированная константа,
                        сигнализирующая о том, что внутри invokeSuspend 
                        произошла приостановка
                        */
                        if (outcome === COROUTINE_SUSPENDED) return
                        // если invokeSuspend вернул результат получаем его
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // если произошло исключение тоже получаем его
                        Result.failure(exception)
                    }
                /*
                так как текущий BaseContinuationImpl получил результат, 
                значит suspend блок в корутине завершился, поэтому 
                текущий объект BaseContinuationImpl больше не нужен,
                а следовательно всякие используемые штуки, 
                такие как CoroutineDispatcher'ы например должен быть очищены
                */
                releaseIntercepted()

                /*
                если мы запустили корутину в другой корутине, то в качестве
                completion будет suspend блок родительской корутины:
                  
                launch { родительский suspend блок BaseContinuationImpl
                  launch { дочерний suspend блок BaseContinuationImpl
                    
                  }
                }

                */
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    /*
                    вызывается самый высокоуровневый Continuation объект, 
                    в большинстве случаев это сама корутина
                    */
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    // тот самый метод, переопределённый в сгенерированном коде
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

Суммируем:

  1. мы имеем Continuation интерфейс, позволяющий продолжить выполнение корутины после её приостановки

  2. для suspend блока генерируется специальная реализация ContinuationImpl со стейт-машиной (switch или when конструкцией) в переопределённом методе invokeSuspend()

  3. когда suspend функция приостанавливается происходят следующие вещи:
    -invokeSuspend() возвращает специальное значение COROUTINE_SUSPENDED
    -BaseContinuationImpl завершается через return и ожидает следующего вызова resumeWith()

  4. логика обработки состояния приостановки содержится в BaseContinuationImpl, а логика переключения потоков с помощью CoroutineDispatcher'ов происходит в наследнике ContinuationImpl.

Что же такое корутина?

Теперь мы можем разобраться чем же на самом деле является корутина и как происходит выполнение suspend блока в ней, для этого снова возвращаемся к исходникам launch функции:

public fun CoroutineScope.launch(
    // ...
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    /*
    мы не будем рассматривать LazyStandaloneCoroutine, так как эта штука
    очень похожа на базовую реализацию корутины StandaloneCoroutine,
    только запускается по требованию
    */
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    /*
    запускаем корутину, LazyStandaloneCoroutine не запустится таким образом,
    нужно будет вручную вызвать метод start() у объекта Job, 
    который возвращает launch
    */
    coroutine.start(start, coroutine, block)
    return coroutine
}

Помните в BaseContinuationImpl был такой параметр как completion, я ещё говорил что им может быть сама корутина, так вот StandaloneCoroutine и есть реализация корутины, короче не будем тянуть, идём смотреть исходники:

// корутина наследует Job, Continuation и CoroutineScope
class StandaloneCoroutine<in T>(
    parentContext: CoroutineContext,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        /*
        кладёт Job текущей корутины в Job'у родительской
        это нужно чтобы родительские корутины знали о 
        дочерних и не завершались раньше них
        */
        initParentJob(parentContext[Job])
    }

    /* 
    так можно сделать потому что StandaloneCoroutine 
    является реализацией Job, а Job является одним из элементов 
    CoroutineContext'а
    */
    override val context: CoroutineContext = parentContext + this

    // вы уже знаете, что когда корутина выходит из состояния приостановки,
    // вызывается метод Continuation.resumeWith()
    override fun resumeWith(result: Result<T>) {
        // makeCompletingOnce пытается завершить Job'у корутины
        val state = makeCompletingOnce(result.toState())
        // если у текущей Job'ы есть дочерние и они не были завершены, 
        // корутина не может быть завершена
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

    /*
    запуск suspend блока в корутине происходит в CoroutineStart enum'е,
    
    в качестве параметра receiver передаётся сама корутина, это нужно
    чтобы suspend блок получил CoroutineContext, если уже забыли про это, 
    возвращайтесь к исходнику ContinuationImpl
    
    completion это как раз то самый высокоуровневый Continuation объект,
    вызываемый из сгенерированного Continuation объекта для suspend блока, 
    как я уже говорил ранее им является сама корутина
    */
    fun <R> start(
        start: CoroutineStart, 
        receiver: R, 
        block: suspend R.() -> T
    ) {
        start(block = block, receiver = receiver, competion = this)
    }
}

// поначалу я долго искал где запускается корутина, так как не сразу
// заметил что у CoroutineStart переопределён invoke оператор
enum class CoroutineStart {

    // я опустил остальные варианты, оставил только базовый
    DEFAULT;

    operator fun <R, T> invoke(
        block: suspend R.() -> T, 
        receiver: R, 
        completion: Continuation<T>
    ): Unit =
        when (this) {
            // для suspend блока будет сгенерированна ContinuationImpl
            // реализация, которая запустится при вызове launch функции
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            else -> Unit
        }

}

Осталось только разобраться как все таки запускается suspend блок, для этого проваливаемся ещё на уровень ниже, в startCoroutineCancellable() функцию:

// обратите внимание, что startCoroutineCancellable() является
// Kotlin Extension функцией для suspend блока
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, 
    completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    // первым делом из suspend блока создаётся Continuation объект
    createCoroutineUnintercepted(receiver, completion)
        /*
        далее оборачивается в DispatchedContinuation, если используется
        CoroutineDispatcher, а он в большинстве случаев используется
        */
        .intercepted()
        /*
        ну и происходит вызов Continuation.resumeWith() 
        в зависимости от типа Continuation, чтобы блок в корутине
        начал выполняться, иначе ничего не произойдёт
        */
        .resumeCancellableWith(Result.success(Unit), onCancellation)

// создаёт Continuation из suspend блока
actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    /* 
    в нашем случае suspend блок является объектом ContinuationImpl,
    а это наследник BaseContinuationImpl, поэтому выполняется 
    первая ветка
    */
    return if (this is BaseContinuationImpl)
        // метод create будет сгенерирован компилятором для ContinuationImpl
        // реализации, кстати ранее уже был пример сгенерированного кода
        create(receiver, completion)
    else
        ...
}

/*
делает вызов resumeWith в зависимости от типа Continuation:
1) DispatchedContinuation может передать вызов resumeWith() 
CoroutineDispatcher'у, который выполнит код на другом потоке
2) обычный вызов Continuation.resumeWith() произойдёт без
смены потоков и тд
*/
fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

Можно сказать что вы прошли джедайское обучение и готовы к настоящему бою, разберём самый первый пример:

fun main() = runBlocking {
    // запускаем новую корутину
    launch {
        println("Hello, I'm a Kotlin coroutine, how are you?")
    }
}

Если посмотреть на пример с точки зрения внутрянки корутин, то получим примерно следующий код:

fun main() {
    SuspendLaunchBlock(
        // (2)
        completion = StandaloneCoroutine()
    ).resumeWith(Result.success(Unit)) // (3)
}

// (1)
class SuspendLaunchBlock(
    completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {

    var label = 0

    // (5)
    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (newResult === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    // (4)
    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("Hello, I'm a Kotlin coroutine, how are you?")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class StandaloneCoroutine<T>(...) : Continuation<T> {

    // (6)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

Давайте по порядку:

  1. Создаётся сгенерированная реализация SuspendLaunchBlock для suspend блока в launch функции со стейт-машиной или конкретнее when конструкцией.

  2. Создаётся StandaloneCoroutine и передаётся в качестве параметра completion в SuspendLaunchBlock

  3. Запускается корутина через вызов SuspendLaunchBlock.resumeWith() метода, который далее выполняет сгенерированный invokeSuspend() метод

  4. В invokeSuspend() выполняется единственная ветка в when блоке - вывод в консоль и возвращение пустого результата

  5. После завершения invokeSuspend() в SuspendLaunchBlock.resumeWith() происходит сначала проверка на состояние приостановки, в данном случае invokeSuspend() выполнилась без приостановки, поэтому сразу вызывается completion.resumeWith(), а так как completion это StandaloneCoroutine, то вызывается StandaloneCoroutine.resumeWith() реализация

  6. StandaloneCoroutine.resumeWith() проверяет нет ли незавершенных дочерних корутин, у нас их нет, и прекращает выполнение

Вы практически магистр джедаев! Сделайте паузу, заварите чай или кофе, скушайте шоколадку и ещё раз пройдитесь по примеру, очень важно понять что механизм корутин заключается в переходах между состояниями приостановки через вызов Continuation.resumeWith() метода, а завершение происходит в корневом Continuation объекте и самое главное, что нет никакой магии.

А если в корутине цепочка из suspend функций?

Усложним пример из прошлого раздела, добавив две suspend функции:

// опустим подробности реализации
suspend fun fetchAuthToken() = ...
suspend fun fetchProfileData(token: String) = ...

fun main() = runBlocking {
    // запускаем корутины с двумя suspend функциями
    launch {
        val token = fetchAuthToken()
        val profile = fetchProfileData(token = token)
        println(profile)
    }
}

Интересно какой теперь будет код с точки зрения внутрянки корутин, смотрим:

fun main() {
    SuspendLaunchBlock(
        // (2)
        completion = StandaloneCoroutine()
    ).resumeWith(Result.success(Unit)) // (3)
}

// (5)
suspend fun fetchAuthToken(continuation: SuspendLaunchBlock): Any? {
    /*
    любое асинхронное выполнение кода приводит к состоянию приостановки
    это необязательно использование многопоточности, дальше вы это увидите
    
    runCodeInBackground - магический метод, который выполняет 
    блок кода в фоновом потоке
    */
    runCodeInBackground {
        val token = ...
        runCodeInMain {
            // (6)
            // магический метод, который выполняет блок кода на главном потоке
            continuation.resumeWith(token)
        }
    }
  
    return COROUTINE_SUSPENDED
}

suspend fun fetchProfileData(token: String) = ...

// (1)
class SuspendLaunchBlock(
    completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        } 
        // (9)
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        // while(true) нужен чтобы выполнять ветки дальше, 
        // если suspend функция не перешла в состояние приостановки
        while (true) {
            when(label) {
                // (4)
                0 -> {
                    throwIfFailureResult(result)
                    label = 1
                    // Continuation передаётся в качестве 
                    // аргумента suspend функции
                    val state = fetchAuthToken(this)
                    if (state == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED
                    }
                }
                // (7)
                1 -> {
                    throwIfFailureResult(result)
                    label = 2
                    val token = result.unwrap()
                    val state = fetchProfileData(token, this)
                    if (state == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED
                    }
                }
                // (8)
                2 -> {
                    throwIfFailureResult(result)
                    val profile = result.unwrap()
                    println(profile)
                    break
                }
                else -> error("Illegal state")
            }
        }
        
        return Unit
    }
  
}

class StandaloneCoroutine<T>(...) : Continuation<T> {

    // (10)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

Чтобы было поинтереснее будем считать, что обе suspend функции переходят в состояние приостановки, смотрим логику:

  1. Создаётся сгенерированная реализация SuspendLaunchBlock для suspend блока в launch функции со стейт-машиной или конкретнее when конструкцией.

  2. Создаётся StandaloneCoroutine и передаётся в качестве параметра completion в SuspendLaunchBlock

  3. Запускается корутина через вызов SuspendLaunchBlock.resumeWith() метода, который далее выполняет сгенерированный invokeSuspend() метод

  4. В invokeSuspend() выполняется первая ветка (label == 0), где происходит вызов функции fetchAuthToken(), в качестве единственного параметра передаётся текущий Continuation объект, в данном случае это SuspendLaunchBlock, значение переменной label меняется на 1

  5. Функция fetchAuthToken() возвращает значение COROUTINE_SUSPENDED, что свидетельствует о состоянии приостановки, важно что здесь нет никакой магии, выполнение кода происходит в другом потоке, а так как это асинхронное выполнение, внешнему коду можно только передать результат через callback, которым кстати является SuspendLaunchBlock

  6. После выполнение своего кода fetchAuthToken() вызывает метод SuspendLaunchBlock.resumeWith() с результатом своей работы, в примере это строка с токеном

  7. SuspendLaunchBlock.resumeWith() возобновляет своё выполение и повторно вызывает invokeSuspend(), где уже выполняется вторая ветка (label == 1), в ней происходит вызов fetchProfileData() метода, в качестве первого параметра он принимает токен от предыдущей suspend функции fetchAuthToken(), а в качестве второго ссылку на Continuation объект, которым как мы уже знаем является SuspendLaunchBlock, метод fetchProfileData() выполняется аналогично fetchAuthToken(), label становится равным 2

  8. В последней ветке invokeSuspend(), где label == 2, происходит вывод в консоль результата функции fetchProfileData() и возвращение пустого значения

  9. На этот раз возвращенное значение из invokeSuspend() не является COROUTINE_SUSPENDED , поэтому выполнение SuspendLaunchBlock завершается, дальнейшее управление передаётся StandaloneCoroutine через вызов completion.resumeWith()

  10. StandaloneCoroutine.resumeWith() проверяет нет ли незавершенных дочерних корутин, у нас их нет, и прекращает выполнение

Отлично, теперь вы знаете как происходят переходы между отдельными suspend функциями, поздравляю, вы можете смело умничать на собесах, но не забывайте что скромность красит человека)

Переключение потоков, delay() и CoroutineDispatcher

Мы забыли о самом важном ради чего в принципе используются корутины - выполнение suspend функций на других потоках, отсюда собственно и возникает потребность приостановить выполнение текущей корутины, как мы уже выяснили для этого нужно передать Continuation объект suspend функции и ждать пока она сама не вызовет Continuation.resumeWith() метод:

fun fetchAuthToken(continuation: Continuation<Any?>): Any? {
    // магический метод, который выполняет блок кода в фоновом потоке
    runCodeInBackground {
        val token = ...
        // магический метод, который выполняет блок кода на главном потоке
        runCodeInMain {
            /* 
            чтобы сообщить корутине что suspend функция завершила
            своё выполнение нужно вызвать Continuation.resumeWith() 
            с результатом работы функции
            */
            continuation.resumeWith(token)
        }
    }
    // так как функция не может сразу вернуть результат, 
    // она переходит в состояние приостановки
    return COROUTINE_SUSPENDED
}

Это достаточно упрощенная версия кода, но зато она отражают общий механизм, а самое главное показывает что состояние приостановки это ничто иное как выполнение некоторого кода на другом потоке и возвращение результата через Continuation.resumeWith как через обычный callback.

В прошлом разделе я вскользь упомянул, что любое асинхронное выполнение приводит к состоянию приостановки, но не раскрыл эту тему, давайте разбираться.

Что нужно чтобы код стал асинхронным? Правильно сделать его выполнение независимым от текущей точки выполнения, например создать новый поток:

fun main() {
    // создаётся новый поток и сразу запускается
    Thread {
        val sum = 3 + 7
        println(sum)
    }.start()
    // код main() продолжает выполняться независимо от того,
    // выполнился ли весь код в Thread
    val mul = 3 * 7
    println(mul)
    // функция main() может завершиться раньше созданного потока
}

Но давайте на минутку забудем про многопоточность и вспомним главный поток Android, он позволит нам выполнить код асинхронно? Конечно же да, можно это сделать через старый добрый Handler:

// Handler поставит выполнение кода в очередь главного потока
handler.post {
    println("I'll run soon")
}

Кстати Handler используется в реализации функции delay для главного потока Android, если забыли, то эта функция позволяет сделать задержку без блокировки текущего потока:

fun delay(continuation: Continuation<Any?>): Any? {
    val block = Runnable {
        // после того как задержка пройдёт, выполнится этот блок кода
        // и корутина продолжит своё выполнение после приостановки
        continuation.resumeWith(Result.success(Unit))
    }
 
    // handler.postDelayed() выполняет block через указанный 
    // промежуток времени в миллисекундах
    if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
        // если корутина была отменена нужно отменить задержку
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    } else {
        // отменяет текущую корутину, так как Handler для главного потока
        // был закрыт
        cancelOnRejection(continuation.context, block)
    }

    return COROUTINE_SUSPENDED
}

Вот и вся суть асинхронного выполнения кода, им может быть любой механизм, позволяющий выполнить X код независимо от Y кода и если попробовать дать определение корутинам сейчас, то оно будет примерно такое:

Корутина — это абстракция, которая оборачивает некоторую асинхронную работу, это может быть выполнение кода в другом потоке или использование очереди главного потока, например MessageQueue из Android, в удобный последовательный код с механизмами отмены и другими прикольными фишками.

Ладно, вроде бы разобрались что такое приостановка корутины и асинхронное выполнение кода, можем переходить к более прикладным вещам, например к функции withContext(), чаще всего используемой для изменения CoroutineDispatcher'а:

suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    /*
    как мы знаем Continuation работает под капотом корутин и его нельзя
    получить явно в прикладном коде, поэтому была придумана inline функция
    suspendCoroutineUninterceptedOrReturn (и не одна кстати), 
    которая после компиляции подставит текущий Continuation объект
    */
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        val oldContext = uCont.context
        /*
        если вы ещё не забыли то новый контекст производится 
        путём сложения двух контекстов, в качестве результата 
        мы имеем контекст в котором старые элементы заменены новыми, 
        например Dispatchers.Main можно поменять на Dispatchers.Default
        */
        val newContext = oldContext.newCoroutineContext(context)

        // проверка что корутина все ещё выполняется
        newContext.ensureActive()

        // мы не будем рассматривать все ветки, нас интересует только
        // переключение потоков через CoroutineDispatcher
        if (newContext === oldContext) {
            ...
        }
        // CoroutineDispatcher является наследником ContinuationInterceptor
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            ...
        }
        
        // ну вот опять создаётся какая-то неизвестная нам корутина,
        // не беспокойтесь там достаточно простая логика
        val coroutine = DispatchedCoroutine(newContext, uCont)
        // стартуем также как и обычную корутину
        block.startCoroutineCancellable(coroutine, coroutine)
        /*
        если есть возможность сразу отдать результат без приостановки корутины
        то withContext сразу завершится, в противном случае корутина, 
        содержащая withContext() вызов перейдёт в состояние приостановки
        */
        coroutine.getResult()
    }
}

Функция withContext делает 3 простые вещи:

  1. Получает текущий Continuation объект, он может быть взят из параметра suspend функции или из текущей корутины в которой была вызвана функция withContext

  2. Берёт контекст из Continuation объекта и складывает с контекстом, переданным в качестве параметра, в результате создаётся новый контекст

  3. На основе нового контекста создаёт определённый вид корутин, например если был изменён CoroutineDispatcher будет создана корутина DispatchedCoroutine

Что ж давайте теперь глянем исходники DispatchedCoroutine:

/*
чаще всего в качестве continuation параметра выступает 
Continuation объект, который генерируется для suspend блока в корутине, 
если забыли, то это реализация абстрактного класса ContinuationImpl
*/
internal class DispatchedCoroutine<in T> internal constructor(
    context: CoroutineContext,
    continuation: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

    /* 
    метод afterResume() вызывается перед завершением Continuation.resumeWith(),
    когда корутина закончила выполнять все свои suspend функции и 
    у неё больше нет дочерних корутин в состоянии выполнения
    */
    override fun afterResume(state: Any?) {
        /*
        метод afterResume() может быть вызван раньше getResult(), 
        например если блок кода в withContext() очень быстро выполнился
        в таком случае результат вернёт getResult()
        */
        if (tryResume()) return 
        /*
        Я уже вскользь упоминал что делает каждый метод в этой цепочке,
        когда мы рассматривали как стартует корутины, ещё раз повторим:
  
        intercepted() оборачивает continuation в DispatchedContinuation, 
        который реализует логику работы с CoroutineDispatcher'ами

        resumeCancellableWith() вызывает resumeWith() в зависимости от типа
        Continuation, в данном случае будет вызван метод
        DispatchedContinuation.resumeCancellableWith()
        */  
        continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }

    internal fun getResult(): Any? {
        // если нельзя сразу вернуть результат корутина приостанавливается
        if (trySuspend()) return COROUTINE_SUSPENDED

        // результат выполнения withContext()
        val state = ...
        return state as T
    }
}

Суммируем, DispatchedCoroutine выполняет две ключевые задачи:

  1. Добавляет возможность вернуть результат без перехода в состояние приостановки, если такая возможность есть, для этого используется getResult() метод.

  2. Переключает Continuation объект, в котором был вызван withContext(), на родной поток, например если ваша корутина выполняется на главном потоке, затем вызывает withContext() на фоновом, то результат должен вернуться снова на главный.

Ладно, с DispatchedCoroutine более менее разобрались, чтобы понять как на самом деле происходит переключение потоков в корутинах провалимся в DispatchedContinuation, в который оборачиваются другие Continuation объекты:

/*
DispatchedContinuation принимает на вход:

dispatcher - выполняет блок кода, чаще всего на другом потоке 
или с использованием очереди, например Handler / MessageQueue из Android 
continuation - Continuation объект, работа с которым будет происходить
через указанный диспатчер
*/
internal class DispatchedContinuation<in T>(
    val dispatcher: CoroutineDispatcher,
    val continuation: Continuation<T>
) : Continuation<T> by continuation {

    /*
    логика resumeWith() идентична resumeCancellableWith() с отличием только
    в разных режимах resumeMode, обычно чаще всего вызывается 
    именно resumeCancellableWith, так как режим MODE_CANCELLABLE 
    позволяет прокинуть CancellationException для отмены корутины
    */
    override fun resumeWith(result: Result<T>) { ... }

    internal inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        /*
        CoroutineDispatcher имеет два логически связанных метода:
        
        isDispatchNeeded() решает выполнять код в диспатчере или нет
        dispatch() выполняет код в диспатчере: код может выполниться на
        другом потоке, поставлен в очередь и тд
        */
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // для наглядности я упростил блок кода и написал его здесь
            val block = Runnable {
                continuation.resumeWith(state)
            }
            dispatcher.dispatch(context, block)
        } else {
            /*
            если диспатчер не хочет выполнять код, а такое может быть
            например если диспатчер переключает на главный поток, а мы уже
            на главном потоке и внутри диспатчера реализована проверка, 
            то isDispatchNeeded() вернёт false, в таком случае выполнение
            корутины будет добавлено в EventLoop
            */
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

}

Как видите логика CoroutineDispatcher'а достаточно простая:

  1. Вызывается метод isDispatchNeeded() чтобы понять отдавать выполнение кода диспатчеру или нет, это нужно чтобы избежать лишних вызовов dispatch(), например не делать переключение на главный поток, если мы уже находимся на нём

  2. Если isDispatchNeeded() вернул true разумеется отдаём выполнение кода диспатчеру, вызвав метод dispatch()

  3. Если isDispatchNeeded() вернул false запускаем корутину на EventLoop'е, об этом в следующем разделе

В качестве примера рассмотрим такие интересные диспатчеры из Android, как Dispatchers.Main и Dispatchers.Main.immediate:

val mainLooper = Looper.getMainLooper()
val handler = Handler(mainLooper)

// реализация для Dispatchers.Main
override fun isDispatchNeeded(...) = true

// реализация для Dispatchers.Main.immediate
override fun isDispatchNeeded(...): Boolean {
    // сравнивает Looper текущего потока с главным
    return Looper.myLooper() != mainLooper
}

override fun dispatch(
    context: CoroutineContext, 
    block: Runnable
) {
    // handler.post() выполняет блок кода на главном потоке
    handler.post(block)
}

Вот и вся разница между ними: Dispatchers.Main всегда переключает выполнение кода на главный поток через Handler.post(), а Dispatchers.Main.immediate только, если код не выполняется на главном потоке.

Для закрепления знаний попробуем собрать всё воедино и описать логику для следующего примера:

// примерно такой код можно встретить в рабочих проектах 
viewModelScope.launch {
    // я не стал выносить в отдельную функцию, чтобы не усложнять пример
    val posts = withContext(Dispatchers.IO) {
        try {
            // получаем список постов в background потоке
            apiService.fetchPosts()
        } catch (exception: Exception) {
            // важно прокидывать CancellationException дальше 
            // так как это часть механизма отмены корутины
            if (exception is CancellationException) throw exception
            emptyList()
        }
    }
    // отображаем данные на главном потоке
    println(posts)
}

Под капотом весь этот код будет выглядить примерно как-то так:

// (1)
class ViewModelScopeLaunchBlock(
    completion: Continuation<Any?>
) : ContinuationImpl(completion) {

    var label = 0

    // (5)
    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        // (17)
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        // while(true) нужен чтобы выполнять ветки дальше, 
        // если suspend функция не перешла в состояние приостановки
        while (true) {
            when(label) {
                // (5)
                0 -> {
                    throwIfFailureResult(result)
                    label = 1
                    // Continuation передаётся в качестве 
                    // аргумента suspend функции
                    val state = fetchPosts(this)
                    // (10)
                    if (state == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED
                    }
                }
                // (16)
                1 -> {
                    throwIfFailureResult(result)
                    val profile = result.unwrap()
                    println(profile)
                    break
                }
                else -> error("Illegal state")
            }
        }
        
        return Unit
    }
  
}

class StandaloneCoroutine(...) {

    // (18)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

// (6)
class WithContextBlock(
    completion: DispatchedCoroutine
) : ContinuationImpl(completion) {

    var label = 0

    // (12)
    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
             Result.failure(exception)
        }
        // (12)
        completion.resumeWith(newResult)
    }

    // (11)
    fun invokeSuspend(result: Result<Any?>): Any? {
        try {
            val posts = apiService.fetchPosts()
            return posts
        } catch (exception: Exception) {
            // важно прокидывать CancellationException дальше так как это часть
            // механизма отмены корутины, вспомните resumeCancellableWith
            if (exception is CancellationException) throw exception
            return emptyList()
        }
    }

}

class DispatchedCoroutine(
    ...
    // (7)
    val continuation: ViewModelScopeLaunchBlock
): ScopeCoroutine(context, continuation) {

    // (13)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

    // (14)
    override fun afterResume(state: Any?) {
        if (tryResume()) return 
        continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
  
}

class DispatchedContinuation<in T>(
    val dispatcher: CoroutineDispatcher,
    val continuation: Continuation<T>
) : Continuation<T> by continuation {

    // (4, 9, 15)
    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            val block = Runnable {
                continuation.resumeWith(state)
            }
            // (9, 15)
            dispatcher.dispatch(context, block)
        } else {
            // (4)
            continuation.resumeWith(state)
        }
    }

}

// (2)
val topLevelCoroutine = StandaloneCoroutine(...)

val viewModelScopeLaunchBlock = ViewModelScopeLaunchBlock(
    // (2)
    completion = topLevelCoroutine
)

// (3)
DispatchedContinuation(
  dispatcher = Dispachers.Main.immediate
  continuation = viewModelScopeLaunchBlock
).resumeCancellableWith(Result.success(Unit))

val withContextBlock = WithContextBlock(
    // (7)
    completion = DispatchedCoroutine(viewModelScopeLaunchBlock)
)

// (8)
DispatchedContinuation(
    dispatcher = Dispatchers.IO,
    continuation = withContextBlock
).resumeCancellableWith(Result.success(Unit))

Очень запутанно? Согласен, давайте по порядку:

  1. Генерируется реализация ViewModelScopeLaunchBlock для suspend блока в viewModelScope.launch() функции со стейт-машиной или конкретнее when конструкцией.

  2. Создаётся StandaloneCoroutine и передаётся в качестве параметра completion в ViewModelScopeLaunchBlock

  3. ViewModelScopeLaunchBlock оборачивается в DispatchedContinuation, в качестве диспатчера передаётся тот, что был указан в viewModelScope, для Android этим диспатчером будет Dispatchers.Main.immediate

  4. Вызывается DispatchedContinuation.resumeCancellableWith() для ViewModelScopeLaunchBlock, где происходит проверка на главный поток, но так как viewModelScope.launch() и так выполняется на главном потоке, то не будет никакого переключения через диспатчер, а метод ViewModelScopeLaunchBlock.resumeWith() будет вызван напрямую

  5. В ViewModelScopeLaunchBlock.resumeWith() происходит вызов invokeSuspend(), где начинает выполняться первая ветка (label == 0), в которой вызывается fetchPosts(), в качестве первого параметра передаётся ссылка на текущий Continuation объект, в данном случае это ViewModelScopeLaunchBlock, переменная label становится равной 1

  6. В функции fetchPosts() вызывается withContext(), который также как и viewModelScope.launch() принимает suspend блок, поэтому генерируется реализация WithContextBlock

  7. Создаётся DispatchedCoroutine и передаётся в качестве параметра completion в WithContextBlock.
    Обратите внимание, что DispatchedCoroutine в качестве объекта Continuation принимает ViewModelScopeLaunchBlock, блок из которого вызывается функция withContext(), так как нам нужно каким-то образом вернуть результат обратно.

  8. WithContextBlock запускается аналогично ViewModelScopeLaunchBlock, также оборачивается в DispatchedContinuation, только теперь в качестве диспатчера передаётся Dispatchers.IO.

  9. Вызывается DispatchedContinuation.resumeCancellableWith() для WithContextBlock, где происходит переключение главного потока на background поток через Dispatchers.IO диспатчер, WithContextBlock.resumeWith() теперь будет выполняться на background потоке

  10. DispatchedCoroutine не может отдать сразу результат запроса и поэтому fetchPosts() в ViewModelScopeLaunchBlock.invokeSuspend() возвращает COROUTINE_SUSPENDED, что приводит к приостановке корутины

  11. Метод WithContextBlock.invokeSuspend() выполняет единственную ветку кода - запрос в сеть и получение ответа на background потоке.

  12. Когда запрос завершится метод WithContextBlock.invokeSuspend() вернёт результат в WithContextBlock.resumeWith(), где произойдёт дальнейшая отправка результата через вызов completion.resumeWith() в объект корутины, в данном случае это DispatchedCoroutine

  13. В DispatchedCoroutine.resumeWith() сначала произойдёт проверка на дочерние корутины в состоянии выполнения и если их нет, как в данном примере, выполнится код DispatchedCoroutine.afterResume()

  14. Метод DispatchedCoroutine.afterResume() должен вернуть результат в ViewModelScopeLaunchBlock, но здесь есть проблема: WithContextBlock сейчас выполняется на background потоке, который предоставил Dispatchers.IO, а ViewModelScopeLaunchBlock должен получить результат на главном, поэтому вызывается цепочка continuation.intercepted().resumeCancellableWith(), метод intercepted() не будет повторно оборачивать ViewModelScopeLaunchBlock в DispatchedContinuation, это сделано для оптимизации

  15. Снова вызывается DispatchedContinuation.resumeCancellableWith() для ViewModelScopeLaunchBlock, но только теперь происходит переключение на главный поток через диспатчер Dispatchers.Main.immediate, если вы не забыли там под капотом Handler.post() вызов, в итоге ViewModelScopeLaunchBlock.resumeWith() выполняется на главном потоке, а в качестве результата передаётся список постов

  16. В ViewModelScopeLaunchBlock.resumeWith() происходит второй вызов ViewModelScopeLaunchBlock.invokeSuspend(), теперь уже выполняется вторая ветка (label == 1), которая берёт список постов и выводит его в консоль.

  17. Метод ViewModelScopeLaunchBlock.invokeSuspend() завершается успешно без приостановки, поэтому ViewModelScopeLaunchBlock.resumeWith() заканчивает своё выполнение и делает вызов completion.resumeWith(), где в качестве completion выступает корутина StandaloneCoroutine

  18. В StandaloneCoroutine.resumeWith() происходит проверка на дочерние корутины в состоянии выполнения, их в данном примере нет, корутина завершается.

Если вы дошли до этого момента, то вы явно не простой перец, обязательно сделайте перерыв с шоколадкой и попробуйте аналогичным образом проследить путь выполнение корутин в своём коде.

К сожалению это ещё не конец, корутины могут запускаться в других корутинах и не по одной, а целыми пачками, здесь работает похожий механизм с диспатчерами, но помимо него есть ещё и EventLoop, если вы уже отдохнули и готовы сделать последний рывок, продолжаем!

Дочерние корутины, EventLoop и runBlocking

Выполнение дочерних корутин важно разграничивать на два вида:

  1. Дочерняя корутина запускается через диспатчер

  2. Дочерняя корутина выполняется на EventLoop

Первый случай возникает, когда диспатчер всегда переключает выполнение корутины, даже если она выполняется на правильном потоке, например Dispatchers.Main, второй же свойственен для диспатчеров, где переключение происходит только по необходимости, яркий пример из Android: Dispatchers.Main.immediate, который переключает выполнение корутины на главный поток, только если она не выполняется на нём.

Что ж, рассмотрим по порядку оба случая, начнём с первого:

/*
Dispatchers.Main всегда переключает выполнение корутины 
на главный поток через Handler.post() механизм, 
даже если корутина и так выполняется на главном
*/
val uiScope = CoroutineScope(Dispatchers.Main + Job())
uiScope.launch {
    launch {
        println("I'm child coroutine #1")
    }
    launch {
        println("I'm child coroutine #2")
    }
    println("I'm parent coroutine")
}

// примерно во что всё это превратится

class UiScopeParentBlock(
    completion: StandaloneCoroutine
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("I'm parent coroutine")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class UiScopeChild1Block(
    completion: UiScopeParentBlock
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("I'm child coroutine #1")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class UiScopeChild2Block(
    completion: UiScopeParentBlock
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("I'm child coroutine #2")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class StandaloneCoroutine(...) {

    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        /*
        resumeWith() не завершится полностью пока 
        дочерние корутины не закончат своё выполнение

        когда мы рассматривали код StandaloneCoroutine, то можно было
        увидеть как происходит добавление Job'ы дочерней корутины в Job'у
        родительской, поэтому родительская корутина знает состояния 
        своих дочерних корутин
        */
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

// родительская корутина
val parentCoroutine = StandaloneCoroutine(...)

val uiScopeParentBlock = UiScopeParentBlock(
    completion = parentCoroutine
)

DispatchedContinuation(
  dispatcher = Dispachers.Main
  continuation = uiScopeParentBlock
).resumeCancellableWith(Result.success(Unit))

// первая дочерняя корутина
val childCoroutine1 = StandaloneCoroutine(...)

val uiScopeChild1Block = UiScopeChild1Block(
    completion = childCoroutine1
)

DispatchedContinuation(
  dispatcher = Dispachers.Main
  continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))

// вторая дочерняя корутина
val childCoroutine2 = StandaloneCoroutine(...)

val uiScopeChild1Block = UiScopeChild1Block(
    completion = childCoroutine2
)

DispatchedContinuation(
  dispatcher = Dispachers.Main
  continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))

Я не буду расписывать отдельные шаги, как это было в прошлом разделе, вы сами прекрасно можете справиться с этим, к тому же это будет хорошей практикой для закрепления знаний, вкратце основная суть:

  1. Родительская корутина StandaloneCoroutine не будет завершена до тех пора пока все её дочерние корутины не выполнились. При создании новой корутины её объект Job добавляется в качестве дочернего элемента в объект Job родительской корутины, благодаря этому корутины могут отслеживать состояния своих детей.

  2. suspend блоки дочерних корутин UiScopeChild1Block и UiScopeChild2Block будут обёрнуты в DispatchedContinuation и переключены на главный поток через Handler.post() в не зависимости была ли родительская корутина изначательно на главном потоке или нет, Dispatchers.Main в отличии от Dispatchers.Main.immediate всегда делает переключение.

  3. Объект Continuation родительской корутины никак не связан с Continuation объектами дочерних корутин, поэтому когда завершатся последние результат не будет проброшен обратно в UiScopeParentBlock, да и в этом особо нет смысла, как например с withContext(), который гарантирует последовательный порядок выполнения с возвращением результата.

  4. Диспатчеры в принципе не могут гарантировать порядок выполнения, так как выполняют код асинхронно, тот же метод Handler.post() из Android не даёт 100% уверенности, что код всегда будет выполняться в том порядке, который мы запланировали.

С запуском дочерних корутин через диспатчеры в целом разобрались, но что произойдёт если например у нас диспатчер Dispatchers.Main.immediate и все корутины выполняются на главном потоке, дочерние корутины ведь не будут переключаться снова через Handler.post() как это было с Dispatchers.Main, в таком случае начинает работать так называемый EventLoop:

// метод из DispatchedContinuation
internal inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    if (dispatcher.isDispatchNeeded(context)) {
        // ...
    } else {
        /*
        executeUnconfined делает одну из двух вещей:
        
        1) выполняет лямбду на EventLoop'е
        2) ставит лямбду в очередь EventLoop'а
        */
        executeUnconfined(state, MODE_CANCELLABLE) {
            continuation.resumeWith(result)
        }
    }
}

private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    // EventLoop - штука куда кладутся лямбды в очередь на исполнение
    val eventLoop = ThreadLocalEventLoop.eventLoop

    // isUnconfinedLoopActive изначательно равен false поэтому для
    // родительской корутины срабатывает вторая ветка, а для дочерних - первая
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        // выполнение дочерних корутин ставится в очередь EventLoop'а
        eventLoop.dispatchUnconfined(this)
        true
    } else {
        /*
        выполняет Continuation.resumeWith() для родительской корутины,
        дочерние в этот момент создаются и кладутся в очередь EventLoop'а, 
        после завершения инициализация родительской корутины дочерние
        по очереди берутся из EventLoop'а и выполняются
        */
        runUnconfinedEventLoop(eventLoop, block = block)
        false
    }
}

Считайте что EventLoop это простая очередь задач или лямбд как в нашем случае, куда кладутся запросы на выполнение дочерних корутин, затем они выполняются в том порядке, в котором были добавлены в очередь, как раз такая организация обеспечивает последовательное выполнение корутин:

/*
viewModelScope под капотом использует Dispatchers.Main.immediate,
который не будет переключать дочерние корутины, так как они и так
находятся на главном потоке, поэтому будет задействован 
механизм EventLoop'а
*/
viewModelScope.launch {
    launch {
        println("I'm the second!")
    }
    launch {
        println("I'm the third!")
    }
    /*
    весь блок кода в родительской корутине выполняется 
    до момента выполнения дочерних корутин, это необходимо
    чтобы поставить дочерние корутины в очередь, 
    а потом начать их выполнять
    */
    println("I'm the first!")
}

В качестве дополнения приведу ещё парочку интересных особенностей EventLoop'а:

  1. EventLoop наследуется от CoroutineDispatcher и может быть использован в корутинах, например так работает runBlocking()

  2. EventLoop создаётся для каждого потока чере механизм ThreadLocal переменных, как например экземпляр Looper класса из Android

  3. Под капотом EventLoop лежит ArrayDeque из Kotlin коллекций для формирования очереди задач.

Напоследок рассмотрим как runBlocking() ждёт завершения своих корутин и соблюдает их порядок, хотя ответ очевиден - используется EventLoop:

/*
EventLoop создаётся на основе переданного CoroutineContext'а и кладётся 
в специальную корутину BlockingCoroutine, затем вызывается 
метод joinBlocking(), который ждёт пока все дочерние корутины 
не выполнятся
*/
actual fun <T> runBlocking(
    context: CoroutineContext, 
    block: suspend CoroutineScope.() -> T
): T {  
    val currentThread = Thread.currentThread()
    val eventLoop: EventLoop = ...
    val newContext: CoroutineContext = ...
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}

private class BlockingCoroutine<T>(
    parentContext: CoroutineContext,
    private val blockedThread: Thread,
    private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {

    /*
    ожидание происходит в while(true) цикле, а любой бесконечный цикл 
    как вы уже догадываетесь блокирует текущий поток, отсюда 
    и название runBlocking
    чтобы завершить цикл ожидания нужно поменять isCompleted на false,
    это произойдет только когда все дочерние корутины завершатся
    */
    fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            eventLoop?.incrementUseCount()
            try {
                while (true) {
                    eventLoop?.processNextEvent()
                    if (isCompleted) break
                }
            } finally { // paranoia
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        val state = this.state.unboxState()
        (state as? CompletedExceptionally)?.let { throw it.cause }
        return state as T
    }
}

Именно благодаря бесконечному циклу runBlocking() дожидается выполнения всех дочерних корутин, при этом блокируя текущий поток.

Хотелось бы добавить что порядок в runBlocking() будет гарантироваться только когда корутины выполняются без каких либо асинхронных вызовов и переключений на другие диспатчеры, например здесь порядок не будет соблюдён:

fun main() = runBlocking<Unit> {
    launch {
        val result = withContext(Dispatchers.IO) {
            delay(500)
            "I'm the second!"
        }
        println(result)
    }
    launch {
        println("I'm the first!")
    }
}

Заключение

Не думал что и сам дотяну до этого момента, статья получилась очень объёмной и без капли самоуверенности заявляю охренительно полезной!

В качестве последних слов я собрал парочку фактов:

  1. Корутина - это всего лишь удобная абстракция c крутыми фишками над асинхронным выполнением кода, примером асинхронного выполнения может быть переключение на другой поток, использование очереди главного потока (MessageQueue из Android) и тд.

  2. Continuation — кирпичик на котором построена практически вся библиотека корутин, является простейшим интерфейсом с единственным методом resumeWith(), данный метод вызывается когда происходит переход между состоянием приостановки корутины и состоянием её выполнения.

  3. Состояние приостановки - так как корутины позволяют писать асинхронный код в последовательном стиле, то необходим механизм возвращения к точкам выполнения этого кода, в большинстве случаев такой механизм реализуется с помощью callback'ов, которыми как раз и являются Continuation реализации.

  4. К реализациям Continuation интерфейса относятся: обычная корутина StandaloneCoroutine, сгенерированный suspend блок на базе ContinuationImpl, реализация для работы диспатчеров DispatchedContinuation, корутина используемая в runBlocking() методе - BlockingCoroutine и другие.

  5. DispatchedContinuation оборачивает другие Continuation объекты, чтобы передать выполнение Continuation.resumeWith() метода диспатчеру.

  6. CoroutineDispatcher в большинстве случаев используется для переключения корутины на другой или другие потоки, но и есть и исключения, такие как EventLoop например, который позволяет выполнить корутины в правильном порядке.

  7. EventLoop - это простая очередь задач, куда кладутся запросы на выполнение корутин, а затем они выполняются в том порядке, в котором были добавлены в очередь, такая организация обеспечивает правильный порядок выполнения, но это работает только если ни в одной корутине не будет переключений через диспатчеры.

Полезные ссылки:

  1. Мой телеграм канал

  2. Доклад от Яндекса ШМР 2024

  3. Курс по корутинам от Android Broadcast

  4. Официальная дока

  5. Крутой доклад от создателя библиотеки

  6. Репозиторий библиотеки на Github'е

Пишите в комментах ваше мнение и всем хорошего кода!

Теги:
Хабы:
Всего голосов 30: ↑30 и ↓0+36
Комментарии5

Публикации

Истории

Работа

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
19 сентября
CDI Conf 2024
Москва
20 – 22 сентября
BCI Hack Moscow
Москва
24 сентября
Конференция Fin.Bot 2024
МоскваОнлайн
25 сентября
Конференция Yandex Scale 2024
МоскваОнлайн
28 – 29 сентября
Конференция E-CODE
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
30 сентября – 1 октября
Конференция фронтенд-разработчиков FrontendConf 2024
МоскваОнлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн