Императивный подход к реактивным данным на примере Jetbrains KTor и R2DBC

Статья об использовании реактивного доступа к базам данных из корутин. Spring все упрощает, но это плохо сказывается на понимании реальных процессов работы приложения. Для демонстрации был выбран фреймворк KTor (просто потому, что мне нравится смотреть на то, что делает JetBrains), который интенсивно использует корутины — чтобы задача сочетания Reactive Streams и этих самых корутин добавила интереса. В процессе работы пришло понимание, что происходящее — явный пример преобразования непонятного многим реактивного потока в понятное императивное программирование, на котором мы собаку съели. Я люблю реактивные цепочки, но почему бы не порадовать тех, кто любит армейский порядок?


Реактивные приложения завоевали сердца и посадили нервы многих разработчиков, причем эти множества заметно пересекаются. Посадили бы еще больше, если бы не усилия сообществ, адаптирующих чистый поток разума от создателей спецификаций в удобоваримые библиотеки. Так произошло со спецификацией R2DBC и фреймворком Spring (Boot) — разработчику виден уже привычный Spring Data API с уже привычными реактивными типами данных. Однако есть причины не использовать Spring: не хочется Spring и хочется чего-то нового. Ну, есть еще унаследованный код, но в этом случае вряд ли придется столкнуться с реактивным доступом к данным.


В этом случае придется посмотреть на R2DBC без прикрас. И он будет ожидаемо сильно отличаться от того, что нам предлагают в готовом фреймворке — так же, как JDBC отличается от Spring Data JPA. Плюс реактивность. И реактивность по спецификации Reactive Streams. А у нас на слуху корутины. Которые вроде как будущее и все равно под них переписывать.


Корутины запускать можно и вручную из main метода, но попробуем представить, что параллельность и конкурентость нам реально нужна — то есть high load (скажем, один запрос в час!) и мы совершенно серьезно решили писать это без Spring. А вот у нас, оказывается, есть легковесный аналог, целиком написанный на Kotlin, да еще от создателей самого языка, да еще и на корутинах, о которых мы грезим.


Готовим проект


По аналогии со всеми знакомым проектом, заготовку для сборки можно сгенерировать с веб-страницы (или через плагин для IntelliJ IDEA).


Идем на https://start.ktor.io и выбираем модули:


  • Call Logging: просто потому, что я люблю логи входящих запросов смотреть
  • Routing: для функционального определения точек входе (endpoint URI)
  • Gson: чтобы сериализовывать ответы сервиса.

Видно, что готовых модулей для загрузки довольно много и они интересные — всякие OAuth, JWT, LDAP для ограничения доступа, например.


Загружаем проект и открываем для редактирования. Неудивительно, что я открыл его в IntelliJ IDEA Community Edition.


Подключаем зависимости


Я буду использовать встраиваемую базу данных H2, реактивные драйверы подключим в build.gradle как


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"

реактивный пул соединений как


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

для облегчения работы с чистым Reactive Streams, интерфейс которого предоставляет R2DBC драйвер, воспользуемся Reactor и его расширениями для Kotlin, так как пул соединений все равно его требует:


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

Косметическая деталь: заставим компилироваться проект в байткод JVM 1.8. Может быть, когда-то это сделают в native и можно будет бросаться в докер контейнер без Java. Но не сейчас и не об этом.


подключим в build.gradle последней инструкцией


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}

Реализация


Запускаемый файл — Application.kt, там и будем писать код. Обнаруживаем код, который я разбавил своими метками и комментариями, но не добавлял никакой логики:


fun Application.module(testing: Boolean = false) {
    // Сюда запишем логику инициализации перед готовностью сервиса
initDatabase@

    // и тут закончим

applicationStarted@
    // тут разместим служебный код для прогрева пула

boringStandardInitialization@
    // стандартный сгенерированный кусок, который не трогаем
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            // тут будем выводить список таблиц в базе
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            // тут будем показывать статистику пула соединений
            call.respond(mapOf("hello" to "world"))
        }
    }
}

Чтобы обеспечить горячую подмену классов после перекомпиляции, найдем файл настройки application.conf и в раздел ktor.deployment сразу после строк port добавим путь к классам. Из официальной документации неочевидно, но это именно путь:


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

Интересный момент — приложение перегружает классы не сразу после компиляции, как devtools, а только после следующего обращения к нему. Я сначала подумал, что перегрузка не работает и перезапускал приложение.


Настроим соединение с базой данных


На метке initDatabase опишем соединение с базой (в памяти) и создадим пул для нее.


initDatabase@
    "" // украшение, чтобы компилятор не ругался на метку перед объявлением
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) // тут подскажут добавить аннотацию @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

Надо еще добавить аннотацию @ExperimentalTime на модуль — я использовал экспериментальный API.


Все хорошо, но хочется (хотя и нет смысла) прогреть пул соединений после старта. Логичнее было бы инициализировать саму базу, но это не так важно сейчас. Добавим обработчик события старта:


applicationStarted@
    // тут разместим служебный код для прогрева пула
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             // напугаем мнимой блокировкой
            log.debug("Pool is hot, welcome!") // логирование бесплатно с фреймворком
        }
    }

Вот наша первая корутина — launch запустит ее. Пул реактивный, все операции реактивные, по классике — нельзя ждать, надо подписываться. Не тут-то было! Если мы "ждем" (awaitFirst из расширений корутин Kotlin), то поток не блокируется на время ожидания. Вот и ответ на важный вопрос: как же сцепить Reactor-типы Mono и Flux с результатами работы с основанной на другом принципе корутиной? Подождать реактивно. Чем это отличается от "подождать не-реактивно"? Прелесть в том, что в коде — не отличается. Ждем — а мы все равно реактивные. Это почти прямо прописывается в руководствах по корутинам, однако для разработчика, пишущего на Reactor или RxJava это звучит как прямое нарушение принципов реактивного программирования. Так что мы видим два взаимоисключающих утверждения — "жди!" и "не вздумай ждать!". Эта дилемма решается простым приоритетом корутин, которые пришли позже и уже знали про такой вопрос, поэтому и решили его где-то в своих внутреностях. Так что еще раз — для RxJava и Reactor профи — ждем!


Теперь приложение можно запустить и увидеть, как прогревается пул и даже идут запросы к базе. Для чистоты надо добавить еще обработчик на остановку приложения для закрытия пула, но это я опущу в статье ради читабельности.


Возвращаем объекты из базы


Никаких CRUD делать не будем, чтобы не захламлять статью, а вернем данные из системной таблицы, которая в H2 уже есть: TABLES. Сделаем дата-класс для строк в том же файле:


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

Я добавил typealias в начало файла, чтобы IDE не сходила с ума на конфликтующих именах


typealias R2DBCResult = io.r2dbc.spi.Result

И теперь готов вытаскивать данные из базы. Вот он, почти незамутненный R2DBC:


get("/tables") {
            showTables@
            // тут будем выводить список таблиц в базе
            ""
            val connection = pool.create().awaitSingle() // мы в корутине - ждать можно
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  // тут вернётся реактивный поток
                    .toFlux()   // который мы преобразуем в удобный Reactor Flux
                    .collectList()  // который умеет собирать поток событий с данными в одно событие со списком данных
                    .awaitFirst()   // подождем, пока все соберется - мы же в корутине.
   convertData@
                ""
                TODO() // этот кусок допишем позже, тут вернем список объектов
            } finally {
                connection
                    .close()    // реактивно закроем соединение
                    .awaitFirstOrNull() // и подождем null - мы же в корутине.
            }

            call.respond(list)
        }

Тут хорошо видно, что знакомый по Reactor реактивный поток с цепочками map/flatMap уступает место императивному подходу. Можно было бы закрутить все на map/flatMap по привычке, кто уже прочувствовал реактивщину, однако это усложнит код. Посмотрите на код finally — закрытие соединения (а оно обязательно) тоже является реактивной операцией и выглядеть это в операторе doFinally будет ужасно. Кстати, в официальной документации README по пулу соединений закрытие показано как conn.close(), что не работает ввиду отсутствия подписчика. В общем — можете поэкспериментировать, я — уже. Добавлю, что как только вы переходите в тело map вы теряете возможность сделать await, так как область корутины не видна.


На закрытие вызываем awaitFirstOrNull() так как возвращается Mono, никогда не посылающий onNext() .

Собираем все результаты во список, так как корутины не умеют собирать цепочку событий — это же, грубо говоря, функции с одним возвращаемым значением.


Вытаскиваем объекты из данных


R2DBC требует от нас функцию-отображение (map) для перевода данных в объекты. Я выделил ее в companion object своего дата-класса:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 // Числа только через промежуточные Java типы, иначе будут проблемы с декодированием
        )
    }

Осталось вспомнить, как жутко было работать с чистым JDBC и применить опыт к реактивной инкарнации.


Тут мы увидим map который на самом деле не часть Reactor, ReactiveStream или даже Java Stream, а независимая функция R2DBC.


   convertData@
                result.flatMap {// один результат может породить несколько записей. Как - это дело драйвера, мы только принимаем факт
                    it
                        .map(Tables.DB::ofRow)  // преобразуем данные в поток объектов
                        .toFlux()               // который мы преобразуем в удобный Reactor Flux 
                        .collectList()          // который умеет собирать поток событий с данными в одно событие со списком данных
                        .awaitFirst()          // подождем, пока все соберется - мы же в корутине.
                }

Все хорошо, можно запустить запрос :


curl http://localhost:8080/tables

Если JSON некрасивый, то он настраивается в секции gson приложения (setPrettyPrinting).


Отдаем статистику пула


Ну и для красоты (раз уж мы не подключали стандартный модуль метрик) добавим точку для просмотра статистики пула. Движки шаблонов нам излишни, раз у нас средства языка позволяют:


        get("/pool") {
            showPool@
            // тут будем показывать статистику пула соединений
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Конечно, при желании это можно отдать через HTML DSL.


Выводы


Подружить корутины с реактиными потоками можно, но необходимо переключаться между реактивным и императивным стилем — желательно пореже и стараться придерживаться одного стиля.


Не весной лишь единой!


Реактивный доступ к базам данных не такой красивый, как после макияжа Spring Data JPA, но пользоваться можно.

Комментарии 0

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

Самое читаемое