Как стать автором
Обновить
1123.59
OTUS
Цифровые навыки от ведущих экспертов

5 уроков, которые я извлек для себя, продолжая осваивать ZIO

Время на прочтение8 мин
Количество просмотров3.5K
Автор оригинала: Natan Silnitsky

Всем привет. В преддверии старта курса "Scala-разработчик" подготовили для вас полезный перевод.


Еще в январе 2020 года я написал два поста (I, II) о подводных камнях, с которыми могут столкнуться новички в начале работы с ZIO. Прошло 9 месяцев. На этот период пришелся релиз ZIO 1.0, и среда ZIO значительно улучшилась с внедрением ZLayer.

Как показал мой опыт, писать код с использованием ZIO — одно удовольствие. Особенно в тех случаях, когда приходится иметь дело со сложными требованиями в отношении одновременности/асинхронности, которые гораздо проще реализовать с помощью возможностей, предоставляемых этой библиотекой.

С января я написал порядочное количество кода с использованием ZIO и за это время успел совершить и исправить несколько ошибок. Ниже я расскажу еще о 5 уроках, которые я усвоил, работая с ZIO. Темы будут затронуты очень разные: от оптимальных методов написания рекурсивного кода с помощью ZIO до правильного формулирования тестовых assert-проверок при использовании TestClock.

Фотография Il Vagabiondo на Unsplash

1. Выполнение рекурсии, небезопасной для кучи

Рекурсия широко используется в функциональном программировании. Не являющиеся хвостовыми рекурсивные функции (т. е., такие, где рекурсивный вызов производится не в конце функции) могут приводить к утечке памяти и даже к ошибкам переполнения стека.

Интерпретатор ZIO обеспечивает безопасность стека рекурсий, переключая кадры вызовов со стека JVM на стек, размещенный в куче. Так мы избегаем ошибок переполнения стека, но вероятность утечки памяти все равно сохраняется.

Рассмотрим упрощенный пример кода нашей реализации опроса и обработки сообщений потребителей в Kafka.

Сначала мы попытались использовать рекурсивный вызов (pollLoop) для постоянного опроса. Но несмотря на то, что вызов самой рекурсивной функции идет последним, такая реализация все равно была небезопасна для кучи. И действительно, в ходе стресс-тестов мы выявили утечку сотен мегабайтов памяти за короткий промежуток времени.

def pollLoop(running: Ref[Boolean],
             consumer: Consumer
            ): UIO[Unit] =
  running.get.flatMap {
    case true => for {
      _ <- pollAndHandle(consumer)
      result <- pollLoop(running, consumer)
    } yield result
    case false => ZIO.unit
  }

Проблема заключалась в том, что мы использовали for-выражение, которое было реализовано способом, исключающим хвостовую рекурсию. For-выражение по сути является последовательностью flatMap, за которыми следует итоговый вызов map. Ниже приведен код, где показанное выше for-выражение развернуто в такую последовательность вызовов. Обратите внимание, что последний вызов функции — тождественное отображение map — не является рекурсивным.

def pollLoop(running: Ref[Boolean],
                 consumer: Consumer
                 ): UIO[Unit] =
  running.get.flatMap {
    case true => 
      pollAndHandle(consumer)
        .flatMap(_ => pollLoop(running, consumer))
        .map(result => result)
    case false => ZIO.unit
  }

Мы решили полностью отказаться от использования рекурсивных методов в нашей кодовой базе Greyhound и вместо них применять оператор ZIO doWhile, который гарантированно обеспечивает хвостовую рекурсию эффекта, безопасную для кучи. Для вышеприведенного случая мы изменили рекурсивный метод, чтобы он ограничивался только одной операцией опроса (pollOnce), а рекурсией управлял оператор doWhile:

pollOnce(running, consumer).doWhile(_ == true).forkDaemon

Таким образом, данная реализация pollOnce должна возвращать значение UIO[boolean], от которого зависит, будет ли рекурсия выполняться дальше:

def pollOnce(running: Ref[Boolean],
                 consumer: Consumer
                 ): UIO[Unit] =
  running.get.flatMap {
    case true => for {
      _ <- pollAndHandle(consumer)
    } yield true
    case false => UIO(false)
  }

И действительно, последующие стресс-тесты показали, что утечка памяти исчезла.

2. Использование побочного эффекта одновременно с повторяющимся эффектом

Смешивание «ленивого» и «жадного» кода внутри метода всегда вызывает проблемы (см. урок 2 в части 2), особенно в тех случаях, когда целью является повторение эффекта. Рассмотрим следующий пример кода:

def repeatedlyPublishQuote(stock: Stock) = {
  publishQuote(stock).repeat(Schedule.fixed(1.second))
}

def publishQuote(stock: Stock) = {
  println(s"getLatestQuote for $stock")
  for {
    quote <- grpcClient.getLatestQuote(stock)
    _ <- sendToWebsocket(quote)
  } yield ()
}

Любой побочный эффект не из библиотеки ZIO, выполняемый в методе publishQuote, не будет повторен оператором repeat в repeatedlyPublishQuote. Оператор repeat повторяет только функциональные эффекты.

Функциональные эффекты — это структуры данных, которые могут интерпретироваться и выполняться в среде выполнения ZIO. Однако с побочными эффектами, которые обычно выполняют некоторые операции ввода-вывода и не предоставляют никакой информации для среды выполнения, дела обстоят иначе.

В нашем примере, если вы хотите, чтобы повторяемый эффект постоянно делал записи в журнал, воспользуйтесь функциональным эффектом, таким как console.putStrLn, и поместите его внутрь for-выражения, как показано ниже:

def publishQuote(stock: Stock) = {
  val sendQuote = for {
    _ <- console.putStrLn(s"getLatestQuote for $stock")
    quote <- grpcClient.getLatestQuote(stock)
    _ <- sendToWebsocket(quote)
  } yield ()
  
  sendQuote.catchAll(_ => UIO.unit)
}

Если вам нужно обеспечить повторение эффекта, добавьте оператор catchAll после for-выражения, чтобы гарантировать отсутствие сбоев. В противном случае эффект перестанет повторяться при первой же ошибке.

3. Неумышленное использование TestClock в коде периодической assert-проверки

Допустим, мы хотим протестировать функцию под названием «потребитель сообщения с задержкой», суть которой заключается в том, что потребитель обрабатывает сообщения только после определенной задержки. В нашем тесте будут принимать участие потребитель consumer, для которого установлена задержка в 1 секунду, и производитель, публикующий сообщение.

Обработчик-заглушка messageHandler снабжен счетчиком, который позволяет тесту убедиться (assert), что обработка сообщения действительно произошла. После того как сообщение отправлено, мы с заданной периодичностью проверяем счетчик до тех пор, пока его значение не будет удовлетворять условию.

Примечание. Этот пример написан с использованием specs2, однако тот же принцип работает и для ScalaTest.

Для сокращения времени выполнения теста мы можем воспользоваться великолепной функцией TestClock, которую нам предлагает ZIO, чтобы вручную переводить время на секунду вперед. Однако использовать TestClock следует с особой осторожностью, поскольку последствия могут быть непредсказуемыми.

Рассмотрим реализацию eventuallyZ:

def eventuallyZ[T](f: UIO[T])(predicate: T => Boolean): ZIO[Clock, Throwable, Unit] = {
  f.repeat(Schedule.spaced(100.milliseconds) && Schedule.doUntil(predicate))
    .timeoutFail(new RuntimeException)(4.seconds)
    .unit
}

С помощью Schedule из библиотеки ZIO заданное условие вызывается каждые 100 миллисекунд до тех пор, пока оно не станет истинным или не наступит тайм-аут спустя 4 секунды.

Но здесь и кроется проблема. eventuallyZ использует часы Clock в среде, но без уточнения, какие именно. Необходимо использовать Live Clock, но в нашем примере, поскольку в тесте используется TestClock, eventuallyZ также будет использовать TestClock, а это означает, что фактически циклическая проверка происходить не будет, так как TestClock.adjust не вызывается.

Решением данной проблемы будет указание правильных часов с помощью оператора ZIO provideSomeLayer:

def eventuallyZ[T](f: UIO[T])(predicate: T => Boolean): ZIO[Clock, Throwable, Unit] = {
  f.repeat(Schedule.spaced(100.milliseconds) && Schedule.doUntil(predicate))
    .timeoutFail(new RuntimeException)(4.seconds)
    .provideSomeLayer(Clock.live)
    .unit
}

Теперь эффекты ZIO, выполняемые в области eventuallyZ, используют Live Clock и корректно проверяют условия каждые 100 миллисекунд. Это не влияет на остальной код теста, в котором можно и далее использовать TestClock.

Полный фрагмент кода этого примера можно посмотреть здесь.

4. Не забываем связывать assert-проверки ZIO Test между собой

К вопросу о тестах: у меня также была возможность поработать с замечательной библиотекой ZIO Test. Рассмотрим простой пример: проверка, является ли число положительным и четным:

object NumbersTest extends DefaultRunnableSpec {
  override def spec =
      testM("positive and even") {
        checkAll(Gen.fromIterable(Seq(0, 2, 4, 6))) { number =>
            assert(number)(isPositive)
            assert(number % 2 == 0)(Assertion.isTrue)
          }
      }
}

Как видите, ее можно легко провести на генерируемом потоке значений, который, разумеется, может быть потоком случайных значений для тестирования свойств (например, положительных целых чисел: Gen.anyInt.filter(_ > 0)). Однако в приведенном выше коде есть небольшая проблема. В действительности будет выполняться только проверка isEven, поскольку первый assert не связан со вторым. Таким образом, тест будет пройден, несмотря на то что 0 не является положительным числом.

+ positive and even after additionRan 1 test in 660 ms: 1 succeeded, 0 ignored, 0 failed

Чтобы это исправить, достаточно связать две assert-проверки с помощью оператора &&:

assert(number)(isPositive) && assert(number % 2 == 0 (Assertion.isTrue)

Теперь тест завершается неудачей:

Ran 1 test in 685 ms: 0 succeeded, 0 ignored, 1 failed
- positive and even after addition
Test failed after 1 iteration with input: 0
0 did not satisfy isGreaterThan(0)

5. Прерывание волокна, исходящего из области Managed#Acquire

Волокна (fibers) ZIO — это своего рода строительные блоки, из которых создаются функции для одновременного/асинхронного выполнения, предлагаемые ZIO.

Обычно мы создаем волокна с помощью встроенных операторов, таких как foreachPar.

Но иногда возникает потребность в ручном создании волокон. Для этого предназначены fork и forkDaemon (отделение от родительского волокна).В других же случаях нам нужно прервать созданные нами волокна, когда они перестают выполнять свою функцию (обычно волокно предполагает выполнение повторяющегося эффекта).

И вот здесь важно помнить, что иногда волокна нельзя прервать! Примерами тому являются области ZManaged acquire и release (необходимо контролировать, что ресурсы приобретаются и высвобождаются безопасным образом), а также те случаи, когда вы указываете свойство uninterruptible:

criticalEffect.uninterruptible

В примере ниже область действия Managed.acquire используется для создания нового волокна, которое периодически посылает сигнальное сообщение «heart-beat» некому серверу в целях поддержания связи. Область Managed.release используется для прерывания (по завершении работы приложения).

object Server extends zio.ManagedApp {
  def sendHeartBeat(): URIO[Console with Clock, Unit] =
    console.putStrLn("heart-beat").repeat(Schedule.fixed(1.second)).ignore

  override def run(args: List[String]): ZManaged[zio.ZEnv, Nothing, ExitCode] =
    Managed
      .make(sendHeartBeat().fork)(_.interrupt)
      .map(_ => ExitCode(0))
}

Проблема заключается в том, что данный fork нельзя прервать, поэтому приложение никогда не остановится и будет продолжать посылать сигнальные сообщения!

Решить эту проблему можно, явно указав интерпретатору ZIO, что это волокно должно быть прерываемым:

Managed.make(sendHeartBeat().fork.interruptible)(.interrupt)

Сокращенная запись для этого кода выглядит следующим образом:

sendHeartBeat().toManaged.fork

toManaged_ создает прерываемое волокно, в отличие от Managed.make.

Спасибо за внимание!

Предыдущие посты на эту тему:

Если вы хотите быть в курсе всех моих перипетий работы с ZIO, подписывайтесь на меня в Twitter и Medium. Если что-то показалось непонятным или у вас есть замечания к написанному, можете оставить комментарий ниже.

Узнать подробнее о курсе.

Читать ещё:

Теги:
Хабы:
Всего голосов 7: ↑7 и ↓0+7
Комментарии0

Публикации

Информация

Сайт
otus.ru
Дата регистрации
Дата основания
Численность
101–200 человек
Местоположение
Россия
Представитель
OTUS