Современный подход к конкурентности в Android: корутины в Kotlin

Автор оригинала: Geoffrey Métais
  • Перевод
Привет, Хабр!

Напоминаем, что у нас уже открыт предзаказ на долгожданную книгу о языке Kotlin из знаменитой серии Big Nerd Ranch Guides. Сегодня мы решили предложить вашему вниманию перевод статьи, рассказывающей о корутинах Kotlin и о правильной работе с потоками в Android. Тема обсуждается очень активно, поэтому для полноты картины также рекомендуем посмотреть эту статью с Хабра и этот подробный пост из блога компании Axmor Software.

Современный фреймворк для обеспечения конкурентности в Java/Android учиняет ад обратных вызовов и приводит к блокирующим состояниям, так как в Android нет достаточно простого способа гарантировать потокобезопасность.

Корутины Kotlin – это очень эффективный и полный инструментарий, позволяющий гораздо проще и производительнее управлять конкурентностью.

Приостановка и блокирование: в чем разница

Корутины не заменяют потоков, а скорее дают фреймворк для управления ими. Философия корутин заключается в определении контекста, позволяющего ожидать, пока завершатся фоновые операции, не блокируя при этом основного потока.

Цель корутин в данном случае – обойтись без обратных вызовов и упростить конкуренцию.

Простейший пример

Для начала возьмем самый простой пример: запустим корутину в контексте Main (главный поток). В нем мы извлечем изображение из потока IO и отправим это изображение на обработку обратно в Main.

launch(Dispatchers.Main) {
    val image = withContext(Dispatchers.IO) { getImage() } // получить из контекста IO
    imageView.setImageBitmap(image) // Возвращаемся в главный поток
}

Код прост как однопоточная функция. Причем, пока getImage выполняется в выделенном пуле потоков IO, главный поток свободен и может взяться за любую другую задачу! Функция withContext приостанавливает текущую корутину, пока работает ее действие (getImage()). Как только getImage() возвратится и looper из главного потока станет доступен, корутина возобновит работу в главном потоке и вызовет imageView.setImageBitmap(image).

Второй пример: теперь нам требуется, чтобы были выполнены 2 фоновые задачи, чтобы ими можно было воспользоваться. Мы применим дуэт async/await, чтобы две эти задачи выполнялись параллельно, и воспользуемся их результатом в главном потоке, как только обе задачи будут готовы:

val job = launch(Dispatchers.Main) {
    val deferred1 = async(Dispatchers.Default) { getFirstValue() }
    val deferred2 = async(Dispatchers.IO) { getSecondValue() }
    useValues(deferred1.await(), deferred2.await())
}
job.join() // приостанавливает выполнение актуальной корутины, пока задача не будет выполнена

async подобен launch, но возвращает deferred (сущность Kotlin, эквивалентная Future), поэтому ее результат можно получить при помощи await(). При вызове без параметров она работает в контексте, задаваемом по умолчанию для текущей области видимости.

Опять же, главный поток остается свободен, пока мы дожидаемся наших 2 значений.
Как видите, функция launch возвращает Job, который можно использовать для ожидания, пока операция завершится – это делается при помощи функции join(). Она работает как и в любом другом языке, с той оговоркой, что просто приостанавливает корутину, а не блокирует поток.

Диспетчеризация

Диспетчеризация – ключевая концепция при работе с корутинами. Это действие, позволяющее «перепрыгнуть» от одного потока к другому.

Рассмотрим, как в java выглядит эквивалент для диспетчеризации в Main, то есть,

runOnUiThread:
public final void runOnUiThread(Runnable action) {
    if (Thread.currentThread() != mUiThread) {
        mHandler.post(action); // Диспетчеризация
    } else {
        action.run(); // Немедленное выполнение
    }
}

Реализация контекста Main для Android – это диспетчер на основе Handler. Итак, это действительно очень подходящая реализация:

launch(Dispatchers.Main) { ... }
        vs
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) { ... }


// Начиная с kotlinx 0.26:
launch(Dispatchers.Main.immediate) { ... }

launch(Dispatchers.Main) посылает Runnable в Handler, так что его код выполняется не сразу.

launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) немедленно выполнит свое лямбда-выражение в текущем потоке.

Dispatchers.Main гарантирует, что когда корутина возобновит работу, она будет направлена в главный поток; кроме того, Handler используется здесь как нативная реализация Android для отправки в цикл событий приложения.

Точная реализация выглядит так:

val Main: HandlerDispatcher = HandlerContext(mainHandler, "Main")

Вот хорошая статья помогающая разобраться в тонкостях диспетчеризации в Android:
Understanding Android Core: Looper, Handler, and HandlerThread.

Контекст корутины

Контекст корутины (он же – диспетчер корутины) определяет, в каком потоке будет выполняться ее код, что делать, если будет выброшено исключение, и обращается к родительскому контексту для распространения отмены.

val job = Job()
val exceptionHandler = CoroutineExceptionHandler {
    coroutineContext, throwable -> whatever(throwable)
}

launch(Disaptchers.Default+exceptionHandler+job) { ... }

job.cancel() отменит все корутины, родителем которых является job. A exceptionHandler получит все исключения, выброшенные в этих корутинах.

Область видимости

Интерфейс coroutineScope упрощает обработку ошибок:
Если откажет какая-либо из его дочерних корутин, то откажет и вся область видимости, и все дочерние корутины будут отменены.

В примере async, если извлечь значение не удалось, а другая задача при этом продолжила работу – у нас возникает поврежденное состояние, и с этим надо что-то делать.

При работе с coroutineScope функция useValues будет вызываться лишь в случае, если извлечение обоих значений прошло успешно. Также, если deferred2 откажет, deferred1 будет отменена.

coroutineScope { 
    val deferred1 = async(Dispatchers.Default) { getFirstValue() }
    val deferred2 = async(Dispatchers.IO) { getSecondValue() }
    useValues(deferred1.await(), deferred2.await())
}

Также можно “поместить в область видимости” целый класс, чтобы задать для него контекст CoroutineContext по умолчанию и использовать его.

Пример класса, реализующего интерфейс CoroutineScope:

open class ScopedViewModel : ViewModel(), CoroutineScope {
    protected val job = Job()
    override val coroutineContext = Dispatchers.Main+job

    override fun onCleared() {
        super.onCleared()
        job.cancel()
    }
}

Запуск корутин в CoroutineScope:

Диспетчер launch или async, задаваемый по умолчанию, теперь становится диспетчером актуальной области видимости.

launch {
    val foo = withContext(Dispatchers.IO) { … }
    // лямбда-выражение выполняется в контексте CoroutineContext области видимости 
    …
}

launch(Dispatchers.Default) {
    // лямбда-выражение выполняется в задаваемом по умолчанию пуле потоков
    …
}

Автономный запуск корутины (вне какого-либо CoroutineScope):

GlobalScope.launch(Dispatchers.Main) {
    // лямбда-выражение выполняется в главном потоке.
    …
}

Можно даже определить область видимости для приложения, задав диспетчер Main по умолчанию:

object AppScope : CoroutineScope by GlobalScope {
    override val coroutineContext = Dispatchers.Main.immediate
}

Замечания

  • Корутины ограничивают интероперабельность с Java
  • Ограничивают изменяемость во избежание блокировок
  • Корутины предназначены для ожидания, а не для организации потоков
  • Избегайте I/O в Dispatchers.DefaultMain…) — для этого предназначен Dispatchers.IO
  • Потоки ресурсозатратны, поэтому используются однопоточные контексты
  • Dispatchers.Default основан на ForkJoinPool, появившемся в Android 5+
  • Корутины можно использовать посредством каналов

Избавляемся от блокировок и обратных вызовов при помощи каналов

Определение канала из документации JetBrains:

Канал Channel концептуально очень похож на BlockingQueue. Ключевое отличие заключается в том, что он не блокирует операцию put, он предусматривает приостанавливающий send (или неблокирующий offer), а вместо блокирования операции take предусматривает приостанавливающий receive.


Акторы

Рассмотрим простой инструмент для работы с каналами: Actor.

Actor, опять же, очень похож на Handler: мы определяем контекст корутины (то, есть, поток, в котором собираемся выполнять действия) и работаем с ним в последовательном порядке.

Разница, конечно же, заключается в том, что здесь используются корутины; можно указать мощность, а выполняемый код – приостановить.

В принципе, actor будет переадресовывать любую команду каналу корутины. Он гарантирует выполнение команды и ограничивает операции в ее контексте. Такой подход отлично помогает избавиться от вызовов synchronize и держать все потоки свободными!

protected val updateActor by lazy {
    actor<Update>(capacity = Channel.UNLIMITED) {
        for (update in channel) when (update) {
            Refresh -> updateList()
            is Filter -> filter.filter(update.query)
            is MediaUpdate -> updateItems(update.mediaList as List<T>)
            is MediaAddition -> addMedia(update.media as T)
            is MediaListAddition -> addMedia(update.mediaList as List<T>)
            is MediaRemoval -> removeMedia(update.media as T)
        }
    }
}
// используем
fun filter(query: String?) = updateActor.offer(Filter(query))
// или
suspend fun filter(query: String?) = updateActor.send(Filter(query))

В данном примере мы пользуемся запечатанными классами Kotlin, выбирая, какое именно действие выполнить.

sealed class Update
object Refresh : Update()
class Filter(val query: String?) : Update()
class MediaAddition(val media: Media) : Update()

Причем, все эти действия будут поставлены в очередь, параллельно выполняться они никогда не будут. Это удобный способ добиться ограничения изменяемости.

Жизненный цикл Android + корутины

Акторы могут очень пригодиться и для управления пользовательским интерфейсом Android, упрощают отмену задач и предотвращают перегрузку главного потока.
Давайте это реализуем и вызовем job.cancel() при уничтожении активности.

class MyActivity : AppCompatActivity(), CoroutineScope {
    protected val job = SupervisorJob() // экземпляр Job для данной активности
    override val coroutineContext = Dispatchers.Main.immediate+job


    override fun onDestroy() {
        super.onDestroy()
        job.cancel() // отмена задачи при уничтожении активности
    }
}

Класс SupervisorJob похож на обычный Job с тем единственным исключением, что отмена распространяется только в нисходящем направлении.

Поэтому мы не отменяем всех корутин в Activity, когда одна из них отказывает.

Чуть лучше дела обстоят с функцией расширения, позволяющей открыть доступ к этому CoroutineContext из любого View в CoroutineScope.

val View.coroutineContext: CoroutineContext?
    get() = (context as? CoroutineScope)?.coroutineContext

Теперь мы можем все это скомбинировать, функция setOnClick создает объединенный actor для управления ее действиями onClick. В случае множественных нажатий промежуточные действия будут игнорироваться, исключая таким образом ошибки ANR (приложение не отвечает), и эти действия будут выполняться в области видимости Activity. Поэтому при уничтожении активности все это будет отменено.

fun View.setOnClick(action: suspend () -> Unit) {
    // запускаем один актор в качестве родителя задачи контекста
    val scope = (context as? CoroutineScope)?: AppScope
    val eventActor = scope.actor<Unit>(capacity = Channel.CONFLATED) {
        for (event in channel) action()
    }
    // устанавливаем слушатель для активации этого актора
    setOnClickListener { eventActor.offer(Unit) }
}

В данном примере мы задаем для Channel значение Conflated, чтобы он игнорировал часть событий, если их будет слишком много. Можно заменить его на Channel.UNLIMITED, если вы предпочитаете ставить события в очередь, не теряя ни одного из них, но все равно хотите защитить приложение от ошибки ANR.

Также можно комбинировать корутины и фреймворки Lifecycle, чтобы автоматизировать отмену задач, связанных с пользовательским интерфейсом:

val LifecycleOwner.untilDestroy: Job get() {
    val job = Job()

    lifecycle.addObserver(object: LifecycleObserver {
        @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
        fun onDestroy() { job.cancel() }
    })

    return job
}
// использование
GlobalScope.launch(Dispatchers.Main, parent = untilDestroy) {
    /* здесь происходят удивительные вещи! */
}

Упрощаем ситуацию с обратными вызовами (часть 1)

Вот как можно преобразить использование API, основанного на обратных вызовах, благодаря Channel.

API работает вот так:

  1. requestBrowsing(url, listener) инициирует синтаксический разбор папки, расположенной по адресу url.
  2. Слушатель listener получает onMediaAdded(media: Media) для любого медиа-файла, обнаруженного в этой папке.
  3. listener.onBrowseEnd() вызывается по завершении синтаксического разбора папки

Вот старая функция refresh в поставщике контента для обозревателя VLC:

private val refreshList = mutableListOf<Media>()

fun refresh() = requestBrowsing(url, refreshListener)

private val refreshListener = object : EventListener{
    override fun onMediaAdded(media: Media) {
        refreshList.add(media))
    }
    override fun onBrowseEnd() {
        val list = refreshList.toMutableList()
        refreshList.clear()
        launch {
            dataset.value = list
            parseSubDirectories()
        }
    }
}

Как это улучшить?

Создаем канал, который будет запускаться в refresh. Теперь обратные вызовы обозревателя будут лишь направлять медиа в этот канал, а затем закрывать его.

Теперь функция refresh стала понятнее. Она создает канал, вызывает обозреватель VLC, затем формирует список медиа-файлов и обрабатывает его.

Вместо функций select или consumeEach можно использовать for для ожидания медиа, и этот цикл будет разрываться, как только канал browserChannel закроется.

private lateinit var browserChannel : Channel<Media>

override fun onMediaAdded(media: Media) {
    browserChannel.offer(media)
}

override fun onBrowseEnd() {
    browserChannel.close()
}

suspend fun refresh() {
    browserChannel = Channel(Channel.UNLIMITED)
    val refreshList = mutableListOf<Media>()
    requestBrowsing(url)
    // Приостанавливается на каждой итерации в ожидании медиа
    for (media in browserChannel) refreshList.add(media)
    // Канал закрыт
    dataset.value = refreshList
    parseSubDirectories()
}

Упрощаем ситуацию с обратными вызовами (часть 2): Retrofit

Второй подход: мы вообще не используем корутины kotlinx, зато применяем корутинный core-фреймворк.

Смотрите, как на самом деле работают корутины!

Функция retrofitSuspendCall оборачивает запрос на вызов Retrofit Call, чтобы сделать из него функцию suspend.

При помощи suspendCoroutine мы вызываем метод Call.enqueue и приостанавливаем корутину. Предоставленный таким образом обратный вызов обратится к continuation.resume(response), чтобы возобновить корутину откликом от сервера, как только тот будет получен.

Далее нам остается просто объединить наши функции Retrofit в retrofitSuspendCall, чтобы с их помощью возвращать результаты запросов.

suspend inline fun  <reified T> retrofitSuspendCall(request: () -> Call <T>
) : Response <T> = suspendCoroutine { continuation ->
    request.invoke().enqueue(object : Callback<T> {
        override fun onResponse(call: Call<T>, response: Response<T>) {
            continuation.resume(response)
        }
        override fun onFailure(call: Call<T>, t: Throwable) {
            continuation.resumeWithException(t)
        }
    })
}

suspend fun browse(path: String?) = retrofitSuspendCall {
    ApiClient.browse(path)
}

// использование (в контексте корутины Main)
livedata.value = Repo.browse(path)

Таким образом, вызов, блокирующий сеть, делается в выделенном потоке Retrofit, корутина находится здесь, ожидая отклика от сервера, а использовать ее в приложении – проще некуда!

Такая реализация вдохновлена библиотекой gildor/kotlin-coroutines-retrofit.

Также имеется JakeWharton/retrofit2-kotlin-coroutines-adapter с другой реализацией, дающей аналогичный результат.

Эпилог

Channel можно использовать и многими другими способами; посмотрите в BroadcastChannel более мощные реализации, которые могут вам пригодиться.

Также можно создавать каналы при помощи функции Produce.

Наконец, при помощи каналов удобно организовать коммуникацию между компонентами UI: адаптер может передавать события нажатий в свой фрагмент/активность через Channel или, например, через Actor.
Издательский дом «Питер»
281,27
Компания
Поделиться публикацией

Похожие публикации

Комментарии 29

    –5
    >> Причем, пока getImage выполняется в выделенном пуле потоков IO, главный поток свободен и может взяться за любую другую задачу!

    Ну бред же.

    Этот «главный поток» есть недоделанное детище ведро-архитекторов. Все остальные о таком чуде просто не знают (ибо вредные вещи им просто не нужны).

    На самом деле поток исполнения блокируется до получения результата. А как там при этом перераспределяются потоки операционной системы — это уже второстепенно. И даже скорее вредно отделять потоки оси от порождаемых виртуальных потоков исполнения кода, ибо всем новичкам будет казаться, что они написали «всё правильно», а на самом деле они видят виртуальную картинку, которую сочинители котлина вынуждены были сочинить из-за убогости ведра, под которое всё же приходится писать (ибо широко распространённое поделие).

    В общем — тупо прячем под ковёр гнилые потроха ведроида с его дичайшей асинхронностью и смертью всего на свете из-за лени архитекторов (не захотели под UI выделить отдельный поток).
      +1

      "Главный поток" — это общая черта любых платформ GUI, а не только Андроида… Как минимум, он присутствует в WINAPI, XLib, WinForms (как следствие WINAPI), WPF, AWT, Qt...

        –1
        >> это общая черта любых платформ GUI

        Внимательно читаем о том, что такое windows, а потом смотрим на приведённый список аббревиатур и… В общем — не надо звонить, не обладая информацией. Затем читаем про систему X Window. Ну и доказываем, как же она подходит под «это общая черта любых платформ GUI».
          +2

          А может, Вы сами приведете примеры на которые Вы ссылаетесь?

            –3
            А каких ещё примеров не хватает? Или вам здесь нужно статью из википедии скопировать?
              0

              Процитируйте ключевое на что вы ссылаетесь.
              Бегло я не увидел на википедии ничего про потоки.


              сколько не видел туториалов по иксам, там так же идет цикл событий.
              Их можно сделать несколько, но везде вижу рекомендации: выделить поток под обработку событий, чтоб не парится с синхронизацией своих данных.


              Замечу, что кроме Windows, Android, есть еще iOS/MacOS.

                –1
                >> сколько не видел туториалов по иксам, там так же идет цикл событий

                Читайте обзорную архитектуру. Иксы — это сервер. Он может быть вообще на другой стороне земли. Пояснять далее, почему на другой стороне земли будет отдельный поток?
                  +1

                  А причем тут сервер, к клиенту?

                    0
                    Да, здесь всё безнадёжно…
                0

                Примеров GUI платформ, где не используется цикл событий в выделенном под него потоке.

                  –1
                  Иксы
                    +1

                    Сервер иксов да, а клиент?


                    Вы сейчас сравниваете внутреннюю кухню сервера иксов с тем как работает клиент.
                    Апельсины с помидорами не очень сравниваются.

                      –1
                      Всё со всем можно сравнить. И иксы отлично вписываются в данную тему. Но некоторые почему-то считают, что можно кодить только в одном единственном стиле, когда один глупый программист может повесить всю систему.
                        0

                        О какой "всей системе" речь? "Главный поток" в Андроиде свой собственный для каждого приложения. И повесить глупый программист может только свою собственную программу.

          +1
          На самом деле поток исполнения блокируется до получения результата.

          Он действительно свободен. иначе бы у вас интерфейс завис.
          По сути тот код передает исполнение в поток из пула Dispatchers.IO.
          Потом управление передается в Dispatchers.Main, этот диспатчер по сути оберка над runOnUiThread процитированным выше.
          Все довольно просто.

            –1
            Кто свободен? Читайте внимательно википедию про «текущий поток исполнения».

            Интерфейс виснет от кривого архитектурного решения в ведроиде — UI ждёт, пока неграмотный программист закончит выполнять неэффективно написанную процедуру.
              +2

              Читайте внимательно код, и что в каком потоке выполняется.


              Отсылок на википедию от вас уже достаточно.
              А по делу, как не было, так и нет.
              одни нападки.

          0
          Оглавление книги на вашем сайте not found.
            –2
            Интересно, что Издательский дом предпочитает изобретать свою терминологию «корутины» вместо устоявшийся "Сопрограммы" :) Боюсь покупать книги такого издательства с необычными терминами — куплю и ничего не пойму.
              0
              «Корутинами» их называют создатели языка.
                0
                Всё-таки речь идёт про Kotlin. А в сообществе Kotlin слово «корутины» вполне общепринято и встречается чаще, чем «сопрограммы».
                  +1
                  Спасибо за ответы. Но ведь каждое сообщество хочет расширяться, хочет чтобы его ЯП стал еще более популярным. А каждый дублирующий термин может создаь проблему для понимания новичком в этом сообществе. В статье при первом появлении такого термина можно написать «корутины (сопрограммы)».
                  С наилучшими пожеланиями сообществу Kotlin. (Я совершенно искренне).
                0
                А вот у меня такой вопрос: вот есть код из статьи:
                launch(Dispatchers.Main) {
                    val image = withContext(Dispatchers.IO) { getImage() } // контекст IO
                    imageView.setImageBitmap(image) // контекст Main
                }
                


                Почему для I/O операций необходимо указывать свой контекст? Разве такой код в Android не будет работать/что-то заблокирует?
                launch(Dispatchers.Main) {
                    val image = getImage() // контекст Main, но это I/O операция
                    imageView.setImageBitmap(image) // контекст Main
                }
                


                Если getImage() — это метод асинхронного ввода-вывода, разве он не приостановится при выполнении чтения как любая другая асинхронная функция в контексте main? Или это какой-то задел для мультиплатформы или типа того?
                  0

                  Так можно делать, чтобы ограничить число параллельных блокирующих IO операций.


                  Иначе:


                  • Блокирующие операции могут остановить один из общих потоков (т.е. приложение не сможет выполнить простую операцию просто потому, что все потоки ждут, или другими словами — процессор свободен, однако программа не может его использовать). Следовательно — все блокирующие вещи должны быть отдельно.
                  • Много IO операций с диском всё равно не смогут выполняться параллельно (дисков-то не так много), значит желательно ограничить параллелизм, чтобы:
                    • Соседние операции не мешали друг другу
                    • Не создавать много потоков, каждый из которых кушает около 2 Мб (для стандартной Java).

                  Следовательно, всё блокирующее IO взаимодействие лучше выделить в отдельный пул потоков — Dispatchers.IO


                  Аналогичная идея есть у .Net с SynchronizationContext.

                    0
                    Следовательно, всё блокирующее IO взаимодействие лучше выделить в отдельный пул потоков — Dispatchers.IO

                    В этом собственно и был вопрос: а стоит ли метод асинхронного ввода-вывода исполнять в контексте "блокирующего IO взаимодействия"?

                    В теории, блокирующие I/O операции вообще не должны использоваться, и вместо них необходимо использовать их неблокирующие аналоги. А для операций, которых не имеют таких аналогов (не приходит ничего на ум) — использовать Dispatcher.IO.
                    Хотя вот тоже спорно — стоило ли вообще делать отдельный диспетчер для ввода-вывода, если Default справляется точно так же? Только ради ограничения по количеству параллельно выполняемых операций? Но в Default по идее также должно стоять такое же ограничение по количеству параллельно выполняющихся блокирующих потоков…
                      0
                      Авторы языка решили, что Dispatcher.IO почему-то удобнее именно для IO, хотя Default работает полностью идентично. Причина такого решения абсолютно непонятна. Точнее — авторы не захотели её пояснять в доках по API.

                      В целом имеем полное копирование истории с ведроидом — такие же неполноценные доки без пояснения выбранных архитекторами концепций.
                  0
                  Спасибо за книгу про Kotlin!
                  Планируете ли вы издавать похожую книгу, только о программировании под Android?

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                Самое читаемое