Недавно на профильном ресурсе один программист задал вопрос: «Что использовать в сервере ММО для работы с потоками?». Программист склонялся к Intel TBB, но даже не к базовым примитивам, а к кастомному планированию задач (task scheduling). Ну нравится TBB — ну и ладно. А немного позже я увидел исходники сервера ММО другого программиста, который недавно начал переписываться его с нуля для улучшения архитектуры. И там было очень много велосипедов, которые писались самим программистом вместо того что бы использовать сторонние компоненты такие как boost (к примеру класы обертки над pthread-ом, и это в 2010 году, когда boost.thread уже почти в стандарте). Была там реализована и поддержка пула потоков с планировщиком задач. Тема эта мне очень интересна и я начал копать информацию о готовых решениях планировки задач (как в TBB) и нашел boost.task, про что и решил написать.
Задача (task) — это логически объедененный набор действий. Планировщик задач (task scheduler) асинхронино выполняет задачи руководствуясь определенными стратегиями по выбору кто должен выполняться в данный момент в каком потоке.
Задачи позволяют абстрагировться от обычных потоков и оперировать на более высоком уровне.
Как работает сферический сервер в вакууме? Очень просто:
Ну кроме того в сервере могут происходить какие то процессы, которые выполняются и без запроса клиента. Например рассылка уведомлений по всей базе пользователей, очистка базы от устаревших данных (крончик), обработка дневной статистики и тд.
Сейчас загвоздка именно в том, как обрабатывается запрос. Надо разобраться как его обрабатывать.
Возьмем к примеру memcached-подобный сервер: у нас есть hash_map с данными, есть запросы чтения, есть запросы записи, которые делают простой лукап по хеш-мапе и возвращают данные либо записывают их в хеш-мап. Пока всё происходит в одном потоке, но что делать, если нам надо задействовать все процессоры системы?
Создаем столько потоков, сколько ядер. В каждом потоке обрабатываем пользователей, которых при создании соеденения раскидываем по принципу round-robin. При обращении к контейнеру используем rwlock-и (boost::shared_mutex). Отлично. А как нам быть с удалением элементов из контейнера? Создаем поток, который раз в N секунд просыпается и чистит контейнер.
Это был простой пример, а теперь более сложный пример: сервис, который может в зависимости от запроса пользователя сделать запрос в базу данных, сделать http запрос на какой то сайт. Что будет если сделать серрвер по предидущей модели (все запросы к другим компонентам будут выполняться синхронно)? Ну база данных находится на той же площадке, что и сервер, ответ будет в приделах пары миллисекунд. Отослать email — тоже не проблема — ставим sendmail на ту же машину, отдаём ему данные, а он сам разберется как отослать письмо.
Отлично. Хотя не совсем. А что делать с http-запросом? Он же может занять очень долго — всё зависит от сайта который находится где то далеко и не известно сколько будет обрабатывать запрос. В таком случае поток будет бездействовать, хотя в очереди есть много запросов, которые могут выполниться, но они ждут пока освободится этот поток.
Такой запрос необходимо выполнять асинхронно. Реализовать можно так:
HttpRequestExecutor принемает url запроса и колбек, который надо вызвать по завершению запроса (тип колбека — boost::function).
И такой подход в работает, правда не слишком красиво.
В блоге Thinking Asynchronously in C++ показана интерестная реализация выполнения асинхронных задач. Выглядит результат следующим образом:
Постепенно логика усложняется, добавляются новые элементы, которые надо обрабатывать асинхронно, реализация тоже усложняется. В дальнейшем задачу
И выполняя её последовательно с остановками в http запросах мы видим, что запросы
и
можно выполнять паралельно и если мы захотим это сделать, то прийдется ещё сильнее усложнять логику. А хотелось бы написать простой код, например:
Вот тут и пригодиться планировщик задач.
Про поддержку планировщика задач в новом стандарте 0x можно почитать тут.
Мне наиболее понравился boost.task. Дальше его детальное рассмотрение.
boost.task — реализация предложения в стандарт C++0x. Она поддерживает задание стратегий выполнения задач, создание под-задач, прерывание задач.
Библиотека зависит от:
boost.task и boost.fiber компилируемые библиотеки (boost.atomic и boost.move — header-only) — так что прийдется их собирать. Что бы было удобнее эксперементировать собрал все зависимости в одном месте, приправил cmake-ом и залил поект на github. Работает на linux-е, для сборки под windows — потребуется 2-3 строчки добавить в cmake файлы.
API библиотеки достаточно простой, реализовать обработчик запроса, который описывался выше не совтавит труда. Приведу его ещё раз:
В качестве эмуляции запроса к mysql будет использован обычный sleep на случайное время:
В качестве внешнего http-запроса будет использован асинхронный таймер из boost::asio.
Итак:
Request — класс запроса.
А RequestHandler — класс обработчика запроса.
io_service — передается для того, что бы можно было выполнить внешний вызов (использовать таймер boost::asio::deadline_timer).Итак начнем. Определяем пул потоков, для обработки наших задач:
boost.task поддерживает два основных вида стратегий планировки задач:
Также есть возможность задания стратегии обработки задач внутри очереди:
Соответственно описанная строчка кода создает пул из 5 потоков с неограниченной очередью типа fifo.
Теперь нам понадобится создать io_service и пул из 3-х потоков для обработки внешних запросов.
Если вызвать io_service::run в момент когда в нем нету задач, метод сразу завершится, а для нормальной работы нам необходимы работающие потоки. Обычно это достигается тем, что в io_service добавлен accept-ор порта, на который подключаются клиенты, а в данном случае можно занять io_service ожиданием исполнения таймера:
После этого можно создать пул потоков:
Все готово, можно выполнять задачу:
boost::tasks::async поддерживает 4 алгоритма выполнения задачи:
Далее подождем пока задача выполнится:
И остановим io_service:
Функция Process получилась на удивление очень простой
Подзадачи выполняются при помощи boost::tasks::async без указания policy на запуск и автоматически выбирается as_sub_task алгоритм, который выполнит задачи в том же пуле потоков, что и родительская задача. Реализация функций подзадач тоже тривиальная.
RequestHandler::Request — вызывает boost::this_thread::sleep, а с ExternalRequest все немного сложнее:
В противовес обычным потокам и примитивам синхронизации (boost::condition) ev.wait() не блокирует поток, а блокирует задачу (вызывает в цикле this_task::yield()). А это значит, что ресурсы процессора будут использованы другими задачами.
Файл целиком может быть найден тут.
boost.task вполне удобная библиотека для планирования задач. Она позволяет посмотреть как будет выглядить поддержка асинхронного выполнения кода в новом стандарте C++0x, и её можно использовать уже прямо сейчас не дожидаясь пока будет выпущен стандарт.
Код с использованием boost.task становится меньше и намного понятнее, чем при обычном использовании потоков.
Есть канечно и недостатки: код ещё не оптимизирован, что может вызвать проблемы в редких случаях; библиотека ещё не принята в boost (вместе с её зависимостями).
Определение
Задача (task) — это логически объедененный набор действий. Планировщик задач (task scheduler) асинхронино выполняет задачи руководствуясь определенными стратегиями по выбору кто должен выполняться в данный момент в каком потоке.
Задачи позволяют абстрагировться от обычных потоков и оперировать на более высоком уровне.
Зачем нужен планировщик задач?
Как работает сферический сервер в вакууме? Очень просто:
- Приходит запрос от клиента
- Он обрабатывается!
- Отсылается ответ
Ну кроме того в сервере могут происходить какие то процессы, которые выполняются и без запроса клиента. Например рассылка уведомлений по всей базе пользователей, очистка базы от устаревших данных (крончик), обработка дневной статистики и тд.
Сейчас загвоздка именно в том, как обрабатывается запрос. Надо разобраться как его обрабатывать.
Возьмем к примеру memcached-подобный сервер: у нас есть hash_map с данными, есть запросы чтения, есть запросы записи, которые делают простой лукап по хеш-мапе и возвращают данные либо записывают их в хеш-мап. Пока всё происходит в одном потоке, но что делать, если нам надо задействовать все процессоры системы?
Создаем столько потоков, сколько ядер. В каждом потоке обрабатываем пользователей, которых при создании соеденения раскидываем по принципу round-robin. При обращении к контейнеру используем rwlock-и (boost::shared_mutex). Отлично. А как нам быть с удалением элементов из контейнера? Создаем поток, который раз в N секунд просыпается и чистит контейнер.
Это был простой пример, а теперь более сложный пример: сервис, который может в зависимости от запроса пользователя сделать запрос в базу данных, сделать http запрос на какой то сайт. Что будет если сделать серрвер по предидущей модели (все запросы к другим компонентам будут выполняться синхронно)? Ну база данных находится на той же площадке, что и сервер, ответ будет в приделах пары миллисекунд. Отослать email — тоже не проблема — ставим sendmail на ту же машину, отдаём ему данные, а он сам разберется как отослать письмо.
Отлично. Хотя не совсем. А что делать с http-запросом? Он же может занять очень долго — всё зависит от сайта который находится где то далеко и не известно сколько будет обрабатывать запрос. В таком случае поток будет бездействовать, хотя в очереди есть много запросов, которые могут выполниться, но они ждут пока освободится этот поток.
Такой запрос необходимо выполнять асинхронно. Реализовать можно так:
class LongRequestHandler
{
public:
void Handle()
{
// read client request parameters
// mysql request 1
// mysql request 2
HttpRequestExecutor::GetInstance()->Execute(
"example.com?x=1",
boost::bind(this, &LongRequestHandler::HandleStage2)
);
}
void HandleStage2(const std::string & http_request_result)
{
// mysql request 3
// write response to client
}
};
* This source code was highlighted with Source Code Highlighter.
HttpRequestExecutor принемает url запроса и колбек, который надо вызвать по завершению запроса (тип колбека — boost::function).
И такой подход в работает, правда не слишком красиво.
В блоге Thinking Asynchronously in C++ показана интерестная реализация выполнения асинхронных задач. Выглядит результат следующим образом:
template <typename Handler>void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
bind(&async_echo<handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
if (ec) break;
yield async_write(socket,
buffer(working_buffer, length),
bind(&async_echo<handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
}
handler(ec);
}
}
* This source code was highlighted with Source Code Highlighter.
Coroutine и yield в С++ смотрятся необычно;) Реализовано это на дефайнах, в блоге можно почитать как это удалось автору.Постепенно логика усложняется, добавляются новые элементы, которые надо обрабатывать асинхронно, реализация тоже усложняется. В дальнейшем задачу
mysql request 1 mysql request 2 http request 1 mysql request 3 http request 2 mysql request 4 mysql request 5
И выполняя её последовательно с остановками в http запросах мы видим, что запросы
mysql request 2 http request 1
и
mysql request 3 http request 2 mysql request 4
можно выполнять паралельно и если мы захотим это сделать, то прийдется ещё сильнее усложнять логику. А хотелось бы написать простой код, например:
mysql request 1 x = run(func1) y = run(func2) wait(x, y) mysql request 5 func1: mysql request 2 http request 1 func2: mysql request 3 http request 2 mysql request 4
Вот тут и пригодиться планировщик задач.
Реализации
Про поддержку планировщика задач в новом стандарте 0x можно почитать тут.
- just::thread — реализация библиотеки потоков стандарта C++0x от отца boost::thread
- Parallel Patterns Library (PPL) — реализцаия от Microsoft
- Asynchronous Agents Library — и ещё одна от Microsoft
- Intel Threading Building Blocks — очень мощная библиотека для паралельного программирования от Intel. Включает в себя и планировщик задач.
- boost::task — — реализация от Oliver Kowalke, не принятая ещё в boost
Мне наиболее понравился boost.task. Дальше его детальное рассмотрение.
Описание boost.task
boost.task — реализация предложения в стандарт C++0x. Она поддерживает задание стратегий выполнения задач, создание под-задач, прерывание задач.
Библиотека зависит от:
- boost >=1.41
- boost.atomic — реализация атомарных объектов C++0x для boost
- boost.move — реализация семантики Move для C++03 стандарта в котором ещё нет rvalue references
- boost.fiber — аналог boost.thread для легковесных потоков
boost.task и boost.fiber компилируемые библиотеки (boost.atomic и boost.move — header-only) — так что прийдется их собирать. Что бы было удобнее эксперементировать собрал все зависимости в одном месте, приправил cmake-ом и залил поект на github. Работает на linux-е, для сборки под windows — потребуется 2-3 строчки добавить в cmake файлы.
Пример использования
API библиотеки достаточно простой, реализовать обработчик запроса, который описывался выше не совтавит труда. Приведу его ещё раз:
mysql request 1 mysql request 2 http request 1 mysql request 3 http request 2 mysql request 4 mysql request 5
В качестве эмуляции запроса к mysql будет использован обычный sleep на случайное время:
boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%100 + 10));
В качестве внешнего http-запроса будет использован асинхронный таймер из boost::asio.
Итак:
Request — класс запроса.
class Request
{
public:
Request(const std::string & data);
const std::string & Read() const;
void Write(const std::string & answer);
};
* This source code was highlighted with Source Code Highlighter.
А RequestHandler — класс обработчика запроса.
class RequestHandler
{
public:
RequestHandler(boost::asio::io_service & io_service, const RequestPtr & request);
void Process() const;
};
* This source code was highlighted with Source Code Highlighter.
io_service — передается для того, что бы можно было выполнить внешний вызов (использовать таймер boost::asio::deadline_timer).Итак начнем. Определяем пул потоков, для обработки наших задач:
boost::tasks::static_pool< boost::tasks::unbounded_fifo > pool( boost::tasks::poolsize( 5) );
* This source code was highlighted with Source Code Highlighter.
boost.task поддерживает два основных вида стратегий планировки задач:
- ограниченные (bounded) — имеют порог количества выполняемых задач, при достижении которого добавление новой задачи блокирует поток, который выполняет это действие. Основная задача — избежать исчерпания ресурсов (resource exhaustion) когда скорость добавления задач превышает скорость их выполнения
- неограниченные (unbounded) — позволяют добавлять бесконечное число задач в очередь
Также есть возможность задания стратегии обработки задач внутри очереди:
- fifo — первая добавленная задача выполняется первой
- priority — у задачи есть приоритет, для выполнения выбираются задачи с высшим приоритетом
- smart — очередь такого типа возможно сильно кастомизировать передавая параметры в шаблон. по умолчанию есть возможность индексировать задачи по любому ключу и заменять старую задачу на новую, если она сущесвует
Соответственно описанная строчка кода создает пул из 5 потоков с неограниченной очередью типа fifo.
Теперь нам понадобится создать io_service и пул из 3-х потоков для обработки внешних запросов.
boost::asio::io_service io_service;
* This source code was highlighted with Source Code Highlighter.
Если вызвать io_service::run в момент когда в нем нету задач, метод сразу завершится, а для нормальной работы нам необходимы работающие потоки. Обычно это достигается тем, что в io_service добавлен accept-ор порта, на который подключаются клиенты, а в данном случае можно занять io_service ожиданием исполнения таймера:
boost::asio::deadline_timer dummy_timer(io_service);
dummy_timer.expires_from_now(boost::posix_time::seconds(10));
// void dummy_handler(const boost::system::error_code&) {}
dummy_timer.async_wait(&dummy_handler);
* This source code was highlighted with Source Code Highlighter.
После этого можно создать пул потоков:
boost::thread_group io_service_thread_pool;
for(int i = 0; i < 3; ++i)
io_service_thread_pool.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service)
);
* This source code was highlighted with Source Code Highlighter.
Далее создаём запрос:RequestPtr request(new Request("some data"));
RequestHandlerPtr handler(new RequestHandler(io_service, request));
* This source code was highlighted with Source Code Highlighter.
Все готово, можно выполнять задачу:
boost::tasks::handle< void > request_processing(
boost::tasks::async(
boost::tasks::make_task( &RequestHandler::Process, handler ),
pool));
* This source code was highlighted with Source Code Highlighter.
boost::tasks::make_task( &RequestHandler::Process, handler ) — создает задачу вызова Process у объекта handler, которую можно будет выполнить. boost::tasks::async инициирует асинхронное выполнение задачи. boost::tasks::handle объект, по которому можно отслеживать статус завершения задачи, получить результат если он есть.boost::tasks::async поддерживает 4 алгоритма выполнения задачи:
- own_thread — синхронное выполнение в том же потоке
- new_thread — для задачи создается поток, в котором она будет выполнена, после чего поток будет завершен
- as_sub_task — если текущая задача выполняется в пуле — добавляет новую задачу в него, иначе создает новый поток, как new_thread. Это поведение по умолчанию
- static_pool — выполнить задачу в пуле потоков
Далее подождем пока задача выполнится:
request_processing.wait();
* This source code was highlighted with Source Code Highlighter.
И остановим io_service:
io_service.stop();
io_service_thread_pool.join_all();
* This source code was highlighted with Source Code Highlighter.
Функция Process получилась на удивление очень простой
void Subtask1() const
{
Request("query2");
ExternalRequest("extquery1");
}
void Subtask2() const
{
Request("query3");
ExternalRequest("extquery2");
Request("query4");
}
void Process() const
{
std::string data = request_->Read();
Request("query1");
boost::tasks::handle< void > subtask1(
boost::tasks::async(
boost::tasks::make_task( &RequestHandler::Subtask1, this )));
boost::tasks::handle< void > subtask2(
boost::tasks::async(
boost::tasks::make_task( &RequestHandler::Subtask2, this )));
boost::tasks::waitfor_all( subtask1, subtask2);
Request("query5");
request_->Write("some answer");
}
* This source code was highlighted with Source Code Highlighter.
Подзадачи выполняются при помощи boost::tasks::async без указания policy на запуск и автоматически выбирается as_sub_task алгоритм, который выполнит задачи в том же пуле потоков, что и родительская задача. Реализация функций подзадач тоже тривиальная.
RequestHandler::Request — вызывает boost::this_thread::sleep, а с ExternalRequest все немного сложнее:
void ExternalRequest(const std::string & what) const
{
ExternalRequestHandler external_handler(io_service_);
boost::tasks::spin::auto_reset_event ev;
external_handler.PerformExternalReqeust(what, &ev);
ev.wait();
}
* This source code was highlighted with Source Code Highlighter.
Создается хендлер, а так же событие с автоматическим сбросом — boost::tasks::spin::auto_reset_event. Это событие передается обработчику внешнего запроса и по его завершению будет вызвано ev.set(), а до тех пор ev.wait() блокирует задачу.В противовес обычным потокам и примитивам синхронизации (boost::condition) ev.wait() не блокирует поток, а блокирует задачу (вызывает в цикле this_task::yield()). А это значит, что ресурсы процессора будут использованы другими задачами.
Файл целиком может быть найден тут.
Выводы
boost.task вполне удобная библиотека для планирования задач. Она позволяет посмотреть как будет выглядить поддержка асинхронного выполнения кода в новом стандарте C++0x, и её можно использовать уже прямо сейчас не дожидаясь пока будет выпущен стандарт.
Код с использованием boost.task становится меньше и намного понятнее, чем при обычном использовании потоков.
Есть канечно и недостатки: код ещё не оптимизирован, что может вызвать проблемы в редких случаях; библиотека ещё не принята в boost (вместе с её зависимостями).