
Недавно мне довелось поработать над приложением, которое должно было контролировать скорость своих исходящих подключений. Например, подключаясь к одному URL приложение должно было ограничить себя, скажем, 200KiB/sec. А подключаясь к другому URL — всего 30KiB/sec.
Самым интересным моментом здесь оказалось тестирование этих самых ограничений. Мне потребовался HTTP-сервер, который бы отдавал трафик с какой-то заданной скоростью, например, 512KiB/sec. Тогда бы я мог видеть, действительно ли приложение выдерживает скорость 200KiB/sec или же оно срывается на более высокие скорости.
Но где взять такой HTTP-сервер?
Поскольку я имею некоторое отношение к встраиваемому в С++ приложения HTTP-серверу RESTinio, то не придумал ничего лучше, чем быстренько набросать на коленке простой тестовый HTTP-сервер, который способен отдавать клиенту длинный поток исходящих данных.
О том, насколько это было просто и хотелось бы рассказать в статье. Заодно узнать в комментариях, действительно ли это просто или же я сам себя обманываю. В принципе, данную статью можно рассматривать как продолжение предыдущей статьи про RESTinio под названием "RESTinio — это асинхронный HTTP-сервер. Асинхронный". Посему, если кому-то интересно прочитать о реальном, пусть и не очень серьезном применении RESTinio, то милости прошу под кат.
Общая идея
Общая идея упомянутого выше тестового сервера очень проста: когда клиент подключается к серверу и выполняет HTTP GET запрос, то взводится таймер, срабатывающий раз в секунду. Когда таймер срабатывает, то клиенту отсылается очередной блок данных заданного размера.
Но все несколько сложнее
Если клиент вычитывает данные с меньшим темпом, нежели отсылает сервер, то просто отсылать по N килобайт раз в секунду не есть хорошая идея. Поскольку данные начнут скапливаться в сокете и ни к чему хорошему это не приведет.
Поэтому при отсылке данных желательно на стороне HTTP-сервера контролировать готовность сокета к записи. Пока сокет готов (т.е. в нем не скопилось еще слишком много данных), то новую порцию отсылать можно. А вот если не готов, то нужно подождать пока сокет не перейдет в состояние готовности к записи.
Звучит разумно, но ведь операции ввода-вывода скрыты в потрохах RESTinio… Как тут узнать, можно ли записывать следующую порцию данных или нет?
Из данной ситуации можно выйти, если использовать after-write нотификаторы, которые есть в RESTinio. Например, мы можем написать так:
void request_handler(restinio::request_handle_t req) { req->create_response() // Начинаем формировать ответ. ... // Наполняем ответ содержимым. .done([](const auto & ec) { ... // Вот этот код будет вызван когда запись ответа закончится. }); }
Лямбда, переданная в метод done() будет вызвана когда RESTinio завершит запись исходящих данных. Соответственно, если сокет какое-то время был не готов к записи, то лямбда будет вызвана не сразу, а после того, как сокет придет в должное состояние и примет все исходящие данные.
За счет использования after-write нотификаторов логика работы тестового сервера будет такой:
- отсылаем очередную порцию данных, вычисляем время, когда нам нужно было бы отослать следующую порцию при нормальном развитии событий;
- вешаем after-write нотификатор на очередную порцию данных;
- когда after-write нотификатор вызывается, мы проверяем, наступило ли время отсылки следующей порции. Если наступило, то сразу же инициируем отсылку следующей порции. Если не наступило, то взводим таймер.
В результате получится, что как только запись начнет притормаживать, отсылка новых данных приостановится. И возобновиться когда сокет будет готов принимать новые исходящие данные.
И еще немного сложного: chunked_output
RESTinio поддерживает три способа формирования ответа на HTTP-запрос. Самый простой способ, который применяется по умолчанию, в данном случае не подходит, т.к. мне требуется практически бесконечный поток исходящих данных. И такой поток, естественно, нельзя отдать в единственный вызов метода set_body.
Поэтому в описываемом тестовом сервере используется т.н. chunked_output. Т.е. при создании ответа я указываю RESTinio, что ответ будет формироваться частями. После чего просто периодически вызываю методы append_chunk для добавления к ответу очередной части и flush для записи накопленных частей в сокет.
А давайте уже посмотрим в код!
Пожалуй, достаточно уже вступительных слов и пора перейти к рассмотрению самого кода, который можно найти в этом репозитории. Начнем с функции request_processor, которая вызывается для обработки каждого корректного HTTP-запроса. При этом углубимся в те функции, которые из request_processor вызываются. Ну а затем уже посмотрим, как именно request_processor ставится в соответствие тому или иному входящему HTTP-запросу.
Функция request_processor и её подручные
Функция request_processor вызывается для обработки нужных мне HTTP GET запросов. Ей в качестве аргументов передаются:
- Asio-шный io_context, на котором ведется вся работа (он потребуется, например, для взведения таймеров);
- размер одной части ответа. Т.е. если мне нужно отдавать исходящий поток с темпом в 512KiB/sec, то в качестве этого параметра будет передано значение 512KiB;
- количество частей в ответе. На случай, если поток должен иметь какую-то ограниченную длину. Например, если нужно отдавать поток с темпом 512KiB/sec в течении 5 минут, то в качестве этого параметра будет передано значение 300 (60 блоков в минуту в течении 5 минут);
- ну и сам входящий запрос для обработки.
Внутри request_processor создается объект с информацией о запросе и параметрах его обработки, после чего эта самая обработка и начинается:
void request_processor( asio_ns::io_context & ctx, std::size_t chunk_size, std::size_t count, restinio::request_handle_t req) { auto data = std::make_shared<response_data>( ctx, chunk_size, req->create_response<output_t>(), count); data->response_ .append_header(restinio::http_field::server, "RESTinio") .append_header_date_field() .append_header( restinio::http_field::content_type, "text/plain; charset=utf-8") .flush(); send_next_portion(data); }
Тип response_data, содержащий все относящиеся к запросу параметры, выглядит следующим образом:
struct response_data { asio_ns::io_context & io_ctx_; std::size_t chunk_size_; response_t response_; std::size_t counter_; response_data( asio_ns::io_context & io_ctx, std::size_t chunk_size, response_t response, std::size_t counter) : io_ctx_{io_ctx} , chunk_size_{chunk_size} , response_{std::move(response)} , counter_{counter} {} };
Тут нужно заметить, что одна из причин появления структуры response_data состоит в том, что объект типа restinio::response_builder_t<restinio::chunked_output_t> (а именно этот тип спрятан за коротким псевдонимом response_t) является moveable-, но не copyable-типом (по аналогии с std::unique_ptr). Поэтому этот объект нельзя просто так захватить в лямбда-функции, которая затем оборачивается в std::function. Но если объект-response поместить в динамически созданный экземпляр response_data, то умный указатель на экземпляр reponse_data уже можно без проблем захватывать в лямбда-функции с последующим сохранением этой лямбды в std::function.
Функция send_next_portion
Функция send_next_portion вызывается каждый раз, когда требуется отослать клиенту очередную часть ответа. Ничего сложного в ней не происходит, поэтому выглядит она достаточно просто и лаконично:
void send_next_portion(response_data_shptr data) { data->response_.append_chunk(make_buffer(data->chunk_size_)); if(1u == data->counter_) { data->response_.flush(); data->response_.done(); } else { data->counter_ -= 1u; data->response_.flush(make_done_handler(data)); } }
Т.е. отсылаем очередную часть. И, если эта часть была последней, то завершаем обработку запроса. А если не последняя, то в метод flush передается after-write нотификатор, который создается, пожалуй, наиболее сложной функцией данного примера.
Функция make_done_handler
Функция make_done_handler отвечает за создание лямбды, которая будет передана в RESTinio в качестве after-write нотификатора. Этот нотификатор должен проверить, завершилась ли запись очередной части ответа успешно. Если да, то нужно разобраться, следует ли следующую часть отослать сразу же (т.е. были "тормоза" в сокете и темп отсылки выдерживать не получается), либо же после некоторой паузы. Если нужна пауза, то она обеспечивается через взведение таймера.
В общем-то, несложные действия, но в коде получается лямбда внутри лямбды, что может смутить людей, не привыкших к "современному" С++. Которому не так уж и мало лет чтобы называться современным ;)
auto make_done_handler(response_data_shptr data) { const auto next_timepoint = steady_clock::now() + 1s; return [=](const auto & ec) { if(!ec) { const auto now = steady_clock::now(); if(now < next_timepoint) { auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_); timer->expires_after(next_timepoint - now); timer->async_wait([timer, data](const auto & ec) { if(!ec) send_next_portion(data); }); } else data->io_ctx_.post([data] { send_next_portion(data); }); } }; }
На мой взгляд, основная сложность в этом коде проистекает из-за особенностей создания и "взвода" таймеров в Asio. По-моему, получается как-то слишком уж многословно. Но тут уж что есть, то есть. Зато не нужно никаких дополнительных библиотек привлекать.
Подключение express-like роутера
Показанные выше request_processor, send_next_portion и make_done_handler в общем-то и составляли самую первую версию моего тестового сервера, написанного буквально за 15 или 20 минут.
Но через пару дней использования этого тестового сервера оказалось, что в нем есть серьезный недостаток: он всегда отдает ответный поток с одинаковой скоростью. Скомпилировал со скоростью 512KiB/sec — отдает всем 512KiB/sec. Перекомпилировал со скоростью 20KiB/sec — будет отдавать всем 20KiB/sec и никак иначе. Что было неудобно, т.к. стало нужно иметь возможность получать ответы разной "толщины".
Тогда и появилась идея: а что, если скорость отдачи будет запрашиваться прямо в URL? Например, сделали запрос на localhost:8080/ и получили ответ с заранее заданной скоростью. А если сделали запрос на localhost:8080/128K, то стали получать ответ со скоростью 128KiB/sec.
Потом мысль пошла еще дальше: в URL также можно задавать и количество отдельных частей в ответе. Т.е. запрос localhost:8080/128K/3000 приведет к выдаче потока из 3000 частей со скоростью 128KiB/sec.
Нет проблем. В RESTinio есть возможность использовать маршрутизатор запросов, сделанный под влиянием ExpressJS. В итоге появилась вот такая функция описания обработчиков входящих HTTP-запросов:
auto make_router(asio_ns::io_context & ctx) { auto router = std::make_unique<router_t>(); router->http_get("/", [&ctx](auto req, auto) { request_processor(ctx, 100u*1024u, 10000u, std::move(req)); return restinio::request_accepted(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); if(0u != chunk_size) { request_processor(ctx, chunk_size, 10000u, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?)/:count(\d+))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); const auto count = restinio::cast_to<std::size_t>(params["count"]); if(0u != chunk_size && 0u != count) { request_processor(ctx, chunk_size, count, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); return router; }
Здесь формируются обработчики HTTP GET запросов для URL трех типов:
- вида
http://localhost/; - вида
http://localhost/<speed>[<U>]/; - вида
http://localhost/<speed>[<U>]/<count>/
Где speed — это число, определяющее скорость, а U — это опциональный мультипликатор, который указывает, в каких единицах задана скорость. Так 128 или 128b означает скорость в 128 байт в секунду. А 128k — 128 килобайт в секунду.
На каждый URL вешается своя лямбда функция, которая разбирается с полученными параметрами, если все нормально, вызывает уже показанную выше функцию request_processor.
Вспомогательная функция extract_chunk_size выглядит следующим образом:
std::size_t extract_chunk_size(const restinio::router::route_params_t & params) { const auto multiplier = [](const auto sv) noexcept -> std::size_t { if(sv.empty() || "B" == sv || "b" == sv) return 1u; else if("K" == sv || "k" == sv) return 1024u; else return 1024u*1024u; }; return restinio::cast_to<std::size_t>(params["value"]) * multiplier(params["multiplier"]); }
Здесь C++ная лямбда используется для эмуляции локальных функций из других языков программирования.
Функция main
Осталось посмотреть, как все это запускается в функции main:
using router_t = restinio::router::express_router_t<>; ... int main() { struct traits_t : public restinio::default_single_thread_traits_t { using logger_t = restinio::single_threaded_ostream_logger_t; using request_handler_t = router_t; }; asio_ns::io_context io_ctx; restinio::run( io_ctx, restinio::on_this_thread<traits_t>() .port(8080) .address("localhost") .write_http_response_timelimit(60s) .request_handler(make_router(io_ctx))); return 0; }
Что здесь происходит:
- Поскольку мне нужен не обычный штатный роутер запросов (который вообще ничего делать сам не может и перекладывает всю работу на плечи программиста), то я определяю новые свойства для своего HTTP-сервера. Для этого беру штатные свойства однопоточного HTTP-сервера (тип
restinio::default_single_thread_traits_t) и указываю, что в качестве обработчика запросов будет использоваться экземпляр express-like роутера. Заодно, чтобы контролировать, что происходит внутри, указываю, чтобы HTTP-сервер использовал настоящий логгер (по умолчанию используетсяnull_logger_tкоторый вообще ничего не логирует). - Поскольку мне нужно взводить таймеры внутри after-write нотификаторов, то мне нужен экземпляр io_context, с которым я смог бы работать. Поэтому я его создаю сам. Это дает мне возможность передать ссылку на мой io_context в функцию
make_router. - Остается только запустить HTTP-сервер в однопоточном варианте на ранее созданном мной io_context-е. Функция
restinio::runвернет управление только когда HTTP-сервер завершит свою работу.
Заключение
В статье не был показан полный код моего тестового сервера, только его основные моменты. Полный код, которого чуть-чуть больше из-за дополнительных typedef-ов и вспомогательных функций, несколько подлиннее. Увидеть его можно здесь. На момент написания статьи это 185 строк, включая пустые строки и комментарии. Ну и написаны эти 185 строк за пару-тройку подходов суммарной длительностью вряд ли более часа.
Мне такой результат понравился и задача оказалась интересной. В практическом плане быстро был получен нужный мне вспомогательный инструмент. И в плане дальнейшего развития RESTinio появились кое-какие мысли.
В общем, если кто-то еще не пробовал RESTinio, то я приглашаю попробовать. Сам проект живет на GitHub. Задать вопрос или высказать свои предложения можно в Google-группе или прямо здесь, в комментариях.
