Остров Котлин
Предыдущие тексты этой серии: про AsyncTask, про Loaders, про Executors и EventBus, про RxJava.
Итак, этот час настал. Это статья, ради которой была написана вся серия: объяснение, как новый подход работает «под капотом». Если вы пока не знаете и того, как им пользоваться, вот для начала полезные ссылки:
А освоившись с корутинами, вы можете задаться вопросом, что позволило Kotlin предоставить эту возможность и как она работает. Прошу заметить, что здесь речь пойдёт только о стадии компиляции: про исполнение можно написать отдельную статью.
Первое, что нам нужно понять — в рантайме вообще-то не существует никаких корутин. Компилятор превращает функцию с модификатором suspend в функцию с параметром Continuation. У этого интерфейса есть два метода:
abstract fun resume(value: T)
abstract fun resumeWithException(exception: Throwable)
Тип T — это тип возвращаемого значения вашей исходной suspend-функции. И вот что на самом деле происходит: эта функция выполняется в определённом потоке (терпение, до этого тоже доберёмся), и результат передаётся в resume-функцию того continuation, в контексте которого вызывалась suspend-функция. Если функция не получает результат и выбрасывает исключение, то вызывается resumeWithException, пробрасывая ошибку вызывавшему коду.
Хорошо, но откуда взялось continuation? Разумеется, из корутиновского builder! Давайте посмотрим на код, создающий любую корутину, к примеру, launch:
public actual fun launch(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
Тут builder создаёт корутину — экземпляр класса AbstractCoroutine, который, в свою очередь, реализует интерфейс Continuation. Метод start принадлежит интерфейсу Job. Но найти определение метода start весьма затруднительно. Но мы можем зайти тут с другой стороны. Внимательный читатель уже заметил, что первый аргумент функции launch — это CoroutineContext, и по умолчанию ему присвоено значение DefaultDispatcher. «Диспетчеры» — это классы, управляющие исполнением корутин, так что они определённо важны для понимания происходящего. Давайте посмотрим на объявление DefaultDispatcher:
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool
Так что, по сути, это CommonPool, хотя java-доки и говорят нам, что это может измениться. А что такое CommonPool?
Это диспетчер корутин, использующий ForkJoinPool в качестве реализации ExecutorService. Да, это так: в конечном счёте все ваши лямбда-корутины — это просто Runnable, попавшие в Executor с набором хитрых трансформаций. Но дьявол как всегда в мелочах.
Fork? Или join?
Судя по результатам опроса в моём твиттере, тут требуется вкратце объяснить, что представляет собой FJP :)
В первую очередь, ForkJoinPool — это современный executor, созданный для использования с параллельными стримами Java 8. Оригинальная задача была в эффективном параллелизме при работе со Stream API, что по сути означает разделение потоков для обработки части данных и последующее объединение, когда все данные обработы. Упрощая, представим, что у вас есть следующий код:
IntStream
.range(1, 1_000_000)
.parallel()
.sum()
Сумма такого стрима не будет вычислена в одном потоке, вместо этого ForkJoinPool рекурсивно разобьёт диапазон на части (сначала на две части по 500 000, затем каждую из них на 250 000, и так далее), посчитает сумму каждой части, и объединит результаты в единую сумму. Вот визуализация такого процесса:
Потоки разделяются для разных задач и вновь объединяются после завершения
Эффективность FJP основана на алгоритме «похищения работы»: когда у конкретного потока кончаются задачи, он отправляется в очереди других потоков пула и похищает их задачи. Для лучшего понимания можно посмотреть доклад Алексея Шипилёва или проглядеть презентацию.
Отлично, мы поняли, что выполняет наши корутины! Но как они там оказываются?
Это происходит внутри метода CommonPool#dispatch:
_pool.execute(timeSource.trackTask(block))
Метод dispatch вызывается из метода resume (Value: T) в DispatchedContinuation. Звучит знакомо! Мы помним, что Continuation — это интерфейс, реализованный в AbstractCoroutine. Но как они связаны?
Трюк заключён внутри класса CoroutineDispatcher. Он реализует интерфейс ContinuationInterceptor следующим образом:
public actual override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
Видите? Вы предоставляете в builder корутин простой блок. Вам не требуется реализовывать никакие интерфейсы, о которых вы знать ничего не хотите. Это всё берёт на себя библиотека корутин. Она
перехватывает исполнение, заменяет continuation на DispatchedContinuation, и отправляет его в executor, который гарантирует наиболее эффективное выполнение вашего кода.
Теперь единственное, с чем нам осталось разобраться — как dispatch вызывается из метода start. Давайте восполним этот пробел. Метод resume вызывается из startCoroutine в extension-функции блока:
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnchecked(receiver, completion).resume(Unit)
}
А startCoroutine, в свою очередь, вызывается оператором "()" в перечислении CoroutineStart. Ваш builder принимает его вторым параметром, и по умолчанию это CoroutineStart.DEFAULT. Вот и всё!
Вот по какой причине меня восхищает подход корутин: это не только эффектный синтаксис, но и гениальная реализация.
А тем, кто дочитал до конца, достаётся эксклюзив: видеозапись моего доклада «Скрипач не нужен: отказываемся от RxJava в пользу корутин в Котлине» с конференции Mobius. Наслаждайтесь :)