Этот пост является заключительным в моей мини-серии из трех постов о cppcoro. cppcoro — это библиотека абстракций корутин от Льюиса Бейкера (Lewis Baker). Сегодня я покажу вам пулы потоков (thread pools).
Чтобы получить максимум пользы от этого поста, вам следовало бы ознакомиться с двумя моими предыдущим постами про cppcoro.
C++20: Корутины с cppcoro: введение в cppcoro, простейшие задачи и генератор корутин.
C++20: Мощные корутины с cppcoro: более мощные корутины с потоками.
В дополнение к функции cppcoro::sync_wait
, которую можно использовать для ожидания указанный Awaitable
завершается, cppcoro предлагает довольно интересную функцию cppcoro::when_all
.
when_all
when_all
: создаетAwaitable
, который ожидает все своиInput-Awaitable
и возвращает совокупность их результатов.
Я упростил определение функции cpporo::when_all
для этой статьи. Следующий пример должен помочь вам понять что к чему.
// cppcoroWhenAll.cpp
#include <chrono>
#include <iostream>
#include <thread>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/when_all.hpp>
using namespace std::chrono_literals;
cppcoro::task<std::string> getFirst() {
std::this_thread::sleep_for(1s); // (3)
co_return "First";
}
cppcoro::task<std::string> getSecond() {
std::this_thread::sleep_for(1s); // (3)
co_return "Second";
}
cppcoro::task<std::string> getThird() {
std::this_thread::sleep_for(1s); // (3)
co_return "Third";
}
cppcoro::task<> runAll() {
// (2)
auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());
std::cout << fir << " " << sec << " " << thi << std::endl;
}
int main() {
std::cout << std::endl;
auto start = std::chrono::steady_clock::now();
cppcoro::sync_wait(runAll()); // (1)
std::cout << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start; // (4)
std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
std::cout << std::endl;
}
Задача верхнего уровня cppcoro::sync_wait(runAll())
(строка 1) ожидает Awaitable runAll
. runAll
ожидает Awaitable getFirst
, getSecond
и getThird
(строка 2). Awaitable runAll
, getFirst
, getSecond
и getThird
являются корутирами. Каждая из функций get
засыпает на одну секунду (строка 3). Три раза по одной секунде составляет три секунды. Это и будет временем, в течение которого вызов cppcoro::sync_wait(runAll())
ожидает корутины. Строка 4 отображает это время.
Теперь, когда мы познакомились с функцией cppcoro::when_all
, позвольте мне добавить к ней пулы потоков.
static_thread_pool
static_thead_pool
: управление работой пула потоков фиксированного размера.
cppcoro::static_thread_pool
можно вызывать с числовым параметром или без. Это число обозначает количество созданных потоков. Если вы не укажете число, то вместо него будет использована функция C++11 std::thread::hardware_concurrency(). std::thread::hardware_concurrency
предоставляет вам информацию о количестве аппаратных потоков, поддерживаемых вашей системой. Это может быть количество процессоров или ядер, которые находятся в вашем распоряжении.
Позвольте мне продемонстрировать это на практике. Следующий пример я вляется модификацией предыдущего — он одновременно выполняет корутины getFirst
, getSecond
и getThird
.
// cppcoroWhenAllOnThreadPool.cpp
#include <chrono>
#include <iostream>
#include <thread>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/when_all.hpp>
using namespace std::chrono_literals;
cppcoro::task<std::string> getFirst() {
std::this_thread::sleep_for(1s);
co_return "First";
}
cppcoro::task<std::string> getSecond() {
std::this_thread::sleep_for(1s);
co_return "Second";
}
cppcoro::task<std::string> getThird() {
std::this_thread::sleep_for(1s);
co_return "Third";
}
template <typename Func>
cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
co_await tp.schedule();
auto res = co_await func();
co_return res;
}
cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {
auto[fir, sec, thi] = co_await cppcoro::when_all( // (3)
runOnThreadPool(tp, getFirst),
runOnThreadPool(tp, getSecond),
runOnThreadPool(tp, getThird));
std::cout << fir << " " << sec << " " << thi << std::endl;
}
int main() {
std::cout << std::endl;
auto start = std::chrono::steady_clock::now();
cppcoro::static_thread_pool tp; // (1)
cppcoro::sync_wait(runAll(tp)); // (2)
std::cout << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start; // (4)
std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
std::cout << std::endl;
}
Сейчас я объясню основные отличия от предыдущей программы cppcoroWhenAll.cpp
. Я создал в строке (1) пул потоков tp
и использовал его в качестве аргумента для функции runAll(tp)
(строка 2). Функция runAll
использует пул потоков для одновременного запуска корутин. Благодаря структурным привязкам (строка 3) значения каждой корутины можно легко агрегировать и присвоить переменной. В итоге функция main выполняется за одну, а не за три секунды, как раньше.
Возможно, вы знаете, что мы получаем с защелками и барьерами C++20. Защелки (latches) и барьеры (barrier) — это механизмы синхронизации потоков, которые позволяют некоторым потокам блокироваться до тех пор, пока счетчик не станет равным нулю. cppcoro
также поддерживает защелки и барьеры.
async_latch
async_latch
: позволяет заставить корутину асинхронно ожидать, пока счетчик не станет равным нулю
Следующая программа cppcoroLatch.cpp
демонстрирует синхронизацию потоков с помощью cppcoro::async_latch
.
// cppcoroLatch.cpp
#include <chrono>
#include <iostream>
#include <future>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/async_latch.hpp>
#include <cppcoro/task.hpp>
using namespace std::chrono_literals;
cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
std::cout << "Before co_await" << std::endl;
co_await latch; // (3)
std::cout << "After co_await" << std::endl;
}
int main() {
std::cout << std::endl;
cppcoro::async_latch latch(3); // (1)
// (2)
auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); });
auto counter1 = std::async([&latch] { // (2)
std::this_thread::sleep_for(2s);
std::cout << "counter1: latch.count_down() " << std::endl;
latch.count_down();
});
auto counter2 = std::async([&latch] { // (2)
std::this_thread::sleep_for(1s);
std::cout << "counter2: latch.count_down(2) " << std::endl;
latch.count_down(2);
});
waiter.get(), counter1.get(), counter2.get();
std::cout << std::endl;
}
В строке (1) я создаю cppcoro::asynch_latch
и инициализирую счетчик равным 3. На этот раз я использую std::async
(строка (2)) для одновременного запуска трех корутин. Каждый std::async
получает latch
для каждой ссылки. В строке (3) корутина waitFor
ожидает, пока счетчик не станет равным нулю. Корутина counter1
засыпает на 2 секунды, а затем отсчитывает 1 событие. Напротив, counter2
засыпает на 1 секунду и отсчитает 2 события. На скриншоте показано это чередование потоков.
Что дальше?
На данный момент я успел написать о трех из большой четверки C++20: концептах, диапазонах и корутинах. В моем путешествии по большой четверке все еще отсутствуют модули. Они и станут темой моих следующих постов.
Кстати, если кто-то хочет написать пост о возможностях C++20, о которых я тоже планирую писать в будущем, свяжитесь со мной. Я с удовольствием опубликую ее и переведу на английский/немецкий язык, если это необходимо.
Программа наставничества
Моя новая программа наставничества "Fundamentals for C++ Professionals" стартует в апреле. Получить дополнительную информацию можно здесь.
Всех желающих приглашаем на открытый урок в OTUS «Умные указатели», на котором разберем, что такое умные указатели и зачем они нужны. Проведем обзор умных указателей входящих в stl, unique_ptr, Shared_ptr, weak_ptr.