В этой статье мы рассмотрим, как и почему изменилась реализация примитивов синхронизации из стандартной библиотеки Java и пакета java.util.concurrent для Kotlin Coroutines и для языка Kotlin в целом. Сразу хочу предупредить: рассматриваемые в статье библиотеки и классы будут оцениваться не с точки зрения поддержки legacy-функциональности и возможности использовать их в Java, а с точки зрения эффективности и возможности использовать их в корутинах и Kotlin Multiplatform. Поэтому эта статья будет больше полезна тем, кто собирается писать новые проекты на языке Kotlin.
В рамках данной статьи будут рассмотрены:
критические секции;
атомарные переменные;
реактивные переменные;
барьерная синхронизация.
Критические секции
Здесь мы не будем очень подробно останавливаться на стандартных реализациях критической секции в языке Java, таких как synchronized и ReentrantLock. Написано множество статей на эту тему, и я уверен, они всем известны. Рассмотрим, почему их использование для синхронизации корутин нежелательно, и приведем альтернативу.
synchronized / @Synchronized
Разработаем простой тест, где будем на разных потоках делать инкремент переменной value:
fun main() = runBlocking {
var obj = Any()
var value = 0
Array(100_000) {
async(Dispatchers.Default) {
synchronized(obj) {
++value
}
}
}.forEach { it.await() }
assertEquals(100_000, value)
}
Данный код запускает 100 000 параллельно выполняющихся корутин (настолько параллельно, насколько это позволяет количество потоков в Dispatchers.Default). В данном примере синхронизация блоком synchronized будет работать корректно, и тест будет пройден. Модифицируем этот код, добавив suspend функцию delay внутрь блока synchronized:
...
synchronized(obj) {
delay(1)
++value
}
...
Данный код даже не скомпилируется с ошибкой:
The 'delay' suspension point is inside a critical section
Все дело в том, что мы не можем вызывать никакие suspend функции внутри блока синхронизации, потому что текущий поток корутины должен уметь освобождаться для использования в других корутинах в момент начала активного ожидания. Стандартные реализации критической секции не рассчитаны на то, что в середине блока синхронизации поток может освободиться для других задач, а после ожидания suspend функции выполнение может быть продолжено на совершенно другом потоке.
ReentrantLock
В примере выше нас спас компилятор. Давайте посмотрим, что произойдет при использовании библиотечной реализации критической секции, например, ReentrantLock:
...
val lock = ReentrantLock()
...
lock.lock()
try {
delay(1)
++value
} finally {
lock.unlock()
}
...
Такой код скомпилируется, но синхронизация не будет работать, а может, даже произойдет вылет с IllegalMonitorStateException. Что интересно, если в данном примере использовать lock.withLock { ... }
, мы получим уже знакомую нам ошибку компиляции.
То же самое касается и полезнейшего класса ReentrantReadWriteLock, который позволяет экономить на операциях read after read. К сожалению, для синхронизации корутин он не будет работать по той же причине.
Mutex
В примерах с synchronized/ReentrantLock и suspend функциями нас мог кое-где спасти компилятор, а в худшем случае мы бы получили несинхронизированный код с вероятным вылетом в рантайме. К счастью для нас, разработчики корутин подумали о таком сценарии и специально для этого разработали интерфейс Mutex и его стандартную имплементацию. Модифицируем код при помощи Mutex:
...
val mutex = Mutex()
...
mutex.withLock {
delay(1)
++value
}
...
Такой код отработает корректно: операции над переменной value синхронизированы, suspend функция delay не мешает синхронизации.
К сожалению, класс ReadWriteMutex (аналогичный ReentrantReadWriteLock) по состоянию на июль 2023 года еще не добавили, однако обсуждение давно ведется, и есть уже минимум одна реализация в PullRequest к GitHub Kotlin/kotlinx.coroutines.
Вы можете сказать, что для данного примера синхронизация единственной переменной value при помощи критической секции — не самое концептуально правильное решение. И будете правы, поэтому синхронизацию одиночных переменных рассмотрим в следующем разделе.
Атомарные переменные
Начнем этот раздел со спойлера: volatile (в котлине @Volatile) и классы пакета java.util.concurrent.atomic (например AtomicReference, AtomicInteger...) при правильном применении будут работать с корутинами не хуже, чем со стандартными Java потоками или какими-нибудь многопоточными фреймворками.
volatile / @Volatile
Для начала вспомним, для чего нам вообще нужен @Volatile. Разные потоки для ускорения работы могут кэшировать у себя значения глобальных переменных. Из-за чего в других потоках при обращении к этим переменным мы не всегда будем видеть их актуальное значение. Рассмотрим код, иллюстрирующий суть проблемы:
@Volatile var value1 = true
@Volatile var value2 = true
fun main() = runBlocking {
repeat(1_000) {
value1 = true
value2 = true
val job1 = async(Dispatchers.Default) {
value2 = false
while(value1);
}
val job2 = async(Dispatchers.Default) {
value1 = false
while(value2);
}
job1.await()
job2.await()
}
println("Success!")
}
Это правильный код, который успешно выполнится, но, если убрать аннотацию @Volatile хотя бы с одной переменной, то тест бесконечно зависнет в одном из циклов while, потому что поток корутины использует кэшированное значение общей переменной.
Однако и @Volatile можно использовать неправильно. Рассмотрим другой пример:
@Volatile var value = true
fun main() = runBlocking {
Array(10_000) {
async(Dispatchers.Default) {
value = !value
}
}.forEach { it.await() }
assertTrue(value)
}
В данном тесте запускается 10 000 конкурирующих корутин, которые меняют значение value на противоположное. Так как количество корутин чётное, итоговое значение не должно измениться и должно остаться равным true. Однако в половине случаев тест падает, даже несмотря на @Volatile. Дело в том, что операция value=!value — неатомарная, то есть состоит из нескольких операций: чтение value, потом запись нового value. Если неатомарные операции часто вызываются в любой многопоточной среде, есть риск получить неправильное значение переменной.
java.util.concurrent.atomic
Большую гибкость по сравнению с @Volatile дают классы пакета java.util.concurrent.atomic, которые позволяют делать атомарными операции инкремента, декремента, compareAndSet (атомарно сравнить с предыдущим значением и записать), getAndSet (атомарное получение старого значения и запись нового) и ряд других. Рассмотрим как можно более эффективно и просто синхронизировать уже знакомый нам код из раздела про критические секции:
fun main() = runBlocking {
var value = 0
Array(100_000) {
async(Dispatchers.Default) {
++value
}
}.forEach { it.await() }
assertEquals(100_000, value)
}
В данном примере кода переменная value не синхронизирована, поэтому значение к концу теста value=100_000 не гарантировано. Так как инкремент по умолчанию — неатомарная операция (++value — это по сути value=value+1 чтение и запись), то и @Volatile тут не поможет. Однако, если модифицировать этот код с помощью класса AtomicInteger:
fun main() = runBlocking {
var value = AtomicInteger(0)
Array(100_000) {
async(Dispatchers.Default) {
value.incrementAndGet()
}
}.forEach { it.await() }
assertEquals(100_000, value.get())}
Благодаря атомарности операции incrementAndGet в классе AtomicInteger синхронизация начинает работать правильно. Если задача не ограничивается настолько простыми операциями, и нужно синхронизировать целые участки кода, на помощь приходят уже рассмотренные нами критические секции.
kotlinx.atomicfu
Как я писал в начале раздела, если использовать volatile и классы пакета java.util.concurrent.atomic правильно, то они будет корректно работать и в корутинах. Тогда зачем же нужно что-то еще? — спросите вы.
Дело в том, что этот тезис касается JVM и Android, однако Kotlin и корутины уже давно вышли за пределы этих двух платформ. Если вы уже разрабатываете на Kotlin Multiplaform или потенциально планируете переход, то без kotlinx.atomicfu вам не обойтись:
fun main() = runBlocking {
var value = atomic(0)
Array(100_000) {
async(Dispatchers.Default) {
value.incrementAndGet()
}
}.forEach { it.await() }
assertEquals(100_000, value.value)
}
В любом случае, переходя на kotlinx.atomicfu для JVM и Android, вы ничего не теряете — все функции библиотеки помечены как actual и внутри используют тот же пакет java.util.concurrent.atomic.
Реактивные переменные
Атомарные переменные — это хорошо, но часто необходимо также подписываться на изменение переменной и выполнять в обработчике какой-то код. В этом разделе мы рассмотрим реализации реактивных переменных, то есть классов, которые позволяют:
через generic задать тип значения;
записывать значение синхронно;
получать значение синхронно;
хранить это значение как внутреннее состояние;
подписываться на изменения значения.
Реализации, не обладающие всеми этими свойствами, в данном разделе рассмотрены не будут.
LiveData
Первое решение, которое мы рассмотрим, — это LiveData. Уверен, многие Android-разработчики не раз сталкивались с этим классом. LiveData позволяет хранить значение, получать и менять его напрямую через геттер\сеттер, подписываться на изменение значения, привязывать подписку к LifecycleOwner.
Очень полезный класс, долгое время спасавший Android-разработчиков, работает с Android Data Binding и с Jetpack Compose, но не без недостатков. Их мы сейчас и рассмотрим.
Первый недостаток очевиден из того, что я писал выше: класс LiveData — штука специфичная и доступна только для платформы Android, нигде больше использовать его не получится.
Из первого вытекает и второй недостаток: основные методы setValue \ observe должны вызываться только на Main потоке приложения (Dispatchers.Main), иначе будет вылет в рантайме.
val liveData = MutableLiveData(true)
withContext(Dispatchers.Default) {
// IllegalStateException: Cannot invoke setValue on a background thread
liveData.value = false
}
Конечно, если использовать LiveData по основному назначению, например, для связи ViewModel + Fragment\Activity, скорее всего, никаких проблем не возникнет. Однако мой опыт говорит о том, что порой разработчики пытаются использовать этот класс еще и на уровне интеракторов\репозиториев и вообще повсюду, где может понадобиться реактивность, что порождает вылеты и замедление Main потока лишними вычислениями. Есть, конечно, метод postValue, который можно вызывать на любом потоке, но не без нюансов:
val liveData = MutableLiveData(true)
withContext(Dispatchers.Default) {
println(liveData.value) // Prints true
liveData.postValue(false)
println(liveData.value) // Also prints true
}
Значение false запланировано для записи в liveData на Main потоке, но сразу значение false мы не увидим.
Еще один недостаток: LiveData не дает такой гибкости как, например, RxJava с ее методами преобразования вроде map, filter, distinct, debounce и др. Ну это и понятно: LiveData — это не полноценный фреймворк, а класс, созданный для решения конкретных проблем.
Ну и последнее. Значение LiveData всегда может быть null. Даже если в Kotlin явно указать тип значения без вопроса, например, LiveData<Int>, это не мешает нам записать и считать null.
BehaviorSubject
Похожая на LiveData штука — BehaviorSubject из RxJava. Этот класс хранит значение, позволяет его менять, получать и подписываться на него. В то же время он дает возможность пользоваться всей функциональной мощью RxJava и может применяться для JVM, а не только для платформы Android. Еще один неоспоримый плюс: методы getValue и onNext (по сути setValue) синхронизированы и могут свободно использоваться в любой многопоточной среде, а во время подписки можно выбирать поток обработки с помощью метода observeOn.
Из явных минусов можно выделить то, что nullable значения при записи в BehaviorSubject нужно упаковывать в какой-то буферный класс (а потом распаковывать при получении), потому что null не является валидным значением для этого класса.
Так как в этой статье мы рассматриваем в качестве основного фреймворка для многопоточной разработки именно корутины и Coroutines Flow, было бы странно, особенно для новых проектов, добавлять в проект еще и библиотеку RxJava. Поэтому разработчики корутин разработали собственную реализацию реактивных переменных, которую мы рассмотрим в следующем пункте.
StateFlow
Интерфейс StateFlow и его реализация являются частью Coroutines Flow и решают все проблемы LiveData и BehaviorSubject. Класс хранит значение, позволяет его получать, менять, а также подписываться на его изменение, как и в уже рассмотренных выше классах. Однако есть и ряд преимуществ:
Значение value (запись и получение) синхронизировано и может использоваться в любой многопоточной среде.
Возможность записи\чтения null контролируется на уровне языка через generics. Если объявить StateFlow<Int?>, то value будет nullable, а если StateFlow<Int>, то нет.
StateFlow включает в себя все преимущества Coroutines Flow и возможность вызова suspend функций внутри преобразующих методов или прямо внутри collect.
Для StateFlow реализован потокобезопасный метод compareAndSet, пришедший из атомарных переменных. Также есть реализации построенных на нем потокобезопасных функций обновления значения: update, getAndUpdate, updateAndGet.
При подписке на изменения есть возможность привязки к LifecycleOwner (через lifecycleScope) для Android.
Работает вместе с Android Data Binding, Jetpack Compose и даже Compose Multiplatform.
StateFlow работает не только для Android и JVM, но еще и для других платформ фреймворка Kotlin Multiplatform.
StateFlow — достаточно мощный и универсальный инструмент, который может использоваться для широкого спектра задач, таких как: связь ViewModel и View в архитектуре MVVM, хранение изменяющихся данных на уровне репозиториев с возможностью подписки на них, преобразование данных через функции Coroutines Flow и даже конструирование примитивов синхронизации, на чем мы подробно остановимся в следующем разделе.
Барьерная синхронизация
В данном разделе будет больше креатива — в корутинах пока еще нет стандартных реализаций барьерной синхронизации, поэтому нам придется самим написать этот код. Обычно под барьерной синхронизацией понимается следующее: несколько разных потоков ждут какого-то события, а когда оно происходит, потоки одновременно выходят из ожидания и продолжают свою работу. В случае с корутинами будет примерно то же самое, но ждать события будут не потоки, а, собственно, корутины. Значит, и метод ожидания будет suspend функцией. Выразим это через интерфейс:
interface Barrier {
suspend fun await()
@Throws(TimeoutCancellationException::class)
suspend fun await(timeout: Duration)
}
Очень простой интерфейс всего с двумя функциями: одна ждет с таймаутом, вторая без. Далее в этом разделе мы будем реализовывать недостающие нам классы барьерной синхронизации из пакета java.util.concurrent при помощи этого интерфейса.
CountDownLatch
Класс CountDownLatch из пакета java.util.concurrent действует следующим образом: в одних потоках мы уменьшаем счетчик на 1, а в других потоках ожидаем, пока счетчик не станет равным 0. Напомним функциональность этого класса:
задаем счетчик один раз в конструкторе, значение счетчика больше или равно нулю;
уменьшение счетчика на единицу (метод countDown);
получение значения счетчика синхронно (метод getCount);
ожидание обнуления счетчика с таймаутом (метод await).
Достаточно простой и полезный класс, но есть проблема, которая не позволяет эффективно использовать этот класс в корутинах. Проблема — в методе ожидания обнуления счетчика (метод await). Он не является suspend функцией и блокирует поток, а таймаут вызывает InterruptedException. Из-за этого эффективность корутин падает: текущий поток корутины не освобождается для задач в других корутинах. Также страдает и отменяемость: при вызове метода cancel (на текущей корутине или на всем scope целиком) корутина, зависшая в блокирующем методе await, не сможет отмениться, по крайней мере не сразу.
StateFlow — универсальный и мощный класс, который позволяет нам написать собственную реализацию CountDownLatch меньше чем за 50 строчек кода.
class CountDownBarrier(count: UInt) : Barrier {
private val stateFlow = MutableStateFlow(count)
val counterValue: UInt
@Synchronized get() = stateFlow.value
@Synchronized
fun countDown() {
if (stateFlow.value > 0u) {
--stateFlow.value
}
}
override suspend fun await() {
internalAwait()
}
@Throws(TimeoutCancellationException::class)
override suspend fun await(timeout: Duration) {
if (counterValue > 0u) {
withTimeout(timeout) { internalAwait() }
}
}
private suspend fun internalAwait() {
if (counterValue > 0u) {
// Await first value lower than 0 (suspend function).
stateFlow.first { it <= 0u }
}
}
}
В данной реализации вся изначальная функциональность CountDownLatch сохранилась, зато метод await теперь является suspend функцией. Использование @Synchronized тут оправдано, потому что должна быть возможность вызывать метод countDown и геттер на counterValue и вне корутин. Кроме того, внутри блоков @Synchronized никаких suspend функций не вызывается. Разработаем тест для класса CountDownBarrier:
fun main() = runBlocking {
val сountDownBarrier = CountDownBarrier(5u)
val awaitTasks = Array(10) { index ->
async(Dispatchers.Default) {
сountDownBarrier.await()
println("Await finished: index=$index, counter=${сountDownBarrier.counterValue}")
}
}
val countDownTasks = Array(1000) { index ->
async(Dispatchers.Default) {
println("Count down started: index=$index, counter=${сountDownBarrier.counterValue}")
сountDownBarrier.countDown()
println("Count down finished: index=$index, counter=${сountDownBarrier.counterValue}")
}
}
awaitTasks.forEach { it.await() }
countDownTasks.forEach { it.await() }
println("Success: counter=${сountDownBarrier.counterValue}")
}
Счетчик CountDownBarrier задан на 5, значит, не раньше чем через 5 вызовов метода countDown мы должны увидеть первые логи Await finished. Посмотрим, что выведет этот код:
Count down started: index=0, counter=5
Count down started: index=0, counter=5
Count down finished: index=0, counter=4
Count down started: index=1, counter=4
Count down finished: index=1, counter=3
Count down started: index=2, counter=3
Count down finished: index=2, counter=2
Count down started: index=3, counter=2
Count down finished: index=3, counter=1
Count down started: index=4, counter=1
Count down finished: index=4, counter=0
Await finished: index=9, counter=0
Await finished: index=8, counter=0
...
Счетчик обнулился, и только после этого ожидающие корутины среагировали на это и вышли из await, значит, класс CountDownBarrier работает корректно.
CyclicBarrier
Класс CyclicBarrier похож на CountDownLatch, но значение внутреннего счетчика меняется не публичным методом countDown, а в зависимости от количества ожидающих потоков. Когда количество ожидающих потоков становится равно заданному значению, все они одновременно выходят из ожидания. После освобождения всех потоков внутренний счетчик возвращается в начальное значение. Реализуем примитив барьера для корутин при помощи StateFlow:
class CoroutinesBarrier(
val initialCoroutinesCount: UShort
) : Barrier {
// @Volatile is not necessary here: this field is only used in @Synchronized blocks.
private var stateFlow = MutableStateFlow(initialCoroutinesCount)
val countLeftToReleaseBarrier: UShort
@Synchronized get() = stateFlow.value
override suspend fun await() {
internalAwait()
}
@Throws(TimeoutCancellationException::class)
override suspend fun await(timeout: Duration) {
withTimeout(timeout) { internalAwait() }
}
private suspend fun internalAwait() {
val (flowToAwait, countLeftToRelease) = countDownOrResetBarrier()
if (countLeftToRelease > 0u) {
// Await first value lower than 0 (suspend function).
flowToAwait.first { it <= 0u }
}
}
@Synchronized
private fun countDownOrResetBarrier(): Pair<Flow<UShort>, UShort> {
if (stateFlow.value > 0u) {
--stateFlow.value
}
val result = stateFlow to stateFlow.value
// Reset flow right before releasing awaiting coroutines.
if (stateFlow.value <= 0u) {
stateFlow = MutableStateFlow(initialCoroutinesCount)
}
return result
}
}
В отличие от CyclicBarrier, в конструкторе CoroutinesBarrier мы задаем не количество Java потоков, а количество корутин, при котором барьер освобождается. Чтобы можно было сбрасывать значение счетчика в начальное значение, stateFlow объявлена как var. При сбросе счетчика мы записываем в нее новый экземпляр MutableStateFlow. Дополнительная синхронизация (например, @Volatile) на переменную stateFlow не нужна, так как все обращения к этой переменной происходят только внутри блоков @Synchronized. Разработаем тест для класса CoroutinesBarrier:
fun main() = runBlocking {
val startTimeMillis = System.currentTimeMillis()
val coroutinesBarrier = CoroutinesBarrier(3u)
val awaitTasks = Array(6) { index ->
async(Dispatchers.Default) {
println("Coroutine started.")
delay(index * 100L)
coroutinesBarrier.await(5_000.milliseconds)
val durationMillis = System.currentTimeMillis() - startTimeMillis
println("Coroutine released: duration=$durationMillis.")
}
}.forEach { it.await() }
val durationMillis = System.currentTimeMillis() - startTimeMillis
println("Test finished: duration=$durationMillis.")
}
В этом тесте мы запускаем шесть корутин с увеличивающейся задержкой (100, 200 ... 600 миллисекунд). Сразу после задержки выставлен барьер на три корутины. Посмотрим, что выведет тест:
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine started.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=285.
Coroutine released: duration=583.
Coroutine released: duration=583.
Coroutine released: duration=583.
Test finished: duration=583.
Из результатов видно, что барьер работает. Корутины выходили из барьера по три штуки: первые три — спустя 285 миллисекунд, вторые три — спустя 583 миллисекунды. Однако, если в этом тесте увеличить количество корутин с шести до семи, то вместо вывода Test finished программа вылетит с ошибкой спустя пять секунд:
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms
Это логично, так как семь на три не делится нацело, значит две пачки корутин по три штуки пройдут барьер, а одна последняя корутина будет ждать до таймаута.
Заключение
В данной статье мы рассмотрели, как правильно применять прототипы синхронизации потоков в контексте корутин, выделили плюсы и минусы существующих решений, а также написали собственную реализацию примитивов барьерной синхронизации для корутин. В следующей статье мы разберем реактивные потоки, раздельный доступ, семафоры и акторы.