company_banner

Reactor, WebFlux, Kotlin Coroutines, или Асинхронность на простом примере

  • Tutorial


Многие сервисы в современном мире, по большей части, «ничего не делают». Их задачи сводятся к запросам к другим базам/сервисам/кешам и агрегации всех этих данных по различным правилам и разнообразной бизнес-логике. Поэтому неудивительно, что появляются такие языки, как Golang, с удобной встроенной конкурентной системой, позволяющей легко организовывать неблокирующий код.


В JVM-мире всё немного сложнее. Есть огромное количество фреймворков и библиотек, блокирующих потоки при использовании. Так и сама stdlib может делать то же самое порой. Да и в Java нет аналогичного механизма, похожего на горутины в Golang.


Тем не менее, JVM активно развивается и появляются новые интересные возможности. Есть Kotlin с корутинами, которые по своему использованию очень похожи на горутины из Golang (хоть и реализованы совершенно по-другому). Есть JEP Loom, который в будущем привнесёт fibers в JVM. Один из самых популярных веб-фреймворков — Spring — не так давно добавил возможность создавать полностью неблокирующие сервисы на Webflux. А с недавним релизом Spring boot 2.2 интеграция с Kotlin стала ещё лучше.


Предлагаю на примере небольшого сервиса по переводу денег с одной карты на другую самим написать приложение на Spring boot 2.2 и Kotlin для интеграции с несколькими внешними сервисами.


Хорошо, если вы уже знакомы с Java, Kotlin, Gradle, Spring, Spring boot 2, Reactor, Webflux, Tomcat, Netty, Kotlin Сoroutines, Gradle Kotlin DSL или даже имеете степень доктора наук. Но если нет — не беда. Код будет максимально упрощён, и даже если вы не из мира JVM, надеюсь, вам будет всё понятно.


Если вы планируете сами написать сервис, убедитесь, что всё необходимое установлено:


  • Java 8+;
  • Docker и Docker Compose;
  • cURL и желательно jq;
  • Git;
  • желательно IDE для Kotlin (Intellij Idea, Eclipse, VS, vim и т.п.). Но можно и в блокноте.

Примеры будут содержать как заготовки под реализацию в сервисе, так и уже написанную реализацию. Сначала запустим установку и сборку и подробнее рассмотрим сервисы и их API.


Сам пример сервисов и API сделан лишь для наглядности, не стоит переносить к себе в прод всё AS IS!

Сперва клонируем к себе репозиторий с сервисами, интеграцию с которыми будем делать, и переходим в директорию:


git clone https://github.com/evgzakharov/spring-demo-services && cd spring-demo-services

В отдельном терминале собираем все приложения с помощью gradle, где после успешной сборки все сервисы запустятся с помощью docker-compose.


./gradlew build && docker-compose up

Пока всё скачивается и устанавливается, рассмотрим проект с сервисами.



На вход сервиса (Demo service) будет поступать запрос с токеном, номерами карт для перевода и суммой, которую будем переводить между картами:


{
     "authToken": "auth-token1",
     "cardFrom": "55593478",
     "cardTo": "55592020",
     "amount": "10.1"
}

По токену authToken необходимо сходить в сервис AUTH и получить userId, с которым потом можно сделать запрос к USER и вытянуть всю дополнительную информацию по пользователю. AUTH также будет возвращать нам информацию о том, к каким из трёх сервисов мы можем получить доступ. Пример ответа от AUTH:


{
     "userId": 158,
     "cardAccess": true,
     "paymentAccess": true,
     "userAccess": true
}

Для перевода между картами сначала идём с каждым номером карты в CARD. В ответ на запросы мы получим cardId, дальше с ними отправляем запрос в PAYMENT и делаем перевод. И последнее — ещё раз отправляем запрос в PAYMENT с fromCardId и узнаём текущий баланс.


Чтобы эмулировать небольшую задержку в сервисах, во всех контейнерах пробрасывается значение переменной окружения TIMEOUT, в которой в миллисекундах задаётся задержка на ответ. И чтобы разнообразить ответы от AUTH, есть возможность варьировать значение SUCCESS_RATE, которое управляет вероятностью ответа true для сервиса.


Файл docker-compose.yaml:


version: '3'
services:
  service-auth:
    build: service-auth
    image: service-auth:1.0.0
    environment:
      - SUCCESS_RATE=1.0
      - TIMEOUT=100
    ports:
      - "8081:8080"
  service-card:
    build: service-card
    image: service-card:1.0.0
    environment:
      - TIMEOUT=100
    ports:
      - "8082:8080"
  service-payment:
    build: service-payment
    image: service-payment:1.0.0
    environment:
      - TIMEOUT=100
    ports:
      - "8083:8080"
  service-user:
    build: service-user
    image: service-user:1.0.0
    environment:
      - TIMEOUT=100
    ports:
      - "8084:8080"

Для всех сервисов делается проброс портов с 8081 по 8084, чтобы легко достучаться до них напрямую.


Перейдём к написанию Demo service. Сперва попробуем написать реализацию максимально «топорно», без асинхронности и параллелизма. Для этого возьмём Spring boot 2.2.1, Kotlin и заготовку для сервиса. Клонируем репозиторий и переходим в ветку spring-mvc-start:


git clone https://github.com/evgzakharov/demo-service && cd demo-service && git checkout spring-mvc-start 

Переходим в файл demo.Controller. В нём есть единственный пустой метод processRequest, реализацию для которого необходимо написать.


  @PostMapping
   fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { .. }

На вход в метод будет поступать запрос на перевод между картами.


data class ServiceRequest(
    val authToken: String,
    val cardFrom: String,
    val cardTo: String,
    val amount: BigDecimal
)

Для тех, кто не близко знаком с Spring

В Spring есть встроенный DI, который работает на основе аннотаций. DemoController помечен специальной аннотацией RestController: она, помимо регистрации бина в DI, добавляет также его обработку как контроллера. PostProcessor находит все методы, помеченные аннотацией PostMapping, и добавляет их в качестве endpoint у сервиса с методом POST.


Обработчик также создаёт proxy-класс для DemoController, в котором в метод processRequest передаются все необходимые аргументы. В нашем случае это всего один аргумент, помеченный аннотацией @RequestBody. Поэтому в proxy данный метод будет вызываться с содержимым JSON, десериализованым в класс ServiceRequest.


Чтобы было проще, все методы по интеграции с другими сервисами уже сделаны, нужно только их правильно соединить. Методов всего пять, по одному под каждое действие. Сами вызовы других сервисов реализованы на блокирующем вызове RestTemplate из Spring.


Пример метода для вызова AUTH:


private fun getAuthInfo(token: String): AuthInfo {
   log.info("getAuthInfo")

   return restTemplate.getForEntity("${demoConfig.auth}/{token}", AuthInfo::class.java, token)
       .body ?: throw RuntimeException("couldn't find user by token='$token'")
}

Перейдём к реализации метода. В комментариях отмечен порядок действий и какой ответ ожидается на выходе:


  @PostMapping
   fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response {
       //1) get auth info from service by token -> userId

       //2) find user info by userId from 1.

       //3) 4) find cards info for each card in serviceRequest

       // 5) make transaction for known cards by calling sendMoney(id1, id2, amount)

       // 6) after payment get payment info by fromCardId

       TODO("return SuccessResponse")
//        SuccessResponse(
//            amount = ,
//            userName = ,
//            userSurname = ,
//            userAge =
//        )
   }

Сначала реализуем метод максимально просто, без учёта, что AUTH может нам запретить доступ к другим сервисам. Попробуйте это сделать самостоятельно. Когда получится (или после перехода в ветку spring-mvc), вы можете проверить работу сервиса следующим образом:


реализация из ветки spring-mvc
fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response {
   val authInfo = getAuthInfo(serviceRequest.authToken)

   val userInfo = findUser(authInfo.userId)

   val cardFromInfo = findCardInfo(serviceRequest.cardFrom)
   val cardToInfo = findCardInfo(serviceRequest.cardTo)

   sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount)

   val paymentInfo = getPaymentInfo(cardFromInfo.cardId)

   return SuccessResponse(
       amount = paymentInfo.currentAmount,
       userName = userInfo.name,
       userSurname = userInfo.surname,
       userAge = userInfo.age
   )
}

Запускаем сервис (из папки demo-service):


./gradlew bootRun

Отправляем запрос на endpoint:


./demo-request.sh

В ответ получаем что-то подобное:


➜  demo-service git:(spring-mvc) ✗ ./demo-request.sh
+ curl -XPOST http://localhost:8080/ -d @demo-payment-request.json -H 'Content-Type: application/json; charset=UTF-8'
+ jq .
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   182    0    85  100    97     20     23  0:00:04  0:00:04 --:--:--    23
{
  "amount": 989.9,
  "userName": "Vasia",
  "userSurname": "Pupkin",
  "userAge": 18,
  "status": true
}

В общей сумме нужно сделать 6 запросов, чтобы реализовать работу сервиса. И учитывая, что каждый из них отвечает с задержкой в 100 мс, общее время не может быть меньше 600 мс. В реальности получается примерно 700 мс с учётом всех накладных расходов. Пока код совсем простой, и если мы сейчас захотим добавить проверку ответа AUTH для доступа к другим сервисам, то это будет несложно сделать (как и любой другой рефакторинг).


Но давайте подумаем, как можно ускорить выполнение запросов. Если не учитывать проверки ответа от AUTH, то у нас есть 2 независимые задачи:


  • получение userId и запрос данных из USER;
  • получение cardId для каждой из карт, проведение платежа и получение итоговой суммы.

Эти задачи могут выполняться независимо друг от друга. Тогда суммарное время выполнения будет зависеть от наиболее длинной цепочки вызовов (в данном случае второй) и будет суммарно выполняться за время 300 мс + X мс на накладные расходы.


Учитывая, что сами вызовы у нас блокирующие, то единственный способ выполнить параллельные запросы — запускать их на отдельных потоках. Можно под каждый вызов создать отдельный Thread, но это будет очень накладно. Другой способ — запуск задач на ThreadPool. С первого взгляда такое решение выглядит подходящим, и время действительно уменьшится. Например, мы можем выполнять запросы на CompletableFuture. Он позволяет запускать фоновые задачи, вызывая методы с постфиксом async. И если при вызове методов не указывать конкретный ThreadPool, задачи будут запускаться на ForkJoinPool.commonPool(). Попробуйте сами написать реализацию или перейдите в ветку spring-mvc-async.


Реализация из ветки spring-mvc-async
fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response {
   val authInfoFuture = CompletableFuture.supplyAsync {  getAuthInfo(serviceRequest.authToken) }
   val userInfoFuture = authInfoFuture.thenApplyAsync { findUser(it.userId) }

   val cardFromInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardFrom) }
   val cardToInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardTo) }

   val waitAll = CompletableFuture.allOf(cardFromInfo, cardToInfo)

   val paymentInfoFuture = waitAll
       .thenApplyAsync {
           sendMoney(cardFromInfo.get().cardId, cardToInfo.get().cardId, serviceRequest.amount)
       }
       .thenApplyAsync {
           getPaymentInfo(cardFromInfo.get().cardId)
       }

   val paymentInfo = paymentInfoFuture.get()
   val userInfo = userInfoFuture.get()

   log.info("result")

   return SuccessResponse(
       amount = paymentInfo.currentAmount,
       userName = userInfo.name,
       userSurname = userInfo.surname,
       userAge = userInfo.age
   )
}

Если сейчас измерить время запроса, оно будет в районе 360 мс. По сравнению с первоначальным вариантом, суммарное время уменьшилось почти в 2 раза. Сам код немного усложнился, но пока его всё так же несложно видоизменять. И если мы тут захотим добавить проверку ответа от AUTH, то и это сделать несложно.


Но что если у нас большое количество входящих запросов на сам сервис? Скажем, около 1000 одновременных запросов? При таком подходе довольно быстро получится, что все потоки ThreadPool заняты выполнением блокирующих вызовов. И мы приходим к тому, что текущий вариант тоже не устраивает.


Остаётся только что-то сделать с самими вызовами сервисов. Можно изменить запросы и сделать их неблокирующими. Тогда методы по вызову сервисов будут возвращать CompletableFuture, Flux, Observable, Deferred, Promise или аналогичный объект, на котором можно построить цепочку ожиданий. При таком подходе нам не нужно делать вызовы на отдельных потоках — достаточно будет одного (или по крайней мере маленького отдельного пула потоков), который мы уже заняли под обработку запросов.


Сможем ли мы теперь выдерживать на сервисе большую нагрузку? Чтобы ответить на этот вопрос, внимательно посмотрим на Tomcat, который используется в Spring boot 2.2.1 в стартере org.springframework.boot:spring-boot-starter-web. Он построен так, что под каждый входящий запрос выделяется поток из ThreadPool на его обработку. И при отсутствии свободных потоков новые запросы будут становиться «в очередь» ожидания. Но сам наш сервис только лишь рассылает запросы в другие сервисы. Выделять под это целый поток и блокировать его, пока не придут ответы от всех, выглядит, мягко говоря, излишним.


К счастью, недавно в Spring появилась возможность использовать неблокирующий веб-сервер на базе Netty или Undertow. Для этого потребуется только сменить стартер spring-boot-starter-web на spring-boot-starter-webflux и немного изменить метод для обработки запросов, в котором запрос и ответ будут «обёрнуты» в Mono. Это связано с тем, что Webflux построен на основе Reactor, и поэтому теперь в методе нужно построить цепочку из преобразований Mono.

Попробуйте написать самостоятельно неблокирующую реализацию метода. Для этого перейдите в ветку spring-webflux-start. Обратите внимание, что изменился стартер для Spring Boot, где теперь используется версия с Webflux, и также изменилась реализация запросов к другим сервисам, которые переписаны на использование неблокирующего WebClient.


Пример метода для вызова AUTH:


private fun getAuthInfo(token: String): Mono<AuthInfo> {
        log.info("getAuthInfo")

        return WebClient.create().get()
            .uri("${demoConfig.auth}/$token")
            .retrieve()
            .bodyToMono(AuthInfo::class.java)
}

В содержимое метода processRequest в комментарии вставлена реализация из первого примера. Попробуйте её самостоятельно переписать на Reactor. Как и в прошлый раз, вначале сделайте версию без учёта проверок от AUTH, а потом посмотрите, насколько сложно их добавить:


fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> {
//        val authInfo = getAuthInfo(serviceRequest.authToken)
//
//        val userInfo = findUser(authInfo.userId)
//
//        val cardFromInfo = findCardInfo(serviceRequest.cardFrom)
//        val cardToInfo = findCardInfo(serviceRequest.cardTo)
//
//        sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount)
//
//        val paymentInfo = getPaymentInfo(cardFromInfo.cardId)
//
//        log.info("result")
//
//        return SuccessResponse(
//            amount = paymentInfo.currentAmount,
//            userName = userInfo.name,
//            userSurname = userInfo.surname,
//            userAge = userInfo.age
//        )

       TODO()
   }

После того как справились с этим, можете сравнить с моей реализацией из ветки spring-webflux:


Реализация из ветки spring-webflux
fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> {
   val cacheRequest = serviceRequest.cache()
       .publishOn(Schedulers.parallel())

   val userInfoMono = cacheRequest.flatMap {
       getAuthInfo(it.authToken)
   }.flatMap {
       findUser(it.userId)
   }

   val cardFromInfoMono = cacheRequest.flatMap { findCardInfo(it.cardFrom) }
   val cardToInfoMono = cacheRequest.flatMap { findCardInfo(it.cardTo) }

   val paymentInfoMono = cardFromInfoMono.zipWith(cardToInfoMono)
       .flatMap { (cardFromInfo, cardToInfo) ->
           cacheRequest.flatMap { request ->
               sendMoney(cardFromInfo.cardId, cardToInfo.cardId, request.amount).map { cardFromInfo }
           }
       }.flatMap {
           getPaymentInfo(it.cardId)
       }

   return userInfoMono.zipWith(paymentInfoMono)
       .map { (userInfo, paymentInfo) ->
           log.info("result")

           SuccessResponse(
               amount = paymentInfo.currentAmount,
               userName = userInfo.name,
               userSurname = userInfo.surname,
               userAge = userInfo.age
           )
       }
}

Согласитесь, что теперь написать реализацию (по сравнению с предыдущим блокирующим подходом) стало сложней. И если мы захотим добавить «забытые» проверки от AUTH, то это будет не так просто сделать.


В этом вся суть реактивного подхода. Он прекрасно подходит для построения неразветвлённых цепочек обработок. Но если появляется ветвление, то код становится уже не таким простым.


Помочь тут могут корутины из Kotlin, которые прекрасно дружат с любым асинхронным/реактивным кодом. К тому же существует большое количество написанных обёрток для Reactor, CompletableFuture и т.п. Но даже если вы не найдёте нужную, её всегда можно написать самостоятельно, используя специальные билдеры.


Давайте самостоятельно перепишем реализацию на корутины. Для этого перейдём в ветку spring-webflux-coroutines-start. В ней в build.gradle.kts добавляются нужные зависимости:


implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion")

И немного меняется метод processRequest:


suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope {
     //TODO()
   }

Он больше не нуждается в Mono и преобразуется просто в suspend-функцию (спасибо интеграции Spring и Kotlin). Учитывая, что в методе мы будем создавать дополнительные корутины, нам потребуется создать дочерний скоуп coroutineScope (для понимания причин создания дополнительного скоупа посмотрите пост Романа Елизарова о Structured concurrency). Обратите внимание, что другие вызовы сервисов совсем не изменились. Они возвращают всё тот же Mono, на котором можно вызвать suspend-метод awaitFirst, чтобы «дождаться» результата выполнения запроса.


Если корутины для вас ещё новая концепция, то есть замечательный гайд c подробным описанием. Попробуйте самостоятельно написать реализацию метода processRequest или перейдите в ветку spring-webflux-coroutines:


реализация из ветки spring-webflux-coroutines
suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope {
   log.info("start")

   val userInfoDeferred = async {
       val authInfo = getAuthInfo(serviceRequest.authToken).awaitFirst()
       findUser(authInfo.userId).awaitFirst()
   }

   val paymentInfoDeferred = async {
       val cardFromInfoDeferred = async { findCardInfo(serviceRequest.cardFrom).awaitFirst() }
       val cardToInfoDeferred = async { findCardInfo(serviceRequest.cardTo).awaitFirst() }

       val cardFromInfo = cardFromInfoDeferred.await()
       sendMoney(cardFromInfo.cardId, cardToInfoDeferred.await().cardId, serviceRequest.amount).awaitFirst()

       getPaymentInfo(cardFromInfo.cardId).awaitFirst()
   }

   val userInfo = userInfoDeferred.await()
   val paymentInfo = paymentInfoDeferred.await()

   log.info("result")

   SuccessResponse(
       amount = paymentInfo.currentAmount,
       userName = userInfo.name,
       userSurname = userInfo.surname,
       userAge = userInfo.age
   )
}

Можно сравнить код с реактивным подходом. С корутинами не придётся заранее продумывать все точки ветвлений. Мы можем просто в нужных местах вызывать методы await и «ответвлять» выполнение асинхронных задач в async. Код остаётся максимально похожим на первоначальный прямолинейный вариант, который совсем не сложно изменять. И немаловажным фактором является то, что корутины просто встраиваются в реактивный код.


Возможно, даже для этой задачи реактивный подход вам нравится больше, но многие из опрошенных людей находят его более сложным. В целом оба подхода решают свою задачу и можно использовать тот, что по душе. Кстати, с недавних пор в Kotlin появилась ещё и возможность создавать «холодные» корутины с Flow, которые во многом похожи на Reactor. Правда, они ещё находятся в экспериментальной стадии, но уже сейчас можно посмотреть на текущую реализацию и попробовать у себя в коде.


На этом хочу закончить и напоследок оставить полезные ссылки:



Надеюсь, вам было интересно и у вас получилось самостоятельно написать реализацию метода для всех способов. И, конечно, хочется верить, что вариант с корутинами вам нравится больше =)


Спасибо всем, кто дочитал до конца!

  • +32
  • 6,1k
  • 7
FunCorp
312,12
Разработка развлекательных сервисов
Поделиться публикацией

Похожие публикации

Комментарии 7

    +1
    5 селектов и всего один инсерт — мечта))) А обычно пять инсертов и один селект.
      0
      Очень зашли короутины котлина — пишется практически обычный императивный код, а получается асинхронный и никаких этих thenApply, map и прочего.

      Но жаль в бекенд продакшине котлина практически нет, и шанса что вы где то это кроме дома примените — крайне мал. Да и оказывается в 99% процентов случаев это бестолково, потому что в энтерпрайзе часто работают с JDBC, а он все равно блокирующий, и профит если и есть то очень небольшой.

      А вот то что меня постоянно раздражает в самом котлине:
      // обычное тело функции:
      fun someFunction(): String {
          return "Text" // здесь есть return, и без него не компилируется
      }
      
      // только что понял, что она должна быть обернуть во что-то, например тот же coroutineScope
      fun someFunction(): String = wrapper {
         "Text" // здесь нет return, и с ним не компилируется
      }
      


      В той же Scala можно и так, и так, насколько я помню.
        0

        В backend Kotlin уже полно. Даже с блокирующим JDBC иногда имеет смысл использовать корутины. Например, можно обернуть выполнение задачи на ThreadPool в методе, который возвращает CompletableFuture, а на нем уже можно использовать await из корутин, чтобы дождаться результата.
        А в скором будущем будет R2DBC, который будет возвращать реактивные типы, и на которых также можно будет вызывать методы из корутин.


        Да и том же "энтерпрайзе" все равно есть вызовы других сервисов, походы в кэш, или другие "неблокирующие" действия. Корутины для них отлично заходят.

          0
          R2DBC уже есть :)

          Можно брать и начинать пользоваться потихоньку. Релиз уровня webflux ранних версий. Кафка версии 0.8 людей не смущает, а R2DBC смущает
        0
        А вот то что меня постоянно раздражает в самом котлине:..

        Для этого есть причина. Для inline врапперов можно делать нелокальный возврат из функции.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое