Как стать автором
Обновить
1294.25
OTUS
Цифровые навыки от ведущих экспертов

Основы Cat Concurrency с Ref и Deferred

Время на прочтение7 мин
Количество просмотров4.6K
Автор оригинала: Krzysztof Grajek

Параллельный доступ и ссылочная прозрачность

Для будущих учащихся на курсе «Scala-разработчик» приготовили перевод материала.

Приглашаем также на вебинар по теме
«Эффекты в Scala». На занятии рассмотрим понятие эффекта и сложности, которые могут возникать при их наличии. Также введем понятие функционального эффекта, рассмотрим его свойства и реализуем свой небольшой функциональный эффект. Присоединяйтесь.


*Concurrency — конкурентность, допускающая одновременное выполнение нескольких вычислительных процессов.

Ref и Deferred являются основными строительными блоками в FP, используемыми параллельно, в манере concurrent. Особенно при использовании c tagless final (неразмеченной конечной) абстракцией, эти два блока, при построении бизнес-логики, могут дать нам и то, и другое: параллельный доступ (concurrent access) и ссылочную прозрачность (referential transparency), и мы можем использовать их для построения более продвинутых структур, таких как counters (счетчики) и state machines (конечные автоматы).

Перед тем, как мы углубимся в Ref и Deferred, нам полезно узнать, что concurrency в Cats строится на Java AtomicReference, и здесь мы и начнем наше путешествие.

Atomic Reference

AtomicReference — это один из элементов пакета java.util.concurrent.atomic. В Oracle docs мы можем прочитать, что java.util.concurrent.atomic — это:

Небольшой инструментарий классов, поддерживающих потокобезопасное программирование «без блоков» с одиночными переменными. По сути, классы в данном пакете расширяют понятие volatile значений, полей и элементов массива до тех, которые также обеспечивают условную операцию atomic обновления…

Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong, и AtomicReference обеспечивают доступ и обновление от одиночных переменных к соответствующему типу (функционального блока).

AtomicReference с нами начиная с Java 1.5 и используется для получения лучшей производительности, чем синхронизации (хотя это не всегда так).

Когда вам приходится совместно использовать некоторые данные между нитями (threads), вы должны защитить доступ к этой части данных. Самым простым примером будет увеличение некоторого количества int: i = i + 1. Наш пример состоит из фактически 3 операций, сначала мы читаем значение i , затем добавляем 1 к этому значению, а в конце снова присваиваем вычисленное значение i . В отношении многопоточных приложений, мы можем столкнуться с ситуацией, когда каждый thread будет выполнять эти 3 шага между шагами другого thread, а конечное значение i предсказать не удастся.

Обычно в вашей голове появляется слово synchronised или механизм класса lock, но с atomic.* вам больше не нужно беспокоиться о явной синхронизации, и вы можете перейти на предоставленные atomic (атомарные) типы утилит, где проверка выполнения операции в один шаг включается автоматически.

Давайте, возьмем для примера AtomicInteger.incrementAndGet:

/**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

С помощью операции compareAndSet мы либо обновляем наши данные, либо терпим неудачу, но никогда не заставляем thread ждать. Таким образом, если операция compareAndSet  в incrementAndGet не удаётся, мы просто пытаемся повторить всю операцию заново, извлекая текущее значение наших данных с помощью функции get() в начале. С другой стороны, при использовании синхронизированных механизмов нет ограничений на количество операторов (statement), которые вы хотите «выполнить» во время блокировки, но этот блок никогда не выйдет из строя и может заставить вызывающий thread ждать, предоставляя возможность заблокировать или снизить производительность.

Теперь, зная определенные основы, давайте перейдем к нашей первой мега-звезде concurrency.

Ref

Ref в Cats очень похож на упомянутую выше atomic (атомарную) ссылку Java. Основные отличия заключаются в том, что Ref используется с tagless final абстракцией F . Он всегда содержит значение, а значение, содержащееся в Ref — типа A, всегда является неизменным (immutable).

abstract class Ref[F[_], A] {
  def get: F[A]
  def set(a: A): F[Unit]
  def modify[B](f: A => (A, B)): F[B]
  // ... and more
}

Ref[F[_], A] — это функциональная изменяемая (mutable) ссылка:

  • Concurrent ( конкурентная)

  • Lock free ( “без блоков”)

  • Всегда содержит значение

Она создается путем предоставления начального значения, и каждая операция осуществляется в
F, например, cats.effect.IO.

Если мы внимательно посмотрим на сопутствующий объект для Cats Ref, мы увидим, что наша F должна соответствовать некому требованию, а именно быть Sync.

def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))

Вышеприведенный метод является лишь примером многих операций, доступных на нашем Ref; он используется для построения Ref с исходным значением. 

Sync дает нам возможность приостанавливать любые побочные эффекты с помощью метода
delay для каждой операции на Ref.

Ref — довольно простая конструкция, мы можем сосредоточиться в основном на ее  get, set и of чтобы понять, как она работает.

Метод get and set 

Допустим, у нас есть объект (для этого блога мы назовем его Shared), который нужно обновить несколькими threads, и мы используем для этого наши методы get и set , создавая утилитный метод, который поможет нам в дальнейшем:

def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
	for {
		sh <- trace.get()
		_ <- trace.set(Shared(sh, msg))
	} yield ()
}

Наш Shared объект может быть построен путем использования его предыдущего состояния и нового значения для создания нового экземпляра — Shared, который может быть на самом деле всем, что мы хотим — простым списком, картой или чем угодно, к чему мы хотим получить одновременный безопасный доступ. 

Я только что создал Shared(prev: Shared, msg: String) для данной статьи.

В нашем примере выше F был заменён конкретным IO из Cats Effect, но имейте в виду, что Ref является полиморфным в F и может быть использован с другими библиотеками.

С помощью monadic (монадический) IO мы применяем функцию flatMap на каждом шаге и устанавливаем значение, сохраненное в нашем Ref на желаемое значение — или... подождите, может быть, мы этого не делаем.

При таком подходе, когда modifyShared будет вызываться одновременно, и мы можем потерять обновления! Это происходит потому, что мы можем столкнуться с ситуацией, когда, например, двое threads могут прочитать значение с помощью get и каждый из них будет выполнять set одновременно. Методы get и set не вызываются атомарно (atomically) вместе.

Atomic (атомарный) update

Конечно, мы можем улучшить приведенный выше пример и использовать другие доступные методы из Ref. Для совместной реализации get и set мы можем использовать update.

def update(f: A => A): F[Unit] 

Это решит нашу проблему с обновлением значения, однако update имеет свои недостатки. Если мы захотим обратиться к переменной сразу после обновления, аналогично тому, как мы использовали get и set , мы можем в итоге получить устаревшие данные, допустим, наш Ref будет содержать ссылку на Int:

for {
		_ <- someRef.update(_ + 1)
		curr <- someRef.get
		_ <- IO { println(s"current value is $curr")}
	} yield ()

Нас спасет modify 

Мы можем немного улучшить вышеупомянутую ситуацию, используя modify , которая будет делать то же самое, что и update , но тем не менее,  modify вернет нам обновленное значение для дальнейшего использования.

def modify[B](f: A => (A, B)): F[B] = {
      @tailrec
      def spin: B = {
        val c = ar.get
        val (u, b) = f(c)
        if (!ar.compareAndSet(c, u)) spin
        else b
      }
      F.delay(spin)
    }

Как видите, это практически та же имплементация, что и в примере с AtomicInteger.incrementAndGet, который я показывал в начале, но только в Scala. Нам четко видно, что для выполнения своей работы Ref также работает на основе AtomicReference .

Ref ограничения

Вы, вероятно, уже заметили, что в случае неудачи при обновлении значения функция, переданная update/ modify, должна быть запущена недетерминированно (nondeterministically) и, возможно, должна быть запущена несколько раз. Хорошая новость заключается в том, что это решение в целом оказывается намного быстрее, чем стандартный механизм блокировки и синхронизации, и гораздо безопаснее, так как это решение не может быть заблокировано.

Как только мы узнаем, как работает простой Ref, мы можем перейти к другому классу Cats Concurrent: Deferred (Отложенный вызов).

Deferred

В отличие от Ref, Deferred:

  • создается «пустым» (отложенный результат выполнения)

  • может быть выполнен один раз

  • и после установки его нельзя изменить или снова сделать «пустым».

Эти свойства делают Deferred простым и в то же время довольно интересным.

abstract class Deferred[F[_], A] {
  def get: F[A]
  def complete(a: A): F[Unit]
}

Deferred используется для явной функциональной синхронизации. Когда мы вызываем get в «пустой» Deferred мы устанавливаем блокировку до того момента, как значение станет вновь доступно. В соответствии с документацией из самого класса

  • Блокировка указана только семантическая, никакие реальные threads (нити) не блокируются имплементацией

Тот же вызов get «непустого» Deferred немедленно вернет сохраненное значение.

Другой метод — complete — заполнит значение, если экземпляр пуст и при вызове «непустого» Deferred приведет к сбою (неудачная попытка IO).

Здесь важно отметить, что Deferred требует, чтобы F было Concurrent, что означает, что его можно отменить.

Хорошим примером использования Deferred является ситуация, когда одна часть вашего приложения должна ждать другую. 

Пример ниже взят из великолепного выступления Фабио Лабеллы на выставке Scala Italy 2019 — Composable Concurrency with Ref + Deferred available at Vimeo

def consumer(done: Deferred[IO, Unit]) = for {
	c <- Consumer.setup
	_ <- done.complete(())
	msg <- c.read
	_ <- IO(println(s"Received $msg"))
} yield ()

def producer(done: Deferred[IO, Unit]) = for {
	p <- Producer.setup()
	_ <- done.get
	msg = "Msg A"
	_ <- p.write(msg)
	_ <- IO(println(s"Sent $msg"))
} yield ()

def prog = for {
  d <- Deferred[IO, Unit]
  _ <- consumer(d).start
  _ <- producer(d).start
} yield ()

В приведенном выше примере у нас есть producer (производитель) и consumer (потребитель), и мы хотим, чтобы producer ждал, пока consumer setup закончится, прежде чем писать сообщения, в противном случае все, что бы мы ни написали в producer, будет потеряно. Для преодоления этой проблемы мы можем использовать общий экземпляр Deferred и блокировать get до тех пор, пока не будет заполнен экземпляр done Deferred со стороны consumer (значение в данном случае простая Unit () ).

Конечно, вышеуказанное решение не обошлось без проблем, когда consumer setup никогда не прекращался, мы застревали в ожидании, а producer не мог отправлять сообщения. Чтобы преодолеть это, мы можем использовать таймаут с get , а также использовать Either[Throwable, Unit] или какую-либо другую конструкцию вместо простой Unit внутри нашего объекта Deferred.

Deferred довольно прост, но в сочетании с Ref он может быть использован для построения более сложных структур данных, таких как semaphores (семафоры).

Для получения более подробной информации я рекомендую вам ознакомиться с самой документацией о Cats, где вы можете узнать больше о Cats concurrency и структуре данных, которые она предоставляет.


Узнать подробнее о курсе «Scala-разработчик».

Смотреть открытый вебинар по теме «Эффекты в Scala».

Теги:
Хабы:
Всего голосов 9: ↑7 и ↓2+7
Комментарии0

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS