Как стать автором
Обновить

Увеличиваем throughput приложения в 2 раза или неблокирующая работа с Elasticsearch с использованием Kotlin coroutines

Время на прочтение6 мин
Количество просмотров3.4K

Elasticsearch - мощный поисковый движок и распределенная система хранения документов. При правильной конфигурации, всю магию поиска выполняет именно он, а клиентскому приложению остается лишь сгенерировать запрос в виде Query DSL и подождать ответа.

Но что если наши поисковые стратегии довольно сложны, а сущности, среди которых осуществляется поиск, имеют развесистую структуру? Прибавив к этому индексы на десяток миллионов документов и большой RPS, получаем нетривиальную задачу обеспечения стабильного времени ответа и хорошей пропускной способности.

Если вам интересно узнать, как это можно сделать, то добро пожаловать под кат!

Настройка окружения

Для начала нам необходимо поднять тестовый инстанс Elasticsearch. Для этого я использовал официальный Docker-образ и нижеприведенную команду.

docker run --name elastic -p 9200:9200 --env-file ./docker.env -d docker.elastic.co/elasticsearch/elasticsearch:7.12.0

В файле docker.env я указал 2 параметра - тип single-node и максимальный размер хипа в 1 гб.

В качестве тестовых данных будем использовать документы с единственным полем name:

{
    "_index": "record",
    "_type": "_doc",
    "_id": "_Erb8HkByb6IPnpUYTbS",
    "_score": 1.0,
    "_source": {
        "name": "Bob"
    }
}

Далее соберем клиентское Spring Boot приложение. Весь код приводить не буду, ввиду его тривиальности, ограничусь лишь списком зависимостей:

plugins {
    id("org.springframework.boot") version "2.4.3"
    id ("io.spring.dependency-management") version "1.0.11.RELEASE"
    id("org.jetbrains.kotlin.plugin.spring") version "1.4.20"
    kotlin("jvm") version "1.4.20"
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")

    // зависимости для работы с elasticsearch
    implementation("org.elasticsearch:elasticsearch:$esVersion")
    implementation("org.elasticsearch.client:elasticsearch-rest-client:$esVersion")
    implementation("org.elasticsearch.client:elasticsearch-rest-high-level-client:$esVersion")
}

и кодом вызова эластика через RestHighLevelClient

fun search(term: String): List<String> {
    val sourceBuilder = SearchSourceBuilder().apply {
         // Поиск по ID
         query(QueryBuilders.idsQuery().addIds(term))
    }
    val request = SearchRequest(indexNames, sourceBuilder).apply {
        requestCache(false) // выключаем кэш для пресечения мгновенных ответов
    }
    
    return client.search(request, RequestOptions.DEFAULT)
        .let { mapResults(it) }
}

Вызывать наш код будем через REST контроллер с параметром запроса query.

Запускаем приложение и в консоли видим привычные строки:

INFO 22720 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)

В стартере spring-boot-starter-web под капотом используется сервер Tomcat, который по умолчанию создает 1 поток на каждый web-запрос. Это пригодится нам в будущем для понимания, что происходит при большой загрузке на наше приложение.

Тестируем приложение под нагрузкой

Для эмуляции большого RPS я использовал jmeter - утилиту для проведения нагрузочного тестирования с удобной настройкой типа запросов и их интенсивности. Для более точной конфигурации стресс-теста я использовал плагин Concurrency Thread Group. Он позволяет буквально нарисовать ту нагрузку на наш веб-сервис, которую мы хотим сделать.

Настройки я использовал следующие:

  • Target Concurrency: 250

  • Ramp Up Time: 30 секунд

  • Ramp-Up Steps Count: 250

Это означает, что мы хотим выполнять вызовы один за другим в течение 30 секунд, постепенно повышая количество параллельных сессий с 0 до 250. Всего будет сделано несколько десятков тысяч реквестов.

Во время выполнения стресс-теста я также буду мониторить поведение потоков приложения с помощью вкладки Threads приложения VusualVM.

При тестировании нагрузки я натыкался на следующую ошибку: Address already in use: connect. Причина кроется в лимите открываемых портов для приложения на стороне Windows - это мешает jmeter`у запускать слишком много одновременных сессий. В решении этой проблемы мне помогла эта статья.

Запускаем нагрузочный тест и смотрим на потоки:

Потоки в Tomcat
Потоки в Tomcat

По изображению можно заметить - на каком-то моменте сервер понимает, что дефолтные http-nio-8080-exec-* (0-9) потоки не справляются с получаемой нагрузкой и он начинает создавать вспомогательные треды с номерами с 10 до 199. Это происходит из-за того, что в Томкате по дефолту настройка max-threads равна 200.

Тем временем наш тест закончился, давайте посмотрим на результаты. Для более удобного отображения в jmeter есть возможность построить зависимость времени ответа сервера от времени - привожу его ниже.

Время ответа в Tomcat
Время ответа в Tomcat

На графике можно заметить, что существует несколько скачков во времени ответа сервера, что скорее всего связано с накладными расходами на запуск дополнительных потоков.

Наконец, посмотрим на Summary Report, построенный jmeter`ом:

На нём видно, что всего мы успели выполнить почти 22 тысячи запросов. Самый быстрый из них отработал за 7 миллисекунд, а самый долгий - за 618. В среднем, пропускная способность приложения составила 717 запросов в секунду.

Миграция на корутины.

Итак, мы получили довольно бодрый перформанс от нашего приложения, но что если мы хотим большего? Если подумать, то наше апи делает следующие действия:

  1. Формирует поисковую query и запрос к эластику

  2. Делает вызов

  3. Ожидает ответа

  4. Получает ответ

  5. Парсит его и отдает клиенту

Но что если бы мы могли бы сказать приложению: "Послушай, тебе не нужно больше ждать, пока работа делается на стороне эластика. Почему бы тебе в это время не заняться другими полезными делами? Как только ответ придет мы тебе обязательно сообщим!". По сути мы хотим реализовать давно известный метод построения асинхронного взаимодействия, основанный на коллбеках. Учитывая то, что у elasticsearch-client есть асинхронный клиент и в нем явно говорится об интеграции с корутинами, давайте сделаем это!

Для начала вспомним, что такое корутины. Корутины - маленькие сопрограммы, которые умеют приостанавливаться и возобновляться. Они построены на принципе CPS (Continuation-passing style), или по-простому, на коллбеках. Более подробно об этом можно узнать из доклада Roman Elizarov - Deep dive into Kotlin coroutines.

Итак, какие изменения мы привнесем в наш код:

  • Заменим классический spring-boot-starter-web на spring-boot-starter-webflux

  • Добавим зависимости для использования корутин. Это будут зависимости org.jetbrains.kotlinx:kotlinx-coroutines-core, kotlinx-coroutines-reactor и kotlinx-coroutines-reactive

  • Переделаем вызов эластика - теперь мы не должны напрямую вызывать его, а будем делать это через специальную функцию suspendCoroutine. Её описание и сигнатура может смутить тех, кто не привык к API корутин, но, по сути, она лишь предоставляет нам объект Continuation, который мы можем использовать для того, чтобы возобновить выполнение нашего кода. Это будет выглядеть так:

suspendCoroutine<SearchResponse> { continuation -> // объект континуации
    // создаем коллбек, который будет вызван при получении ответа от эластика
    val callback = AsyncSearchResponseActionListener(continuation)

    // вызовем асинхронный клиент и предоставим коллбек
    client.asyncSearch().submitAsync(request, RequestOptions.DEFAULT, callback)
}
class AsyncSearchResponseActionListener(private val continuation: Continuation<SearchResponse>) :
    ActionListener<AsyncSearchResponse> {
    // При успешном ответе возобновляем Continuatuion
    override fun onResponse(response: AsyncSearchResponse) = continuation.resume(response.searchResponse)

    // При получении ошибки тоже возобновляем, но передаем причину ошибки
    override fun onFailure(ex: Exception) = continuation.resumeWithException(ex)
}
  • Так как теперь мы вызываем приостанавливаемую функцию suspendCoroutine, заменим цепочку функций из контроллера до сервиса - сделаем их приостанавливаемыми, добавив ключевое слово suspend.

Запустим приложение и увидим следующую запись в логе:

INFO 9868 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080

Заменив обычный web-стартер на стартер webflux, мы заменили сервер, на котором будет работать наше приложение. Теперь оно поддерживает неблокирующий код и мы можем использовать не только корутины, но и реализации реактивного подхода, такие как Reactor.

Повторное нагрузочное тестирование приложения.

Запустим уже существующий стресс тест. Снова обратим внимание на вкладку threads:

Потоки в Netty
Потоки в Netty

При просмотре диаграммы потоков, можно заметить, что их создается не так много, как это делается в Томкате. К тому же, внимательный читатель заметит, что при работе Томката все http-потоки были по большей части в состоянии Wait, а здесь видим, что они находятся в состоянии Running.

Если же посмотреть на график времени выполнения:

Время ответа в Netty
Время ответа в Netty

И суммарный отчет:

То можно сделать вывод, что время выполнения запросов уменьшилось, а суммарное количество выполненных задач и пропускная способность приложения, наоборот, увеличилось. Теперь, если верить jmeter`у, количество запросов за тот же промежуток времени возросло почти в 2 раза - с 717 до 1259 в сек!

Итог

Пропускная способность приложения играет немаловажную роль в высоконагруженных сервисах. Для того чтобы максимально утилизировать использование потоков и, как следствие повысить throughput, можно мигрировать на неблокирующую стратегию в написании кода. Это возможно сделать с использованием альтернативных серверов приложений вкупе с реализаций асинхронного взаимодействия в виде Kotlin coroutines.

Теги:
Хабы:
+4
Комментарии3

Публикации

Истории

Работа

Java разработчик
358 вакансий

Ближайшие события

Weekend Offer в AliExpress
Дата20 – 21 апреля
Время10:00 – 20:00
Место
Онлайн
Конференция «Я.Железо»
Дата18 мая
Время14:00 – 23:59
Место
МоскваОнлайн