Планирование задач в сервере при помощи boost.task

    Недавно на профильном ресурсе один программист задал вопрос: «Что использовать в сервере ММО для работы с потоками?». Программист склонялся к Intel TBB, но даже не к базовым примитивам, а к кастомному планированию задач (task scheduling). Ну нравится TBB — ну и ладно. А немного позже я увидел исходники сервера ММО другого программиста, который недавно начал переписываться его с нуля для улучшения архитектуры. И там было очень много велосипедов, которые писались самим программистом вместо того что бы использовать сторонние компоненты такие как boost (к примеру класы обертки над pthread-ом, и это в 2010 году, когда boost.thread уже почти в стандарте). Была там реализована и поддержка пула потоков с планировщиком задач. Тема эта мне очень интересна и я начал копать информацию о готовых решениях планировки задач (как в TBB) и нашел boost.task, про что и решил написать.


    Определение


    Задача (task) — это логически объедененный набор действий. Планировщик задач (task scheduler) асинхронино выполняет задачи руководствуясь определенными стратегиями по выбору кто должен выполняться в данный момент в каком потоке.
    Задачи позволяют абстрагировться от обычных потоков и оперировать на более высоком уровне.

    Зачем нужен планировщик задач?


    Как работает сферический сервер в вакууме? Очень просто:
    1. Приходит запрос от клиента
    2. Он обрабатывается!
    3. Отсылается ответ

    Ну кроме того в сервере могут происходить какие то процессы, которые выполняются и без запроса клиента. Например рассылка уведомлений по всей базе пользователей, очистка базы от устаревших данных (крончик), обработка дневной статистики и тд.
    Сейчас загвоздка именно в том, как обрабатывается запрос. Надо разобраться как его обрабатывать.
    Возьмем к примеру memcached-подобный сервер: у нас есть hash_map с данными, есть запросы чтения, есть запросы записи, которые делают простой лукап по хеш-мапе и возвращают данные либо записывают их в хеш-мап. Пока всё происходит в одном потоке, но что делать, если нам надо задействовать все процессоры системы?
    Создаем столько потоков, сколько ядер. В каждом потоке обрабатываем пользователей, которых при создании соеденения раскидываем по принципу round-robin. При обращении к контейнеру используем rwlock-и (boost::shared_mutex). Отлично. А как нам быть с удалением элементов из контейнера? Создаем поток, который раз в N секунд просыпается и чистит контейнер.
    Это был простой пример, а теперь более сложный пример: сервис, который может в зависимости от запроса пользователя сделать запрос в базу данных, сделать http запрос на какой то сайт. Что будет если сделать серрвер по предидущей модели (все запросы к другим компонентам будут выполняться синхронно)? Ну база данных находится на той же площадке, что и сервер, ответ будет в приделах пары миллисекунд. Отослать email — тоже не проблема — ставим sendmail на ту же машину, отдаём ему данные, а он сам разберется как отослать письмо.
    Отлично. Хотя не совсем. А что делать с http-запросом? Он же может занять очень долго — всё зависит от сайта который находится где то далеко и не известно сколько будет обрабатывать запрос. В таком случае поток будет бездействовать, хотя в очереди есть много запросов, которые могут выполниться, но они ждут пока освободится этот поток.
    Такой запрос необходимо выполнять асинхронно. Реализовать можно так:
    class LongRequestHandler
    {
    public:
        void Handle()
        {
            // read client request parameters
            // mysql request 1
            // mysql request 2
            HttpRequestExecutor::GetInstance()->Execute(
                "example.com?x=1",
                boost::bind(this, &LongRequestHandler::HandleStage2)
            );
        }
        void HandleStage2(const std::string & http_request_result)
        {
            // mysql request 3
            // write response to client
        }
    };


    * This source code was highlighted with Source Code Highlighter.

    HttpRequestExecutor принемает url запроса и колбек, который надо вызвать по завершению запроса (тип колбека — boost::function).
    И такой подход в работает, правда не слишком красиво.
    В блоге Thinking Asynchronously in C++ показана интерестная реализация выполнения асинхронных задач. Выглядит результат следующим образом:
    template <typename Handler>void async_echo(
     tcp::socket& socket,
     mutable_buffer working_buffer,
     Handler handler,
     // coroutine state:
     coroutine coro = coroutine(),
     error_code ec = error_code(),
     size_t length = 0)
    {
     reenter (coro)
     {
     entry:
      while (!ec)
      {
       yield socket.async_read_some(
         buffer(working_buffer),
         bind(&async_echo<handler>,
          ref(socket), working_buffer,
          box(handler), coro, _1, _2));
       if (ec) break;
       yield async_write(socket,
         buffer(working_buffer, length),
         bind(&async_echo<handler>,
          ref(socket), working_buffer,
          box(handler), coro, _1, _2));
      }
      handler(ec);
     }
    }


    * This source code was highlighted with Source Code Highlighter.
    Coroutine и yield в С++ смотрятся необычно;) Реализовано это на дефайнах, в блоге можно почитать как это удалось автору.
    Постепенно логика усложняется, добавляются новые элементы, которые надо обрабатывать асинхронно, реализация тоже усложняется. В дальнейшем задачу
    mysql request 1
    mysql request 2
    http request 1
    mysql request 3
    http request 2
    mysql request 4
    mysql request 5
    

    И выполняя её последовательно с остановками в http запросах мы видим, что запросы
    mysql request 2
    http request 1
    

    и
    mysql request 3
    http request 2
    mysql request 4
    

    можно выполнять паралельно и если мы захотим это сделать, то прийдется ещё сильнее усложнять логику. А хотелось бы написать простой код, например:
    mysql request 1
    x = run(func1)
    y = run(func2)
    wait(x, y)
    mysql request 5
    
    func1:
      mysql request 2
      http request 1
    
    func2:
      mysql request 3
      http request 2
      mysql request 4
    

    Вот тут и пригодиться планировщик задач.

    Реализации


    Про поддержку планировщика задач в новом стандарте 0x можно почитать тут.
    • just::thread — реализация библиотеки потоков стандарта C++0x от отца boost::thread
    • Parallel Patterns Library (PPL) — реализцаия от Microsoft
    • Asynchronous Agents Library — и ещё одна от Microsoft
    • Intel Threading Building Blocks — очень мощная библиотека для паралельного программирования от Intel. Включает в себя и планировщик задач.
    • boost::task — — реализация от Oliver Kowalke, не принятая ещё в boost

    Мне наиболее понравился boost.task. Дальше его детальное рассмотрение.

    Описание boost.task


    boost.task — реализация предложения в стандарт C++0x. Она поддерживает задание стратегий выполнения задач, создание под-задач, прерывание задач.
    Библиотека зависит от:

    boost.task и boost.fiber компилируемые библиотеки (boost.atomic и boost.move — header-only) — так что прийдется их собирать. Что бы было удобнее эксперементировать собрал все зависимости в одном месте, приправил cmake-ом и залил поект на github. Работает на linux-е, для сборки под windows — потребуется 2-3 строчки добавить в cmake файлы.

    Пример использования


    API библиотеки достаточно простой, реализовать обработчик запроса, который описывался выше не совтавит труда. Приведу его ещё раз:
    mysql request 1
    
      mysql request 2
      http request 1
    
      mysql request 3
      http request 2
      mysql request 4
    
    mysql request 5
    

    В качестве эмуляции запроса к mysql будет использован обычный sleep на случайное время:
    boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));
    


    В качестве внешнего http-запроса будет использован асинхронный таймер из boost::asio.
    Итак:
    Request — класс запроса.
    class Request
    {
    public:
      Request(const std::string & data);
      const std::string & Read() const;
      void Write(const std::string & answer);
    };


    * This source code was highlighted with Source Code Highlighter.

    А RequestHandler — класс обработчика запроса.
    class RequestHandler
    {
    public:
      RequestHandler(boost::asio::io_service & io_service, const RequestPtr & request);
      void Process() const;
    };


    * This source code was highlighted with Source Code Highlighter.

    io_service — передается для того, что бы можно было выполнить внешний вызов (использовать таймер boost::asio::deadline_timer).Итак начнем. Определяем пул потоков, для обработки наших задач:
    boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );

    * This source code was highlighted with Source Code Highlighter.

    boost.task поддерживает два основных вида стратегий планировки задач:
    • ограниченные (bounded) — имеют порог количества выполняемых задач, при достижении которого добавление новой задачи блокирует поток, который выполняет это действие. Основная задача — избежать исчерпания ресурсов (resource exhaustion) когда скорость добавления задач превышает скорость их выполнения
    • неограниченные (unbounded) — позволяют добавлять бесконечное число задач в очередь

    Также есть возможность задания стратегии обработки задач внутри очереди:
    • fifo — первая добавленная задача выполняется первой
    • priority — у задачи есть приоритет, для выполнения выбираются задачи с высшим приоритетом
    • smart — очередь такого типа возможно сильно кастомизировать передавая параметры в шаблон. по умолчанию есть возможность индексировать задачи по любому ключу и заменять старую задачу на новую, если она сущесвует

    Соответственно описанная строчка кода создает пул из 5 потоков с неограниченной очередью типа fifo.
    Теперь нам понадобится создать io_service и пул из 3-х потоков для обработки внешних запросов.
    boost::asio::io_service io_service;

    * This source code was highlighted with Source Code Highlighter.

    Если вызвать io_service::run в момент когда в нем нету задач, метод сразу завершится, а для нормальной работы нам необходимы работающие потоки. Обычно это достигается тем, что в io_service добавлен accept-ор порта, на который подключаются клиенты, а в данном случае можно занять io_service ожиданием исполнения таймера:
    boost::asio::deadline_timer dummy_timer(io_service);
    dummy_timer.expires_from_now(boost::posix_time::seconds(10));
    // void dummy_handler(const boost::system::error_code&) {}
    dummy_timer.async_wait(&dummy_handler);


    * This source code was highlighted with Source Code Highlighter.

    После этого можно создать пул потоков:
    boost::thread_group io_service_thread_pool;
    for(int i = 0; i < 3; ++i)
      io_service_thread_pool.create_thread(
        boost::bind(&boost::asio::io_service::run, &io_service)
      );


    * This source code was highlighted with Source Code Highlighter.
    Далее создаём запрос:
    RequestPtr request(new Request("some data"));
    RequestHandlerPtr handler(new RequestHandler(io_service, request));

    * This source code was highlighted with Source Code Highlighter.

    Все готово, можно выполнять задачу:
    boost::tasks::handle< void > request_processing(
      boost::tasks::async(
        boost::tasks::make_task( &RequestHandler::Process, handler ),
        pool));


    * This source code was highlighted with Source Code Highlighter.
    boost::tasks::make_task( &RequestHandler::Process, handler ) — создает задачу вызова Process у объекта handler, которую можно будет выполнить. boost::tasks::async инициирует асинхронное выполнение задачи. boost::tasks::handle объект, по которому можно отслеживать статус завершения задачи, получить результат если он есть.
    boost::tasks::async поддерживает 4 алгоритма выполнения задачи:
    • own_thread — синхронное выполнение в том же потоке
    • new_thread — для задачи создается поток, в котором она будет выполнена, после чего поток будет завершен
    • as_sub_task — если текущая задача выполняется в пуле — добавляет новую задачу в него, иначе создает новый поток, как new_thread. Это поведение по умолчанию
    • static_pool — выполнить задачу в пуле потоков

    Далее подождем пока задача выполнится:
    request_processing.wait();

    * This source code was highlighted with Source Code Highlighter.

    И остановим io_service:
    io_service.stop();
    io_service_thread_pool.join_all();


    * This source code was highlighted with Source Code Highlighter.

    Функция Process получилась на удивление очень простой
    void Subtask1() const
    {
      Request("query2");
      ExternalRequest("extquery1");
    }

    void Subtask2() const
    {
      Request("query3");
      ExternalRequest("extquery2");
      Request("query4");
    }

    void Process() const
    {
      std::string data = request_->Read();

      Request("query1");

      boost::tasks::handle< void > subtask1(
        boost::tasks::async(
          boost::tasks::make_task( &RequestHandler::Subtask1, this )));
      boost::tasks::handle< void > subtask2(
        boost::tasks::async(
          boost::tasks::make_task( &RequestHandler::Subtask2, this )));

      boost::tasks::waitfor_all( subtask1, subtask2);

      Request("query5");

      request_->Write("some answer");
    }


    * This source code was highlighted with Source Code Highlighter.

    Подзадачи выполняются при помощи boost::tasks::async без указания policy на запуск и автоматически выбирается as_sub_task алгоритм, который выполнит задачи в том же пуле потоков, что и родительская задача. Реализация функций подзадач тоже тривиальная.
    RequestHandler::Request — вызывает boost::this_thread::sleep, а с ExternalRequest все немного сложнее:
    void ExternalRequest(const std::string & what) const
    {
      ExternalRequestHandler external_handler(io_service_);
      boost::tasks::spin::auto_reset_event ev;
      external_handler.PerformExternalReqeust(what, &ev);
      ev.wait();
    }

    * This source code was highlighted with Source Code Highlighter.
    Создается хендлер, а так же событие с автоматическим сбросом — boost::tasks::spin::auto_reset_event. Это событие передается обработчику внешнего запроса и по его завершению будет вызвано ev.set(), а до тех пор ev.wait() блокирует задачу.
    В противовес обычным потокам и примитивам синхронизации (boost::condition) ev.wait() не блокирует поток, а блокирует задачу (вызывает в цикле this_task::yield()). А это значит, что ресурсы процессора будут использованы другими задачами.
    Файл целиком может быть найден тут.

    Выводы


    boost.task вполне удобная библиотека для планирования задач. Она позволяет посмотреть как будет выглядить поддержка асинхронного выполнения кода в новом стандарте C++0x, и её можно использовать уже прямо сейчас не дожидаясь пока будет выпущен стандарт.
    Код с использованием boost.task становится меньше и намного понятнее, чем при обычном использовании потоков.
    Есть канечно и недостатки: код ещё не оптимизирован, что может вызвать проблемы в редких случаях; библиотека ещё не принята в boost (вместе с её зависимостями).

    Что почитать по теме?


    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 29

      0
      Спасибо
        –9
        >И там было очень много велосипедов, которые писались самим программистом вместо того что бы использовать сторонние компоненты такие как boost

        А можно было писать на winapi и не использовать велосипеды типа boost.
          +2
          ПО пишется для linux-а;)
          В любом случае если использовать boost — то портировать под любую другую платформу будет намного легче (так как он кроссплатформенный), чем если бы использовать конкретное api.
            –2
            >ПО пишется для linux-а;)

            Сервер ММО под линуксом? Корейцы негодуют.

            >В любом случае если использовать boost — то портировать под любую другую платформу будет намного легче

            Это понятно. Вопрос в том, встанет ли вопрос о портировании вообще.
              0
              > Сервер ММО под линуксом? Корейцы негодуют.

              Как бы да. Вы считаете что Windows Server лучше подходит под такие задачи?

              > Это понятно. Вопрос в том, встанет ли вопрос о портировании вообще.

              Ну это скорее приятный бонус, который дается бесплатно. Если есть выбор из двух библиотек: кроссплатформенной и нет — то я бы остановился на первой при прочих равных.
                –2
                >Вы считаете что Windows Server лучше подходит под такие задачи?

                Все забугорные реализации серверов под это дело, которые я видел, были заточены под винду. Видел две наших — с+фря и ява+линукс. У забугорных реализаций производительность была на порядок выше (использование IOCP вместо epoll/libevent, например).
                  0
                  Ну все зависит от разработчика — можно и на linux сделать неоптимальный сервер, ровно как и высоконагруженую систему (нету точной информации, но возможно, что EveOnline использует Windows, MS SQL они используют).

                  Ещё вопрос почему большая часть высоконагруженых систем все-таки использует linux, а не windows?

                  Ну и пару примеров Linux-серверов MMO:
                  Second Life использует Debian Linux
                  Perfect World — Linux.
                  Из движков:
                  Big World Tech — Linux
                  Hero Engine — поддерживает Linux и Windows.
                  Smart Fox Server — Linux, Windows, MacOSX
                    +1
                    >Ещё вопрос почему большая часть высоконагруженых систем все-таки использует linux, а не windows?

                    Каких систем?

                    >Ну и пару примеров Linux-серверов MMO:

                    Для примера lineage 2 виндовый, knights online — тоже виндовый. Про еву не знаю — она мне была интересна только своими изысканиями в сторону stackless python. Кстати, интересно было бы узнать про ВоВ.

                    >Big World Tech — Linux

                    Точно? Я знавал одну контору в МСК, которая этот бигворлд пару лет назад лицензировать хотела, там вроде совсем не линукс был.
                      0
                      > Каких систем?

                      Тех, которые обслуживают большое число пользователей. Ну к примеру google, twitter, livejournal, digg, flickr, youtube, и тд.

                      > Для примера lineage 2 виндовый, knights online — тоже виндовый. Про еву не знаю — она мне была интересна только своими изысканиями в сторону stackless python.

                      Я ж и не спорю, под windows тоже достаточно много хороших MMO серверов.

                      > Кстати, интересно было бы узнать про ВоВ.

                      Тоже очень интерестно, но такой информации в сети не видел.

                      Есть, кстати, open-source сервер WoW-а — MaNGOS — он кроссплатформенный.

                      > Точно? Я знавал одну контору в МСК, которая этот бигворлд пару лет назад лицензировать хотела, там вроде совсем не линукс был.

                      Тут и тут пишут, что linux.
                        0
                        > Тех, которые обслуживают большое число пользователей. Ну к примеру google, twitter, livejournal, digg, flickr, youtube, и тд.

                        В их случае наличие исходников может быть плюсом, как вариант(заточить ведро под проект). И, кстати, вышеперечисленные конторы решают абсолютно разные задачи. Но это так, к слову.

                        >под windows тоже достаточно много хороших MMO серверов

                        Мне казалось, их там подавляющее большинство. Впрочем, я этим вопросом уже года два как не интересуюсь.

                        >Есть, кстати, open-source сервер WoW-а

                        А есть опенсорсный сервер lineage2, то же кросплатформенный. На яве. Я даже когда-то в команде его разработчиков состоял.
                          +1
                          насчет wow я сам долго искал и ничего толком не нашел про серверную архитектуру, единственное что могу предположить это то что сервера на линуксе. вот здесь (конкретную ссылку дать не получилось) в вакансиях на разработчика сервера было сказано про необоходимость опыта разработки модулей ядра линукса и опыта работы с oracle.
              +1
              Boost позволяет скрыть то чтобы вы бы наваяли используя winapi, больше того он позволяет в плане потоков забыть о конкретной платформе…
              Ваш К.О.
                –1
                Плохой, негодный К.О. В первом абзаце поста речь шла о разработке сервера ммошки, где эта кросплатформенность никуда не вперлась. Собственно, мой комментарий относился к контексту «приложение под задачу blah <=> целесообразность использования boost», а не к использованию boost вообще.
                  –1
                  Да, К.О. вас не понял ) Но, почему использование boost может быть нецелесообразно?.. Это замечательная библиотека, написанная профессионалами, стабильная — неужели вы считаете что в короткий срок сможете сделать достойную замену реализации которая предложена в boost? Почему же не использовать?..
                    –1
                    >Но, почему использование boost может быть нецелесообразно?..

                    Я видел полоумного чудака, который цепанул boost::format к приложению на голом posix api, потому как ему лень было написать обёртку над sprintf. И если за это не убивать, то за что тогда убивать вообще?

                    > неужели вы считаете что в короткий срок сможете сделать достойную замену реализации которая предложена в boost?

                    Реализации чего? В бусте 100500 батареек. Конкретно реализации тасков — я бы не взялся.

                    > Почему же не использовать?

                    Использовать-то можно, а местами буст незаменим. Главное, чтоб использующий с головой дружил.
                      –1
                      >Конкретно реализации тасков — я бы не взялся.
                      >Использовать-то можно, а местами буст незаменим. Главное, чтоб использующий с головой дружил.

                      Так вот мы приходим к выводу, что в данном посте описано целесообразное применение boost ;-)
                      0
                      Ну в подтверждение сказаного про буст — ещё один пример: там были реализованы и умные указатели (shared и weak). Реализованы с использованием мютексов (для изменения счетчика ссылок). boost.smart_ptr — уже давно использует атомарные операции и в нет необходимости блокировок, ну и ещё много бонусов дает по сравнению с самописными реализациями (boost::enable_shared_from_this к примеру).
                      И это не первый раз когда я вижу, что кто то изобретает свои умные указатели. В универе понятное дело — можно эксперементировать, но на работе зачем?
                        +2
                        >И это не первый раз когда я вижу, что кто то изобретает свои умные указатели. В универе понятное дело — можно эксперементировать, но на работе зачем?

                        Каждый программист обязан написать свой smart_ptr, контейнер и сортировку. А вообще, эксперименты — дело полезное, 6 месяцев ресёрча дают офигический результат на выходе.
                          +1
                          А еще каждый программист обязан написать свой компилятор.

                          Считаю глупо писать свой смарт поинтер, контейнер и сортировку. Нужно просто прочитать книгу и понять как это работает.
                            0
                            > Нужно просто прочитать книгу и понять как это работает.

                            Чтение книг без написания кода — это фигня, потому как программисты-теоретики никому не нужны. Написание того же смартпоинтера научит обходить десяток-другой граблей при работе с указателями.

                            >А еще каждый программист обязан написать свой компилятор.

                            Да без проблем, если задача потребует.
                  +1
                  толсто :)
                    0
                    >А можно было писать на winapi и не использовать велосипеды типа boost.

                    boost это расширение стандартной библиотеки шаблонов С++ (stl), а не «велосипед».
                    0
                    just::thread чет не порадовала платность библиотеки в 100$.

                    В boost::task не сильно понравилось, что в случае нехватки потоков, вызывающий поток блокируется, вместо того чтобы закинуть в очередь.
                      0
                      just::thread — платная, да. Но на их сайте много информации полезной;)

                      А boost::task имеет 2 стратегии — то про что вы говорите — bounded очередь, которая специально так и сделана. Но можно использовать unbounded — тогда задача будет добавляться сразу.

                      Попробуйте следующий код:

                      void SleepTask(int i)
                      {
                        std::cout<<" * task"<<i<<" begin\n";
                        boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));
                        std::cout<<" * task"<<i<<" end\n";
                      }

                      boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );

                      std::vector<boost::tasks::handle< void > > task_list;
                      for(int i = 0; i < 100; ++i)
                      {
                        task_list.push_back(boost::tasks::async(
                          boost::tasks::make_task( &SleepTask, i ),
                          pool));
                      }
                      std::cout<<" * all tasks scheduled\n";
                      boost::tasks::waitfor_all(task_list.begin(), task_list.end());

                      * This source code was highlighted with Source Code Highlighter.
                        0
                        rand()%100 + 10

                        Я понимаю, что топику уже 2 недели и что rand() это оффтопик, но все равно: How can I get random integers in a certain range?.
                          0
                          Я в курсе, что это не правельный рандом, но в данном случае и небыло потребности в исключительно правелном;)
                          Ну раз уж топик про буст — там есть библиотека генерации случайный чисел.
                          0
                          А за статью спасибо.
                            0
                            Рад, что вам понравилось.
                        0
                        Эмм…
                        а мне нужен планировщик расписания задач, но дело в том что по английски расписание — это scheduler и планировщик — это scheduler, и вот я нагуглил Вашу статью. Тема конечно интересная, но не совсем то, что мне в данный момент нужно (а тут речь скорее идет о пуле задач и потоков или о задаче «Round-robin» https://ru.wikipedia.org/wiki/Round-robin_(%D0%B0%D0%BB%D0%B3%D0%BE%D1%80%D0%B8%D1%82%D0%BC) )

                        А мне нужно такое:
                        добавлять задачу и время её выполнения (или периодичность — не суть) Ну и компонента уже сама выполняет добавленные задачи когда наступает время X. Своеобразный крон такой.

                        Есть ли готовые решения в бусте? Ну или как с минимальным велосипедизмом люди делают?

                        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                        Самое читаемое