Ox, библиотека Scala для безопасного параллелизма и отказоустойчивости в императивном стиле (direct‑style) на JVM, получила новую реализацию параллельной потоковой обработки данных (concurrent streaming). Она позволяет определять конвейеры обработки данных с помощью функционального API, императивного API или сразу обоих вариантов одновременно.
Потоковая обработка данных в Ox была и раньше: предыдущая реализация была основана исключительно на каналах (channels). Хоть она и работала, но все‑таки имела свои недостатки: каждый этап преобразования вводил асинхронную границу. В некоторых ситуациях это может быть неэффективно: если вы оперируете всего лишь несколькими неблокирующими и не требующими больших затрат CPU этапами, такими как .filter
, .mapStateful
или .interleave
, асинхронные границы просто не нужны. Следовательно, такой подход приводил к избыточному параллелизму.
Чтобы улучшить эту ситуацию, мы ввели flow API. Каналы по‑прежнему остаются основным структурным элементом, предлагающим императивный способ определения преобразований данных. Flow‑потоки («flow» — букв. «поток», но чтобы не возникало путаницы с thread»ами и stream»ами, мы будем использовать эту форму) поверх каналов предлагают функциональный API с рядом высокоуровневых комбинаторов, хорошо известных по другим потоковым библиотекам, таким как Akka Streams или fs2.
Давайте же разберемся, как flow‑потоки работают и взаимодействуют с каналами!
Так же как и каналы в Ox были вдохновлены алгоритмом из Kotlin, асинхронные flow‑потоки тоже уже были там реализованы.
Пример flow-потока
Вот небольшой пример flow API в действии, запускаемый с помощью Scala CLI:
//> using dep "com.softwaremill.ox::core:0.5.0"
import ox.flow.Flow
import java.net.URI
@main def capitals(): Unit =
Flow
.fromValues("Poland", "Argentina", "Italy", "Germany",
"Republic of India", "France", "Japan", "United Kingdom",
"Australia", "USA")
.map(_.toLowerCase().replace(" ", "%20"))
.mapPar(3)(country =>
val getCapital = new URI(
s"https://restcountries.com/v3.1/name/$country?fields=capital")
Flow
.fromInputStream(getCapital.toURL().openStream())
.linesUtf8
.take(1)
.runToList()
)
.runForeach(println)
Первая особенность flow‑потоков заключается в том, что они вычисляются лениво. Вызывая методы .map
, .filter
и т. д., мы лишь описываем, как должны быть преобразованы данные. Ничего не будет происходить до фактического запуска flow‑потока.
Второй важный момент заключается в том, что параллелизм скрыт. Вы описываете то, что вам нужно, с помощью предоставленных высокоуровневых методов — нет необходимости использовать каналы напрямую, не говоря уже о других примитивах параллелизма.
Наконец, мы можем использовать эффективные блокирующие операции без ущерба для выразительности и производительности. Это стало возможным благодаря виртуальным потокам (virtual threads) и структурированному параллелизму Java, реализованным в Java 21+ и Ox.
Flow-поток под микроскопом
Чтобы лучше понять, как работают flow‑потоки, давайте рассмотрим комбинаторы (combinators), используемые в примере выше. Во‑первых, у нас есть Flow.fromValues(...)
. Он создает Flow[String]:
описание flow‑потока, которое при запуске выдает некоторое количество строк. В данном случае это конечный список стран, но потенциально flow‑потоки могут быть и бесконечными.
Flow
.fromValues(...)
.map(country => ...)
Затем мы создаем.map для этого flow‑потока, приводя к нижнему регистру имена и делая простенькое URL‑кодирование (не делайте так в продакшене;) ). И снова никакие данные не обрабатываются, поскольку мы еще не запустили flow‑поток. Мы просто получили еще один экземпляр Flow[String]
, который содержит два этапа: один производит значения, а другой преобразует их. Если бы в этот момент был запущен flow‑поток, оба этапа выполнялись бы в вызывающем thread‑потоке без какого‑либо параллелизма.
Flow
.fromValues(...)
.map(country => ...)
.mapPar(3)(country =>
...
)
Все становится еще интереснее, когда мы доходим до .mapPar
. Этот комбинатор производит преобразование (mapping) каждого элемент входящего flow‑потока, выполняя до 3 одновременных вызовов функции преобразования (mapping function). Это полезно, если маппинг‑функция производит какие‑либо эффекты или, в противном случае, блокирует. Так и здесь: мы выполняем HTTP GET‑запрос для каждой страны, чтобы преобразовать название страны в название столицы.
При запуске .mapPar
запускает определенный ранее конвейер (который выдает столицы) в фоновом форке (fork). Любые элементы, выдаваемые конвейером, отправляются в канал, который по умолчанию имеет емкость в 16 элементов (ее можно настроить). Другой фоновый форк получает элементы из этого канала (названия столиц в нижнем регистре) и создает дополнительные форки, которые выполняют HTTP‑запросы. Наконец, в вызывающем thread‑потоке мы собираем ответы, следя за сохранением первоначального порядка.
Весь этот процесс (к счастью) скрыт от конечного пользователя, и в нем используются форки и concurrency scopes Ox. Это означает, что если что‑то пойдет не так, например, HTTP‑запрос выбросит IOException, все будет очищено, и только после этого произойдет распространение исключения. «Очистка» в данном контексте означает прерывание и ожидание завершения всех форков, которые еще выполняются.
Flow
.fromValues(...)
.map(_.toLowerCase().replace(" ", "%20"))
.mapPar(3)(country =>
...
)
.runForeach(println)
Через пару мгновений мы вернемся к реализации функции преобразования, а пока давайте быстро рассмотрим последнюю операцию основного flow‑потока: .runForeach(println)
. Этот метод фактически запускает flow‑поток, блокируя вызывающий thread‑поток до тех пор, пока не будут обработаны все элементы. Только после вызова этого метода начинается обработка данных и происходят любые эффекты, являющиеся частью потока (stream).
Наконец, давайте посмотрим на реализацию функции преобразования стран в столицы:
val getCapital = new URI(s"https://restcountries.com/v3.1/name/$country?fields=capital")
Flow
.fromInputStream(getCapital.toURL().openStream())
.linesUtf8
.take(1)
.runToList()
Скорее для демонстрации, нежели по реальной необходимости, в ней также используется Flow. Сначала мы открываем InputStream
(используя URL Java — опять же, не делайте этого в продакшене, используйте вместо этого sttp‑клиент). Он используется для создания Flow[Chunk[Byte]]
с помощью Flow.fromInputStream
. Этот метод выдает фрагменты байтов, считанные из входного потока, и гарантирует, что InputStream
будет закрыт, независимо от того, завершился ли flow‑поток успешно или с ошибкой.
Затем мы используем вспомогательный метод .linesUtf8
, который разбирает байтовые блоки на строки. Нас интересует только первая строка (так как в ней обычно одна столица), и мы запускаем flow‑поток, собирая все выданные элементы в список с помощью .runToList()
. Здесь не хватает парсинга JSON, так как это стандартный формат ответов, но мы оставим это как упражнение для читателя :).
Доступные комбинаторы
Flow API содержит ряд комбинаторов, хорошо известных любому пользователю библиотеки коллекций Scala — преобразование, фильтрация, сбор и т. д. Есть также некоторые специфические для потока комбинаторы, такие как троттлинг, чередование/перемешивание (interleaving/interspersing), группировка элементов на основе количества элементов или времени, преобразование с учетом состояния и другие.
Аналогично, объект‑компаньон Flow содержит ряд способов удобного создания flow‑потоков, таких как итерация по предоставленным значениям, развертка по функциям или чтение из InputStream
, как описано выше. Если этих способов недостаточно, то для гибкого определения потоков может пригодиться метод .usingEmit
:
import ox.flow.Flow
def isNoon(): Boolean = ???
val intFlow = Flow.usingEmit: emit =>
emit(1)
for i <- 4 to 50 do emit(i)
if isNoon() then emit(42)
Также можно ввести явные асинхронные границы с помощью .async()
. Это может быть полезно, если создание следующего элемента и потребление предыдущего должны выполняться параллельно, или если время обработки потребителя меняется, и производитель должен буферизировать элементы. Под капотом здесь используются каналы.
Императивный API
Как уже говорилось в начале, функциональный API — это только одна сторона поддержки потоковой передачи в Ox. Каналы предлагают императивный API, который обеспечивает максимальную гибкость, цена которой заключается в больше подробности описания. При таком подходе вы можете использовать блокирующие операции .send()
и .receive()
для получения, преобразования и передачи данных на последующие этапы конвейера.
Flow‑потоки и каналы полностью совместимы: вы можете создать flow‑поток из канала и запустить flow‑поток в канал. Как было описано выше, параллелизм внутри этапов flow‑потока реализуется с помощью каналов.
В качестве небольшого примера приведем метод, преобразующий канал строк в канал целых чисел:
import ox.Ox
import ox.channels.Source
import ox.flow.Flow
def transformChannel(ch: Source[String])(using Ox): Source[Int] =
Flow.fromSource(ch)
.mapConcat(_.split(" "))
.mapConcat(_.toIntOption)
.filter(_ % 2 == 0)
.runToChannel()
Обратите внимание, что мы должны запускать этот метод в рамках concurrency scope (используя Ox), поскольку запуск flow‑потока на канал создает фоновый форк, который запускает flow‑поток и отправляет все испускаемые элементы на возвращенный канал. Эта concurrency scope также является областью, в которой обрабатываются ошибки.
Расширение flow API
Вы можете создавать собственные этапы обработки flow‑потока, реализуя пользовательский FlowStage:
trait FlowStage[+T]:
def run(emit: FlowEmit[T]): Unit
trait FlowEmit[-T]:
/** Передает значение для последующей обработки. */
def apply(t: T): Unit
Метод run должен реализовать логику обработки данных с помощью ранее определенного конвейера (доступного в виде экземпляра FlowStage
). Как будет выполняться предыдущий конвейер, синхронно или асинхронно, полностью зависит от реализации этапа flow‑потока.
Ox содержит одно расширение flow API, ориентированное на производителей и потребителей Kafka, доступное как часть модуля kafka. Мы надеемся увидеть гораздо больше комбинаторов, доступных как в рамках основного API, так и в рамках интеграций внутри и за пределами проекта Ox! Если вы хотите принять участие, форум сообщества — отличное отправная точка.
Попробуйте Ox уже сегодня!
Ox лицензирован под Apache2. Исходники доступны на Maven Central. Все, что вам нужно, чтобы начать экспериментировать со структурированным параллелизмом и блокированием потоков данных, — это добавить следующую зависимость в вашу сборку:
// зависимость для sbt
“com.softwaremill.ox”%% “core”% “0.5.0”
// зависимость для scala‑cli
//> using dep com.softwaremill.ox::core:0.5.0
Мы будем благодарны за любые ваши отзывы — как о функционале потоковой передачи, так и о наборе функций Ox в целом. Чего вам не хватает для написания приложений на Scala в императивном стиле? Форум сообщества и проблемы на GitHub — отличное место, чтобы оставлять свои комментарии/багрепорты.
Ближайшие открытые уроки по Scala, которые пройдут в рамках курса Otus:
16 декабря — Эффекты в Scala. Обсудим, что такое эффекты и какие они бывают; узнаем о понятии функционального эффекта и какие задачи они могут решать; реализуем свой функциональный эффект. Записаться
23 декабря — Основы и особенности языка Scala. Попрактикуемся в написании функций и методов, создании иммутабельных конструкций, получим представление о возможностях композиции. Записаться