Тема 1. Как выглядит Kotlin Coroutine без макияжа

Тема 2. Kotlin suspend функции

Код Kotlin корутин выполняется в потоках, но важно понимать, что корутины не привязаны жестко к конкретным потокам. Их выполнение управляется диспетчерами (Dispatchers), которые определяют, в каком потоке или пуле потоков будет работать корутина.

Как мы обсуждали в Как выглядит Kotlin Coroutine без макияжа при создании корутины создается объект Continuation, в котором содержится код, который выполняет корутина. Код делится на блоки - suspend-функции с помощью switch. Когда код доходит до suspend-функции она вызывается и в неё передается весь текущий объект Continuation и на этом это ответвление switch заканчивается. Suspend-функция по завершению своей работы вызывает метод resume у переданного ей объекта Continuation и таким образом начинается выполнение следующего ответвления switch'a.

Мы всё это вспомнили не просто так:

объект Continuation, который мы обсуждаем на всех этапах передается в специальной обёртке DispatchedContinuation.

Именно на этой обертке и вызывается метод resume, который выделяет поток и вызывает resume у объекта Continuation.

Исходя из изложенного выше мы можем понять механизм смены потока - он может меняться при каждом вызове метода resume у объекта DispatchedContinuation.

// Упрощенный пример DispatchedContinuation
class DispatchedContinuation<T>(
    private val dispatcher: CoroutineDispatcher,
    private val continuation: Continuation<T>
) : Continuation<T> {
    
    override val context: CoroutineContext = continuation.context
    
    override fun resumeWith(result: Result<T>) {
        // 1. Диспетчер решает, в каком потоке выполнить код
        dispatcher.dispatch(context) {
            // 2. В выделенном потоке вызываем resume у оригинального Continuation
            continuation.resumeWith(result)
        }
    }
}

Доступны следующие диспетчеры:

Dispatchers.Default

Если не передать корутине в контекст диспетчер, то она использует диспетчер по умолчанию. Этот диспетчер представляет собой пул потоков, количество которых равно количеству ядер процессора (включая виртуальные). Его целесообразно использовать для CPU-интенсивных задач (сортировка, вычисления). Не рекомендуется запускать на этом диспетчере IO операции из-за относительно небольшого количества пула потоков, т.к. это ограничит параллельное выполнение операций - операции будут ждать в очереди освобождения занятых потоков.

Dispatchers.IO

Его лимит на потоки равен 64 (или числу ядер процессора, если их больше 64). Этот диспетчер целесообразно использовать для выполнения IO операций (сетевые запросы, работа с БД, чтение/запись файлов и т.п.). Не рекомендуется запускать на этом диспетчере CPU-интенсивные задачи, чтобы не перегрузить CPU.

Main

Main диспетчер выполняет код корутины в основном потоке.

Executor

Можно создать диспетчер с помощью Executor:
Executors.newSingleThreadExecutor().asCoroutineDispatcher().

Тогда все будет выполняться на одном потоке:

        val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
        val scope = CoroutineScope(Job())
        // or val scope = CoroutineScope(dispatcher)
        repeat(6) {
            scope.launch(dispatcher) {
                Log.e("TAAAAAAAG", "Coroutine N$it, start")
                TimeUnit.MILLISECONDS.sleep(100)
                Log.e("TAAAAAAAG", "Coroutine N$it, end")
            }
        }

08:53:19.304  TAAAAAAAG   Coroutine N0, start
08:53:19.404  TAAAAAAAG   Coroutine N0, end
08:53:19.405  TAAAAAAAG   Coroutine N1, start
08:53:19.505  TAAAAAAAG   Coroutine N1, end
08:53:19.506  TAAAAAAAG   Coroutine N2, start
08:53:19.606  TAAAAAAAG   Coroutine N2, end
08:53:19.606  TAAAAAAAG   Coroutine N3, start
08:53:19.707  TAAAAAAAG   Coroutine N3, end
08:53:19.707  TAAAAAAAG   Coroutine N4, start
08:53:19.808  TAAAAAAAG   Coroutine N4, end
08:53:19.808  TAAAAAAAG   Coroutine N5, start
08:53:19.908  TAAAAAAAG   Coroutine N5, end

Unconfined

Запускает корутину в текущем потоке, но возобновляет в потоке, предоставленном suspend-функцией. 

Смена потока внутри корутины с suspend fun <T> withContext

withContext — это suspend-функция, которая:

  1. Временно меняет контекст выполнения (диспетчер) для блока кода.

  2. Выполняет переданный блок в указанном диспетчере.

  3. Возвращает результат выполнения блока.

  4. Автоматически возвращается в исходный диспетчер.

launch(Dispatchers.Main) {
    // 1. Начинаем в UI-потоке
    showLoading()
    
    // 2. Переключаемся на IO-поток для сетевого запроса
    val data = withContext(Dispatchers.IO) {
        fetchUserData() // Выполняется в IO-потоке
    }
    
    // 3. Автоматически возвращаемся в UI-поток
    updateUI(data) // Выполняется в Main-потоке
}

Как работает механизм переключения

Шаг 1: Приостановка корутины

Когда корутина доходит до withContext, она:

  • Сохраняет текущий контекст (диспетчер).

  • Приостанавливает выполнение текущего состояния корутины.

  • Освобождает текущий поток (если он не Dispatchers.Unconfined).

Шаг 2: Создание нового Continuation

Под капотом создается новый DispatchedContinuation, который:

  • Содержит переданный блок кода.

  • Связан с новым диспетчером (например, Dispatchers.IO).

  • Имеет callback для возврата результата.

// Упрощённый код
internal fun withContext(context: CoroutineContext, block: suspend () -> T): T {
    // 1. Получаем текущий контекст
    val oldContext = currentCoroutineContext()
    
    // 2. Создаем новую обертку Continuation
    val newContinuation = DispatchedContinuation(
        newDispatcher = context[ContinuationInterceptor] as CoroutineDispatcher,
        continuation = object : Continuation<T> {
          override val context: CoroutineContext = oldContext
            // Этот код выполнится после завершения блока
            override fun resumeWith(result: Result<T>) {
                // Вернемся в исходный контекст
                  oldContext[ContinuationInterceptor]?.interceptContinuation { continuation -> 
                    continuation.resumeWith(result)
                } ?: resumeUndispatchedWith(result)
            }
        }
    )
    
    // 3. Диспетчер запускает блок в своем потоке
    newContinuation.dispatcher.dispatch(block)
    
    // 4. Приостанавливаем текущую корутину
    return suspendCoroutine { /* ... */ }
}

Шаг 3: Выполнение в новом потоке

Новый диспетчер:

  1. Берет свободный поток из своего пула.

  2. Выполняет переданный блок кода.

  3. По завершении вызывает resume для продолжения.

Шаг 4: Возврат в исходный поток

После выполнения блока:

  1. Результат передается через цепочку Continuation.

  2. Специальный механизм (UndispatchedContext или аналогичный) определяет, нужно ли возвращаться в исходный диспетчер.

  3. Если исходный диспетчер отличается от текущего, задача ставится в очередь исходного диспетчера.

// Механизм возврата упрощённо
fun resumeInOriginalContext(result: Result<T>) {
    val originalDispatcher = savedOriginalDispatcher
    
    if (originalDispatcher != currentDispatcher) {
        // Ставим задачу в очередь исходного диспетчера
        originalDispatcher.dispatch {
            // Продолжаем выполнение корутины
            originalContinuation.resumeWith(result)
        }
    } else {
        // Остаемся в том же потоке
        originalContinuation.resumeWith(result)
    }
}