Comments 14
val javaParallelTime = measureTimeMillis {
val res = (1..10).toList()
.parallelStream()
.map { getDataAsync("URL/$it") }
res.forEach { println(it) }
}
Если async, то как же дождаться результата?
Именно, суффикс асинк подразумевает что метод вернул какую-нибудь CompletableFuture, но нету вообще намека на то что это так. Код стал бы куда сложнее для чтения и понимания.
А если там синхронный клиент, то очень легко сделать так чтобы полностью асинхронная версия с корутинами на большем колличестве запросов вырвалась вперед (опять же зависит как клиент настроить, может там в клиенте 1 req/host установлен и тогда хоть 1000 тредов создай, 0 пользы)
В общем тут у автора надо спросить.
Мне кажется быстрее чем async await не выйдет сделать. Или можно потоков кучу создать с синхронным клиентом?
Окей, запрос асинхронно но в итоге тред из CommonPool из ForkJoinPool блочится тогда. Добавляем еще запросов и оп, магический parallel stream перестал скейлиться.
Так что тесты так себе, как и идея сравнивать производительность относительно 3rd-party сервиса в интрнете.
Лучше локально поднять сервис и ввести исскуственную latency
Не, можно конечно указать отдельно пул потоков, но будет ли толк?
А какое тогда более лучшее решение по параллельному получению данных есть?
val javaParallelTime = measureTimeMillis {
val res = (1..10).toList()
.parallelStream()
.map { getDataAsync("$URL/$it") }
res.forEach { println(it) }
}
println("Java parallelSrtream время $javaParallelTime мс")
А как это может работать без блокирования треда?) Ну кроме loom, но тут речи про него вообще не было
Но из-за спортивного интереса да, можно на локальном потестить. Думаю что как будет свободное время — поковыряюсь с этим.
Я просто никак не могу быть согласным что стримы быстрее пусть и на 100мс.
Вот я накидал кейс когда они будут намного медленнее:
Server.kt
Undertow.builder()
.setWorkerThreads(1000)
.addHttpListener(8080, "127.0.0.1", BlockingHandler { exchange ->
Thread.sleep(10000)
exchange.responseSender.send("Hello")
}).build().start()
Client1.kt
val syncClient = HttpClients.custom().setMaxConnTotal(500).setMaxConnPerRoute(500).build()
val time = measureTimeMillis {
(1..1000).toList()
.parallelStream()
.map { syncClient.execute(HttpGet("http://localhost:8080")).close() }
.toList()
}
println(time)
syncClient.close()
Client2.kt
val asyncClient = HttpAsyncClients.custom().setMaxConnTotal(500).setMaxConnPerRoute(500).build().also {
it.start()
}
val time = measureTimeMillis {
coroutineScope {
(1..1000).map { async { asyncClient.execute(HttpGet("http://localhost:8080")) } }
}.awaitAll()
}
println(time)
asyncClient.close()
Корутины (client2) выполнятся за ожидаемые 20секунд(10секунд, 1000запросов, 500 одновременных запросов ограничение клиента, у меня получилось 20730мс если точно), они упираются только в сервер и колличество сокетов.
Java вариант будет выполняться вечность (1000 запросов / 8 тредов * 10с = 20 минут, я даже подожду чтобы получить реальную цифру, подождал – 20 минут), он упрется в CommonPool (параллелизм в моем случае 8, по числу тредов), потому что не нужно использовать Stream для задач с IO, ну не нужно и все. Даже в вашем тесте попробуйте увеличить размер дата-сета который обрабатываете, и Dispatchers.IO в разы лучше справится чем стримы. Стримы в джаве это фреймворк для математических задач, но из-за отсутсвия нормальных коллекций стал использоваться как замена адекватным коллекциям.
К сожалению проблема с выбранным подходом в том что сама загрузка у вас получилось блокирующей, так как в функции вы делаете join()
. Отсюда все приключения с Dispatcher.IO (который нужен только для того чтобы в корутинах выполнять код который может быть заблочен, например на операциях ввода-вывода).
Еще стоит отметить, что создавать http клиент на каждый запрос не самая хорошая идея. Обычно такие вещи шарят чтобы переиспользовать пул соединений.
С учетом вышесказанного, правильный подход был бы таким.
suspend fun getData(httpClient: HttpClient, url: String): String? {
val httpRequest = HttpRequest.newBuilder().uri(URI.create(url)).build()
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
.await().body()
}
val httpClient:HttpClient = HttpClient.newBuilder().build()
val result = measureTimeMillis {
runBlocking {
(1..10).toList().map { async { getData(httpClient, "$URL/$it") } }.awaitAll()
}
}
println("Time for requests: $result")
Может я что-то не так делаю?
Насчет клиента — согласен. Спасибо за подсказку. Насчет глобального подхода — я решил свою задачу, потому как до этого не знал как одновременно получать пачку ответов.
Подключите https://github.com/Kotlin/kotlinx.coroutines/tree/master/integration/kotlinx-coroutines-jdk8. Экстеншен для CompletionStage живёт там
val httpClient:HttpClient = HttpClient.newBuilder().build()
val result = measureTimeMillis {
runBlocking {
val res =(1..10).toList()
.map { async { getMyData(httpClient, "$URL/$it") } }
.awaitAll()
println(res)
}
}
println("Time for requests: $result")
suspend fun getMyData(httpClient: HttpClient, url: String): String? =
suspendCoroutine {
val httpRequest = HttpRequest.newBuilder().uri(URI.create(url)).build()
httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
.thenApply { obj -> it.resume(obj.body()) }
}
Так конечно чуть быстрее. Спасибо за наводку. Не знал что join блокирует. Я джаву почти не знаю, там как то получить можно string из CompletableFuture — сделал через thenApply -> body()
Параллельные запросы в Kotlin для автоматизации сборки данных