Параллельный доступ и ссылочная прозрачность
Для будущих учащихся на курсе «Scala-разработчик» приготовили перевод материала.
Приглашаем также на вебинар по теме «Эффекты в Scala». На занятии рассмотрим понятие эффекта и сложности, которые могут возникать при их наличии. Также введем понятие функционального эффекта, рассмотрим его свойства и реализуем свой небольшой функциональный эффект. Присоединяйтесь.
*Concurrency — конкурентность, допускающая одновременное выполнение нескольких вычислительных процессов.
Ref и Deferred являются основными строительными блоками в FP, используемыми параллельно, в манере concurrent. Особенно при использовании c tagless final (неразмеченной конечной) абстракцией, эти два блока, при построении бизнес-логики, могут дать нам и то, и другое: параллельный доступ (concurrent access) и ссылочную прозрачность (referential transparency), и мы можем использовать их для построения более продвинутых структур, таких как counters (счетчики) и state machines (конечные автоматы).
Перед тем, как мы углубимся в Ref и Deferred, нам полезно узнать, что concurrency в Cats строится на Java AtomicReference
, и здесь мы и начнем наше путешествие.
Atomic Reference
AtomicReference
— это один из элементов пакета java.util.concurrent.atomic
. В Oracle docs мы можем прочитать, что java.util.concurrent.atomic
— это:
Небольшой инструментарий классов, поддерживающих потокобезопасное программирование «без блоков» с одиночными переменными. По сути, классы в данном пакете расширяют понятие
volatile
значений, полей и элементов массива до тех, которые также обеспечивают условную операциюatomic
обновления…Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong, и AtomicReference обеспечивают доступ и обновление от одиночных переменных к соответствующему типу (функционального блока).
AtomicReference
с нами начиная с Java 1.5 и используется для получения лучшей производительности, чем синхронизации (хотя это не всегда так).
Когда вам приходится совместно использовать некоторые данные между нитями (threads), вы должны защитить доступ к этой части данных. Самым простым примером будет увеличение некоторого количества int: i = i + 1
. Наш пример состоит из фактически 3 операций, сначала мы читаем значение i
, затем добавляем 1
к этому значению, а в конце снова присваиваем вычисленное значение i
. В отношении многопоточных приложений, мы можем столкнуться с ситуацией, когда каждый thread будет выполнять эти 3 шага между шагами другого thread, а конечное значение i
предсказать не удастся.
Обычно в вашей голове появляется слово synchronised
или механизм класса lock
, но с atomic.*
вам больше не нужно беспокоиться о явной синхронизации, и вы можете перейти на предоставленные atomic (атомарные) типы утилит, где проверка выполнения операции в один шаг включается автоматически.
Давайте, возьмем для примера AtomicInteger.incrementAndGet
:
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
С помощью операции compareAndSet
мы либо обновляем наши данные, либо терпим неудачу, но никогда не заставляем thread ждать. Таким образом, если операция compareAndSet
в incrementAndGet
не удаётся, мы просто пытаемся повторить всю операцию заново, извлекая текущее значение наших данных с помощью функции get()
в начале. С другой стороны, при использовании синхронизированных механизмов нет ограничений на количество операторов (statement), которые вы хотите «выполнить» во время блокировки, но этот блок никогда не выйдет из строя и может заставить вызывающий thread ждать, предоставляя возможность заблокировать или снизить производительность.
Теперь, зная определенные основы, давайте перейдем к нашей первой мега-звезде concurrency.
Ref
Ref
в Cats очень похож на упомянутую выше atomic (атомарную) ссылку Java. Основные отличия заключаются в том, что Ref
используется с tagless final абстракцией F
. Он всегда содержит значение, а значение, содержащееся в Ref
— типа A
, всегда является неизменным (immutable).
abstract class Ref[F[_], A] {
def get: F[A]
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
// ... and more
}
Ref[F[_], A]
— это функциональная изменяемая (mutable) ссылка:
Concurrent ( конкурентная)
Lock free ( “без блоков”)
Всегда содержит значение
Она создается путем предоставления начального значения, и каждая операция осуществляется в F
, например, cats.effect.IO
.
Если мы внимательно посмотрим на сопутствующий объект для Cats Ref
, мы увидим, что наша F
должна соответствовать некому требованию, а именно быть Sync
.
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
Вышеприведенный метод является лишь примером многих операций, доступных на нашем Ref
; он используется для построения Ref
с исходным значением.
Sync дает нам возможность приостанавливать любые побочные эффекты с помощью метода delay
для каждой операции на Ref
.
Ref
— довольно простая конструкция, мы можем сосредоточиться в основном на ее get
, set
и of
чтобы понять, как она работает.
Метод get
and set
Допустим, у нас есть объект (для этого блога мы назовем его Shared), который нужно обновить несколькими threads, и мы используем для этого наши методы get
и set
, создавая утилитный метод, который поможет нам в дальнейшем:
def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
for {
sh <- trace.get()
_ <- trace.set(Shared(sh, msg))
} yield ()
}
Наш Shared
объект может быть построен путем использования его предыдущего состояния и нового значения для создания нового экземпляра — Shared
, который может быть на самом деле всем, что мы хотим — простым списком, картой или чем угодно, к чему мы хотим получить одновременный безопасный доступ.
Я только что создал Shared(prev: Shared, msg: String)
для данной статьи.
В нашем примере выше F
был заменён конкретным IO из Cats Effect, но имейте в виду, что Ref
является полиморфным в F и может быть использован с другими библиотеками.
С помощью monadic
(монадический) IO мы применяем функцию flatMap
на каждом шаге и устанавливаем значение, сохраненное в нашем Ref
на желаемое значение — или... подождите, может быть, мы этого не делаем.
При таком подходе, когда modifyShared
будет вызываться одновременно, и мы можем потерять обновления! Это происходит потому, что мы можем столкнуться с ситуацией, когда, например, двое threads могут прочитать значение с помощью get
и каждый из них будет выполнять set
одновременно. Методы get
и set
не вызываются атомарно (atomically) вместе.
Atomic (атомарный) update
Конечно, мы можем улучшить приведенный выше пример и использовать другие доступные методы из Ref
. Для совместной реализации get
и set
мы можем использовать update
.
def update(f: A => A): F[Unit]
Это решит нашу проблему с обновлением значения, однако update
имеет свои недостатки. Если мы захотим обратиться к переменной сразу после обновления, аналогично тому, как мы использовали get
и set
, мы можем в итоге получить устаревшие данные, допустим, наш Ref
будет содержать ссылку на Int
:
for {
_ <- someRef.update(_ + 1)
curr <- someRef.get
_ <- IO { println(s"current value is $curr")}
} yield ()
Нас спасет modify
Мы можем немного улучшить вышеупомянутую ситуацию, используя modify
, которая будет делать то же самое, что и update
, но тем не менее, modify
вернет нам обновленное значение для дальнейшего использования.
def modify[B](f: A => (A, B)): F[B] = {
@tailrec
def spin: B = {
val c = ar.get
val (u, b) = f(c)
if (!ar.compareAndSet(c, u)) spin
else b
}
F.delay(spin)
}
Как видите, это практически та же имплементация, что и в примере с AtomicInteger.incrementAndGet
, который я показывал в начале, но только в Scala. Нам четко видно, что для выполнения своей работы Ref
также работает на основе AtomicReference
.
Ref ограничения
Вы, вероятно, уже заметили, что в случае неудачи при обновлении значения функция, переданная update
/ modify
, должна быть запущена недетерминированно (nondeterministically) и, возможно, должна быть запущена несколько раз. Хорошая новость заключается в том, что это решение в целом оказывается намного быстрее, чем стандартный механизм блокировки и синхронизации, и гораздо безопаснее, так как это решение не может быть заблокировано.
Как только мы узнаем, как работает простой Ref
, мы можем перейти к другому классу Cats Concurrent: Deferred
(Отложенный вызов).
Deferred
В отличие от Ref
, Deferred
:
создается «пустым» (отложенный результат выполнения)
может быть выполнен один раз
и после установки его нельзя изменить или снова сделать «пустым».
Эти свойства делают Deferred
простым и в то же время довольно интересным.
abstract class Deferred[F[_], A] {
def get: F[A]
def complete(a: A): F[Unit]
}
Deferred
используется для явной функциональной синхронизации. Когда мы вызываем get
в «пустой» Deferred
мы устанавливаем блокировку до того момента, как значение станет вновь доступно. В соответствии с документацией из самого класса:
Блокировка указана только семантическая, никакие реальные threads (нити) не блокируются имплементацией
Тот же вызов get
«непустого» Deferred
немедленно вернет сохраненное значение.
Другой метод — complete
— заполнит значение, если экземпляр пуст и при вызове «непустого» Deferred
приведет к сбою (неудачная попытка IO).
Здесь важно отметить, что Deferred
требует, чтобы F
было Concurrent
, что означает, что его можно отменить.
Хорошим примером использования Deferred
является ситуация, когда одна часть вашего приложения должна ждать другую.
Пример ниже взят из великолепного выступления Фабио Лабеллы на выставке Scala Italy 2019 — Composable Concurrency with Ref + Deferred available at Vimeo
def consumer(done: Deferred[IO, Unit]) = for {
c <- Consumer.setup
_ <- done.complete(())
msg <- c.read
_ <- IO(println(s"Received $msg"))
} yield ()
def producer(done: Deferred[IO, Unit]) = for {
p <- Producer.setup()
_ <- done.get
msg = "Msg A"
_ <- p.write(msg)
_ <- IO(println(s"Sent $msg"))
} yield ()
def prog = for {
d <- Deferred[IO, Unit]
_ <- consumer(d).start
_ <- producer(d).start
} yield ()
В приведенном выше примере у нас есть producer (производитель) и consumer (потребитель), и мы хотим, чтобы producer ждал, пока consumer setup закончится, прежде чем писать сообщения, в противном случае все, что бы мы ни написали в producer, будет потеряно. Для преодоления этой проблемы мы можем использовать общий экземпляр Deferred
и блокировать get
до тех пор, пока не будет заполнен экземпляр done
Deferred
со стороны consumer (значение в данном случае простая Unit ()
).
Конечно, вышеуказанное решение не обошлось без проблем, когда consumer setup
никогда не прекращался, мы застревали в ожидании, а producer
не мог отправлять сообщения. Чтобы преодолеть это, мы можем использовать таймаут с get
, а также использовать Either[Throwable, Unit]
или какую-либо другую конструкцию вместо простой Unit
внутри нашего объекта Deferred
.
Deferred
довольно прост, но в сочетании с Ref
он может быть использован для построения более сложных структур данных, таких как semaphores (семафоры).
Для получения более подробной информации я рекомендую вам ознакомиться с самой документацией о Cats, где вы можете узнать больше о Cats concurrency и структуре данных, которые она предоставляет.
Узнать подробнее о курсе «Scala-разработчик».
Смотреть открытый вебинар по теме «Эффекты в Scala».