Всем привет. В преддверии старта курса "Scala-разработчик" подготовили для вас полезный перевод.
Еще в январе 2020 года я написал два поста (I, II) о подводных камнях, с которыми могут столкнуться новички в начале работы с ZIO. Прошло 9 месяцев. На этот период пришелся релиз ZIO 1.0, и среда ZIO значительно улучшилась с внедрением ZLayer.
Как показал мой опыт, писать код с использованием ZIO — одно удовольствие. Особенно в тех случаях, когда приходится иметь дело со сложными требованиями в отношении одновременности/асинхронности, которые гораздо проще реализовать с помощью возможностей, предоставляемых этой библиотекой.
С января я написал порядочное количество кода с использованием ZIO и за это время успел совершить и исправить несколько ошибок. Ниже я расскажу еще о 5 уроках, которые я усвоил, работая с ZIO. Темы будут затронуты очень разные: от оптимальных методов написания рекурсивного кода с помощью ZIO до правильного формулирования тестовых assert-проверок при использовании TestClock.
Фотография Il Vagabiondo на Unsplash
1. Выполнение рекурсии, небезопасной для кучи
Рекурсия широко используется в функциональном программировании. Не являющиеся хвостовыми рекурсивные функции (т. е., такие, где рекурсивный вызов производится не в конце функции) могут приводить к утечке памяти и даже к ошибкам переполнения стека.
Интерпретатор ZIO обеспечивает безопасность стека рекурсий, переключая кадры вызовов со стека JVM на стек, размещенный в куче. Так мы избегаем ошибок переполнения стека, но вероятность утечки памяти все равно сохраняется.
Рассмотрим упрощенный пример кода нашей реализации опроса и обработки сообщений потребителей в Kafka.
Сначала мы попытались использовать рекурсивный вызов (pollLoop
) для постоянного опроса. Но несмотря на то, что вызов самой рекурсивной функции идет последним, такая реализация все равно была небезопасна для кучи. И действительно, в ходе стресс-тестов мы выявили утечку сотен мегабайтов памяти за короткий промежуток времени.
def pollLoop(running: Ref[Boolean],
consumer: Consumer
): UIO[Unit] =
running.get.flatMap {
case true => for {
_ <- pollAndHandle(consumer)
result <- pollLoop(running, consumer)
} yield result
case false => ZIO.unit
}
Проблема заключалась в том, что мы использовали for-выражение, которое было реализовано способом, исключающим хвостовую рекурсию. For-выражение по сути является последовательностью flatMap
, за которыми следует итоговый вызов map
. Ниже приведен код, где показанное выше for-выражение развернуто в такую последовательность вызовов. Обратите внимание, что последний вызов функции — тождественное отображение map
— не является рекурсивным.
def pollLoop(running: Ref[Boolean],
consumer: Consumer
): UIO[Unit] =
running.get.flatMap {
case true =>
pollAndHandle(consumer)
.flatMap(_ => pollLoop(running, consumer))
.map(result => result)
case false => ZIO.unit
}
Мы решили полностью отказаться от использования рекурсивных методов в нашей кодовой базе Greyhound и вместо них применять оператор ZIO doWhile
, который гарантированно обеспечивает хвостовую рекурсию эффекта, безопасную для кучи. Для вышеприведенного случая мы изменили рекурсивный метод, чтобы он ограничивался только одной операцией опроса (pollOnce
), а рекурсией управлял оператор doWhile
:
pollOnce(running, consumer).doWhile(_ == true).forkDaemon
Таким образом, данная реализация pollOnce
должна возвращать значение UIO[boolean]
, от которого зависит, будет ли рекурсия выполняться дальше:
def pollOnce(running: Ref[Boolean],
consumer: Consumer
): UIO[Unit] =
running.get.flatMap {
case true => for {
_ <- pollAndHandle(consumer)
} yield true
case false => UIO(false)
}
И действительно, последующие стресс-тесты показали, что утечка памяти исчезла.
2. Использование побочного эффекта одновременно с повторяющимся эффектом
Смешивание «ленивого» и «жадного» кода внутри метода всегда вызывает проблемы (см. урок 2 в части 2), особенно в тех случаях, когда целью является повторение эффекта. Рассмотрим следующий пример кода:
def repeatedlyPublishQuote(stock: Stock) = {
publishQuote(stock).repeat(Schedule.fixed(1.second))
}
def publishQuote(stock: Stock) = {
println(s"getLatestQuote for $stock")
for {
quote <- grpcClient.getLatestQuote(stock)
_ <- sendToWebsocket(quote)
} yield ()
}
Любой побочный эффект не из библиотеки ZIO, выполняемый в методе publishQuote
, не будет повторен оператором repeat в repeatedlyPublishQuote
. Оператор repeat повторяет только функциональные эффекты.
Функциональные эффекты — это структуры данных, которые могут интерпретироваться и выполняться в среде выполнения ZIO. Однако с побочными эффектами, которые обычно выполняют некоторые операции ввода-вывода и не предоставляют никакой информации для среды выполнения, дела обстоят иначе.
В нашем примере, если вы хотите, чтобы повторяемый эффект постоянно делал записи в журнал, воспользуйтесь функциональным эффектом, таким как console.putStrLn
, и поместите его внутрь for-выражения, как показано ниже:
def publishQuote(stock: Stock) = {
val sendQuote = for {
_ <- console.putStrLn(s"getLatestQuote for $stock")
quote <- grpcClient.getLatestQuote(stock)
_ <- sendToWebsocket(quote)
} yield ()
sendQuote.catchAll(_ => UIO.unit)
}
Если вам нужно обеспечить повторение эффекта, добавьте оператор catchAll после for-выражения, чтобы гарантировать отсутствие сбоев. В противном случае эффект перестанет повторяться при первой же ошибке.
3. Неумышленное использование TestClock в коде периодической assert-проверки
Допустим, мы хотим протестировать функцию под названием «потребитель сообщения с задержкой», суть которой заключается в том, что потребитель обрабатывает сообщения только после определенной задержки. В нашем тесте будут принимать участие потребитель consumer
, для которого установлена задержка в 1 секунду, и производитель, публикующий сообщение.
Обработчик-заглушка messageHandler
снабжен счетчиком, который позволяет тесту убедиться (assert), что обработка сообщения действительно произошла. После того как сообщение отправлено, мы с заданной периодичностью проверяем счетчик до тех пор, пока его значение не будет удовлетворять условию.
Примечание. Этот пример написан с использованием specs2, однако тот же принцип работает и для ScalaTest.
Для сокращения времени выполнения теста мы можем воспользоваться великолепной функцией TestClock
, которую нам предлагает ZIO, чтобы вручную переводить время на секунду вперед. Однако использовать TestClock
следует с особой осторожностью, поскольку последствия могут быть непредсказуемыми.
Рассмотрим реализацию eventuallyZ
:
def eventuallyZ[T](f: UIO[T])(predicate: T => Boolean): ZIO[Clock, Throwable, Unit] = {
f.repeat(Schedule.spaced(100.milliseconds) && Schedule.doUntil(predicate))
.timeoutFail(new RuntimeException)(4.seconds)
.unit
}
С помощью Schedule
из библиотеки ZIO заданное условие вызывается каждые 100 миллисекунд до тех пор, пока оно не станет истинным или не наступит тайм-аут спустя 4 секунды.
Но здесь и кроется проблема. eventuallyZ
использует часы Clock
в среде, но без уточнения, какие именно. Необходимо использовать Live Clock, но в нашем примере, поскольку в тесте используется TestClock
, eventuallyZ
также будет использовать TestClock
, а это означает, что фактически циклическая проверка происходить не будет, так как TestClock.adjust
не вызывается.
Решением данной проблемы будет указание правильных часов с помощью оператора ZIO provideSomeLayer
:
def eventuallyZ[T](f: UIO[T])(predicate: T => Boolean): ZIO[Clock, Throwable, Unit] = {
f.repeat(Schedule.spaced(100.milliseconds) && Schedule.doUntil(predicate))
.timeoutFail(new RuntimeException)(4.seconds)
.provideSomeLayer(Clock.live)
.unit
}
Теперь эффекты ZIO, выполняемые в области eventuallyZ
, используют Live Clock и корректно проверяют условия каждые 100 миллисекунд. Это не влияет на остальной код теста, в котором можно и далее использовать TestClock
.
Полный фрагмент кода этого примера можно посмотреть здесь.
4. Не забываем связывать assert-проверки ZIO Test между собой
К вопросу о тестах: у меня также была возможность поработать с замечательной библиотекой ZIO Test. Рассмотрим простой пример: проверка, является ли число положительным и четным:
object NumbersTest extends DefaultRunnableSpec {
override def spec =
testM("positive and even") {
checkAll(Gen.fromIterable(Seq(0, 2, 4, 6))) { number =>
assert(number)(isPositive)
assert(number % 2 == 0)(Assertion.isTrue)
}
}
}
Как видите, ее можно легко провести на генерируемом потоке значений, который, разумеется, может быть потоком случайных значений для тестирования свойств (например, положительных целых чисел: Gen.anyInt.filter(_ > 0)
). Однако в приведенном выше коде есть небольшая проблема. В действительности будет выполняться только проверка isEven, поскольку первый assert не связан со вторым. Таким образом, тест будет пройден, несмотря на то что 0 не является положительным числом.
+ positive and even after additionRan 1 test in 660 ms: 1 succeeded, 0 ignored, 0 failed
Чтобы это исправить, достаточно связать две assert-проверки с помощью оператора &&
:
assert(number)(isPositive) && assert(number % 2 == 0 (Assertion.isTrue)
Теперь тест завершается неудачей:
Ran 1 test in 685 ms: 0 succeeded, 0 ignored, 1 failed
- positive and even after addition
Test failed after 1 iteration with input: 0
0 did not satisfy isGreaterThan(0)
5. Прерывание волокна, исходящего из области Managed#Acquire
Волокна (fibers) ZIO — это своего рода строительные блоки, из которых создаются функции для одновременного/асинхронного выполнения, предлагаемые ZIO.
Обычно мы создаем волокна с помощью встроенных операторов, таких как foreachPar
.
Но иногда возникает потребность в ручном создании волокон. Для этого предназначены fork
и forkDaemon
(отделение от родительского волокна).В других же случаях нам нужно прервать созданные нами волокна, когда они перестают выполнять свою функцию (обычно волокно предполагает выполнение повторяющегося эффекта).
И вот здесь важно помнить, что иногда волокна нельзя прервать! Примерами тому являются области ZManaged
acquire
и release
(необходимо контролировать, что ресурсы приобретаются и высвобождаются безопасным образом), а также те случаи, когда вы указываете свойство uninterruptible
:
criticalEffect.uninterruptible
В примере ниже область действия Managed.acquire
используется для создания нового волокна, которое периодически посылает сигнальное сообщение «heart-beat» некому серверу в целях поддержания связи. Область Managed.release
используется для прерывания (по завершении работы приложения).
object Server extends zio.ManagedApp {
def sendHeartBeat(): URIO[Console with Clock, Unit] =
console.putStrLn("heart-beat").repeat(Schedule.fixed(1.second)).ignore
override def run(args: List[String]): ZManaged[zio.ZEnv, Nothing, ExitCode] =
Managed
.make(sendHeartBeat().fork)(_.interrupt)
.map(_ => ExitCode(0))
}
Проблема заключается в том, что данный fork нельзя прервать, поэтому приложение никогда не остановится и будет продолжать посылать сигнальные сообщения!
Решить эту проблему можно, явно указав интерпретатору ZIO, что это волокно должно быть прерываемым:
Managed.make(sendHeartBeat().fork.interruptible)(.interrupt)
Сокращенная запись для этого кода выглядит следующим образом:
sendHeartBeat().toManaged.fork
toManaged_
создает прерываемое волокно, в отличие от Managed.make
.
Спасибо за внимание!
Предыдущие посты на эту тему:
5 ловушек, с которыми сталкиваются новички в начале работы с ZIO
Еще 5 ловушек, с которыми сталкиваются новички в начале работы с ZIO
Если вы хотите быть в курсе всех моих перипетий работы с ZIO, подписывайтесь на меня в Twitter и Medium. Если что-то показалось непонятным или у вас есть замечания к написанному, можете оставить комментарий ниже.