Асинхронные HTTP-запросы на C++: входящие через RESTinio, исходящие через libcurl. Часть 3

    В предыдущей статье мы разобрали реализацию двухпоточного bridge_server-а. На одном потоке асинхронно обрабатываются входящие HTTP-запросы посредством RESTinio. На втором потоке выполняются асинхронные запросы к delay_server-у посредством libcurl в виде curl_multi с использованием функций curl_multi_perform и curl_multi_wait.

    Сегодня же мы разберем другую реализацию bridge_server-а, которая асинхронно обслуживает и входящие, и исходящие HTTP-запросы на одном и том же пуле потоков. Из libcurl-а для этих целей применяется функция curl_multi_socket_action.

    Эта реализация заняла у нас больше всего времени. Давно не приходилось выкуривать столько бамбука, сколько довелось при разбирательстве с документацией к этой функции и примерами ее использования. По началу все это вообще воспринималось как какая-то черная магия, но потом свет в конце туннеля все-таки забрезжил, а код — заработал. Как именно заработал? А вот об этом сегодня и поговорим.

    Что и как делает bridge_server_2?


    Рассматриваемый в данной статье bridge_server_2 делает все то же самое, что и рассмотренный ранее bridge_server_1. Только делает он это по другому. Вместо разделения функциональности по двум разным потокам, как это было в bridge_server_1, где на отдельном потоке работал RESTinio, а на другом потоке — curl_multi, в bridge_server_2 все операции выполняются на одном и том же пуле потоков.

    Для этого мы сами берем на себя создание сокетов, которые нужны libcurl-у, и сами ослеживаем моменты готовности этих сокетов к операциям read/write.

    Структура этой статьи


    Материал в данной статье построен следующим образом:

    • сначала речь пойдет о том, что в bridge_server_2 переиспользуется из bridge_server_1. Дабы не увеличивать объем статьи повторением того, что уже было объяснено ранее;
    • затем мы расскажем, чем RESTinio-часть bridge_server_2 отличается от RESTinio-части bridge_server_1;
    • затем мы перейдем к рассказу о том, как реализована curl-часть в bridge_server_2.

    Так что в этой части статьи не будет разбора кода bridge_server_2 от начала и до конца. Поэтому, если вы не читали предыдущую статью с разбором bridge_server_1, то может имеет смысл это сделать. Например, чтобы вы понимали, что такое функция check_curl_op_completion(), что она делает и как работает.

    Что общего между bridge_server_1 и bridge_server_2?


    В bridge_server_2 переиспользуются (методом copy&paste) следующие фрагменты:

    • конфигурация и разбор аргументов командой строки (структура config_t и функция parse_cmd_line_args());
    • структура request_info_t для передачи информации о принятом входящем запросе от RESTinio-части в curl-часть, а так же функция complete_request_processing() для отсылки ответа на принятый входящий HTTP-запрос;
    • функции write_callback() и check_curl_op_completion(). Это означает так же, что контроль за временем жизни экземпляров request_info_t в процессе обработки исходящего HTTP-запроса строится тем же самым образом: голый указатель на request_info_t сохраняется в curl_easy-экземпляре, а потом извлекается оттуда и удаляется.

    Отличия в RESTinio-части от bridge_server_1


    В RESTinio-части bridge_server_2 есть два основных отличия от bridge_server_1.

    Во-первых, в bridge_server_1 обработкой исходящих HTTP-запросов занималась отдельная рабочая нить. Поэтому для обмена информацией между нитью RESTinio и curl-нитью применялся специальный thread-safe контейнер, в который помещались экземпляры request_info_t.

    А вот в bridge_server_2 у RESTinio- и curl-частей приложения общий рабочий контекст (пул рабочих нитей). Поэтому для передачи информации между RESTinio- и curl-частями отдельный контейнер не нужен. Но curl-часть представлена объектом класса curl_multi_processor_t, которому и скармливаются все принятые входящие HTTP-запросы. Поэтому в функцию handler(), которая является реальным обработчиком входящих HTTP-запросов, теперь передается ссылка на curl_multi_processor_t:

    // Реализация обработчика запросов.
    restinio::request_handling_status_t handler(
        const config_t & config,
        curl_multi_processor_t & req_processor,
        restinio::request_handle_t req) {
      if(restinio::http_method_get() == req->header().method()
          && "/data" == req->header().path()) {
        // Разберем дополнительные параметры запроса.
        const auto qp = restinio::parse_query(req->header().query());
    
        // Нужно оформить объект с информацией о запросе и передать
        // его на обработку в нить curl_multi.
        auto url = fmt::format("http://{}:{}/{}/{}/{}",
            config.target_address_,
            config.target_port_,
            qp["year"], qp["month"], qp["day"]);
    
        auto info = std::make_unique<request_info_t>(
            std::move(url), std::move(req));
    
        req_processor.perform_request(std::move(info));
    
        // Подтверждаем, что мы приняли запрос к обработке и что когда-то
        // мы ответ сгенерируем.
        return restinio::request_accepted();
      }
    
      // Все остальные запросы нашим демонстрационным сервером отвергаются.
      return restinio::request_rejected();
    }

    Во-вторых, теперь RESTinio запускается на пуле рабочих потоков. Поэтому поменялся код в функции run_server, вместо on_this_thread там теперь используется on_thread_pool:

    // Вспомогательная функция, которая отвечает за запуск сервера нужного типа.
    template<typename Server_Traits, typename Handler>
    void run_server(
        const config_t & config,
        restinio::asio_ns::io_context & ioctx,
        Handler && handler) {
      restinio::run(
          ioctx,
          restinio::on_thread_pool<Server_Traits>(std::thread::hardware_concurrency())
            .address(config.address_)
            .port(config.port_)
            .request_handler(std::forward<Handler>(handler)));
    }

    Ну и, собственно говоря, немного поменялась функция main(). Хотя и не принципиально.

    Код функции main() из bridge_server_2
    int main(int argc, char ** argv) {
      try {
        const auto cfg = parse_cmd_line_args(argc, argv);
        if(cfg.help_requested_)
          return 1;
    
        // Инциализируем сам curl.
        curl_global_init(CURL_GLOBAL_ALL);
        auto curl_global_deinitializer =
            cpp_util_3::at_scope_exit([]{ curl_global_cleanup(); });
    
        // Сами создаем Asio-шный io_context, т.к. он будет использоваться
        // и curl_multi_processor-ом, и нашим HTTP-сервером.
        restinio::asio_ns::io_context ioctx;
    
        // Обработчик запросов к удаленному серверу.
        curl_multi_processor_t curl_multi{ioctx};
    
        // Актуальный обработчик входящих HTTP-запросов.
        auto actual_handler = [&cfg, &curl_multi](auto req) {
            return handler(cfg.config_, curl_multi, std::move(req));
          };
    
        // Теперь можно запустить основной HTTP-сервер.
    
        // Если должна использоваться трассировка запросов, то должен
        // запускаться один тип сервера.
        if(cfg.config_.tracing_) {
          // Для того, чтобы сервер трассировал запросы, нужно определить
          // свой класс свойств для сервера.
          struct traceable_server_traits_t : public restinio::default_traits_t {
            // Определяем нужный нам тип логгера.
            using logger_t = restinio::shared_ostream_logger_t;
          };
          // Теперь используем этот новый класс свойств для запуска сервера.
          run_server<traceable_server_traits_t>(
              cfg.config_, ioctx, std::move(actual_handler));
        }
        else {
          // Трассировка не нужна, поэтому запускаем обычный штатный сервер.
          run_server<restinio::default_traits_t>(
              cfg.config_, ioctx, std::move(actual_handler));
        }
    
        // Все, теперь ждем завершения работы сервера.
      }
      catch( const std::exception & ex ) {
        std::cerr << "Error: " << ex.what() << std::endl;
        return 2;
      }
    
      return 0;
    }


    Реализация curl-части bridge_server_2


    Прежде чем мы перейдем к разбору curl-части, нужно повторить архиважный дисклаймер из предыдущей статьи, который еще более актуален в случае с bridge_server_2: дабы максимально упростить и сократить код демонстрационных приложений мы вообще не делали никакого контроля за ошибками. Если бы мы должным образом контролировали коды возврата curl-овых функций, то код распух бы раза в три, существенно потеряв при этом в понятности, но не выиграв ничего в функциональности. Поэтому мы в своей демонстрации рассчитываем на то, что вызовы libcurl-а всегда будут завершаться успешно. Это наше осознанное решение для данного конкретного эксперимента, но мы бы так никогда не сделали в реальном продакшен-коде.

    Общая идея работы curl-части


    Curl-часть bridge_server_2 построена вокруг использования функци curl_multi_socket_action. И, признаемся честно, все это выглядит мутновато и оставляет ощущение некоторой магии :(

    Возможно, виной тому не самая вменяемая документация к самой функции curl_multi_socket_action на официальном сайте. После ее прочтения остается много вопросов. Еще больше запутывает дело штатный пример из libcurl под названием asiohiper.cpp. Который должен был бы продемонстрировать как именно можно интегрировать Asio и curl_multi посредством curl_multi_socket_action. Но который написан так, что на трезвую голову не разберешься. Если же кто-то может читать исходники этого примера как открытую книгу, то мы вам откровенно завидуем :)

    Плюс к тому, мы хотели сделать свою демонстрацию как можно быстрее и с наименьшими усилиями. Поэтому тратить лишние часы на детальное разбирательство с curl_multi_socket_action просто не могли. В итоге получившийся у нас вариант вполне себе рабочий. Но мы не уверены в том, что сделали все совершенно правильно и не ошиблись ли где-то. Так что наверняка можно сделать лучше. И если кто-то будет отталкиваться от нашего решения у себя в коде, то это следует учитывать.

    Итак, вот к какому пониманию использования curl_multi_socket_action мы пришли на основании выкуривания официальной информации, примера asiohiper и отрывочных сведений из Интернета:

    • для экземпляра curl_multi нужно задать свойство CURLMOPT_TIMERFUNCTION. Это должен быть предоставленный нами callback, который libcurl будет дергать когда ему потребуется отсчитывать тайм-ауты, связанные с обработкой запросов. Соответственно, нам нужно реализовать обслуживание таймеров;
    • для экземпляра curl_multi нужно задать свойство CURLMOPT_SOCKETFUNCTION. Это будет предоставленный нами callback, который libcurl будет дергать (возможно несколько раз) внутри curl_multi_socket_action;
    • мы должны каким-то образом обеспечить слежение за статусом операций ввода-вывода,
      которые выполняются на созданных libcurl-ом сокетах. Когда какой-то сокет оказывается готов к чтению и/или записи, мы должны вызвать curl_multi_socket_action для этого сокета. Тут возникает вопрос: а как нам узнать какие именно сокеты libcurl создает при обработке наших запросов? Очень правильный вопрос и ответ на него мы обсудим ниже;
    • периодически мы должны дергать функцию curl_multi_socket_action с параметром CURL_SOCKET_TIMEOUT. Этот вызов заставляет libcurl проверить истечение тайм-аутов для просроченных запросов. В принципе, время, когда мы должны вызвать curl_multi_socket_action со значением CURL_SOCKET_TIMEOUT, нам задает сам libcurl через callback, который мы установили через свойство CURLMOPT_TIMERFUNCTION;
    • периодически мы должны дергать функцию curl_multi_info_read для того,
      чтобы определить, какие HTTP-запросы завершились (тут мы как раз переиспользуем check_curl_op_completion() из bridge_server_1).

    Звучит не очень понятно? Ну вот так оно и есть, так что пойдем дальше. Сейчас будет еще немного непонятностей. Но затем мы перейдем к разбору кода и, есть надежда, при рассмотрении кода все станет на свои места.

    Но сперва нужно закрыть важный вопрос с тем, как узнать, какие именно сокеты создает libcurl для обслуживания наших запросов. Ведь не имея этих сокетов мы не сможем контролировать их доступность для операций чтения-записи.

    Подсовывание libcurl-у собственных сокетов


    Чрезвычайно важной в контексте нашего рассказа особенностью libcurl является то, что у libcurl есть отдельно экземпляры curl_easy и есть отдельно сокеты, которые libcurl создает для обслуживания запросов, инициированных через экземпляры curl_easy. Причем curl_easy-экземпляры и сокеты отнюдь не связаны взаимоотношениями 1:1. Т.е. несколько curl_easy-экземпляров вполне могут обслуживаться одним сокетом, если все запросы идут к одному и тому же серверу (тем более, если при этом еще и указан keep-alive).

    При работе с curl_multi получается вообще интересно. Мы заталкиваем в один экземпляр curl_multi пачку экземпляров curl_easy. И curl_multi сам решает в какой момент и сколько сокетов ему нужно создать для того, чтобы обслужить находящиеся внутри экземпляра curl_multi HTTP-запросы. А нам нужно узнать о том, что эти сокеты созданы, мы должны получить их дескрипторы, и мы должны запихнуть дескрипторы этих сокетов в какой-то свой механизм event-loop-а (будь то select, epoll, kqueue, IOCP).

    В принципе, узнать дескриптор сокета несложно. Сам libcurl передает его нам в callback, который мы зарегистрировали через свойство CURLMOPT_SOCKETFUNCTION. Плюс, вызывая этот callback libcurl говорит нам, какие операции на сокете нам нужно мониторить (для этого используются значения CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE). И если бы мы сами выполняли event-loop, то нам этого было бы достаточно.

    Но проблема в том, что реальный event-loop находится внутри Asio. И нам нужно уметь как-то подружить сокеты, создаваемые libcurl-ом с нашим экземпляром io_context. И вот как это сделать?

    Решение мы подсмотрели как раз в примере asiohiper. Этот момент там был, наверное, самым понятным и очевидным. Суть в том, чтобы самим создавать сокеты в виде экземпляров asio::io::tcp::socket. Тем самым мы можем заставить io_context контролировать их готовность к чтению-записи. А в libcurl мы будет отдавать реальные дескрипторы наших сокетов для того, чтобы libcurl мог выполнять read и write.

    Делается это так: для curl_easy мы назначаем дополнительные свойства CURLOPT_OPENSOCKETFUNCTION и CURLOPT_CLOSESOCKETFUNCTION. Это два callback-а. Первый вызывается libcurl-ом когда он хочет создать новый сокет. И этот callback, соответственно, должен возвращать дескриптор нового сокета. Второй callback вызывается когда libcurl больше не нуждается в сокете и хочет его закрыть. Соответственно, мы предоставляем libcurl-у эти самые callback-и, которые создают и уничтожают объекты типа asio::ip::tcp::socket.

    Разбор кода curl-части


    Помимо уже знакомых нам функций, которые были переиспользованы из bridge_server_1, в bridge_server_2 curl-часть представлена двумя описываемыми ниже классами.

    Класс active_socket_t


    Класс active_socket_t является вспомогательным и нужен он нам потому, что мы сами создаем сокеты для libcurl. Созданные сокеты нужно где-то хранить. Как раз в этом нам и помогает active_socket_t. Вот его определение:

    // Вспомогательный класс для работы с сокетом.
    class active_socket_t final
    {
    public:
      using status_t = std::int_fast8_t;
    
      static constexpr status_t poll_in = 1u;
      static constexpr status_t poll_out = 2u;
    
    private:
      restinio::asio_ns::ip::tcp::socket socket_;
      status_t status_{0};
    
    public:
      active_socket_t(restinio::asio_ns::io_service & io_service)
        : socket_{io_service, restinio::asio_ns::ip::tcp::v4()}
        {}
    
      auto & socket() noexcept { return socket_; }
    
      auto handle() noexcept { return socket_.native_handle(); }
    
      void clear_status() noexcept { status_ = 0; }
    
      auto status() noexcept { return status_; }
    
      void update_status( status_t flag ) noexcept { status_ |= flag; }
    };

    Класс active_socket_t инкапсулирует в себя сокет и набор флагов, определяющих статус операций, которые мы хотим отслеживать для этого сокета. Например, если нам нужно ждать готовности к чтению, то в статусе будет выставлен флаг active_socket_t::poll_in. Статусы обновляются внутри callback-ов, которые вызываются в процессе работы с libcurl.

    Как раз наличие active_socket_t со статусом внутри является основным упрощением нашей реализации по сравнению с кодом штатного примера asiohiper. Там для хранения флагов для каждого сокета аллоцируется дополнительный int. Это и лишние накладные расходы, и лишняя головная боль (нужно удалять затем этот int), и лишняя сложность в реализации. Последнее особенно важно, поскольку самое сложное в asiohiper — это разобраться с тем, что является актуальным флагом для очередной операции над сокетом, а что — старыми флагами из этого дополнительного int-а.

    Класс curl_multi_processor_t


    Класс curl_multi_processor_t является основным элементом реализации работы с curl_multi. Именно в нем выполняется вся магия, связанная с обслуживанием экземпляра curl_multi и вызовами curl_multi_socket_action, а так же практически всех связанным с этим callback-ов.

    Для начала мы дадим полное определение класса, а потом более подробно пройдемся по всем его частям. Итак, вот как сам класс выглядит:

    class curl_multi_processor_t {
    public:
      curl_multi_processor_t(restinio::asio_ns::io_context & ioctx);
      ~curl_multi_processor_t();
    
      // Это не Copyable и не Moveable класс.
      curl_multi_processor_t(const curl_multi_processor_t &) = delete;
      curl_multi_processor_t(curl_multi_processor_t &&) = delete;
    
      // Единственная публичная функция, которую будут вызывать для
      // того, чтобы выполнить очередной запрос к удаленному серверу.
      void perform_request(std::unique_ptr<request_info_t> info);
    
    private:
      // Экземпляр curl_multi, который будет выполнять работу с исходящими запросами.
      CURLM * curlm_;
    
      // Asio-шный контекст, на котором будет идти работа.
      restinio::asio_ns::io_context & ioctx_;
      // Защита от одновременной диспетчеризации сразу на нескольких нитях.
      restinio::asio_ns::strand<restinio::asio_ns::executor> strand_{ioctx_.get_executor()};
    
      // Таймер, который будем использовать внутри timer_function-коллбэка.
      restinio::asio_ns::steady_timer timer_{ioctx_};
    
      // Множество еще живых сокетов, созданных для обслуживания запросов
      // к удаленному серверу.
      std::unordered_map<curl_socket_t, std::unique_ptr<active_socket_t>> active_sockets_;
    
      // Вспомогательная функция, чтобы не выписывать reinterpret_cast вручную.
      static auto cast_to(void * ptr) {
        return reinterpret_cast<curl_multi_processor_t *>(ptr);
      }
    
      // Коллбэк для CURLMOPT_SOCKETFUNCTION.
      static int socket_function(
          CURL *,
          curl_socket_t s,
          int what,
          void * userp, void *);
    
      // Коллбэк для CURLMOPT_TIMERFUNCTION.
      static int timer_function(CURLM *, long timeout_ms, void * userp);
      // Вспомогательная функция для проверки истечения таймаутов.
      void check_timeouts();
    
      // Вспомогательная функция, которая будет вызываться, когда какой-либо
      // из сокетов готов к чтению или записи.
      void event_cb(
          curl_socket_t socket,
          int what,
          const restinio::asio_ns::error_code & ec);
    
      // Коллбэк для CURLOPT_OPENSOCKETFUNCTION.
      static curl_socket_t open_socket_function(
          void * cbp,
          curlsocktype type,
          curl_sockaddr * addr);
    
      // Коллбэк для CURLOPT_CLOSESOCKETFUNCTION.
      static int close_socket_function(void * cbp, curl_socket_t socket);
    
      // Вспомогательные функции для того, чтобы заставить Asio отслеживать
      // готовность сокета к операциям чтения и записи.
      void schedule_wait_read_for(active_socket_t & act_socket);
      void schedule_wait_write_for(active_socket_t & act_socket);
    };

    И вот как все это работает…

    Данные класса curl_multi_performer_t


    Внутри curl_multi_performer_t есть несколько членов, без которых мы не сможем обслуживать запросы. Это:

    • curlm_: экземпляр curl_multi, в который будут помещаться отдельные curl_easy-экземпляры для исходящих запросов;
    • ioctx_: Asio-контекст, на котором выполняется вся работа;
    • strand_: специальный Asio-шный объект, выполняющий роль примитива синхронизации. Не позволяет Asio запускать обработчики событий класса curl_multi_performer_t параллельно на нескольких нитях;
    • timer_: Asio-шный таймер, который мы будем использовать для отсчета тайм-аутов,
      про которые нам говорит libcurl;
    • active_sockets_: словарь всех созданных объектов active_socket_t. Ключом в этом словаре является дескриптор сокета. Нам этот словарь нужен потому, что в какие-то из callback-ов libcurl передает только дескриптор сокета, а нам по дескриптору нужно найти сам объет active_socket_t. Вот мы и ищем сокеты в этом словаре.

    Конструктор и деструктор


    В конструкторе нам нужно создать экземпляр curl_multi и произвести его настройку. А в деструкторе, соответственно, нужно удалить curl_multi:

    curl_multi_processor_t::curl_multi_processor_t(
        restinio::asio_ns::io_context & ioctx)
      : curlm_{curl_multi_init()}
      , ioctx_{ioctx} {
    
      // Должным образом настраиваем curl_multi.
      
      // Коллбэк для обработки связанных с сокетом операций.
      curl_multi_setopt(curlm_, CURLMOPT_SOCKETFUNCTION,
        &curl_multi_processor_t::socket_function);
      curl_multi_setopt(curlm_, CURLMOPT_SOCKETDATA, this);
    
      // Коллбэк для обработки связанных с таймером операций.
      curl_multi_setopt(curlm_, CURLMOPT_TIMERFUNCTION,
        &curl_multi_processor_t::timer_function);
      curl_multi_setopt(curlm_, CURLMOPT_TIMERDATA, this);
    }
    
    curl_multi_processor_t::~curl_multi_processor_t() {
      curl_multi_cleanup(curlm_);
    }

    Т.к. libcurl — это чисто сишная библиотека, то в качестве callback-ов мы можем использовать только статические методы класса. А для того, чтобы из статического метода обращаться к данным curl_multi_performer_t, мы передаем this дополнительным параметром к соответствующим callback-ам. Именно для этого используются свойства CURLMOPT_SOCKETDATA и CURLMOPT_TIMERDATA.

    Единственный публичный метод perform_request


    У класса curl_multi_performer_t только один публичный метод perform_request, который предназначен для того, чтобы RESTinio-часть могла передать принятый входящий запрос на обработку в curl-часть.

    Вот его реализация:

    void curl_multi_processor_t::perform_request(
        std::unique_ptr<request_info_t> info) {
      // Для того, чтобы передать новый запрос в curl_multi используем
      // callback для Asio.
      restinio::asio_ns::post(strand_,
        [this, info = std::move(info)]() mutable {
          // Для выполнения очередного запроса нужно создать curl_easy-объект и
          // должным образом его настроить.
          auto handle = curl_easy_init();
    
          // Обычные для curl_easy настройки, вроде URL и writefunction.
          curl_easy_setopt(handle, CURLOPT_URL, info->url_.c_str());
          curl_easy_setopt(handle, CURLOPT_PRIVATE, info.get());
    
          curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, write_callback);
          curl_easy_setopt(handle, CURLOPT_WRITEDATA, info.get());
          // Не совсем обычные настройки.
          // Здесь мы определяем, как будет создаваться новый сокет для
          // обработки запроса.
          curl_easy_setopt(handle, CURLOPT_OPENSOCKETFUNCTION,
            &curl_multi_processor_t::open_socket_function);
          curl_easy_setopt(handle, CURLOPT_OPENSOCKETDATA, this);
    
          // А здесь определяем, как ставший ненужным сокет будет закрываться.
          curl_easy_setopt(handle, CURLOPT_CLOSESOCKETFUNCTION,
            &curl_multi_processor_t::close_socket_function);
          curl_easy_setopt(handle, CURLOPT_CLOSESOCKETDATA, this);
    
          // Новый curl_easy подготовлен, можно отдать его в curl_multi.
          curl_multi_add_handle(curlm_, handle);
    
          // unique_ptr не должен больше нести ответственность за объект.
          // Мы его сами удалим когда обработка запроса завершится.
          info.release();
        });
    }

    В общем-то этот код очень похож на то, что мы уже видели в bridge_server_1. Но есть несколько важных отличий.

    Во-первых, все действия над новым запросом собраны в лямбду, которая отсылается на обработку через asio::io_context::post(). Нужно это потому, что curl_multi не является thread-safe объектом. И мы должны избегать работы с ним из разных нитей одновременно. Как раз то, что мы post-им лямбду через strand и защищает нас. Если в данный момент какая-то часть curl_multi_performer_t уже работает на другой нити, то лямбда будет выполнена только тогда, когда другая нить завершит работу с curl_multi_performer_t.

    Во-вторых, мы здесь как раз и задаем свойства CURLOPT_OPENSOCKETFUNCTION и CURLOPT_CLOSESOCKETFUNCTION, о которых речь шла выше. Именно за счет этих свойств мы сможем создавать и подсовывать libcurl собственные сокеты.

    Первый хитрый callback: socket_function


    Вот мы и добрались до первого хитрого callback-а, который используется в bridge_server_2 — это статический метод socket_function, который мы регистрируем через свойство CURLMOPT_SOCKETFUNCTION в конструкторе curl_multi_performer_t. Этот callback вызывается (возможно несколько раз подряд) из curl_multi_socket_action для того, чтобы мы могли заставить свой event-loop контролировать готовность сокетов к той или иной операции ввода-вывода.

    Вот код socket_function:

    int curl_multi_processor_t::socket_function(
        CURL *,
        curl_socket_t s,
        int what,
        void * userp, void *) {
      auto self = cast_to(userp);
      // Сокет, над которым нужно выполнить действие, должен быть среди живых.
      // Если его там нет, то просто игнорируем операцию.
      const auto it = self->active_sockets_.find(s);
      if(it != self->active_sockets_.end()) {
        auto & act_socket = *(it->second);
    
        // Сбрасываем текущий статус для сокета. Новый статус будет выставлен
        // на основании значения флага what.
        act_socket.clear_status();
    
        // Определяем новый статус для сокета.
        if(CURL_POLL_IN == what || CURL_POLL_INOUT == what) {
          // Требуется проверка готовности к чтению данных.
          act_socket.update_status(active_socket_t::poll_in);
          self->schedule_wait_read_for(act_socket);
        }
        if(CURL_POLL_OUT == what || CURL_POLL_INOUT == what) {
          // Требуется проверка готовности к записи данных.
          act_socket.update_status(active_socket_t::poll_out);
          self->schedule_wait_write_for(act_socket);
        }
      }
    
      return 0;
    }

    Здесь нужно отметить несколько моментов.

    Момент первый. Нам передается дескриптор сокета с которым нужно что-то сделать. Но, в принципе, не факт, что этот сокет еще будет в нашем словаре активных сокетов. Поэтому мы выполняем операции над сокетом только если он нашелся в нашем словаре.

    Момент второй. Мы сперва принудительно сбрасываем все флаги для конкретного активного сокета. А потом вычисляем их новые значения и сохраняем новые флаги. Они нам затем потребуются внутри event_cb().

    Ну и момент третий: в зависимости от того, чего нам нужно дождаться (чтения, записи или чтения-записи) мы просим Asio начать следить за готовностью нашего сокета к соответствующей операции. Выполняется это за счет вызова вспомогательных методов:

    void curl_multi_processor_t::schedule_wait_read_for(
        active_socket_t & act_socket) {
      act_socket.socket().async_wait(
        restinio::asio_ns::ip::tcp::socket::wait_read,
        restinio::asio_ns::bind_executor(strand_,
          [this, s = act_socket.handle()]( const auto & ec ){
            this->event_cb(s, CURL_POLL_IN, ec);
          }));
    }
    
    void curl_multi_processor_t::schedule_wait_write_for(
        active_socket_t & act_socket) {
      act_socket.socket().async_wait(
        restinio::asio_ns::ip::tcp::socket::wait_write,
        restinio::asio_ns::bind_executor(strand_,
          [this, s = act_socket.handle()]( const auto & ec ){
            this->event_cb(s, CURL_POLL_OUT, ec);
          }));
    }

    Здесь в методах schedule_wait_*_for задаются лямбда-функции, которые Asio вызовет, когда сокет окажется готов к соответствующей операции. Здесь мы в первый раз видим обращение к event_cb. Это еще один хитрый callback, о котором речь пойдет ниже.

    Не очень хитрый callback: timer_function


    В конструкторе curl_multi_performer_t для экземпляра curl_multi устанавливается еще один callback — статический метод timer_function следующего вида:

    int curl_multi_processor_t::timer_function(
        CURLM *,
        long timeout_ms,
        void * userp) {
      auto self = cast_to(userp);
    
      if(timeout_ms < 0) {
        // Старый таймер удаляем.
        self->timer_.cancel();
      }
      else if(0 == timeout_ms) {
        // Сразу же проверяем истечение тайм-аутов для активных операций.
        self->check_timeouts();
      }
      else {
        // Нужно взводить новый таймер.
        self->timer_.cancel();
        self->timer_.expires_after(std::chrono::milliseconds{timeout_ms});
        self->timer_.async_wait(
            restinio::asio_ns::bind_executor(self->strand_,
              [self](const auto & ec) {
                if( !ec )
                  self->check_timeouts();
              }));
      }
    
      return 0;
    }

    С ним все более-менее просто. libcurl-у во время обработки запросов нужно контролировать тайм-ауты выполняемых им операций. Но т.к. libcurl в нашем случае своего event-loop-а не имеет, то он нуждается в нашей помощи. Для этого мы задаем таймерный callback, а libcurl вызывает его когда ему нужно контролировать времена своих операций.

    Выполняемые таймерным callback-ом действия полностью зависят от значения аргумента timeout_ms. Если там -1, то текущий таймер нужно отменить. Если значение 0, то нужно вызвать curl_multi_socket_action с параметром CURL_SOCK_TIMEOUT как можно быстрее (мы сразу же делаем этот вызов). Если же timeout_ms больше 0, то нужно взвести таймер на новое время. И, когда это время наступит, нужно вызвать curl_multi_socket_action. Что мы и делаем в лямбде, которую передаем в Asio-шный таймер.

    Давайте посмотрим еще и на вспомогательную функцию, которую мы вызываем при срабатывании таймера:

    void curl_multi_processor_t::check_timeouts() {
      int running_handles_count = 0;
      // Заставляем curl проверить состояние активных операций.
      curl_multi_socket_action(curlm_, CURL_SOCKET_TIMEOUT, 0, &running_handles_count);
      // После чего проверяем завершилось ли что-нибудь.
      check_curl_op_completion(curlm_);
    }

    Тут все вполне предсказуемо: подсовываем в curl_multi_socket_action значение CURL_SOCKET_TIMEOUT, а затем внутри check_curl_op_completion() проверяем, закончилась ли обработка каких-нибудь запросов.

    Самый хитрый callback: event_cb


    Вот мы и подобрались к самому хитрому callback-у нашей реализации. Это нестатический метод event_cb(), который вызывается Asio, когда Asio обнаруживает, что наш сокет готов к чтению/записи, или когда с нашим сокетом случилась какая-то ошибка.

    Давайте сперва посмотрим на код этого event_cb, а потом попробуем разобраться, что же здесь происходит:

    void curl_multi_processor_t::event_cb(
        curl_socket_t socket,
        int what,
        const restinio::asio_ns::error_code & ec) {
      // Прежде всего нужно найти сокет среди живых. Может быть, что его
      // там уже нет. В этом случае ничего делать не нужно.
      auto it = active_sockets_.find(socket);
      if(it != active_sockets_.end()) {
        if( ec )
          what = CURL_CSELECT_ERR;
    
        int running_handles_count = 0;
        // Заставляем curl проверить состояние этого сокета.
        curl_multi_socket_action(curlm_, socket, what, &running_handles_count );
        // После чего проверяем завершилось ли что-нибудь.
        check_curl_op_completion(curlm_);
    
        if(running_handles_count <= 0)
          // Больше нет активных операций. Таймер уже не нужен.
          timer_.cancel();
    
        // Еще раз ищем сокет среди живых, т.к. он мог исчезнуть внутри
        // вызовов curl_multi_socket_action и check_active_sockets.
        it = active_sockets_.find(socket);
        if(!ec && it != active_sockets_.end()) {
          // Сокет все еще жив и подлежит обработке.
          auto & act_socket = *(it->second);
    
          // Проверяем, в каких операциях сокет нуждается и инициируем
          // эти операции.
          if(CURL_POLL_IN == what &&
              0 != (active_socket_t::poll_in & act_socket.status())) {
            schedule_wait_read_for(act_socket);
          }
          if(CURL_POLL_OUT == what &&
              0 != (active_socket_t::poll_out & act_socket.status())) {
            schedule_wait_write_for(act_socket);
          }
        }
      }
    }

    Прежде всего нам нужно найти сокет, для которого вызван event_cb. Его может уже и не быть во множестве активных сокетов. Например, libcurl может закрыть его из-за истечения тайм-аутов. Поэтому дальнейшие действия выполняем только если сокет найден среди активных.

    Далее нам нужно вызвать curl_multi_socket_action. Но сперва нужно понять, какое значение должен иметь параметр what. Если ошибок с сокетом не было, то это будет исходное значение, пришедшее в event_cb (т.е. либо CURL_POLL_IN, либ CURL_POLL_OUT, только эти значения используются в schedule_wait_*_for). А вот если Asio диагностировал какую-то проблему с нашим сокетом (в этом случае параметр ec будет отличен от нуля), то в качестве аргумента what передается специальное значение CURL_CSELECT_ERR.

    Дальше мы дергаем curl_multi_socket_action, чтобы libcurl мог провести все нужные ему операции для сокета. И, после выхода из curl_multi_socket_action мы уже традиционно проверяем, завершились ли какие-нибудь запросы. Ну и еще одна штука: если после выхода из curl_multi_socket_action мы обнаруживаем, что текущих операций больше нет, то отменяем таймер. Он ведь больше не нужен. Хотя, в принципе, делать это вовсе не обязательно.

    А вот дальше начинается некая не очевидная магия… :)

    Внутри вызовов curl_multi_socket_action() и check_curl_op_completion() запрос может быть полностью обработан, curl_easy-экземпляр, связанный с этим запросом, может быть удален. А вот что с сокетом? Сокет в этом случае должен остаться. Он будет жить столько, сколько этого захочет libcurl. Если следом пойдет запрос к тому же хосту, то libcurl запросто может переиспользовать этот сокет.

    С другой стороны внутри curl_multi_socket_action() может выяснится, что запрос еще не завершен и операции с сокетом нужно продолжать. В этом случае мы должны указать Asio, что нужно определять готовность сокета к последующим операциям ввода-вывода.

    Ну и может так случится, что сокет вообще может быть закрыт. Из-за какой-либо ошибки или истечения тайм-аута. Это не суть важно.

    Важно в итоге то, что перед входом в curl_multi_socket_action() и check_curl_op_completion() у нас может быть одно состояние active_sockets_ и самого сокета, а при выходе — совсем другое. Ведь внутри curl_multi_socket_action() могут вызываться callback-и, и в этих callback-ах мы можем как обновить статус сокета, так и вообще изъять сокет из active_sockets_.

    Поэтому после выхода из curl_multi_socket_action() мы еще раз ищем сокет среди активных. Если находим, то проверяем, разрешает ли статус сокета вызывать ту или иную функцию schedule_wait_*_for. Если этого не сделать, то может получиться, что libcurl попросил нас внутри curl_multi_socket_action() проверить готовность нашего сокета к чтению (через вызов socket_function со значением CURL_PULL_IN), а мы ничего не сделали. Соответственно, libcurl не сможет вычитать ответ от удаленного сервера и не сможет завершить обработку запроса. Вот чтобы такого не было и выполняются эти не очень понятные, на первый взгляд, действия во второй части event_cb.

    Два оставшихся несложных callback-а: open_socket_function и close_socket_function


    Осталось разобрать два callback-а, которые используются для того, чтобы создавать и уничтожать сокеты для libcurl-а. Именно эти callback-и регистрируются в каждом новом curl_easy-экземпляре через свойства CURLOPT_OPENSOCKETFUNCTION и CURLOPT_CLOSESOCKETFUNCTION. Вот код этих функций:

    curl_socket_t curl_multi_processor_t::open_socket_function(
        void * cbp,
        curlsocktype type,
        curl_sockaddr * addr) {
      auto self = cast_to(cbp);
      curl_socket_t sockfd = CURL_SOCKET_BAD;
    
      // В данном примере ограничиваем себя только IPv4.
      if(CURLSOCKTYPE_IPCXN == type && AF_INET == addr->family) {
        // Создаем сокет, который затем будет использоваться для взаимодействия
        // с удаленным сервером.
        auto act_socket = std::make_unique<active_socket_t>(self->ioctx_);
        const auto native_handle = act_socket->handle();
        
        // Новый сокет должен быть сохранен во множестве живых сокетов.
        self->active_sockets_.emplace( native_handle, std::move(act_socket) );
        
        sockfd = native_handle;
      }
    
      return sockfd;
    }
    
    int curl_multi_processor_t::close_socket_function(
        void * cbp,
        curl_socket_t socket) {
      auto self = cast_to(cbp);
      // Достаточно просто изъять сокет из множества живых сокетов.
      // Закрытие произойдет автоматически в деструкторе active_socket_t.
      self->active_sockets_.erase(socket);
    
      return 0;
    }

    Надеемся, что в данном случае сложностей с пониманием не возникнет. По крайней мере в close_socket_function все более чем тривиально.

    А вот по поводу open_socket_function нужно дать пояснение. Принципиальная схема этой функции была взята из уже упоминавшегося примера asiohiper.cpp. Там было ограничение на работу только с IPv4, поэтому-то мы его у себя и оставили.

    Однако, добавить поддержку еще и IPv6 не должно быть сложно. Для этого потребуется слегка модифицировать конструктор active_socket_t:

    active_socket_t(
        restinio::asio_ns::io_service & io_service,
        restinio::asio_ns::ip::tcp tcp)
        : socket_{io_service, tcp}
        {}

    И переделать часть open_socket_function, например, таким образом:

    if(CURLSOCKTYPE_IPCXN == type &&
        (AF_INET == addr->family || AF_INET6 == addr->family)) {
      // Создаем сокет, который затем будет использоваться для взаимодействия
      // с удаленным сервером.
      auto act_socket = std::make_unique<active_socket_t>(self->ioctx_,
          AF_INET == addr->family ?
              restinio::asio_ns::ip::tcp::v4() : restinio::asio_ns::ip::tcp::v6());
    

    Так как же оно все-таки работает?


    Теперь попробуем сделать самое сложное: соединить общее описание принципов работы из начала статьи и пояснения, сделанные при разборе кода. Чтобы еще раз попытаться объяснить, каким же образом работает вся эта кухня.

    Итак, есть пул рабочих потоков. На этом пуле работает Asio-шный io_context. Этот io_context обслуживает как RESTinio, так и libcurl.

    Когда RESTinio получает входящий запрос, этот запрос преобразуется в request_info_t и отдается в curl_multi_performer_t. Там для этого request_info_t создается новый экземпляр curl_easy. И этот curl_easy добавляется в единственный экземпляр curl_multi.

    libcurl просит нас создать для curl_easy новый сокет. Мы создаем его в open_socket_function и сохраняем в словаре активных сокетов.

    Далее libcurl дергает для нового сокета socket_function, в которой он говорит, готовность какой операции он хочет дождаться для нового сокета. В этот момент в socket_function передается значение CURL_POLL_OUT или CURL_POLL_INOUT. Мы обновляем статус для сокета и заставляем Asio дождаться готовности сокета к записи.

    Когда Asio обнаруживает, что сокет готов к записи, то у нас вызывается event_cb. В котором мы дергаем curl_multi_socket_action. Там, внутри curl_multi_socket_action, libcurl может провести отправку HTTP-запроса на удаленный сервер. И вызвать socket_function для того, чтобы попросить нас дождаться готовности сокета для чтения (в этом случае в socket_function передается CURL_POLL_IN или CURL_POLL_INOUT). Мы опять обновляем статус для сокета и заставляем дождаться готовности сокета к чтению. При этом мы все еще находимся внутри curl_multi_socket_action, который вызван внутри event_cb :)

    Когда Asio обнаруживает, что сокет готов к чтению, то у нас опять вызывается event_cb. В котором мы опять дергаем curl_multi_socket_action…

    И все это продолжается до тех пор, пока мы не обнаружим, что HTTP-запрос завершен. Тогда мы формируем ответ на входящий HTTP-запрос, изымаем соответствующий curl_eary из curl_multi, после чего уничтожаем и curl_easy и request_info_t.

    А что же сокет?

    А сокет продолжает жить внутри libcurl-а. Он может быть переиспользован libcurl-ом для обслуживания других запросов. И будет удален кода-нибудь в будущем. Если это вообще потребуется.

    Заключение


    Вот, в общем-то, все, что мы хотели рассказать. Спасибо всем, кто смог осилить все три статьи. Надеемся, вы нашли для себя что-нибудь интересное.

    Описанные в данной серии демонстрационные программы можно найти в этом репозитории.

    Интеграция с чисто С-шным кодом, конечно же, доставила. Это было, можно смело сказать, подтверждение народной мудрости: хочешь писать на C без боли — пиши на C++ ;) И остается только пожалеть, что нет родного C++ного аналога libcurl-а. Ибо при всем уважении к libcurl-у нельзя не отметить, что его использование требует ну очень серьезного внимания к деталям.
    • +14
    • 1,9k
    • 7
    Поделиться публикацией
    Ой, у вас баннер убежал!

    Ну. И что?
    Реклама
    Комментарии 7
    • 0
      1) А зачем здесь используются unique_ptr-ы?
      std::unordered_map<curl_socket_t, std::unique_ptr<active_socket_t>> active_sockets_;

      2) (На правах идеи, не факт, что это приемлемо). Я бы вынес всю работу за пределы callback-ов. То есть, в callback-е только лишь заполняется очередь действий, а в конце event loop-а эта очередь разбирается и выполняются все действия
      • 0
        По п.1. Хороший вопрос. Спасибо. Наверное, можно было бы попробовать хранить active_socket_t прямо по значению внутри unordered_map. Но тогда его нужно было бы делать Moveable-типом, чтобы можно было делать такой вызов:
        active_sockets_.emplace(handle, active_socket_t{ioctx_})
        А можно ли его таковым сделать зависит от того, является ли asio::ip::tcp::socket Moveable-типом. В случае же с unique_ptr-ом для active_socket_t вообще не нужно было ни о чем думать: ни о том, является ли asio::ip::tcp::socket Moveable, ни о том, в каком контейнере затем active_socket_t будут хранится. И будут ли они вообще где-то храниться.

        По п.2. Не готов сейчас сказать, насколько бы это упростило реализацию. Тут же основной момент в том, что в event_cb нужно дергать curl_multi_socket_action, откуда будут дергать socket_function. И внутри этой socket_function нужно обновить статус и выяснить, какие операции над сокетом нас будут интересовать дальше. Можно это обновление статуса сперва закинуть в какую-то очередь, потом по ней пройтись… Но будет ли это проще — не знаю.
        • +1
          1) У map-а есть метод, который конструирует данные на месте
           m.emplace(std::piecewise_construct,
                    std::forward_as_tuple("c"),
                    std::forward_as_tuple(10, 'c'));
          • 0
            Спасибо, я про него постоянно забываю и использовать пока не приходилось.
            • 0
              Попробовал.
              От такого варианта emplace пользы в данном случае нет. Т.к. сначала нужно создать объект active_socket_t, затем извлечь из него native_handle. И только после этого делать emplace с использованием native_handle в качестве ключа.

              Чтобы это работало нужно делать active_socket_t Moveable-типом. Это возможно, т.к. asio::ip::tcp::socket так же Moveable-тип.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое