Pull to refresh

Comments 14

val javaParallelTime = measureTimeMillis { 
    val res = (1..10).toList()
        .parallelStream()
        .map { getDataAsync("URL/$it") }
    res.forEach { println(it) }
}

Если async, то как же дождаться результата?

А причем тут async? Или вы про — getDataAsync у автора?

Именно, суффикс асинк подразумевает что метод вернул какую-нибудь CompletableFuture, но нету вообще намека на то что это так. Код стал бы куда сложнее для чтения и понимания.


А если там синхронный клиент, то очень легко сделать так чтобы полностью асинхронная версия с корутинами на большем колличестве запросов вырвалась вперед (опять же зависит как клиент настроить, может там в клиенте 1 req/host установлен и тогда хоть 1000 тредов создай, 0 пользы)

Если я верно понял из приведенного кода, то там HttpClient из 11 джавы, и он запрос посылает асинхронно — httpClient.sendAsync. Может он просто назвал так метод, потому что там в клиенте — sendAsync, а не просто send?
В общем тут у автора надо спросить.
Мне кажется быстрее чем async await не выйдет сделать. Или можно потоков кучу создать с синхронным клиентом?

Окей, запрос асинхронно но в итоге тред из CommonPool из ForkJoinPool блочится тогда. Добавляем еще запросов и оп, магический parallel stream перестал скейлиться.


Так что тесты так себе, как и идея сравнивать производительность относительно 3rd-party сервиса в интрнете.


Лучше локально поднять сервис и ввести исскуственную latency

А почему тред из CommonPool из ForkJoinPool блочится? Разве при указании Dispatchers.IO тредами не рулит корутины, выделяя столько, сколько надо?
Не, можно конечно указать отдельно пул потоков, но будет ли толк?

А какое тогда более лучшее решение по параллельному получению данных есть?
val javaParallelTime = measureTimeMillis { 
    val res = (1..10).toList()
        .parallelStream()
        .map { getDataAsync("$URL/$it") }
    res.forEach { println(it) }
}
println("Java parallelSrtream время $javaParallelTime мс")

А как это может работать без блокирования треда?) Ну кроме loom, но тут речи про него вообще не было

Я конечно могу поднять локальный сервер, например на ktor, но зачем? Мои проблемы код решил, и работает все относительно быстро в сравнении с первым решением или каким-нибудь HttpUrlConnection и без корутин. Да я не разработчик в принципе.

Но из-за спортивного интереса да, можно на локальном потестить. Думаю что как будет свободное время — поковыряюсь с этим.

Я просто никак не могу быть согласным что стримы быстрее пусть и на 100мс.


Вот я накидал кейс когда они будут намного медленнее:


Server.kt


Undertow.builder()
    .setWorkerThreads(1000)
    .addHttpListener(8080, "127.0.0.1", BlockingHandler { exchange ->
        Thread.sleep(10000)
        exchange.responseSender.send("Hello")
    }).build().start()

Client1.kt


val syncClient = HttpClients.custom().setMaxConnTotal(500).setMaxConnPerRoute(500).build()
val time = measureTimeMillis {
    (1..1000).toList()
        .parallelStream()
        .map { syncClient.execute(HttpGet("http://localhost:8080")).close() }
        .toList()
}
println(time)
syncClient.close()

Client2.kt


val asyncClient = HttpAsyncClients.custom().setMaxConnTotal(500).setMaxConnPerRoute(500).build().also {
    it.start()
}
val time = measureTimeMillis {
    coroutineScope {
        (1..1000).map { async { asyncClient.execute(HttpGet("http://localhost:8080")) } }
    }.awaitAll()
}
println(time)
asyncClient.close()

Корутины (client2) выполнятся за ожидаемые 20секунд(10секунд, 1000запросов, 500 одновременных запросов ограничение клиента, у меня получилось 20730мс если точно), они упираются только в сервер и колличество сокетов.


Java вариант будет выполняться вечность (1000 запросов / 8 тредов * 10с = 20 минут, я даже подожду чтобы получить реальную цифру, подождал – 20 минут), он упрется в CommonPool (параллелизм в моем случае 8, по числу тредов), потому что не нужно использовать Stream для задач с IO, ну не нужно и все. Даже в вашем тесте попробуйте увеличить размер дата-сета который обрабатываете, и Dispatchers.IO в разы лучше справится чем стримы. Стримы в джаве это фреймворк для математических задач, но из-за отсутсвия нормальных коллекций стал использоваться как замена адекватным коллекциям.

Так я и не спорю — остановился на async await же. ParallelStream привел для сравнения и своего интереса ради. Кстати, иногда он был таким же по скорости, иногда больше на 30-70 мс. Это простите меня «фигня» по сравнению с удобством обработки ошибок корутин.

К сожалению проблема с выбранным подходом в том что сама загрузка у вас получилось блокирующей, так как в функции вы делаете join(). Отсюда все приключения с Dispatcher.IO (который нужен только для того чтобы в корутинах выполнять код который может быть заблочен, например на операциях ввода-вывода).


Еще стоит отметить, что создавать http клиент на каждый запрос не самая хорошая идея. Обычно такие вещи шарят чтобы переиспользовать пул соединений.


С учетом вышесказанного, правильный подход был бы таким.


suspend fun getData(httpClient: HttpClient, url: String): String? {
    val httpRequest = HttpRequest.newBuilder().uri(URI.create(url)).build()
    return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
           .await().body()
}

val httpClient:HttpClient = HttpClient.newBuilder().build()
val result = measureTimeMillis {
    runBlocking {
        (1..10).toList().map { async { getData(httpClient, "$URL/$it") } }.awaitAll()
    }
}
println("Time for requests: $result")
Взял Ваш код. .await().body() — нет await метода там. По крайней мере IDEA показывает ошибку — prnt.sc/z52n9b
Может я что-то не так делаю?

Насчет клиента — согласен. Спасибо за подсказку. Насчет глобального подхода — я решил свою задачу, потому как до этого не знал как одновременно получать пачку ответов.
Да, я понял о чем Вы. Сделал по-другому просто для теста, через suspend Coroutine

val httpClient:HttpClient = HttpClient.newBuilder().build()
val result = measureTimeMillis {
    runBlocking {
        val res =(1..10).toList()
            .map { async { getMyData(httpClient, "$URL/$it") } }
            .awaitAll()
        println(res)
    }
}
println("Time for requests: $result")

suspend fun getMyData(httpClient: HttpClient, url: String): String? = 
    suspendCoroutine {
        val httpRequest = HttpRequest.newBuilder().uri(URI.create(url)).build()
        httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
            .thenApply { obj -> it.resume(obj.body()) }
}


Так конечно чуть быстрее. Спасибо за наводку. Не знал что join блокирует. Я джаву почти не знаю, там как то получить можно string из CompletableFuture — сделал через thenApply -> body()
Sign up to leave a comment.

Articles