Как стать автором
Обновить

Решение проблемы race condition в Kotlin корутинах

Уровень сложностиСредний
Время на прочтение4 мин
Количество просмотров3.7K
Автор оригинала: Anton Shcherbakov

Синхронизация корутин в Android имеет огромное значение для обеспечения безопасности и эффективности многопоточности. Коррутины упрощают управление асинхронными задачами, но без должной синхронизации могут возникнуть проблемы, такие как race condition, что приведет к неправильному поведению приложения.

Представим, что нам нужно изменить переменную из нескольких корутин в таком коде:

var counter = 0

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
                counter++
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

В этом фрагменте кода мы объявляем глобальную переменную counter и запускаем 2 корутины для увеличения counter. Мы ожидаем, что вывод будет 2000. Но на самом деле результат будет совсем другим - от 1000 до 2000. Этот код - простой и очевидный пример race condition. Race condition возникает, когда несколько корутин пытаются получить доступ к общим данным или изменить их одновременно, и это может привести к неверным и непредсказуемым результатам.

Существует 6 способов решения проблемы состояния гонки (по крайней мере известных мне).

Mutex

Цель Mutex - взаимное исключение (аналогично блокировкам в многопоточности), чтобы предотвратить одновременный доступ нескольких coroutine к общим ресурсам. Mutex (mutual exclusion lock ) гарантирует, что только одна корутина может удерживать блокировку в каждый момент времени. Корутины, которые пытаются получить заблокированный Mutex, будут приостановлены до тех пор, пока он не будет освобожден.

var counter = 0
val mutex = Mutex()


suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            mutex.withLock {
                counter++
            }
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

Пример использования: Защита общего изменяемого состояния для предотвращения race condition.

Атомарные операции

Атомарные переменные позволяют безопасно выполнять изменения без необходимости явных блокировок. Используя такие классы, как AtomicInt или AtomicReference, вы можете безопасно обновлять значения из нескольких coroutine lock-free, thread-safe способом.

var counter = AtomicInt(0)

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            counter.incrementAndGet()
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

Вариант использования: синхронизация без блокировки для легковесных обновлений общего состояния.

Channels

Channels используются для отправки и получения данных между корутинами потокобезопасным способом, часто для шаблонов производитель-потребитель. Channel позволяет корутинам отправлять данные друг другу асинхронно. Принимающая сопрограмма приостанавливается, пока данные не станут доступны.

var counter = 0

suspend fun increment(channel: Channel<Int>) {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            channel.send(1) // Send increment request to the channel
        }
    }
}

fun main() = runBlocking {
    val channel = Channel<Int>()

    val receiver = launch {
        for (value in channel) {
            counter += value
        }
    }

    List(2) {
        launch { increment(channel) }
    }.onEach { it.join() }

    channel.close()
    receiver.join()
    println(counter)
}

Вариант использования: Синхронизация потока данных между корутинами (например, производитель-потребитель).


Actors

Actors предоставляют основанный на сопрограммах способ реализации модели actor-a для параллелизма, где корутины взаимодействуют путем отправки сообщений. Actor - это сопрограмма, которая обрабатывает сообщения по одному за раз, гарантируя последовательное обновление внутреннего состояния, избегая необходимости в блокировках.

sealed class CounterMsg
object Increment : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor(): SendChannel<CounterMsg> = actor<CounterMsg> {
    var counter = 0
    val receiver = launch {
        for (msg in channel) {
            when (msg) {
                is Increment -> counter++
                is GetCounter -> msg.response.complete(counter)
            }
        }
    }
    receiver.join()
}

suspend fun increment(channel: SendChannel<CounterMsg>) {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            channel.send(Increment)
        }
    }
}

fun main() = runBlocking {
    val counterActor = counterActor()

    List(2) {
        launch { increment(counterActor) }
    }.onEach { it.join() }

    val response = CompletableDeferred<Int>()
    counterActor.send(GetCounter(response))
    counterActor.close()
    val result = response.await()
    println(result)
}

Вариант использования: Управление состоянием с помощью передачи сообщений.

Semaphore

Semaphores используются для ограничения количества корутин, которые могут одновременно получать доступ к общему ресурсу. Semaphore поддерживает определенное количество разрешений, и корутины могут получать или освобождать их. Корутины, которые пытаются получить разрешения, когда их нет, будут приостановлены.

var counter = 0
val semaphore = Semaphore(1)

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            semaphore.withPermit {
                counter++
            }
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

Вариант использования: ограничение количества одновременных доступов к ресурсу (например, подключений к базе данных, сетевых запросов).

SharedFlow или StateFlow

SharedFlow и StateFlow также используются для совместного использования состояния и передачи обновлений между несколькими корутинами.

val counterFlow = MutableSharedFlow<Int>()

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            counterFlow.emit(1)
        }
    }
}

fun main() = runBlocking {
    var counter = 0

    val collectorJob = launch {
        counterFlow.collect { value ->
            counter += value
        }
    }

    List(2) {
        launch { increment() }
    }.onEach { it.join() }

    collectorJob.cancel()

    println(counter)
}

Заключение:

Kotlin предоставляет несколько способов синхронизации корутинв зависимости от варианта использования, включая Mutex для взаимного исключения, Channels для безопасной связи, Semaphore для ограничения параллелизма и многое другое. Эти инструменты гарантируют, что сопрограммы могут работать вместе, не вызывая race condition или несогласованных состояний в параллельных программах.

Теги:
Хабы:
+8
Комментарии3

Публикации

Истории

Работа

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань