company_banner

Корутины и синхронизация. Лучше не смешивать

Original author: https://blog.danlew.net/author/dan-lew/
  • Translation

Предположим, у вас в коде есть критическая секция, которая не должна выполняться более, чем одним потоком одновременно.

В мире Java одним из стандартных решений является добавление ключевого слова synchronized к сигнатуре метода. В Kotlin для получения того же эффекта используется аннотация @Synchronized

repeat(2) {
  thread { criticalSection() }
}

@Synchronized
fun criticalSection() {
  println("Starting!")
  Thread.sleep(10)
  println("Ending!")
}

Данный код выведет следующее:

Starting!
Ending!
Starting!
Ending!

Видно, что оба вызова отработали последовательно, один после другого.

Теперь предположим, что мы хотим использовать корутины. Что у нас получится?

val scope = CoroutineScope(Job())

repeat(2) {
  scope.launch { criticalSectionSuspending() }
}

@Synchronized
suspend fun criticalSectionSuspending() {
  println("Starting!")
  delay(10)
  println("Ending!")
}

А получится, что вызовы критической секции пересекутся, что очень не здорово.

Starting!
Starting!
Ending!
Ending!

Понять, что же происходит, можно, разобравшись, как устроены корутины под капотом. Они реализованы с использованием подхода передачи продолжения. (Краткое объяснение можно посмотреть в моём докладе про корутины Grokking Coroutines, Dan Lew, а для более глубокого понимания рекомендую посмотреть доклад Романа Елизарова Deep Dive into Coroutines.)

В рамках этой статьи всё, что вам надо знать — это то, что останавливаемые функции на самом деле не исполняются последовательно, строка за строкой. Когда останавливаемая функция доходит до точки остановки, она ставится на паузу и передаёт управление какой-то другой функции (позже, когда управление будет возвращено этой функции, — она продолжит выполнение).

Таким образом, во втором примере кода на самом деле происходит следующее:

1. criticalSectionSuspending() стартует, забирает блокировку и печатает Starting!

2. Доходит до delay() (который является точкой остановки), выходит из функции и отдаёт блокировку.

3. Так как блокировка свободна, начинается второй запуск criticalSectionSuspending(), которая забирает блокировку, печатает Starting!, останавливается и тоже отдаёт блокировку.

4. Когда delay() заканчивается, criticalSectionSuspending() запускается снова, но уже с предыдущего места остановки.

Для большей наглядности привожу временную диаграмму выполнения одной функции.

Как видите, время остановки проходит вне критической секции и её блокировка не удерживается. Именно поэтому разные потоки имеют доступ к синхронизированной останавливаемой функции — они выполняют её не одновременно.

Это известная проблема в Kotlin. Фактически, компилятор не позволит использовать synchronized() {} с точкой останова внутри. Такой код не скомпилируется с ошибкой "The 'delay' suspension point is inside a critical section"

Я убеждён, что в случае использования аннотации @Synchronized поведение компилятора должно быть аналогичным. Проблема заведена в YouTrack, но особых подвижек пока нет.

Как же быть?

Во-первых, следует признать, что проблема не в том, что «мы не можем использовать synchronized». synchronized — это просто средство обеспечения работоспособности критических секций. И единственная причина того, что у нас есть эти критические секции в том, что у нас есть общее изменяемое состояние. Соответственно, проблема звучит следующим образом: «нам нужен способ управления общим изменяемым состоянием в многопоточной среде».

К счастью, у нас есть официальное руководство по Kotlin, в котором есть раздел Shared mutable state and concurrency, описывающий несколько неплохих вариантов действий. Для нас более всего подходит секция про мьютексы, так как они наиболее похожи на синхронизацию.

val mutex = Mutex()
val scope = CoroutineScope(Job())

repeat(2) {
  scope.launch { criticalSectionSuspendingLocked() }
}

suspend fun criticalSectionSuspendingLocked() {
  mutex.withLock {
    println("Starting!")
    delay(10)
    println("Ending!")
  }
}

Код выше работает ровно так, как нам нужно, — выводит не перемешанные сообщения.

Я не хочу, чтобы люди считали, что использование мьютексов является универсальным решением при работе с общим изменяемым состоянием. В документации описано несколько подходов, и вы должны самостоятельно оценить, какой из них подходит к вашей конкретной ситуации. Но в случае, когда вы использовали синхронизированную функцию и теперь хотите её приостановить, мьютекс — это самый естественный выбор.

От переводчика:

Эту статью я нашёл во время поисков ответа на вопрос: Почему мьютекс в моём конкретном случае работает сильно медленнее (в полтора - два раза), чем синхронизация? Просто имейте в виду, если у вас нет точек останова в горячей критической секции, то лучше использовать синхронизацию, так как мьютекс может сильно просадить производительность.

FunCorp
Разработка развлекательных сервисов

Comments 24

    +5

    Не разрабатываю на Kotlin, но насколько я понимаю корутины могут использовать один и тот же поток. А синхронизация с помощью synchronize блока или метода обладает reentrance свойством, то есть один и тот же поток может заходить в synchronize блок, не снимая блокировку. А вот мютексы таким свойством не обладают.

      0
      корутины могут использовать один и тот же поток

      Могут один, а могут и разные. Мало того, корутина может "проснуться" не в том потоке, где засыпала.

      +2

      переведена статья о том, что синхронизация jvm уровня просто НЕ работает в корутинах (без объяснения, чем кооперативная многозадачность отличается от принудительной). Переводчик зачем-то добавил, что "синхронизацию" лучше использовать только при отсутствии точек останова. Сразу вопрос — если она (synchronized секция) не работает вообще, то....???? Либо я не понял статью, либо переводчик. И да, "синхронизация" — это, в том числе, и мьютекс.

        +1
        Сразу вопрос — если она (synchronized секция) не работает вообще,

        Откуда вы такое взяли? Она вполне себе работает, если корутина не саспендится внутри неё.


        И да, "синхронизация" — это, в том числе, и мьютекс.

        Речь про конкретные synchronization и котлиновский Mutex для корутин

          –1
          Слово «синхронизация», которое у вас в названии написано — оно ну очень широкий смысл имеет. Так что если речь про конкретные частные виды — то лучше было бы так и написать, меньше бы путало.
            +2
            «работает если» — значит не работает. Согласен, что такой код не должен компилиться, тогда можно будет говорить «работает».
              –1

              Чисто формально — всё работает ровно так, как оно реализовано. Т.е. это не баг и не UB, а особенности реализации. То, что компилятор ругается на suspension point внутри блока synchronized — это скорее костыль, поддерживающий… А вот что? Недостаточно подробную документацию и нежелание разработчиков вникать в детали?


              И даже, наверное, можно придумать разумное применение такому поведению. Но я тоже полностью согласен, что получилось мега-неочевидно и допускать компиляцию подобного не желательно. В крайнем случае, если кому прям вот очень надо — отдать на откуп опциям компилятора.

          +5

          Нда. Как можно было изобретая корутины, сломать семантику synchronized в языке. Она же в языке, и не должна зависеть от способа реализации concurrency, вытесняющая там многопоточность или кооперативная. А по факту mutex-ы, которые отдельные объекты, как раз и работают с корутинами, встроенными в язык, а synchronized нет. Это прямо провал какой-то.
          Будем надеяться разработчики loom в Java такой фигни не сделают.

            0

            Это только кажется, что она в языке — а на самом деле там ещё и JVM сильно старается. Разумеется, исправить JVM разработчики Kotlin не могут...

              0

              Мне кажется все равно можно было что-нибудь придумать, например поскольку это аннотация — дополнительно обернуть synchronized блок от JVM в mutex. Не уверен что так сработает.

                0

                Не сработает. Выше уже написали, что разработчику нужно понимать разницу между кооперативной многозадачностью и вытесняющей. И не использовать примитивы второго типа чтобы синхронизировать первый.


                В котлине должен быть аналог SemaphoreSlim.WaitAsync() из .net, который решает проблемы критических секций в асинхронном коде.

                  +1
                  Выше уже написали, что разработчику нужно понимать разницу между кооперативной многозадачностью и вытесняющей. И не использовать примитивы второго типа чтобы синхронизировать первый.

                  Зачем? Если он разрабатывает JVM, то да. Если разрабатывает обычное приложение, то нет, зачем ему знать детали реализации синхронизации, ему достаточно того что она работает.

                    0

                    Например, потому что они ортогональны — одна предназначена для I/O-bound задач, а вторая — для cpu-bound, они имеют разную природу и разные способы управления. Операции, являющиеся синхронизирующими для одной многозадачности могут либо вообще никак не воздействовать на другую, либо наоборот вызывать в ней дедлок.


                    Если программист, не зная деталей, взялся писать многозадачное приложение, то выйдет в лучшем случае средне. Как, впрочем, и с любой другой областью.


                    Если же производительность вам не важна, просто не используйте многозадачность, меньше "странных" эффектов получите.

                      –2

                      Способы управления может и разные, но ни на один из них программист влияния не имеет. Потоками управляет операционная система, корутинами виртуальная машина. Для cpu-bound задач надо просто не запускать корутин, которые сильно нагружают процессор, больше чем ядер в процессоре, вот и все. Потоки на самом деле нафик не нужны, достаточно корутин. Потоки просто оставили для совместимости с Java видимо.

              0

              Номинально в котлине нет примитивов синхронизации на уровне языка. Аннотация не работает, synchronized с лямбдой — работает ок. Достаточно компиляторную магию к аннотации применить, и станет ок.

                +2
                Эта аннотация сделана именно что для совместимости с джавой и делает ровно одну вещь — в генерируемом для JVM таргета коде добавляет к методу ключевое слово.
                Для всего остального в котлине и так есть как минимум 4 способа синхронизации с помощью родных для него инструментов
                0

                Ох уж эти переименованные картинки с подписями и прочий "мемасный постмодерн"...

                  0
                  del
                    –1
                    Не понял, а зачем делать функцию, которая не должна разрываться разрываемой? То есть, сначала мы делаем её synchronized, а потом делаем так, что она не синхронизирована. Зачем???
                      –2

                      ну, оригинальная статья не говорит о том, что не надо саспендится в synchronized секции — это уже придумал переводчик. Статья про то, что привычные жависту примитивы синхронизации в контексте корутин не работают так, как ожидается жуниорами. То есть про то, что надо пользоваться другими средствами. А так совершенно верно — делать функцию suspend без явного разрыва имеет смысл только если разрыв ожидается неявно в одном из вызовов внутри.

                        0
                        ну, оригинальная статья не говорит о том, что не надо саспендится в synchronized секции — это уже придумал переводчик.

                        Я бы попросил вас подтверждать обвинения фактами. Перевод практически дословен, от и до. От меня тут только то, что в секции "от переводчика".

                          –2

                          это факт — именно это и написано в "от переводчика", а в переводе этого нет.

                        +1

                        Инертность мышления, стремление мозга действовать по накатанной. Предположим: спринг с webflux на колтине. Всё от начала и до конца suspend — ну "по красоте" же. И вдруг где-то нужна синхронизация и рука на автомате тянется писать @Synchronized.

                        0
                        Елизаров говорил давно, вроде даже в этом же видосе который в посте, что delay не останавливает поток, а вот Thread.sleep внутри корутины остановит. Понятно, что это будет работать от случая, так как не ясно на каком потоке какая корутина будет работать, но все же. Ещё вопрос в использовании общего состояния в параллельном коде. Функциональный подход решает данный вопрос, если, конечно, нет критической необходимости делать общее состояние

                        Only users with full accounts can post comments. Log in, please.