Как стать автором
Обновить

Kotlin Coroutines. От А до Я

Уровень сложностиПростой
Время на прочтение28 мин
Количество просмотров17K
Превью статьи
Превью статьи

Всем привет!

В этой статье я бы хотел осветить загадочный мир корутин перед своим читателем, рассказав о них от А до Я.

Я старался написать эту статью таким образом, чтобы она была полностью понятна новичкам и могла использоваться как шпаргалка на собеседовании для тех, кому уже знаком этот асинхронный мир. Для лучшего понимания, я буду периодически ссылаться на исходники библиотеки kotlinx.coroutines версии 1.7.1, так как уверен, что это поможет развеять загадочность всех корутинных мифов.

Статья получилась весьма подробной, поэтому запасайтесь кружчекой чая, и давайте начнем!

Оглавление

  1. Шпаргалка по корутинам

  2. Зачем нужны корутины?

  3. Suspend в Котлин

  4. Библиотека kotlinx.corourines

  5. Три способа войти в корутинный мир. runBlocking, CoroutineScope и suspend main

  6. Область видимости корутины. CoroutineScope и GlobalScope

  7. CoroutineContext или как настроить свою корутину. withContext

  8. Работа не Job, или все таки Job? Разбираемся в Job и Deferred

  9. Запуск корутины с помощью Coroutine Builders. launch и async

  10. Dispatchers. Основные виды диспетчеров в корутинах и область их применения

  11. Способы обработок ошибок в корутинах. try-catch, runCatching, CoroutineExceptionHandler, SupervisorJob / supervisorScope

  12. Отмена корутин

  13. Создание своего пользовательского CoroutineScope

1. Шпаргалка по корутнам

Этот раздел - обобщающий для всех последующих. Если вы только открыли статью и не до конца понимаете, что изображено на этой схеме - не переживайте, так как я тоже ниже обо всем это подробно расписано

Шпаргалка по корутинам
Шпаргалка по корутинам

2. Зачем нужны корутины?

Корутины нужны для управления асинхронностью в вашем коде. Прежде чем коснемся этого понятия, давайте представим такой пример:

Вы пишите свое собственное приложение для просмотра картинок из Интернета, которое то и дело общается  с сервером, посылая на него различные сетевые запросы. И проблема в том, что длительность таких запросов порой занимает секунды, в течение которых пользователь не может ничего сделать, так как его главный поток ожидает ответ от сервера и не способен далее обрабатывать код, пока не получит ответ.

Ожидание загрузки фото на главном потоке в течение 3 секунд
Ожидание загрузки фото на главном потоке в течение 3 секунд

Код такого сетевого запроса можно представить примерно так:

fun loadImage() {
    println("Начало загрузки фото")
    Thread.sleep(3000) // Ожидание сервера
    println("Фотография загружена")
}

fun main(args: Array<String>) {
    println("Программа начинает работу")
    loadImage()
    println("Продолжение работы программы...")
}

Метод loadImage() запрашивает у сервера фотографию и ждёт ее в течение 3 секунд (сейчас мы сделали это ожидание искусственно, чтобы упростить код для понимания), прежде чем сервер вернёт ответ. И только после этого наш код возвращает ответ и продолжает свое выполнение:

Скрытый текст

Программа начинает работу
Начало загрузки фото
// 3 секунды паузы
Фотография загружена
Продолжение работы программы...

Все бы ничего, но в течение этих 3 секунд пользователь не способен никак взаимодействовать с приложением, что несет в себе плохой user experience для вашего приложения, и нам бы хотелось убрать это ожидание ответа с главного потока, чтобы пользователь дальше смог продолжить работать с вашей программой.

Но как это сделать?

Есть, как минимум, два варианта:

  1. Сделать сетевой запрос в отдельном потоке, тем самым выполняя его параллельно

  2. Поместить этот запрос внутри корутин, сделав его асинхронным в нашей программе

Давайте подробнее остановимся на этом моменте.

Создавая новый поток, вы запускаете код в нем параллельно вашему основному потоку. Схематично это можно представить так:

Загрузка фото после добавления потоков
Загрузка фото после добавления потоков

Код такого подхода выглядел примерно бы так:

class BackgroundThread() : Thread() {
    override fun run() {
        loadImage()
    }
}

fun loadImage() {
    println("Начало загрузки фото")
    Thread.sleep(3000) // Ожидание сервера
    println("Фотография загружена")
}

fun main(args: Array<String>) {
    println("Программа начинает работу")

    //Создание нового потока для загрузки фото
    val newThread = BackgroundThread()
    newThread.start()

    println("Продолжение работы программы...")
}

Таким образом пользователю теперь не нужно ждать 3 секунды в ожидание ответа от сервера, так как теперь эта работа происходит параллельно (фоново) в другом потоке. Как результат, мы увидим в консоли следующее:

Скрытый текст

Программа начинает работу
Продолжение работы программы...
Начало загрузки фото
Фотография загружена

Схематично, данный код можно визуализировать следующим образом:

Многопоточная загрузка фотографии
Многопоточная загрузка фотографии

Но у такого подхода есть существенный недостаток: создание потоков в программировании далеко небесплатная штука с точки зрения производительности, и потому создание любого нового потока - это накладные ресурсы для вашего программы. Да, иногда без потока не обойтись, когда речь идёт о тяжёлых вычислениях, и они реально сильно ускорят вашу программу, сделав затраты на создание нового потока незначительными. Но в случае сетевого запроса это может быть избыточным, так как такой сетевой запрос не несёт большой нагрузки на процессор, ведь все время ожидания приходится на транспортировку вашего запроса по всемирной паутине туда и обратно, где устройство пользователя просто ждёт ответ, вообще ничего не делая.

В таких случаях лучше воспользоваться корутинами, которые в отличие от потоков не несут больших затрат на свое создание, и добавляют в код асинхронность.

Но что же такое асинхронность?

Асинхронность в программировании - это парадигма программирования, которая позволяет выполнять и обрабатывать операции или задачи без ожидания их завершения. То есть вместо последовательного выполнения задач (как мы это делали в привычном для нас синхронном коде, где каждая операция вызывается друз за другом) асинхронное программирование позволяет коду продолжать выполняться, пока некоторые асинхронные задачи выполняются в фоновом режиме (такие методы мы будем звать suspend, но об этом позже), и обрабатывать их результаты по мере их готовности.

Это достигается за счет использования неблокирующих операций, которые не приостанавливают поток выполнения текущей задачи, а продолжают выполнять код, пока асинхронная операция не завершается, уведомив нас об этом через механизм колбэков.

Наглядно пример асинхронности может продемонстрировать следующий рисунок:

Пример загрузки фото с асинхронностью
Пример загрузки фото с асинхронностью

Как видно из рисунка, когда у нас происходит сетевой вызов, мы не ждём его все 3 секунды на нашем потоке, заблокировав его, а выполняем наш код дальше. Но как только нам вернётся ответ, мы продолжим выполнять код, следующий после получения ответа от нашего сервера.

В этом и заключается главная фишка асинхронности - она не блокирует основной поток, а как бы нарезает его кусочками, что создаёт иллюзию перед пользователем параллельного выполнения кода, хотя по настоящему параллельный код не создаётся (в отличие от потоков).

Причем, если операций много и они ничего не ожидают (как в случае с ответом от сервера), то асинхронность будет достигаться за счет очень быстрого переключения между задачами:

Выполнение 3 задач в асинхронном режиме
Выполнение 3 задач в асинхронном режиме

Но за счёт быстрого переключения между этими операциями пользователь также не замечает разницы, что создаёт то самое впечатление выполнения асинхронности. Классно, да? И создать такой асинхронный код нам позволяют те самые корутины

3. Suspend в Kotlin

Для того, чтобы сказать нашей программе, что не нужно блокировать поток, дожидаясь ответа от определенного метода, используется ключевое слово suspend. Этим словом помечается тот метод, который как раз и способен заблокировать наш поток на длительное время (например, таким примером может послужить тот самый метод сетевого вызова, длительность которого составляет 3 секунды). Но давайте разберемся, как именно работает этот метод под капотом.

Вызывая suspend-метод в своем коде, Котлин под капотом создаёт нарезку из колбэков, примерно следующим образом.

Допустим у нас есть два suspend-метода сетевых вызовов getData1() и getData2(), который мы будем вызывать в нашем методе main():

fun main() = runBlocking {
    val data1: Int = getData1()
    println(data1)

    val data2: String = getData2(data1, "name")
    println(data2)
}

suspend fun getData1(): Int {
    delay(1000)
    return 42
}

suspend fun getData2(data1: Int, name: String): String {
    delay(500)
    return "Результат: $data1, имя: $name"
}

Ключевые слова suspend в этом коде - сигнал котлину о том, что нужно создать некоторый объект типа Continuation, который содержит в себе обратный вызов определенного кода (колбэк), после завершения выполнения suspend-метода.

Упрощённо, этот код преобразится в следующий, создав нарезку из двух колбэков с помощью Continuation:

fun main() = runBlocking {
    val continuation1 = Continuation<Int> { data1 ->
        println(data1) // Вывод data1 после завершения getData1

        val continuation2 = Continuation<String> { data2 ->
            println(data2) // Вывод data2 после завершения getData2
        }
        getData2(data1, "name", continuation2) // Запускаем getData2
    }
    getData1(continuation1) // Запускаем getData1
}

fun getData1(continuation: Continuation<Int>): Unit {
    if (continuation.isCompleted) {
        return
    }
    delayCallback(1000) {
        continuation.resume(42)
    }
}

fun getData2(data1: Int, name: String, continuation: Continuation<String>): Unit {
    if (continuation.isCompleted) {
        return
    }
    delayCallback(500) {
        continuation.resume("Результат: $data1, имя: $name")
    }
}

fun delayCallback(delay: Long, callback: () -> Unit) {
    Thread.sleep(delay)
    callback()
}

Как видно из кода, наши suspend-методы стали обычным методами, которые дополнительно принимают к себе на вход объект типа Continuation, который в свою очередь в конце метода просто вызывает метод resume(), возвращая результат выполнения suspend-метода. Этот возврат с помощью метода resume() - самый обыкновенный колбэк, который позволяет нам выполнять код дальше, после получения результата.

То есть вызывая метод getData1(), мы передаём ему некоторый объект Continuation вместе с колбэком, который говорит что после получения результата из getData1(), нашему коду необходимо вывести этот результат в консоль с помощью команды println() и запустить второй метод getData2(), который на вход примет 2 параметра и ещё один объект Continuation вместе с ещё одним колбэком:

  //...
  val continuation1 = Continuation<Int> { data1 ->
      println(data1) // Вывод data1 после завершения getData1
  
      val continuation2 = Continuation<String> { data2 ->
          //... Колбэк для getData2()
      }
  }
  
  getData1(continuation1) // Запускаем getData1

getData2() в свою очередь сделает аналогичным образом вызов колбэка, который запустит код колбэка (передаваемый в объект Continuation) только после окончания выполнения всего suspend-метода с помощью метода resume().

Вот и всё, никакой магии в suspend нет. С помощью механизма колбэков, которые неявно передаются в Continuation (который в свою очередь создаётся при вызове suspend метода), Котлин искусственно дожидается ответа от долгих suspend-методов, и получая эти результаты передает их дальше, вызывая колбэк с помощью метода resume(). На самом деле все немного сложнее, чем представлено в коде выше, однако этим примером я захотел подчеркнуть принцип работы suspend, для понимания, как он вообще работает, чтобы вы могли грамотно использовать его в своем коде, не помечая все подряд suspend'ом

Кстати, как вы могли заметить, delay(), который создаёт задержку на определенное число миллисекунд в коде - это тоже обыкновенный suspend-метод, работающий аналогичным образом. Под капотом у него тоже есть свой Continuation

Скрытый текст
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

И да, именно потому что delay является suspend методом, вы не сможете запустить его вне coroutineScope, о которых мы поговорим чуть дальше.

Стоит при этом отметить, что ключевое слово suspend встроено прямо в язык Котлин, что позволяет им пользоваться без подключения дополнительных зависимостей к вашему проекту. Это сделано в основном для совместимости вашего кода с другими библиотеками, которым может понадобиться вся мощь корутин, а без suspend её добиться невозможно. Однако многий другой функционал, облегчающий работу с асинхронным миром включен в другие библиотеки, которые предоставили нам в виде отдельных зависимостей разработчики JetBrains.

4. Библиотека kotlinx.corourines

Одну из таких библиотек мы и будем рассматривать в этой статье, поскольку она предоставляет нам много полезных фишек для работы с корутинами. Несмотря на то что, какая-то часть корутинного мира уже встроена на уровне языка Котлин, большая часть необходимого функционала вынесена в отдельную библиотеку kotlinx.coroutines, обновления которой выходят намного чаще, чем обновления языка.

Подключить эту зависимость весьма просто, написав следующий код, если у вас используется Gradle:

dependencies {
	...
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
}

Версию актуальной версии библиотеки вы можете взять из maven-репозитория.

Ну а теперь после ее подключения давайте двинемся дальше и разберемся с основными понятиями корутин.

4. Три способа войти в корутинный мир. runBlocking, CoroutineScope и suspend main

Если вы вызовите в методе main() любой suspend-метод, то компилятор незамедлительно покроет вас матом сообщит о ошибке, и сделает это вполне осознанно. Дело в том, что вызов любой suspend-функции возможен только внутри любого CoroutineScope, который представляет собой область видимости (о них мы поговорим ниже).

Но где взять этот CoroutineScope в самый первый раз? Существует 3 способа сделать это:

  1. С помощью runBlocking

  2. С помощью suspend main()

  3. С помощью coroutineScope

P. S. на самом деле только с помощью runBlocking, так как остальные два способа это переработанный runBlocking под капотом

Рассмотрим первый способ. runBlocking - это метод, который запускает корутину и предоставляет блоку кода внутри свой CoroutineContext. Любой вызов suspend-методов внутри runBlocking блокирует поток (заставляет ожидать выполнение кода). Но если он блокирует поток, зачем тогда он нужен и в чем его профит?

Дело в том, что это один из немногих способов попасть в корутинный мир, и никто не мешает нам в дальнейшем создать новые корутины внутри, переключив их на другие потоки. Поэтому самый простой способ попасть в корутинный мир выглядит именно так:

fun main(args: Array<String>) = runBlocking {
    println("Начало выполнения программы")

    //Запуск suspend функции
    printTextWithDelay()

    println("Продолжение выполнения программы")
}

suspend fun printTextWithDelay() {
    delay(3000)
    println("Текст, напечатанный просто задержки в 3 секунды")
}

Результат выполнения будет примерно следующим:

Скрытый текст

Начало выполнения программы
Текст, напечатанный просто задержки в 3 секунды
Продолжение выполнения программы

Почему здесь мы ждали 3 секунды и только потом напечатали "Продолжение выполнение программы"? Как раз-таки по причине, описанной выше: runBlocking блокирует поток, дожидаясь выполнения всех suspend-методов.

Аналогичного результата можно было бы добиться, написав тот же самый код с помощью suspend main():

suspend fun main(args: Array<String>) {
    println("Начало выполнения программы")

    //Запуск suspend функции
    printTextWithDelay()

    println("Продолжение выполнения программы")
}

suspend fun printTextWithDelay() {
    delay(3000)
    println("Текст, напечатанный просто задержки в 3 секунды")
}

Код выдаст аналогичный результат, как и в первом способе.

Ну и наконец можно воспользоваться CoroutineScope, написав примерно следующий код:

fun main(args: Array<String>) {
    println("Начало выполнения программы")

    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(Job() + Dispatchers.Default)

    // Запуск корутины в созданном scope
    scope.launch {
        printTextWithDelay()
    }

    println("Продолжение выполнения программы")
}

Результат ее выполнения будет таким:

Скрытый текст

Начало выполнения программы
Продолжение выполнения программы

В данном случае мы создаем новый CoroutineScope, определяя его контекст, после чего с помощью билдерп корутин launch() запускаем нашу корутину. Не переживайте если вы не поняли значение слов scope, context и builder, это будет рассмотрено уже ниже.

Почему не вывелся текст после задержки 3 секунд? Потому что программа завершилась раньше, нежели запустилась наша первая корутина с помощью билдера launch() (о нем мы поговорим чуть ниже, а пока можно запомнить - что с помощью launch мы способны создавать новые корутины, которые будут запускаться асинхронно). Но если мы дождемся наш scope явно с помощью метода .join(), то результат станет аналогичным первым двум:

fun main(args: Array<String>) = runBlocking {
    println("Начало выполнения программы")

    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(Job() + Dispatchers.Default)

    // Запуск корутины в созданном scope
    val job = scope.launch {
        printTextWithDelay()
    }
    job.join() // Дожидаеся выполнения корутины и идем дальше

    println("Продолжение выполнения программы")
}
Скрытый текст

Начало выполнения программы
Текст, напечатанный просто задержки в 3 секунды
Продолжение выполнения программы

6. Область видимости корутины. CoroutineScope и GlobalScope

Давайте теперь разберемся, что же такое область видимости корутины Scope и зачем она вообще нужна.

На самом деле CoroutineScope - это очень простая вещь, которая нужна только для 2х вещей:

  1. Она хранит в себе CoroutineContext

  2. Она гарантирует, что все корутины, связанные с данным Scope, точно выполнятся (если внутри них не возникнет ошибка или мы их явно не отменим)

Таким образом в рамках одного скоупа мы можем запускать множество корутин с помощью любых билдеров, и в любой момент отменять их в этом скоупе. Простыми словами - CoroutineScope - это место жизни наших корутин, которое описывается корутин контекстом, о котором мы поговорим чуть ниже.

Теперь давайте посмотрим на те CoroutineScope, которые мы можем с вами создать.

Первым и самым простым Scope является GlobalScope.

Скрытый текст
@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

Данный Scope имеет в себе пустой корутин контекст, и будет выполняться до тех пор, пока не выполнятся все дочерние корутины внутри него. Если вы пришли из мира Android-разработки, то относитесь к данному Scope с осторожностью, так как он не привязан к жизненному циклу ваших экранов и поэтому ведёт себя как демон-поток (выполняясь фоново, даже когда метод, в котором вы его вызвали, уже завершён).

Когда может быть полезен этот Scope? Ответ простой: всегда, когда вам нужно что-то сделать, не привязываясь к конкретным жизненным циклами ваших компонентов. Нужно сделать длительную операцию во время работы вашего приложения или сделать что-то асинхронно не завязываясь на месте в UI вашей программы - смело можно использовать GlobalScope для этих задач. Главное помните, как именно он себя ведёт и используйте его в зависимости от контекста вашей задачи.

А что делать, если вам захочется использовать свой собственный корутин скоуп, который нужно будет гибко настроить в зависимости от специфики вашей программы? На помощь придет класс CoroutineScope.

С его помощью вы сами можете настроить свой CoroutineContext и запускать в нем свои корутины:

fun main() = runBlocking {
    // Создаем контекст с использованием Job и Dispatchers.IO
    val scope = CoroutineScope(Job() + Dispatchers.IO)

    // Запускаем корутину в заданном контексте
    val job = scope.launch {
        // Выполняем блокирующий код в корутине
        println("Запущен в ${Thread.currentThread().name}")
        delay(1000)
        println("Закончен в ${Thread.currentThread().name}")
    }

    // Ожидаем завершения job
    job.join()
    println("Все корутины завершены.")
}

То есть по сути, единственное отличие вашего GlobalScope от CoroutineScope заключается в том, что во втором случае вы можете самостоятельно настроить свой корутин контекст, в отличие от первого, где он всегда равен EmptyCoroutineContext

7. CoroutineContext или как настроить свою корутину. withContext

Каждая корутина имеет свой контекст. Даже когда вы создаёте новую корутину, она либо наследует родительский контекст, либо имеет EmptyCoroutineContext, если родителя просто нет. Но что же такое контекст?

Контекст корутины - это, по сути, обычный Map из объектов типа Element, которые описывают "характеристики" нашей корутины. В качестве таких Element характеристик могут быть:

  • CoroutineName

  • CoroutineId

  • Dispatcher корутины

  • Job корутины

  • и другие

То есть контекст корутины просто описывает нашу корутину: как она называется, на каких потоках выполняется, какую Job выполняет и так далее.

Возможно ли изменить этот контекст? Да, и на помощь здесь придет функция withContext(), в которую мы можем передать контекст, на который хотим заменить текущий контекст.

fun main() = runBlocking {
    println("Running in context: ${coroutineContext[CoroutineName]}")
    
    // Переключение контекста на Dispatchers.IO
    withContext(Dispatchers.IO) {
        println("Running in context: ${coroutineContext[CoroutineName]} on ${Thread.currentThread().name}")
        // Выполнение какой-то IO-операции
    }
    
    println("Back to context: ${coroutineContext[CoroutineName]}")
}

Заметьте, withContext() заменяет контекст, а не создаёт новую корутину.
Причем определяя новый контекст, вы можете пользоваться оператором плюс, как в примере выше. Почему это возможно? Потому что разработчики библиотеки позаботились о реализации данного оператора в своей библиотеке:

Скрытый текст
public operator fun plus(context: CoroutineContext): CoroutineContext =
	if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
		context.fold(this) { acc, element ->
			val removed = acc.minusKey(element.key)
			if (removed === EmptyCoroutineContext) element else {
				// make sure interceptor is always last in the context (and thus is fast to get when present)
				val interceptor = removed[ContinuationInterceptor]
				if (interceptor == null) CombinedContext(removed, element) else {
					val left = removed.minusKey(ContinuationInterceptor)
					if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
						CombinedContext(CombinedContext(left, element), interceptor)
				}
			}
		}

Проще говоря, код выше добавляет элемент контекста, если такого элемента еще не было, иначе заменяет его (например, если мы захотим переопределить в дальнейшем Dispatcher, речь о котором будет ниже).

Разобравшись зачем же нужен контекст, давайте подробнее поговорим о его основных Elemet'ах.

8. Работа не Job, или все таки Job? Разбираемся в Job и Deferred

Job - это Element контекста корутины, который описывает текущее состояние корутины (активна ли она в данный момент времени, отменена ли она, какие дочерние Job имеет и есть ли у нее родительская Job и тд).

Скрытый текст
public interface Job : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key<Job>

    @ExperimentalCoroutinesApi
    public val parent: Job?

    public val isActive: Boolean

    public val isCompleted: Boolean

    public val isCancelled: Boolean

    @InternalCoroutinesApi
    public fun getCancellationException(): CancellationException

    public fun start(): Boolean
	
    public fun cancel(cause: CancellationException? = null)
	
    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
    public fun cancel(): Unit = cancel(null)
	
    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
    public fun cancel(cause: Throwable? = null): Boolean

    public val children: Sequence<Job>

    @InternalCoroutinesApi
    public fun attachChild(child: ChildJob): ChildHandle

    public suspend fun join()
	
    @Suppress("DeprecatedCallableAddReplaceWith")
    @Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
        "Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
        "The job to the right of `+` just replaces the job the left of `+`.",
        level = DeprecationLevel.ERROR)
    public operator fun plus(other: Job): Job = other
}

Как создавать и управлять Job, мы поговорим разделом ниже. Но прежде чем мы его коснемся обсудим ещё один важный тип - тип Deferred.

Если вспомнить определение корутин, то мы столкнемся с асинхронностью, из которой фактически следует, что никто не знает когда закончится та или иная асинхронная операция. Но как вернуть результат из корутины и/или дождаться его возвращения? На помощь придет тип Deferred, который является наследником Job (а значит наследует все его свойства и методы) и дополнительно имеет метод await, позволяющий нам дождаться выполнения работы и получить результат из корутины:

Скрытый текст
public interface Deferred<out T> : Job {
    public suspend fun await(): T

    public val onAwait: SelectClause1<T>

    @ExperimentalCoroutinesApi
    public fun getCompleted(): T

    @ExperimentalCoroutinesApi
    public fun getCompletionExceptionOrNull(): Throwable?
}

То есть Deferred - это обычная расширенная Job, которая дополнительно предоставляет нам возможность дождаться получения результата.

Окей, с этим разобрались. Но как создавать и работать с Job и Deferred? Забегая вперёд: с помощью специальных билдеров. Давайте теперь разберемся с этим.

9. Запуск корутины с помощью Coroutine Builders. launch и async

Для того, чтобы создать Job нужно воспользоваться корутин билдером launch(), который после вызова в каком-то контексте вернёт нам ссылку на полученную Job:

fun main(args: Array<String>) = runBlocking {
    println("Начало выполнения программы")

    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(Job() + Dispatchers.Default)

    // Создание и запуск корутины
    val job = scope.launch {
        printTextWithDelay()
    }

    println("Продолжение выполнения программы")
    job.join() // Дожидаеся выполнения корутины
}
Скрытый текст

Начало выполнения программы
Продолжение выполнения программы
Текст, напечатанный просто задержки в 3 секунды

Причем если мы посмотрим исходники этого метода:

Скрытый текст
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

то мы можем заметить, что:

  1. он обязательно должен вызываться на каком-то корутин контексте (чаще всего это унаследованный контекст от родительской корутины)

  2. Мы можем дополнительно указать дополнение к нашему CoroutineContext при вызове этого метода (именно дополнение контекста, а не новый):

val job = scope.launch(Dispatchers.IO) {
    printTextWithDelay()
}
  1. В качестве параметра мы можем дополнительно указать ленивый метод инициализации, который запустит корутину не сразу, а только после явного вызова метода start() на нашей Job:

fun main(args: Array<String>) = runBlocking {
    println("Начало выполнения программы")

    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(Job() + Dispatchers.Default)

    // Создание корутины
    val job = scope.launch(
        context = Dispatchers.IO,
        start = CoroutineStart.LAZY
    ) {
        printTextWithDelay()
    }

    println("Продолжение выполнения программы")
    job.start() // Запуск корутины
    job.join() // Дожидаеся выполнения корутины
}

Результат аналогичен предыдущему:

Скрытый текст

Начало выполнения программы
Продолжение выполнения программы
Текст, напечатанный просто задержки в 3 секунды

И да, помните про атрибут у Job, который содердит внутри себя все дочерние job? Так вот, в заполнении этих атрибутов никакой магии нет - этим как раз таки и занимается билдер launch. Это можно увидеть, провалившись в исходниках данного метода:

Скрытый текст
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

...

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        if (initParentJob) initParentJob(parentContext[Job])
    }
	...
}

...

protected fun initParentJob(parent: Job?) {
	...
	@Suppress("DEPRECATION")
	val handle = parent.attachChild(this)
	parentHandle = handle
	...
}

При этом родительская Job определяется в том же методе из initParentJob() из контекста:

Скрытый текст
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
    final override val key: CoroutineContext.Key<*> get() = Job
	
	override val parent: Job?
        get() = parentHandle?.parent 
		
	protected fun initParentJob(parent: Job?) {
		...
		@Suppress("DEPRECATION")
		val handle = parent.attachChild(this)
		parentHandle = handle
		...
	}
	
	...
}

С launch разобрались. А что насчёт async?

Async - это похожий билдер корутин, но возвращает он уже не Job, а его наследника Deffered. При этом параметры этого билдера очень похожи на launch:

Скрытый текст
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

Работа с ним также очень похожа на работу с launch, только теперь у нас есть возможность дождаться и вернуть результат с помощью метода await():

fun main(args: Array<String>) = runBlocking {
    println("Начало выполнения программы")

    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(Job() + Dispatchers.Default + CoroutineName("123"))

    // Создание и запуск корутины
    val deferred = scope.async {
        delay(3000)
        return@async Math.random()
    }

    println("Продолжение выполнения программы")

    // Дожидаеся результата из корутины
    val result = deferred.await()
    println("Результат = $result")
}
Скрытый текст

Начало выполнения программы
Продолжение выполнения программы
Результат = 0.8867756197456322

Подытожим выше сказанное:

  1. Билдеры корутин позволяют создать новую корутину, предоставляя возможность для нее переопределить родительский контекст. Если такого переопределения нет, контекст будет автоматически унаследован от родителя

  2. Билдер launch возвращает объект типа Job, а билдер async - типа Deferred (предоставляя возможность вернуть значение из корутины)

  3. Используя launch и async можно выстроить свою иерархию работ, при этом мы можем быть уверены что атрибуты children и parent в Job будут заполнены правильно при вызове этих билдеров

10. Dispatchers. Основные виды диспетчеров в корутинах и область их применения

Давайте разберемся ещё с одним элементом контекста корутины, а именно с диспетчером.

Диспатчер определяет на каких потоке(ах) будет выполняться ваша корутина. Существует множество диспатчеров, но мы разберём основные 4 вида и рассмотрим область их применения:

  1. Diapatchers.Main - данный диспатчер говорит, что весь код вашей корутины должен выполняться в главном потоке. Его можно использовать, например, для работы с UI, так как в противном случае долгие операции (такие как сетевые вызовы, работа с базами данных или долгие расчеты) могут создать серьезную нагрузку на вашем главном потоке, а в следствие с этим и лаги на экране пользователя.

  2. Dispatchers.Default - данный вид диспатчера распределяет корутину на число потоков равное числу ядер. Его  рекомендуется использовать для долгих/интенсивных вычислений, связанных с работой CPU. Он использует ограниченный пул общих потоков

  3. Dispatchers.IO - этот вид диспатчера распределяет корутину на пуле общих потоков (обычно это 64 потока, но в зависимости от системы это число может изменяться). Под капотом он имеет сложную логику, которая рассчитывает, когда нужно добавить ещё один поток для выпололнения нашей корутины. И если не углубляться далеко в архитектуру нашего железа, то можно сказать, что этот вид диспатчера лучше применять для "ожидающих" вызовов (сетевые вызовы, работа с базами данных, запись и чтение в файлы). Это связано с тем что под капотом данного диспатчера также зашит таймер, который через время пробуждает так называемые "спящие" (ожидающие) корутины

  4. Diapatchera.Unconfined - диспатчер, который начинается на потоке, на котором его запустили и после первой приостановки возобновляется на любом случайном потоке. Некоторые источники рекомендуют его использовать, когда нам не важно, на каком потоке будет выполняться корутина, но на практике я лично так и не встретил такой ситуации когда бы он понадобился. Более подробно с ним можно ознакомиться на странице официальной документации

11. Способы обработок ошибок в корутинах. Try-catch, runCatching, CoroutineExceptionHandler, SupervisorJob / supervisorScope

Теперь коснемся одной из самых важных тем, от которой напрямую зависит качество вашего продукта. Мы поговорим о различных способах обработки ошибок в корутинах.

Давайте для начала рассмотрим, что произойдет если в корутине возникнет ошибка. Допустим, во время нашего сетевого вызова возникнет ошибка:

suspend fun loadImage() {
    withContext(Dispatchers.IO) {
        println("Начало загрузки фото")
        Thread.sleep(3000) // Ожидание сервера
        throw IOException("Ошибка во время сетевого запроса")
    }
}

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского scope
    val scope = CoroutineScope(EmptyCoroutineContext)
    
	// Запуск корутины с загрузкой фото
    val job1 = scope.launch { 
        loadImage()
    }
    
	// Дожидаемся выполнения job1
    job1.join()
}

Это может быть ошибка любого рода: исключение при работе с сетью, получение null'а там, где его не ждали и т.д. В данном случае корутина, на которой возникло это исключение, просто отменится, уничтожив нашу программу с ошибкой:

Скрытый текст

Начало загрузки фото
Exception in thread "main" java.io.IOException: Ошибка во время сетевого запроса

Но это пол беды. Если наша программа имела несколько корутин в одном Scope, то при возникновении ошибки в одной из корутин, она отменяет и родительские корутины, и дочерние.

suspend fun loadImage() {
    withContext(Dispatchers.IO) {
        println("Начало загрузки фото")
        Thread.sleep(3000) // Ожидание сервера
        throw IOException("Ошибка во время сетевого запроса")
    }
}

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского scope
    val scope = CoroutineScope(EmptyCoroutineContext)

    // Запуск корутины с загрузкой фото
    val job1 = scope.launch {
        loadImage()
    }

    val job2 = scope.launch {
        println("Начало job2")
        delay(5000)
        println("Завершение job2")
    }

    // Дожидаемся выполнения job1
    job1.join()
    job2.join()
}

В результате мы увидим:

Начало job2
Начало загрузки фото
Exception in thread "DefaultDispatcher-worker-4" java.io.IOException: Ошибка во время сетевого запроса

Это не очевидный момент, который необходимо железобетонно запомнить, так как одно необработанное исключение может послужить проблемой в вашей программе.

Теперь давайте рассмотрим основные способы обработки исключений.

  1. Первый способ это стандартная обработка с помощью try-catch. Да, дёшево и сердито, без какой-либо магии мы можем избежать краша нашей программы, просто обработав возможную ошибку в стандартном блоке обработчике ошибок:

suspend fun loadImage() {
    withContext(Dispatchers.IO) {
        println("Начало загрузки фото")
        Thread.sleep(3000) // Ожидание сервера
        throw IOException("Ошибка во время сетевого запроса")
    }
}

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского scope
    val scope = CoroutineScope(EmptyCoroutineContext)

    // Запуск корутины с загрузкой фото
    val job1 = scope.launch {
        try {
            loadImage()
        } catch (e: Exception) {
            println("Ошибка обработана")
        }
    }

    val job2 = scope.launch {
        println("Начало job2")
        delay(5000)
        println("Завершение job2")
    }

    // Дожидаемся выполнения job1
    job1.join()
    job2.join()
}

Результат:
Начало загрузки фото
Начало job2
Ошибка обработана
Завершение job2

Причем, как вы заметили, даже если мы вызывали withContext (который очень желателен для сетевых вызовов), то вам необязательно дважды обрабатывать ошибку в своем коде. withContext не создаёт новую корутину, он лишь меняет контекст, и ошибка также будет обработана как в примере выше.

  1. Второй способ - это использование runCatching:

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского scope
    val scope = CoroutineScope(EmptyCoroutineContext)

    // Запуск корутины с загрузкой фото
    val job1 = scope.launch {
        val result = kotlin.runCatching {
            loadImage()
        }.onSuccess {
            // ... блок кода при успешном выполнении корутины
            println("Загрузка прошла успешно")
        }.onFailure {
            // ... блок кода при возникновении ошибки
            println("Ошибка во время загрузки")
        }
    }

    // Дожидаемся выполнения job1
    job1.join()
}

Результат:
Начало загрузки фото
Ошибка во время загрузки

Данный метод позволяет определить нам действия при успешном выполнении корутины и действия при ошибки.
Если же мы откроем исходники данного метода, то увидим, что никакой магии на самом деле нет:

Скрытый текст
public inline fun <R> runCatching(block: () -> R): Result<R> {
    return try {
        Result.success(block())
    } catch (e: Throwable) {
        Result.failure(e)
    }
}

Под капотом runCatching использует тот же самый try-catch, но, на мой взгляд, код с ним выглядит несколько красивее.

  1. Теперь рассмотрим подробнее обработку ошибок с помощью CoroutineExceptionHandler:

fun main(args: Array<String>) = runBlocking {
    // Создание CoroutineExceptionHandler
    val coroutineExceptionHandler = CoroutineExceptionHandler { context, throwable ->
        println("Ошибка $throwable в $context")
    }

    // Создание пользовательского scope c CoroutineExceptionHandler в контексте
    val scope = CoroutineScope(coroutineExceptionHandler)

    // Запуск корутины с загрузкой фото
    val job1 = scope.launch {
        loadImage()
    }

    // Дожидаемся выполнения job1
    job1.join()
}

Данный хэндлер является элементом контекста корутины, а потому мы можем применить его для всех корутин определенного Scope, указав, как именно обрабатывать любую ошибку, возникшую в этих корутинах.

Это может быть полезно, если мы хотим обработать разом вообще все ошибки в своем корутине контексте, как в примере выше. Но это можно же посчитать и минусом: мы не можем точно сказать, в какой именно корутине возникла ошибка.

  1. И, наконец, рассмотрим ещё один способ обработки корутин, с помощью SupervisorJob и supervisorScope. Начнем с первого.

SupervisorJob - это элемент контекста, указав который мы как бы говорим, что возникшую ошибку в корутине не нужно прокидывать вверх, как это происходит обычно. Нужно только отменить эту корутину и её дочерние. Давайте посмотрим пример:

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского scope с SupervisorJob в контексте
    val scope = CoroutineScope(SupervisorJob())

    val job1 = scope.launch {
        println("job1 start")
        delay(2000)
        throw IOException("Ошибка во время сетевого запроса")
    }

    val job2 = scope.launch {
        println("job2 start")
        delay(4000)
        println("job2 end")
    }

    val job3 = scope.launch {
        println("job3 start")
        delay(3000)
        println("job3 end")
    }

    // Дожидаемся выполнения всех job
    job1.join()
    job2.join()
    job3.join()
}

В результате выполнения ошибка возникающая в job1 не отменит остальные корутины в данном scope, так как в launch этой джобы мы передаём наш supervisorJob:

Результат:
job1 start
job2 start
job3 start
Exception in thread "DefaultDispatcher-worker-1" java.io.IOException: Ошибка во время сетевого запроса
job3 end
job2 end

Теперь давайте посмотрим на этот код:

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского scope с SupervisorJob в контексте
    val scope = CoroutineScope(SupervisorJob())

    val job = scope.launch {
        val job1 = launch {
            println("job1 start")
            delay(2000)
            throw IOException("Ошибка во время сетевого запроса")
        }

        val job2 = launch {
            println("job2 start")
            delay(4000)
            println("job2 end")
        }

        val job3 = launch {
            println("job3 start")
            delay(3000)
            println("job3 end")
        }

        job1.join()
        job2.join()
        job3.join()
    }

    // Дожидаемся выполнения всех job
    job.join()
}

Результат:
job1 start
job3 start
job2 start
Exception in thread "DefaultDispatcher-worker-4" java.io.IOException: Ошибка во время сетевого запроса

В результате выполнения все три джобы отменяется. Но почему в первом случае все работало нормально, а тут все отменилось? Казалось бы, мы же использовали SupervisorJob в первой джобе. Это и есть одна из самых распространенных ошибок: в данном случае вложенные launch - это дочерние корутины job1, а примером выше все три job принадлежат одному scope. Наглядно это можно изобразить так:

Отмена вложенных корутин
Отмена вложенных корутин

А в первом случае, когда мы запускали все корутины от одного Scope, это выглядело так:

Отмена корутин в единном Scope
Отмена корутин в единном Scope

Работа с SupervisorScope выглядит аналогично, за тем исключением, что мы работаем со скоупом:

fun main(args: Array<String>) = runBlocking {
    supervisorScope {
        val job1 = launch {
            println("job1 start")
            delay(2000)
            throw IOException("Ошибка во время сетевого запроса")
        }

        val job2 = launch {
            println("job2 start")
            delay(4000)
            println("job2 end")
        }

        val job3 = launch {
            println("job3 start")
            delay(3000)
            println("job3 end")
        }

        // Дожидаемся выполнения всех job
        job1.join()
        job2.join()
        job3.join()
    }
}

И опять никакой магии нет. Главное внимательно смотреть в каком scope вы создаёте и запускаете свои корутины, и этот корутинный мир станет чуточку лучше :)

12. Отмена корутин

Этот пункт я не случайно оставил последним, и сейчас объясню почему. Давайте посмотрим следующий пример отмены корутин:

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(EmptyCoroutineContext)

    // Запускаем корутину
    val job1 = scope.launch {
        println("Начало job")

        launch {
            println("Старт дочерней корутины")
            delay(4000)
            println("Конец дочерней корутины")
        }

        delay(3000)
        println("Конец job")
    }

    // Старт другой корутины в том же Scope
    val job2 = scope.launch {
        println("Старт другой job")
        delay(5000)
        println("Конец другой job")
    }

    // Небольшая пауза, так как запуск корутин происходит не мгновенно
    // и отмена может быть раньше, чем job запустится
    delay(500)

    // Отменяем job1 (и как следствие, все дочерние корутины)
    job1.cancel()

    job1.join()
    job2.join()
}

Результат:
Начало job
Старт дочерней корутины
Старт другой job
Конец другой job

Для отмены job1 мы вызываем метод cancel(), который отменяет данную корутины и все дочерние корутины (job2 и job3, соответственно). Стоит отметить, что отмена происходит не мгновенно: мы как бы говорим котлину, что нужно отменить это джобу, а он уже при первой возможности этим займётся. Но как он понимает, какие корутины (а именно дочерние) нужно отменить?

А происходит это очень просто - путем генерации исключения CancelationException. Только данный вид исключения отменяет все корутины вниз, не отменяя их вверх и это зашито под капотом корутинного мира.

Вы можете это проверить, сгенерировав исключение CancelationException в любой вашей корутине, и увидев что родительская Job не будет отменяться, а вот дочерние закончат свое выполнение:

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(EmptyCoroutineContext)

    // Запускаем корутину
    val job1 = scope.launch {
        println("Начало job")

        launch {
            println("Старт дочерней корутины")
            delay(4000)
            println("Конец дочерней корутины")
        }

        delay(3000)
        throw CancellationException() // Аналог cancel()
        println("Конец job")
    }

    // Старт другой корутины в том же Scope
    val job2 = scope.launch {
        println("Старт другой job")
        delay(5000)
        println("Конец другой job")
    }

    // Небольшая пауза, так как запуск корутин происходит не мгновенно
    // и отмена может быть раньше, чем job запустится
    delay(500)

    job1.join()
    job2.join()
}

Аналогично вы можете отменять не отдельные джобы, а целые scope, вызывая все тот же метод cancel():

fun main(args: Array<String>) = runBlocking {
    // Создание пользовательского CoroutineScope
    val scope = CoroutineScope(EmptyCoroutineContext)

    // Запускаем корутину
    scope.launch {
        println("Начало job")

        launch {
            println("Старт дочерней корутины")
            delay(4000)
            println("Конец дочерней корутины")
        }

        delay(3000)
        println("Конец job")
    }

    // Старт другой корутины в том же Scope
    scope.launch {
        println("Старт другой job")
        delay(5000)
        println("Конец другой job")
    }

    // Небольшая пауза, так как запуск корутин происходит не мгновенно
    // и отмена может быть раньше, чем job запустится
    delay(500)

    //Отменем все корутины в заданном scope
    scope.cancel()
}

Результат:
Начало job
Старт дочерней корутины
Старт другой job

13. Создание пользовательского Scope

Разобравшись со всеми основными моментами в корутинах, мы можем самостоятельно попробовать создать свой пользовательский CoroutineScope, сделав его контекст настраиваемым через параметры конструктора. Выглядеть это будет это примерно так:

/**
 * CoroutineScope для фоновой загрузки файлов в рамах отдельной фичи
 * Все возникшие сетевые ошибки обрабатываются в [errorCallback]
 */
class LoadCoroutineScope(
    private val parentCoroutineContext: CoroutineContext = EmptyCoroutineContext,
    private val dispatcher: CoroutineDispatcher = Dispatchers.IO,
    private val coroutineName: CoroutineName = CoroutineName(DEFAULT_COROUTINE_NAME),
    private val errorCallback: (CoroutineContext, Throwable) -> Unit
) : CoroutineScope {

    override val coroutineContext: CoroutineContext
        get() = parentCoroutineContext + dispatcher + coroutineName + SupervisorJob() +
                CoroutineExceptionHandler { coroutineContext, throwable -> 
                    errorCallback(coroutineContext, throwable)
                }
    
    companion object {
        private const val DEFAULT_COROUTINE_NAME = "Loading CoroutineScope"
    }
}

Как видите, ничего сложного в своем Scope нет: нам достаточно просто настроить его контекст под наши цели и дальше запускать от него корутины. Теперь мы можем вызвать наш собственный CoroutineScope и при желании настроить его запуск и отмену в соответствии с жизненным циклом нашего приложения.

На самом деле, вы можете сделать даже собственный вид Job, но, пожалуй, это не будет входить в рамки данной статьи, а останется для кого-то домашним заданием.

Заключение

В этой статье я постарался раскрыть вам всю магию корутинного мира, обращаясь лишь к небольшой части исходников библиотеки. Надеюсь, для кого-то эти старания были не зря, и это непременно поможет им на практике и собеседованиях.

Но я дополнительно рекомендую после прочтения статьи немного попрактиковаться с созданием корутин, параллельно заглядывая в исходники библиотеки: это, на мой взгляд, поможет намного лучше осветить оставшиеся темные места, раскрыв окончательно пелену неизвестности корутинного мира.

Теги:
Хабы:
Всего голосов 25: ↑25 и ↓0+29
Комментарии12

Публикации

Истории

Работа

Ближайшие события

3 – 18 октября
Kokoc Hackathon 2024
Онлайн
10 – 11 октября
HR IT & Team Lead конференция «Битва за IT-таланты»
МоскваОнлайн
25 октября
Конференция по росту продуктов EGC’24
МоскваОнлайн
7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн