
Всем привет! С вами Сергей из команды тестирования производительности. Мы завершаем цикл статей о Gatling. Ранее мы уже рассмотрели работу с HTTP, JDBC, gRPC и Kafka, напоследок расскажу про AMQP.
Немного определений
Википедия дает такое определение протокола: Advanced Message Queuing Protocol — открытый протокол для передачи сообщений между компонентами системы. Основная идея в том, что отдельные подсистемы или независимые приложения могут обмениваться произвольным образом сообщениями через AMQP-брокер, который маршрутизирует, обеспечивает гарантированную доставку и распределяет потоки данных или подписку на нужные типы сообщений.
AMQP основан на трех понятиях:
Сообщение, message. Единица передаваемых данных, содержание которой никак не интерпретируется сервером. К сообщению могут быть присоединены структурированные заголовки.
Точка обмена, exchange. В нее отправляются сообщения. Точка обмена распределяет сообщения в одну или несколько очередей. При этом в точке обмена сообщения не хранятся.
Очередь, queue. Здесь хранятся сообщения до тех пор, пока их не заберет клиент. Он всегда забирает сообщения из одной или нескольких очередей.
Producer — клиентское приложение, которое публикует сообщения в exchange.
Consumer — клиентское приложение, которое получает сообщения из очереди сообщений.
Тестовый сервис RabbitMQ
Для разработки скрипта Gatling нам необходим сервер с AMQP-брокером. В этой статье используем RabbitMQ как наиболее распространенный. Чтобы развернуть тестовый сервис, необходим установленный docker. С помощью docker-compose поднимем сервис:
version: "3.9" services: rabbitmq: image: rabbitmq:3.9.7-management-alpine container_name: rabbitmq environment: - RABBITMQ_DEFAULT_USER=rabbitmq - RABBITMQ_DEFAULT_PASS=rabbitmq ports: - '5672:5672' - '15672:15672'
Запускаем:
docker-compose up
Теперь по адресу http://localhost:15672/ доступна консоль администратора. Логин и пароль для входа — rabbitmq.
Для скрипта публикации сообщений создаем очередь, в которую будем отправлять сообщения. Для этого нужно зайти в консоль, перейти на вкладку Queues, указать имя очереди test_queue и нажать AddQueue. Для скрипта публикации и чтения сообщений нужно по аналогии создать очередь test_queue_out.
Разработка скрипта публикации сообщений
Мы не будем разрабатывать проект с нуля, а используем уже готовый шаблон и с его помощью создадим проект myRabbitmq. Процесс создания мы описывали в первой статье цикла.
В результате получим готовый проект, но по умолчанию он для HTTP-протокола, поэтому его нужно переписать для AMQP.
Шаг 1. Обновим зависимости. В файле project/Dependencies.scala добавим (вместо <current version> подставить актуальную версию).
lazy val amqpPlugin: Seq[ModuleID] = Seq( "ru.tinkoff" %% "gatling-amqp-plugin", ).map(_ % "<current version>" % Test)
В файле build.sbt добавим новую зависимость, чтобы после загрузки обновлений sbt можно было использовать AMQP-плагин.
libraryDependencies ++= amqpPlugin,
Загрузим новые зависимости в проект, запустив команду в консоли:
sbt update
Шаг 2. Введем переменные сервиса. В файле src/test/resources/simulation.conf хранятся дефолтные переменные для запуска. Добавим в него переменные, которые определяют наш RabbitMQ.
amqpHost: "localhost" amqpLogin: "rabbitmq" amqpPassword: "rabbitmq" amqpPort: 5672
Шаг 3. Создадим Action для публикации сообщений. В директории сases создадим новый файл для объекта AmqpActions. В нем создадим действие, которое описывает публикацию сообщения в нашу очередь test_queue. Содержимое сообщения не статично и может изменяться в ходе теста. Параметр #{messageId} позволяет использовать значения, получаемые из Feeder.
package ru.tinkoff.load.myRabbitmq.cases import io.gatling.core.Predef._ import ru.tinkoff.gatling.amqp.Predef._ import ru.tinkoff.gatling.amqp.request.{PublishDslBuilder, RequestReplyDslBuilder} object AmqpActions { val publishMessageToQueue: PublishDslBuilder = amqp("Publish to exchange").publish .queueExchange("test_queue") // имя очереди, в которую отправляем сообщения .textMessage("Hello message - #{messageId}") // текст сообщения .messageId("#{messageId}") // настройки сообщения .priority(0) }
Шаг 4. Используем Feeders. Больше узнать о Feeders можно из вводной статьи про Gatling. В нашем примере используем фидеры из подключаемой библиотеки gatling-picatinny.
Возьмем базовый фидер Gatling, который читает данные из CSV-файла. Создадим в resources директорию pools и в ней файл messageIds.csv. Запишем в этот файл первой строкой заголовок messageId и далее в каждой строке значения от 1 до 10. В результате файл messageIds.csv должен выглядеть следующим образом, значения от 1 до 10:
messageId 1 2 … 10
В директории myRabbitmq создадим новую директорию feeders, а в ней object Feeders.
package ru.tinkoff.load.myRabbitmq.feeders import io.gatling.core.Predef._ import io.gatling.core.feeder.BatchableFeederBuilder object Feeders { val idFeeder: BatchableFeederBuilder[String] = csv("pools/messageIds.csv").random }
Шаг 5. Напишем сценарий теста. В CommonScenario описываем класс CommonScenario, в котором создаем сценарий — порядок выполнения определенных действий.
package ru.tinkoff.load.myRabbitmq.scenarios import io.gatling.core.Predef._ import io.gatling.core.structure.ScenarioBuilder import ru.tinkoff.gatling.amqp.Predef._ import ru.tinkoff.load.myRabbitmq.cases.AmqpActions import ru.tinkoff.load.myRabbitmq.feeders.Feeders.idFeeder object CommonScenario { def apply(): ScenarioBuilder = new CommonScenario().scn } class CommonScenario { val scn: ScenarioBuilder = scenario("Common Scenario") .feed(idFeeder) // вызов Feeder .exec(AmqpActions.publishMessageToQueue) // выполнение публикации сообщения в очередь }
Шаг 6. Опишем AMQP-протокол. В файле myRabbitmq.scala опишем протокол:
package ru.tinkoff.load import io.gatling.core.Predef._ import ru.tinkoff.gatling.amqp.Predef._ import ru.tinkoff.gatling.amqp.protocol.AmqpProtocolBuilder import ru.tinkoff.gatling.config.SimulationConfig.{getIntParam, getStringParam} package object myRabbitmq { val amqpHost: String = getStringParam("amqpHost") val amqpPort: Int = getIntParam("amqpPort") val amqpLogin: String = getStringParam("amqpLogin") val amqpPassword: String = getStringParam("amqpPassword") val amqpConf: AmqpProtocolBuilder = amqp .connectionFactory( rabbitmq .host(amqpHost) .port(amqpPort) .username(amqpLogin) .password(amqpPassword) .vhost("/"), ) .replyTimeout(60000) .consumerThreadsCount(8) .usePersistentDeliveryMode }
Шаг 7. Добавим нагрузочные тесты. В файле Debug.scala опишем тест для дебага, который запустит одну итерацию сценария.
package ru.tinkoff.load.myRabbitmq import io.gatling.core.Predef._ import ru.tinkoff.gatling.amqp.Predef._ import ru.tinkoff.gatling.config.SimulationConfig._ import ru.tinkoff.load.myRabbitmq.scenarios.CommonScenario class Debug extends Simulation { setUp( CommonScenario() // запускаем наш сценарий .inject(atOnceUsers(1)), // запускать будет один пользователь - одну итерацию ).protocols( amqpConf, // работа будет проходить по протоколу, который описан в конфигурации amqpConf ).maxDuration(testDuration) // максимальное время теста равно testDuration,если тест не завершится за меньшее время, он будет остановлен автоматически }
Шаг 8. Запустим Debug-тест. Для запуска теста возьмем GatlingRunner или способ, указанный в шаге 8. Запуск Debug-теста. После выполнения скрипта можно посмотреть консоль RabbitMQ — количество сообщений в очереди увеличилось на 1.
package ru.tinkoff.load.myRabbitmq import io.gatling.app.Gatling import io.gatling.core.config.GatlingPropertiesBuilder object GatlingRunner { def main(args: Array[String]): Unit = { // this is where you specify the class you want to run val simulationClass = classOf[Debug].getName // указывает имя теста Debug, либо какой-то другой, например, MaxPerformance val props = new GatlingPropertiesBuilder props.simulationClass(simulationClass) Gatling.fromMap(props.build) } }
Шаг 9. Настроим публикацию и чтение сообщений. На этом этапе дополнительно добавим чтение сообщений из очереди. В файл AmqpActions запишем новое действие:
val publishAndReply: RequestReplyDslBuilder = amqp("Request Reply exchange test").requestReply .queueExchange("test_queue_out") .replyExchange("test_queue_out") .textMessage("""{"msg": "Hello message - #{messageId}"}""") .messageId("#{messageId}") .priority(0) .contentType("application/json") .headers("test" -> "performance", "extra-test" -> "34-#{messageId}") .check( bodyString.exists, )
Запишем вызов действия, которое публикует и читает сообщения, в сценарий CommonScenario.
val scn: ScenarioBuilder = scenario("Common Scenario") .feed(idFeeder) // вызов Feeder .exec(AmqpActions.publishMessageToQueue) // выполнение публикации сообщения в очередь .exec(AmqpActions.publishAndReply) // публикация в Exchange и чтение из очереди
Заключение
Это заключительная статья в серии про Gatling. Весь цикл мы рассказывали шаг за шагом, как создать базовые скрипты на Gatling. Надеемся, получилось показать, что на самом деле Gatling — это не так сложно, как кажется на первый взгляд. Если интересно дальше изучать Gatling, рекомендуем Gatling Academy.
Спасибо всем, кто читает наши статьи!
