Асинхронные HTTP-запросы на C++: входящие через RESTinio, исходящие через libcurl. Часть 2

    В предыдущей статье мы начали рассказывать о том, как можно реализовать асинхронную обработку входящих HTTP-запросов, внутри которой нужно выполнять асинхронные исходящие HTTP-запросы. Мы рассмотрели реализованную на C++ и RESTinio имитацию стороннего сервера, который долго отвечает на HTTP-запросы. Сейчас же мы поговорим о том, как можно реализовать выдачу асинхронных исходящих HTTP-запросов к этому серверу посредством curl_multi_perform.

    Несколько слов о том, как можно использовать curl_multi


    Библиотека libcurl широко известна в мире C и C++. Но, вероятно, наиболее широко она известна в виде т.н. curl_easy. Использовать curl_easy просто: сперва вызываем curl_easy_init, затем несколько раз вызываем curl_easy_setopt, затем один раз curl_easy_perform. И, в общем-то, все.

    В контексте нашего рассказа с curl_easy плохо то, что это синхронный интерфейс. Т.е. каждый вызов curl_easy_perform блокирует вызвавшую его рабочую нить до завершения выполнения запроса. Что нам категорически не подходит, т.к. мы не хотим блокировать свои рабочие нити на то время, пока медленный сторонний сервер соизволит нам ответить. От libcurl-а нам нужна асинхронная работа с HTTP-запросами.

    И libcurl позволяет работать с HTTP-запросами асинхронно через т.н. curl_multi. При использовании curl_multi программист все так же вызывает curl_easy_init и curl_easy_setopt для подготовки каждого своего HTTP-запроса. Но не делает вызов curl_easy_perform. Вместо этого пользователь создает экземпляр curl_multi через вызов curl_multi_init. Затем добавляет в этот curl_multi-экземпляр подготовленные curl_easy-экземпляры через curl_multi_add_handle и…

    А вот дальше curl_multi предоставляет программисту выбор:


    с последующим обращением к curl_multi_info_read для того, чтобы определить завершившиеся запросы.

    Мы покажем использование обоих подходов. В этой статье речь пойдет о работе с curl_multi_perform, а в заключительной статье серии — о работе с curl_multi_socket_action.

    О чем речь пойдет сегодня?


    В прошлой статье мы в общих чертах описали небольшую демонстрацию, состоящую из delay_server-а и нескольких bridge_server-ов, а так же подробно разобрали реализацию delay_server-а. Сегодня мы расскажем о bridge_server_1, который выполняет запросы к delay_server посредством curl_multi_perform.

    bridge_server_1


    Что делает bridge_server_1?


    bridge_server_1 принимает HTTP GET-запросы на URL вида /data?year=YYYY&month=MM&day=DD. Каждый принятый запрос трансформируется в HTTP GET-запрос к delay_server. Когда от delay_server-а приходит ответ, то этот ответ соответствующим образом трансформируется в ответ на исходный HTTP GET-запрос.

    Если сперва запустить delay_server:

    delay_server -p 4040

    затем запустить bridge_server_1:

    bridge_server_1 -p 8080 -P 4040

    и затем выполнить запрос к bridge_server_1, то можно получить следующее:

    curl -4 -v "http://localhost:8080/data?year=2018&month=02&day=25"
    *   Trying 127.0.0.1...
    * TCP_NODELAY set
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /data?year=2018&month=02&day=25 HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.58.0
    > Accept: */*
    >
    < HTTP/1.1 200 OK
    < Connection: keep-alive
    < Content-Length: 111
    < Server: RESTinio hello world server
    < Date: Sat, 24 Feb 2018 10:15:41 GMT
    < Content-Type: text/plain; charset=utf-8
    <
    Request processed.
    Path: /data
    Query: year=2018&month=02&day=25
    Response:
    ===
    Hello world!
    Pause: 4376ms.
    
    ===
    * Connection #0 to host localhost left intact

    bridge_server_1 берет значения параметров year, month и day из URL и в неизменном виде передает их в delay_server. Поэтому если значение какого-то из параметров задать неправильно, то bridge_server_1 передаст это неправильное значение в delay_server и последствия будут видны в ответе на первоначальный запрос:

    curl -4 -v "http://localhost:8080/data?year=2018&month=Feb&day=25"
    *   Trying 127.0.0.1...
    * TCP_NODELAY set
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /data?year=2018&month=Feb&day=25 HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.58.0
    > Accept: */*
    >
    < HTTP/1.1 200 OK
    < Connection: keep-alive
    < Content-Length: 81
    < Server: RESTinio hello world server
    < Date: Sat, 24 Feb 2018 10:19:55 GMT
    < Content-Type: text/plain; charset=utf-8
    <
    Request failed.
    Path: /data
    Query: year=2018&month=Feb&day=25
    Response code: 404
    * Connection #0 to host localhost left intact

    bridge_server_1 принимает только HTTP GET запросы и только на URL /data. Все остальные запросы bridge_server_1 отвергает.

    Как работает bridge_server_1?


    bridge_server_1 представляет из себя C++ приложение, работа в котором выполняется на двух нитях. На главном потоке работает RESTinio (т.е. на главном потоке запускается встраиваемый HTTP-сервер). А на второй нити, которая запускается из функции main(), выполняются манипуляции с curl_multi (эту нить далее будем называть curl-нитью). Передача информации от главной нити к рабочей curl-нити осуществляется через простой самодельный thread-safe контейнер.

    Когда RESTinio принимает новый HTTP-запрос, этот запрос передается в заданный при старте RESTinio callback. Там проверяется URL запроса и, если это интересующий нас запрос, то создается объект с описанием принятого запроса. Созданный объект заталкивается в thread-safe контейнер, из которого этот объект будет извлечен уже рабочей curl-нитью.

    Рабочая curl-нить периодически извлекает объекты с описаниями принятых запросов из thread-safe контейнера. Для каждого принятого запроса на этой рабочей нити создается соответствующий curl_easy-экземпляр. Этот экземпляр регистрируется в экземпляре curl_multi.

    Рабочая curl-нить выполняет обработку посредством периодических вызовов curl_multi_perform,
    curl_multi_wait и curl_multi_info_read, но подробнее об этом речь пойдет ниже. Когда curl-нить обнаруживает, что очередной запрос обработан (т.е. получен ответ от delay_server-а), то тут же формируется ответ и на исходный входящий HTTP-запрос. Т.е. получается, что входящий HTTP-запрос принимается на главной нити приложения, затем он передается в curl-нить, где и формируется ответ на принятый входящий HTTP-запрос.

    Разбор кода bridge_server_1


    Разбор кода bride_server_1 будет производится следующим образом:

    • сперва будет показана функция main() с необходимыми пояснениями;
    • затем будет показан код нескольких функций, которые имеют отношение к RESTinio;
    • затем уже будет произведен разбор того кода, который работает с curl_multi.

    Ряд моментов, например, связанных с RESTinio или разбором аргументов командной строки, затрагивались в предыдущей статье, поэтому подробно мы на них здесь останавливаться не будем.

    Функция main()


    Вот весь код функции main() для bridge_server_1:

    int main(int argc, char ** argv) {
      try {
        const auto cfg = parse_cmd_line_args(argc, argv);
        if(cfg.help_requested_)
          return 1;
    
        // Нам потребуется контейнер для передачи информации между
        // рабочими нитями.
        request_info_queue_t queue;
    
        // Актуальный обработчик входящих HTTP-запросов.
        auto actual_handler = [&cfg, &queue](auto req) {
            return handler(cfg.config_, queue, std::move(req));
          };
    
        // Запускаем отдельную рабочую нить, на которой будут выполняться
        // запросы к удаленному серверу посредством curl_multi_perform.
        std::thread curl_thread{[&queue]{ curl_multi_work_thread(queue); }};
        // Защищаемся от выхода из скоупа без предварительного останова
        // этой отдельной рабочей нити.
        auto curl_thread_stopper = cpp_util_3::at_scope_exit([&] {
            queue.close();
            curl_thread.join();
          });
    
        // Теперь можно запустить основной HTTP-сервер.
    
        // Если должна использоваться трассировка запросов, то должен
        // запускаться один тип сервера.
        if(cfg.config_.tracing_) {
          // Для того, чтобы сервер трассировал запросы, нужно определить
          // свой класс свойств для сервера.
          struct traceable_server_traits_t : public restinio::default_single_thread_traits_t {
            // Определяем нужный нам тип логгера.
            using logger_t = restinio::single_threaded_ostream_logger_t;
          };
          // Теперь используем этот новый класс свойств для запуска сервера.
          run_server<traceable_server_traits_t>(
              cfg.config_, std::move(actual_handler));
        }
        else {
          // Трассировка не нужна, поэтому запускаем обычный штатный сервер.
          run_server<restinio::default_single_thread_traits_t>(
              cfg.config_, std::move(actual_handler));
        }
    
        // Все, теперь ждем завершения работы сервера.
      }
      catch( const std::exception & ex ) {
        std::cerr << "Error: " << ex.what() << std::endl;
        return 2;
      }
    
      return 0;
    }

    Значительная часть main()-а повторяет main() из описанного в предыдущей статье delay_server. Такой же разбор аргументов командной строки. Такая же переменная actual_handler для хранения лямбда-функции с вызовом реального обработчика HTTP-запросов. Такой же вызов run_server с выбором конкретного типа Traits в зависимости от того, должна ли использоваться трассировка HTTP-сервера или нет.

    Но есть и несколько отличий.

    Во-первых, нам потребуется thread-safe контейнер для передачи информации о принятых запросах от главной нити на curl-нить. В качестве этого контейнера будет использована переменная queue, имеющая тип request_info_queue_t. Подробнее реализацию контейнера мы рассмотрим ниже.

    Во-вторых, нам нужно запустить дополнительную рабочую нить, на которой мы будем работать с curl_multi. И так же нам нужно эту дополнительную рабочую нить остановить, когда мы будем из main()-а выходить. Все это происходит вот в этих строчках:

    // Запускаем отдельную рабочую нить, на которой будут выполняться
    // запросы к удаленному серверу посредством curl_multi_perform.
    std::thread curl_thread{[&queue]{ curl_multi_work_thread(queue); }};
    // Защищаемся от выхода из скоупа без предварительного останова
    // этой отдельной рабочей нити.
    auto curl_thread_stopper = cpp_util_3::at_scope_exit([&] {
        queue.close();
        curl_thread.join();
      });

    Надеемся, что код для запуска нити не вызывает вопросов. А для завершения рабочей нити нам нужно выполнить два действия:

    1. Дать сигнал рабочей нити завершить свою работу. Это выполняется за счет операции queue.close().
    2. Дождаться завершения рабочей нити. Это происходит за счет curl_thread.join().

    Оба эти действия в виде лямбды передаются во вспомогательную функцию at_scope_exit() из нашей утилитарной библиотеки. Этот at_scope_exit() — это всего лишь несложный аналог таких известных вещей, как BOOST_SCOPE_EXIT из Boost-а, defer из языка Go и scope(exit) из языка D. Благодаря at_scope_exit() мы автоматически завершаем curl-нить вне зависимости от того, по какой причине мы выходим из main().

    Конфигурация и разбор аргументов командной строки


    Если кому-то интересно, то ниже можно посмотреть, как представляется конфигурация для bridge_server_1. И как эта конфигурация образуется в результате разбора аргументов командной строки. Все очень похоже на то, как мы это делали в delay_server, поэтому детали упрятаны под спойлер дабы не отвлекать внимание.

    Структура config_t и функция parse_cmd_line_args()
    // Конфигурация, которая потребуется серверу.
    struct config_t {
      // Адрес, на котором нужно слушать новые входящие запросы.
      std::string address_{"localhost"};
      // Порт, на котором нужно слушать.
      std::uint16_t port_{8080};
    
      // Адрес, на который нужно адресовать собственные запросы.
      std::string target_address_{"localhost"};
      // Порт, на который нужно адресовать собственные запросы.
      std::uint16_t target_port_{8090};
    
      // Нужно ли включать трассировку?
      bool tracing_{false};
    };
    
    // Разбор аргументов командной строки.
    // В случае неудачи порождается исключение.
    auto parse_cmd_line_args(int argc, char ** argv) {
      struct result_t {
        bool help_requested_{false};
        config_t config_;
      };
      result_t result;
    
      // Подготавливаем парсер аргументов командной строки.
      using namespace clara;
    
      auto cli = Opt(result.config_.address_, "address")["-a"]["--address"]
            ("address to listen (default: localhost)")
        | Opt(result.config_.port_, "port")["-p"]["--port"]
            (fmt::format("port to listen (default: {})", result.config_.port_))
        | Opt(result.config_.target_address_, "target address")["-T"]["--target-address"]
            (fmt::format("target address (default: {})", result.config_.target_address_))
        | Opt(result.config_.target_port_, "target port")["-P"]["--target-port"]
            (fmt::format("target port (default: {})", result.config_.target_port_))
        | Opt(result.config_.tracing_)["-t"]["--tracing"]
            ("turn server tracing ON (default: OFF)")
        | Help(result.help_requested_);
    
      // Выполняем парсинг...
      auto parse_result = cli.parse(Args(argc, argv));
      // ...и бросаем исключение если столкнулись с ошибкой.
      if(!parse_result)
        throw std::runtime_error("Invalid command line: "
            + parse_result.errorMessage());
    
      if(result.help_requested_)
        std::cout << cli << std::endl;
    
      return result;
    }


    Детали взаимодействия между RESTinio- и curl-частями


    Информация о принятом входящем HTTP-запросе передается от RESTinio-части bridge_server_1 в curl-часть посредством экземпляров вот такой структуры:

    // Сообщение, которое будет передаваться на рабочую нить с curl_multi_perform
    // для того, чтобы выполнить запрос к удаленному серверу.
    struct request_info_t {
      // URL, на который нужно выполнить обращение.
      const std::string url_;
    
      // Запрос, в рамках которого нужно сделать обращение к удаленному серверу.
      restinio::request_handle_t original_req_;
    
      // Код ошибки от самого curl-а.
      CURLcode curl_code_{CURLE_OK};
    
      // Код ответа удаленного сервера.
      // Имеет актуальное значение только если сервер ответил.
      long response_code_{0};
    
      // Ответные данные, которые будут получены от удаленного сервера.
      std::string reply_data_;
    
      request_info_t(std::string url, restinio::request_handle_t req)
        : url_{std::move(url)}, original_req_{std::move(req)}
        {}
    };

    Первоначально в ней заполняются всего два поля: url_ и req_. Но после того, как запрос будет обработан curl-нитью, будут заполнены и остальные поля. В первую очередь это поле curl_code_. Если в нем окажется CURLE_OK, то свои значения получат и поля response_code_ и reply_data_.

    Для того, чтобы передавать экземпляры request_info_t между рабочими нитями используется следующий самодельный thread-safe контейнер:

    // Примитивная реализация thread-safe контейнера для обмена информацией
    // между разными рабочими нитями.
    // Позволяет только поместить новый элемент в контейнер и попробовать взять
    // элемент из контейнера. Никакого ожидания на попытке извлечения элемента
    // из пустого контейнера нет.
    template<typename T>
    class thread_safe_queue_t {
      using unique_ptr_t = std::unique_ptr<T>;
    
      std::mutex lock_;
      std::queue<unique_ptr_t> content_;
    
      bool closed_{false};
    public:
      enum class status_t {
        extracted,
        empty_queue,
        closed
      };
    
      void push(unique_ptr_t what) {
        std::lock_guard<std::mutex> l{lock_};
        content_.emplace(std::move(what));
      }
    
      // Метод pop получает лямбда-функцию, в которую будут поочередно
      // переданы все элементы из контейнера, если контейнер не пуст.
      // Передача будет осуществляться при захваченном mutex-е, что означает,
      // что новые элементы не могут быть помещенны в очередь, пока pop()
      // не завершит свою работу.
      template<typename Acceptor>
      status_t pop(Acceptor && acceptor) {
        std::lock_guard<std::mutex> l{lock_};
        if(closed_) {
          return status_t::closed;
        }
        else if(content_.empty()) {
          return status_t::empty_queue;
        }
        else {
          while(!content_.empty()) {
            acceptor(std::move(content_.front()));
            content_.pop();
          }
          return status_t::extracted;
        }
      }
    
      void close() {
        std::lock_guard<std::mutex> l{lock_};
        closed_ = true;
      }
    };
    
    // Тип контейнера для обмена информацией между рабочими нитями.
    using request_info_queue_t = thread_safe_queue_t<request_info_t>;

    В принципе, thread_safe_queue_t не обязательно было делать шаблоном. Но так получилось, что сперва был сделан класс-шаблон thread_safe_queue_t, а уже после выяснилось, что он будет использоваться только с типом request_info_t. Но переделывать реализацию с шаблона на обычный класс мы уже не стали.

    RESTinio-часть bridge_server_1


    В коде bridge_server_1 есть всего три функции, которые взаимодействуют с RESTinio. Во-первых, это функция-шаблон run_server(), которая отвечает за запуск HTTP-сервера на контексте главной нити приложения:

    // Вспомогательная функция, которая отвечает за запуск сервера нужного типа.
    template<typename Server_Traits, typename Handler>
    void run_server(
        const config_t & config,
        Handler && handler) {
      restinio::run(
          restinio::on_this_thread<Server_Traits>()
            .address(config.address_)
            .port(config.port_)
            .request_handler(std::forward<Handler>(handler)));
    }

    В bridge_server_1 она даже более простая, чем в delay_server. И, вообще говоря, без нее можно было бы обойтись. Можно было бы просто вызывать restinio::run() прямо в main()-е. Но лучше все-таки иметь отдельный run_server(), чтобы при необходимости поменять настройки запускаемого HTTP-сервера менять их пришлось бы всего в одном месте.

    Во-вторых, это функция handler(), которая и является обработчиком HTTP-запросов. Она чуть сложнее, чем ее аналог в delay_server, но так же вряд ли вызовет сложности с пониманием:

    // Реализация обработчика запросов.
    restinio::request_handling_status_t handler(
        const config_t & config,
        request_info_queue_t & queue,
        restinio::request_handle_t req) {
      if(restinio::http_method_get() == req->header().method()
          && "/data" == req->header().path()) {
        // Разберем дополнительные параметры запроса.
        const auto qp = restinio::parse_query(req->header().query());
    
        // Нужно оформить объект с информацией о запросе и передать
        // его на обработку в нить curl_multi.
        auto url = fmt::format("http://{}:{}/{}/{}/{}",
            config.target_address_,
            config.target_port_,
            qp["year"], qp["month"], qp["day"]);
    
        auto info = std::make_unique<request_info_t>(
            std::move(url), std::move(req));
    
        queue.push(std::move(info));
    
        // Подтверждаем, что мы приняли запрос к обработке и что когда-то
        // мы ответ сгенерируем.
        return restinio::request_accepted();
      }
    
      // Все остальные запросы нашим демонстрационным сервером отвергаются.
      return restinio::request_rejected();
    }

    Здесь мы сперва вручную проверяем тип пришедшего запроса и URL из него. Если это не HTTP GET для /data, то запрос мы обрабатывать отказываемся. В bridge_server_1 нам приходится делать эту проверку вручную, тогда как в delay_server из-за использования Express router-а надобности в этом не было.

    Далее, если это ожидаемый нами запрос, то мы разбираем query string на составляющие и формируем URL на delay_server для собственного исходящего запроса. После чего создаем объект request_info_t в который сохраняем сформированный URL и умную ссылку на принятый входящий запрос. И передаем этот request_info_t на обработку curl-нити (путем сохранения его в thread-safe контейнере).

    Ну и, в-третьих, функция complete_request_processing(), в которой мы отвечаем на принятый входящий HTTP-запрос:

    // Финальная стадия обработки запроса к удаленному серверу.
    // curl_multi свою часть работы сделал. Осталось создать http-response,
    // который будет отослан в ответ на входящий http-request.
    void complete_request_processing(request_info_t & info) {
      auto response = info.original_req_->create_response();
    
      response.append_header(restinio::http_field::server,
          "RESTinio hello world server");
      response.append_header_date_field();
      response.append_header(restinio::http_field::content_type,
          "text/plain; charset=utf-8");
    
      if(CURLE_OK == info.curl_code_) {
        if(200 == info.response_code_)
          response.set_body(
            fmt::format("Request processed.\nPath: {}\nQuery: {}\n"
                "Response:\n===\n{}\n===\n",
              info.original_req_->header().path(),
              info.original_req_->header().query(),
              info.reply_data_));
        else
          response.set_body(
            fmt::format("Request failed.\nPath: {}\nQuery: {}\n"
                "Response code: {}\n",
              info.original_req_->header().path(),
              info.original_req_->header().query(),
              info.response_code_));
      }
      else
        response.set_body("Target service unavailable\n");
    
      response.done();
    }

    Здесь мы используем оригинальный входящий запрос, который был сохранен в поле request_info_t::original_req_. Метод restinio::request_t::create_response() возвращает объект, который должен использоваться для формирования HTTP-ответа. Мы сохраняем этот объект в переменную response. То, что тип этой переменной не записан явно не случайно. Дело в том, что create_response() может возвращать разные типы объектов (подробности можно найти здесь). И в данном случае нам не важно, что именно возвращает самая простая форма create_response().

    Далее мы наполняем HTTP-ответ в зависимости от того, чем завершился наш HTTP-запрос к delay_server-у. И когда HTTP-ответ полностью сформирован, мы предписываем RESTinio отослать ответ HTTP-клиенту вызвав response.done().

    Касательно функции complete_request_processing() нужно подчеркнуть одну очень важную вещь: она вызывается на контексте curl-нити. Но когда мы вызываем response.done(), то доставка сформированного ответа автоматически делегируется главной нити приложения, на которой и запущен HTTP-сервер.

    curl-часть bridge_server_1


    В curl-часть bridge_server_1 входит несколько функций, в которых выполняется работа с curl_multi и curl_easy. Начнем разбор этой части с главной ее функции, curl_multi_work_thread(), а затем рассмотрим остальные функции, прямо или косвенно вызываемые из curl_multi_work_thread().

    Но сначала небольшое пояснение по поводу того, почему мы в своей демонстрации использовали «голый» libcurl без применения каких-либо C++ных оберток вокруг него. Причина более чем прозаическая: что тут думать, трясти нужно не хотелось тратить время на поиск подходящей обертки и разбирательство с тем, что и как эта обертка делает. При том, что в свое время у нас был опыт работы с libcurl-ом, как с ним взаимодействовать на уровне его родного C-шного API мы себе представляли. Нужен нам тут был лишь минимальный набор фич libcurl. И при этом хотелось держать все под своим полным контролем. Поэтому никаких сторонних C++ных надстроек над libcurl решили не задействовать.

    И еще один важный дисклаймер нужно сделать перед разбором curl-ового кода: дабы максимально упростить и сократить код демонстрационных приложений мы вообще не делали никакого контроля за ошибками. Если бы мы должным образом контролировали коды возврата curl-овых функций, то код распух бы раза в три, существенно потеряв при этом в понятности, но не выиграв ничего в функциональности. Поэтому мы в своей демонстрации рассчитываем на то, что вызовы libcurl-а всегда будут завершаться успешно. Это наше осознанное решение для данного конкретного эксперимента, но мы бы так никогда не сделали в реальном продакшен-коде.

    Ну а теперь, после всех необходимых пояснений, давайте перейдем к рассмотрению того, как curl_multi_perform позволил нам организовать работу с асинхронными исходящими HTTP-запросами.

    Функция curl_multi_work_thread()


    Вот код основной функции, которая работает на отдельной curl-нити в bridge_server_1:

    // Реализация рабочей нити, на которой будут выполняться операции
    // curl_multi_perform.
    void curl_multi_work_thread(request_info_queue_t & queue) {
      using namespace cpp_util_3;
    
      // Инциализируем сам curl.
      curl_global_init(CURL_GLOBAL_ALL);
      auto curl_global_deinitializer =
          at_scope_exit([]{ curl_global_cleanup(); });
    
      // Создаем экземпляр curl_multi, который нам потребуется для выполнения
      // запросов к удаленному серверу.
      auto curlm = curl_multi_init();
      auto curlm_destroyer = at_scope_exit([&]{ curl_multi_cleanup(curlm); });
    
      // Количество активных операций.
      int still_running{ 0 };
    
      while(true) {
        // Сперва пытаемся взять новые заявки. Делаем это до тех пор,
        // пока очередь не будет опустошена.
        auto status = try_extract_new_requests(queue, curlm);
        if(request_info_queue_t::status_t::closed == status)
          // Работу нужно завершать.
          // Запросы, которые остались необработанными оставляем как есть.
          return;
    
        // Если удалось что-то извлечь или если есть незавершенные операции,
        // то вызываем curl_multi_perform.
        if(0 != still_running ||
            request_info_queue_t::status_t::extracted == status) {
          curl_multi_perform(curlm, &still_running);
          // Пытаемся проверить, закончились ли какие-нибудь операции.
          check_curl_op_completion(curlm);
        }
    
        // Если есть незаврешенные операции, то вызываем curl_multi_wait,
        // чтобы подождать событий ввода-вывода.
        if(0 != still_running) {
          curl_multi_wait(curlm, nullptr, 0, 50 /*ms*/, nullptr);
        }
        else {
          // Никаких активностей нет, поэтому просто заснем, чтобы чуть позже
          // проверить, не появились ли новые запросы.
          std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
      }
    }

    Ее можно разделить на две части: в первой части происходит необходимая инициализация libcurl и создание экземпляра curl_multi, а во второй части выполняется основной цикл по обслуживанию исходящих HTTP-запросов.

    Первая часть совсем простая. Для инициализации libcurl-а нужно вызвать curl_global_init(), а затем, в самом конце работы — curl_global_cleanup(). Что мы и делаем с использованием уже описанного выше фокуса с at_scope_exit. Похожий прием применяем и для создания/удаления экземпляра curl_multi. Надеемся, этот код не вызывает затруднений.

    А вот вторая часть посложнее. Идея такая:

    • мы крутим цикл обслуживания HTTP-запросов до тех пор, пока нам не дадут команду на завершение работы (в функции main() для этого делают вызов queue.close());
    • на каждой итерации цикла сперва пытаемся взять новые HTTP-запросы из thread-safe контейнера. Если новые запросы там есть, то каждый из них преобразуется в curl_easy-экземпляр, который добавляется в curl_multi-экземпляр;
    • после этого мы вызываем curl_multi_perform() для того, чтобы попытаться обслужить те запросы, которые уже есть в работе и/или новые запросы, которые могли быть только добавлены в curl_multi-экземпляр. И после вызова curl_multi_perform() сразу же пытаемся вызвать curl_multi_info_read() для того, чтобы обнаружить те HTTP-запросы, обработка которых была завершена libcurl-ом (все это выполняется внутри check_curl_op_completion());
    • затем мы либо вызываем curl_multi_wait() чтобы дождаться готовности IO-операций, если какие-то HTTP-запросы в данный момент обслуживаются, либо же просто засыпаем на 50ms, если ничего в обработке сейчас нет.

    Грубо говоря, curl-нить работает на потактовой основе. В начале каждого такта извлекаются новые запросы и проверяются результаты активных запросов. После чего curl-нить засыпает либо до готовности IO-операций, либо до истечения 50-миллисекундной паузы. При этом ожидание готовности IO-операций так же ограничивается 50-миллисекундным интервалом.

    Схема очень простая. Но имеющая пару недостатков. В зависимости от ситуации эти недостатки могут быть фатальными, а могут и вовсе не быть недостатками:

    1. Функция curl_multi_info_read() вызывается после каждого обращения к curl_multi_perform(). Хотя, в принципе, curl_multi_perform возвращает количество запросов, которые сейчас находится в обработке. И на основании изменения этого значения можно определять момент, когда количество запросов уменьшается, и только после этого вызывать curl_multi_info_read. Однако, мы используем самый примитивный вариант работы дабы не заморачиваться на ситуации, когда один запрос завершился, один новый добавился, при этом общее количество выполняющихся запросов осталось прежним.

    2. Увеличивается латентность обработки очередного запроса. Так, если в данный момент нет никаких активных запросов и поступает новый входящий HTTP-запрос, то curl-нить получит информацию о нем только после выхода из очередного вызова this_thread::sleep_for(). При размере такта работы curl_multi_work_thread() в 50 миллисекунд это означает +50ms к латентности обработки запроса (в худшем случае). В bridge_server_1 нас это не волнует. Но в реализации bridge_server_1_pipe мы постарались избавиться от этого недостатка, за счет использования дополнительного pipe с нотификациями для curl-нити. Разбирать детально bridge_server_1_pipe мы изначально не планировали, но если у кого-то есть желание увидеть такой разбор, то отпишитесь в комментариях, пожалуйста. При наличии таких пожеланий мы сделаем дополнительную статью с разбором.

    Вот так, в общих словах, работает curl-нить в примере bridge_server_1. Если у вас остались вопросы, то задавайте их в комментариях, мы постараемся ответить. А пока же перейдем к разбору оставшихся функций, относящихся к curl-части bridge_server_1.

    Функции для приема новых входящих HTTP-запросов


    В начале каждого итерации основного цикла внутри curl_multi_work_thread() выполняется попытка забрать все новые входящие HTTP-запросы из thread-safe контейнера, преобразовать их в curl_easy-экземпляры и добавить эти новые curl_easy-экземпляры в curl_multi-экземпляр. Выполняется это все с помощью нескольких вспомогательных функций.

    Во-первых это функция try_extract_new_requests():

    // Попытка извлечения всех запросов, которые ждут в очереди.
    // Если возвращается status_t::closed, значит работа должна быть
    // остановлена.
    auto try_extract_new_requests(request_info_queue_t & queue, CURLM * curlm) {
      return queue.pop([curlm](auto info) {
          introduce_new_request_to_curl_multi(curlm, std::move(info));
        });
    }

    Фактически ее работа состоит в том, чтобы вызвать метод pop() нашего thread-safe контейнера и передать в pop() нужную лямбда-функцию. По большому счету это все можно было бы записать прямо внутри curl_multi_work_thread(), но изначально try_extract_new_requests() была объемнее. Да и ее наличие упрощает код curl_multi_work_thread().

    Во-вторых, это функция introduce_new_request_to_curl_multi(), в которой, фактически, и выполняется вся основная работа. А именно:

    // Создать curl_easy для нового исходящего запроса, заполнить все нужные
    // для него значения и передать этот новый curl_easy в curl_multi.
    void introduce_new_request_to_curl_multi(
        CURLM * curlm,
        std::unique_ptr<request_info_t> info) {
      // Создаем и подготавливаем curl_easy экземпляр для нового запроса.
      CURL * h = curl_easy_init();
      curl_easy_setopt(h, CURLOPT_URL, info->url_.c_str());
      curl_easy_setopt(h, CURLOPT_PRIVATE, info.get());
    
      curl_easy_setopt(h, CURLOPT_WRITEFUNCTION, write_callback);
      curl_easy_setopt(h, CURLOPT_WRITEDATA, info.get());
    
      // Новый curl_easy подготовлен, можно отдать его в curl_multi.
      curl_multi_add_handle(curlm, h);
    
      // unique_ptr не должен больше нести ответственность за объект.
      // Мы его сами удалим когда обработка запроса завершится.
      info.release();
    }

    Если вы работали с curl_easy, то ничего нового вы здесь для себя не увидите. Разве что за исключением вызова curl_multi_add_handle(). Именно таким образом и выполняется передача контроля за выполнением отдельного HTTP-запроса экземпляру curl_multi. Если же вы с curl_easy раньше не работали, то вам нужно будет ознакомиться с официальной документаций, чтобы разобраться с тем, для чего вызываются curl_easy_setopt() и какой эффект это дает.

    Ключевой же момент в introduce_new_request_to_curl_multi() связан с управлением времени жизни экземпляра request_info_t. Дело в том, что request_info_t передается между рабочими нитями посредством unique_ptr-а. И в introduce_new_request_to_curl_multi() он приходит так же в виде unique_ptr-а. Значит, если не принять каких-то специальных действий, экземпляр request_info_t будет уничтожен при выходе из introduce_new_request_to_curl_multi(). Но нам нужно сохранить request_info_t до завершения обработки этого запроса libcurl-ом.

    Поэтому мы сохраняем указатель на request_info_t как приватные данные внутри curl_easy-экземпляра. И вызываем release() у unique_ptr-а для того, чтобы unique_ptr перестал контролировать время жизни нашего объекта. Когда обработка запроса будет завершена, мы вручную достанем приватные данные из curl_easy-экземпляра и сами уничтожим request_info_t объект (это можно будет увидеть внутри функции check_curl_op_completion(), которая разбирается ниже).

    С этим, кстати говоря, связан еще один момент, на который мы не стали отвлекаться в своем демо-приложении, но которому придется уделить время при написании продакшен-кода: когда приложение завершает свою работу, объекты request_info_t, указатели на которые были сохранены внутри curl_easy-экземпляров, не удаляются. Т.е. когда мы выходим из основного цикла в curl_multi_work_thread(), то мы не проходимся по оставшимся живым экземплярам curl_easy и не подчищаем request_info_t за собой. По хорошему, это следовало бы делать.

    Ну и, в-третьих, за подготовку HTTP-запросов отвечает функция write_callback, указатель на которую мы сохраняем в curl_easy-экземпляре:

    // Эту функцию будет вызывать curl когда начнут приходить данные
    // от удаленного сервера. Указатель на нее будет задан через
    // CURLOPT_WRITEFUNCTION.
    std::size_t write_callback(
        char *ptr, size_t size, size_t nmemb, void *userdata) {
      auto info = reinterpret_cast<request_info_t *>(userdata);
      const auto total_size = size * nmemb;
      info->reply_data_.append(ptr, total_size);
    
      return total_size;
    }

    Эта функция вызывается libcurl-ом когда удаленный сервер присылает какие-то данные в ответ на наш исходящий запрос. Эти данные мы накапливаем в поле request_info_t::reply_data_. Здесь так же используется тот факт, что указатель на экземпляр request_info_t сохранен как приватные данные внутри curl_easy-экземпляра.

    Функция check_curl_op_completion()


    Напоследок рассмотрим одну из основных функций curl-части bridge_server_1, которая отвечает за то, чтобы найти выполненные HTTP-запросы и завершить их обработку.

    Суть в том, что внутри curl_multi-экземпляра есть очередь неких сообщений, формируемых libcurl-ом в процессе работы curl_multi. Когда curl_multi завершает обработку очередного запроса внутри curl_multi_perform, в эту очередь сообщений ставится сообщение со специальным статусом CURLMSG_DONE. Это сообщение содержит информацию об обработанном запросе. Наша задача заключается в том, чтобы пробежаться по данной очереди и обработать все найденные в ней сообщения CURLMSG_DONE.

    Выглядит это следующим образом:

    // Попытка обработать все сообщения, которые на данный момент существуют
    // в curl_multi.
    void check_curl_op_completion(CURLM * curlm) {
      CURLMsg * msg;
      int messages_left{0};
    
      // В цикле извлекаем все сообщения от curl_multi и обрабатываем
      // только сообщения CURLMSG_DONE.
      while(nullptr != (msg = curl_multi_info_read(curlm, &messages_left))) {
        if(CURLMSG_DONE == msg->msg) {
          // Нашли операцию, которая реально завершилась.
          // Сразу забераем ее под unique_ptr, дабы не забыть вызвать
          // curl_easy_cleanup.
          std::unique_ptr<CURL, decltype(&curl_easy_cleanup)> easy_handle{
              msg->easy_handle,
              &curl_easy_cleanup};
    
          // Эта операция в curl_multi больше участвовать не должна.
          curl_multi_remove_handle(curlm, easy_handle.get());
    
          // Разбираемся с оригинальным запросом, с которым эта операция
          // была связана.
          request_info_t * info_raw_ptr{nullptr};
          curl_easy_getinfo(easy_handle.get(), CURLINFO_PRIVATE, &info_raw_ptr);
          // Сразу оборачиваем в unique_ptr, чтобы удалить объект.
          std::unique_ptr<request_info_t> info{info_raw_ptr};
    
          info->curl_code_ = msg->data.result;
          if(CURLE_OK == info->curl_code_) {
            // Нужно достать код, с которым нам ответил сервер.
            curl_easy_getinfo(
                easy_handle.get(),
                CURLINFO_RESPONSE_CODE,
                &info->response_code_);
          }
    
          // Теперь уже можно завершить обработку.
          complete_request_processing(*info);
        }
      }
    }

    Мы просто в цикле дергаем curl_multi_info_read() до тех пор, пока в очереди есть хоть что-нибудь. Если извлекаем сообщение типа CURLMSG_DONE, то берем из сообщения экземпляр curl_easy и:

    • изымаем его из curl_multi-экземпляра, т.к. там он больше не нужен;
    • достаем из curl_easy указатель на request_info_t и берем на себя управление временем его жизни;
    • разбираемся с результатом обработки запроса (т.е. достаем из curl_easy результат исходящего запроса);
    • формируем ответ на исходный входящий запрос (функция complete_request_processing разбиралась выше);
    • удаляем все, что больше не нужно (посредством unique_ptr-ов).

    И так для всех запросов, которые уже завершились к этому моменту.

    Заключение второй части


    В этой части рассказа мы рассмотрели, как можно на одной нити получать входящие HTTP-запросы и передавать их обработку второй рабочей нити, на которой посредством curl_multi_perform выполняются исходящие HTTP-запросы. Мы постарались осветить основные моменты в тексте статьи. Но, если что-то осталось непонятным, то задавайте вопросы, постараемся ответить на них в комментариях.

    Так же, если кому-то интересно почитать разбор реализации bridge_server_1_pipe, в котором используется нотификационный pipe, то дайте нам знать. Мы тогда сделаем статью на эту тему.

    Ну и еще осталось рассмотреть bridge_server_2, где используется более хитрый механизм curl_multi_socket_action. Там все гораздо веселее. По крайней мере так казалось пока мы разбирались с этим самым curl_multi_socket_action :)

    Продолжение следует…
    • +16
    • 3,1k
    • 6
    Поделиться публикацией

    Комментарии 6

      +2
      блокирует вызвавшую его рабочую нить

      Вечный вопрос, насколько дословно нужно переводить всем известные под оригинальным именем понятия…
        +1
        В русском профессиональном жаргоне для термина thread есть два устоявшихся русскоязычных термина: поток и нить. Оба используются в статье как взаимозаменяемые.

        Или вы о чем-то другом?
          +3
          я о потоке, нить (кмк) это из серии «дословного перевода»
            –1
            В контексте C++ слово «поток» оказывается не вполне однозначным, т.к. iostreams — это тоже потоки, но совсем другие, нежели threads. Поэтому в мире C++ термин «нить» вполне себе уместен, как более однозначный.
        +1
        > Увеличивается латентность обработки очередного запроса. Так, если в данный момент нет никаких активных запросов и поступает новый входящий HTTP-запрос, то curl-нить получит информацию о нем только после выхода из очередного вызова this_thread::sleep_for().

        Если в request_info_queue_t добавить condition variable, то от этой задержки можно избавиться.
        Вот например такой псевдокод
        std::condition_variable cond_;
        void push(...) {
            std::lock_guard<mutex> lock(mut);
            // push
            cond_.notify_one();
        }
        
        void pop(Acceptor && acceptor, function<bool()> func) {
            std::unique_lock<mutex> lock(mut);
            while (true) {
                if (cond.wait_for(lock, milliseconds(50), [](){return !queue_.empty()})) {
                    break;
               } else {
                    if (func()) { return; }
               }
            }
           // pop elements
        }
        
        // Вызов функции
        pop(accessor, [](){
            int numfds;
            curl_multi_wait(..., 0, &numfds);
            return numfds != 0;
        }
        
          0
          В принципе, да. Но там есть такой фокус, что curl_multi_wait() может возвращать 0 в numfds, даже если сейчас есть выполняющиеся запросы. Поэтому curl_multi_wait() нужно вызывать если still_running != 0, вне зависимости от того, пуста ли очередь или нет. Так что условие для засыпания на condition_variable в pop() будет посложнее.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое