Senders/Receivers в C++26: от теории к практике
Практически каждый день мы сталкиваемся с необходимостью писать параллельный и асинхронный код: загрузка сетевых данных, взаимодействие с БД, UI-обработка, фоновые вычисления. Традиционные инструменты — std::thread, std::async, std::future — порождают boilerplate, разбросанные try/catch, сложную отмену и «callback hell».
Взамен этого C++26 стандартизирует декларативную модель Senders/Receivers в заголовке <execution>
, дополняемую совместимостью с корутинам и расширением параллельных алгоритмов через policy_aware_scheduler
. В результате мы получаем:
Единый API для асинхронности и параллелизма, устраняющий «callback-hell» и громоздкость
std::future/std::async
Композиционные конвейеры (
| then | when_all | upon_error
), позволяющие строить гибкие DAG(Directed Acyclic Graph) задачиКонтроль отмены и обработку ошибок как неотъемлемую часть пайплайна (
set_stopped, upon_error
)Интеграцию корутин (
co_await
sender) и передачу scheduler в параллельные алгоритмы (std::for_each
)
Чтобы оценить и разобраться с предстоящими нововведениями, в этой статье:
Разберём три ключевые абстракции: Scheduler, Sender, Receiver.
Поговорим об инспекции кода (
completion_signatures_of_t
).Реализуем профилирование через
tag_invoke
.Объединим всё это в небольших примерах с NVIDIA stdexec.
Почему понадобился новый и единый подход?
Попытки реализовать асинхронность в C++ предпринимались давно — и не всегда удачно. До появления Senders/Receivers С++ опирался на такие инструменты, как std::async
, std::future
, Boost.Asio и корутины co_await
. Они помогали решать задачи, но по мере роста сложности приложений обнаружились ограничения, мешающие их эффективному применению в реальных сценариях.
Появившись в C++11, std::async
и std::future
стали первым шагом к удобной асинхронности на уровне стандарта. Однако эти инструменты быстро показали свою несостоятельность в более сложных случаях:
Нельзя выразить последовательность задач декларативно, приходится вручную вызывать
get()
и обрабатывать состояния;Eсли задача уже запущена — остановить её невозможно, поскольку нет предусмотренного механизма отмены;
Отсутствие интеграции с планировщиками и I/O: нельзя задать, где именно выполнять задачу, или привязать к событиям ввода-вывода;
Нет полноценной обработки ошибок, исключения пробрасываются через
future::get()
, что создаёт хаос в контроле потока исполнения;Результат одной задачи нельзя передать другой без вложенных вызовов и бойлерплейта.
И, наверняка, можно продолжать еще долго, постепенно углубляясь в реализации...
Тот же Boost.Asio обеспечил высочайшую производительность в I/O, но расплатой за это стала крайне низкая читаемость. Композиция асинхронных операций строится через вложенные лямбды — отсюда и "callback hell". Это затрудняет не только поддержку, но и любую рефакторизацию:
async_read(..., [&](...) {
async_write(..., [&](...) {
async_timer(..., [&](...) {
// Довольно больно
});
});
});
Кроме того, управление состоянием задачи (ошибки, отмена, сбои) перекладывается на плечи разработчика (надежно, как указатель в C++) .
С введением co_await
в C++20 появилась надежда — императивный стиль вернулся в асинхронный код. Но оказалось, что корутины: не умеют напрямую "понимать" std::future
или asio::awaitable
; не композируются между собой без дополнительной инфраструктуры; не интегрируются с scheduler
или I/O без написания собственных адаптеров; имеют неочевидные правила lifetime и сложные семантики отмены.
Поэтому Senders/Receivers - это системный ответ на фрагментированность и разобщённость подходов к асинхронному программированию в C++, который интегрирует их в единую архитектуру, способную масштабироваться от однопоточного embedded-кода до реактивных микросервисов и GPU-обработки данных.
Архитектура Senders/Receivers (с чего все начинается)
Новая модель базируется на трёх абстракциях:
1) Scheduler описывает контекст исполнения (thread-pool, inline, I/O-реактор) без жёсткой привязки к конкретной реализации. Наглядные реализации:
inline_scheduler
— выполняет задачи синхронно сразу,static_thread_pool::scheduler
из NVIDIA stdexec,reactor_scheduler
(I/O наio_uring
),по хабам разбросаны и другие пользовательские GPU/GUI schedulers.
2) Sender — ленивое описание асинхронной операции. Он не начинает работу, пока не будет подписан или не передан адаптеру:
Single-value senders:
sync_wait
,just
,then
,co_await sender
.Multi-value senders:
when_all
,when_any
,when_all_range
.
Как это работает? За кулисами Senders/Receivers определены два низкоуровневых CPO (равно как и в coroutine interop ("P3109R0")):
std::execution::connect(Sender, Receiver)
— связывает sender и receiver, создавая «operation state» (состояние операции) без запуска задачиstd::execution::start(OperationState&)
— запускает уже сконструированную операцию, переводя её в активный режим исполнения
Поэтому мы можем смело отложить запуск задачи до полной компоновки графа задач, агрегировать аллокации (единственная память для всех operation state) и настроить отмену (через get_stop_token
) до фактического старта.
Если бы всё запускалось сразу, мы потеряли бы контроль над инициализацией контекста, отменой и объединением нескольких senders в DAG.
3) Receiver — это коллбэк с тремя каналами:
set_value(v…)
при результате,set_error(exception_ptr)
при исключении,set_stopped()
при отмене.
И, куда же без них, разные адаптеры:
just(v…)
создаёт sender, который сразу отдаёт значение при подписке.read_env(get_scheduler)
извлекаетscheduler
из окружения.then(sender, func)
— принимает sender (можно и от предыдущего адаптера) и возвращает обычное готовое значение (или tuple значений), и запаковывает этот результат в новый sender (если звать подряд - будет sender<sender<int>> и т.д. :auto s = just( 10 ) | then( []( int x ) { return x * 2; } ); - простой трансформер
let_value(sender, func)
— асинхронное продолжение, которое также возвращает sender, но не углубляя вложенность. Требует, чтобыfunc
возвращалsender
(другую асинхронную работу), и «раскрывает» его результаты напрямую, объединяя два шага в один DAG-узел.when_all(s1,s2,…)
— ждёт всех senders, отдаётset_value(v1,v2,…)
.when_any(s1,s2,…)
— завершается первым.when_all_range(container)
— принимает run-time коллекцию senders.sync_wait
- блокирующий вызов, возвращающийoptional<tuple<...>>
(пустой при ошибке или остановке)
По ходу статьи я буду приводить примеры, попутно задействуя большую часть данных инструментов. Для экспериментов буду использовать референсную реализацию NVIDIA stdexec
, пока <execution>
не появится в стандартной библиотеке.
Начнем знакомство с малого, и переведем слова в верхний регистр:
#include <iostream>
#include <stdexec/execution.hpp>
#include <string>
namespace ex = stdexec;
int main()
{
// Создаем sender посредством just, который сразу отправляет строку "Hello"
auto sender = ex::just( std::string( "Hello" ) ) |
ex::then(
[]( auto&& s )
{
for( auto& c : s )
c = std::toupper( c );
return s;
// возвращает sender<std::string>
} ) |
ex::let_value(
[]( std::string s )
{
return ex::just( s + "!" );
// возвращает sender<std::string>
} );
auto result = ex::sync_wait( sender );
// sync_wait автоматически connect+start
if( result )
{
auto [ str ] = result.value(); // Распаковка кортежа
std::cout << "Результат: " << str << std::endl;
}
}
1) ex::just("Hello")
сразу передаёт дальше по графу задач строку "Hello"
, облеченную в sender -> (sender<std::string>).
2) Первый then
берёт эту строку через forwarding auto&&
, модифицирует её и возвращает просто строку. В результате после второго шага у нас по-прежнему sender<std::string>
.
3) let_value
видит, что возвращен sender
, распаковывает вложенный sender и делает так, чтобы он стал частью общей цепочки. В итоге sender
выдаёт непосредственно "HELLO!"
через set_value
.
Если бы вместо let_value
использовался then
, то лямбда [](std::string s){ return ex::just(s + "!"); }
вернула бы sender<std::string>
, и then
упаковал бы его как значение, результатом стало бы sender<sender<std::string>>
, что не то, чего мы хотим.
Следующим примером пойдем через пул потоков считать количество слов в файлах, добавим обработку исключений и суммарный подсчет слов.
// NVIDIA-пул потоков для примера
#include <exec/static_thread_pool.hpp>
#include <fstream>
#include <iostream>
#include <stdexec/execution.hpp>
namespace ex = stdexec;
int main()
{
// Создаём NVIDIA-пул из 3 потоков и получаем его scheduler:
exec::static_thread_pool pool{ 3 };
auto sched = pool.get_scheduler();
// Мини-фабрика sender-ов, каждый считает слова в своём файле:
auto make_count = [ &sched ]( std::string path )
{ // переключаемся в pool
return ex::schedule( sched )
// передаём путь
| ex::then( [ p = std::move( path ) ] { return p; } )
| ex::then(
[]( std::string&& p ) -> size_t
{
// читаем и считаем слова
std::ifstream in( p );
if( !in ) // выбросим исключение, обработчик ниже
throw std::runtime_error( "Cannot open: " + p );
size_t cnt = 0;
std::string w;
while( in >> w )
++cnt;
std::cout << "Count in " << p << " = " << cnt << std::endl;
return cnt;
} ) |
ex::upon_error( // наш обработчик по всем исключениям
[]( std::exception_ptr ep ) -> size_t
{
// Если в любом шаге произошло исключение
// upon_error вернёт 0, и sync_wait отработает корректно.
try
{
if( ep )
std::rethrow_exception( ep );
}
catch( const std::exception& e )
{
std::cerr << e.what() << std::endl;
}
return 0;
} );
};
// Строим три sender-а — по одному на файл:
auto s1 = make_count( "file1.txt" );
auto s2 = make_count( "file2.txt" );
auto s3 = make_count( "file3.txt" );
// Собираем в один pipeline:
// - when_all запускает все три параллельно,
// - первый then упаковывает результаты в вектор,
// - второй then суммирует вектор.
auto pipeline =
ex::when_all( std::move( s1 ), std::move( s2 ), std::move( s3 ) ) |
ex::then(
[]( size_t a, size_t b, size_t c ) {
return std::vector< size_t >{ a, b, c };
} ) |
ex::then( []( std::vector< size_t > v ) { return std::accumulate( v.begin(), v.end(), size_t{ 0 } ); } );
// Запуск и синхронное ожидание:
auto result = ex::sync_wait( std::move( pipeline ) );
if( result )
{
auto [ total ] = result.value();
std::cout << "Total words = " << total << "\n";
}
}
Такой подход даёт чистый и композиционный стиль: каждая операция — отдельный этап пайплайна, всё легко читать и масштабировать.
Но на этом потенциальная польза не заканчивается — Sender/Receiver-подход открывает дверь к множеству дополнительных, ранее сложных или неявных возможностей.
Во-первых, статическая инспекция: completion_signatures_of_t
Благодаря типобезопасности и метапрограммным трюкам (value_types_of_t
, completion_signatures_of_t
), вы можете на этапе компиляции проверить, что конкретный sender порождает именно те значения или ошибки, с которыми вы готовы работать. Это исключает целый класс рантайм-ошибок — пайплайн можно “просмотреть” как тип, а не как черный ящик.
Фактически мы говорим про статическую, декларативную инспекцию возможностей асинхронного звена в конвейере задач.
Каждый sender декларирует, какие каналы завершения он поддерживает.
#include <stdexec/execution.hpp>
#include <string>
#include <type_traits>
namespace ex = stdexec;
int main()
{
// Тип самого sender-а:
using Sender = decltype( ex::just( std::string{} ) );
// Окружение: нам интересен get_scheduler, но для just() оно не влияет на значения
using Env = ex::env< ex::get_scheduler_t >;
// Вычисляем фактические сигнатуры:
using actual_sigs = ex::completion_signatures_of_t< Sender, Env >;
// Задаём ТОЛЬКО ожидаемый набор: just() выдаёт ровно set_value(string)
using expected_sigs = ex::completion_signatures< ex::set_value_t( std::string ) >;
static_assert( std::is_same_v< actual_sigs, expected_sigs >,
"completion_signatures_of_t<just(string)> must be <set_value(string)>" );
return 0;
}
В модели Senders/Receivers частью сигнатур адаптера является environment — совокупность дополнительных параметров (scheduler, allocator, stop_token). Мы выбираем минимальное окружение, содержащее только get_scheduler_t
. Для senders, не использующих scheduler, это окружение «нейтрально», но его обязательно указывать для метапрограммной проверки.
completion_signatures_of_t
- это метафункциональный CPO, который извлекает из sender-а набор допустимых «каналов» завершения — именно set_value
, set_error
, set_stopped
вместе с типами их аргументов. По сути, это чисто статическая информация, доступная на этапе компиляции.
completion_signatures
- это шаблон для явного конструирования ожидаемого набора каналов. Здесь мы утверждаем, что just(string)
исключительно выдаёт set_value(string)
и не поддерживает error
или stopped
. Любое отличие считается ошибкой.
Такой подход позволяет провести статический анализ, static_assert
согласованный API и покрытие тестов для всех ветвей DAG:
#include <exec/static_thread_pool.hpp>
#include <iostream>
#include <stdexec/execution.hpp>
#include <type_traits>
namespace ex = stdexec;
int main()
{
// Создаём пул потоков и получаем scheduler
exec::static_thread_pool pool{ 2 };
auto sched = pool.get_scheduler();
using Env = ex::env< ex::get_scheduler_t >;
// schedule(sched) возвращает sender, который при старте вызовет set_value()
// (без аргументов), то есть его «value»-канал имеет тип void.
auto stage1 = ex::schedule( sched )
// then перехватывает результат void() и превращает его в std::vector<int>
| ex::then(
[]() {
return std::vector< int >{ 1, 2, 3, 4, 5 };
} );
// Какие каналы completion здесь?
// - set_error(exception_ptr) — если при scheduling или в лямбде выбросилось
// - set_stopped() — если отменили задачу до или во время выполнения
// - set_value(vector<int>) — успешный результат с нашим вектором
using stage1_sigs = ex::completion_signatures_of_t< decltype( stage1 ), Env >;
using expected_stage1 = ex::completion_signatures<
ex::set_error_t( std::exception_ptr ), ex::set_stopped_t(),
ex::set_value_t( std::vector< int > ) >;
static_assert( std::is_same_v< stage1_sigs, expected_stage1 >,
"Stage1: unexpected signatures" );
// Здесь всё ок: void→vector<int> после then, плюс стандартные error/stopped.
// === Pipeline: then(vector→int) + upon_error → int ===
auto pipeline = stage1
// then ─ берёт на вход vector<int> из stage1
// и "заменяет" канал set_value(vector<int>) на новый канал set_value(int).
| ex::then( []( const std::vector< int >& v ) { return std::accumulate( v.begin(), v.end(), 0 ); } )
// ex::upon_error ─ перехватывает любой exception_ptr из предыдущих senders
| ex::upon_error([]( std::exception_ptr ep )
{
try
{
if( ep )
std::rethrow_exception( ep );
}
catch( const std::exception& e )
{
std::cerr << "[Error] " << e.what() << "\n";
}
return -1;
} );
// Инспекция completion_sigs для pipeline:
// 1) set_error(exception_ptr) — любая ошибка на любом этапе
// 2) set_stopped() — кооперативная отмена (schedule + then поддерживают stopped)
// 3) set_value(int) — единственный value'канал после второго then/upon_error
using pipeline_sigs = ex::completion_signatures_of_t< decltype( pipeline ), Env >;
using expected_pipeline =
ex::completion_signatures< ex::set_error_t( std::exception_ptr ),
ex::set_stopped_t(), ex::set_value_t( int ) >;
static_assert( std::is_same_v< pipeline_sigs, expected_pipeline >,
"Pipeline: unexpected signatures" );
// Здесь нет set_value(vector<int>) от предыдущего then
auto result = ex::sync_wait( std::move( pipeline ) );
if( result )
{
auto [ r ] = result.value();
std::cout << "Result = " << r << "\n"; // Вывод: Result = 15
}
}
Данный способ наглядно позволяет формально описывать поведение асинхронных операций без запуска, в чисто типовом виде и выявлять потенциальные ошибки и несогласованности в адаптерах, особенно при сложных преобразованиях потоков данных и исключений. Может, немного, но это может быть удобнее.
Во-вторых, профилирование через tag_invoke
В <execution>
для каждого «core operation» (schedule
, connect
, start
и т.д.) определён соответствующий customization-point object (CPO), реализованный через вызов tag_invoke
.
При вызове
ex::schedule(some_scheduler);
стандартная реализация ищет свободную функцию
tag_invoke(ex::schedule_t, /* ваш тип scheduler */ )
и если она найдена — вызывает именно её.
Мы оборачиваем schedule
для собственного profiling_scheduler
, чтобы автоматически замерять производительность:
// Profiling-scheduler: обёртка над реальным scheduler-ом
struct profiling_scheduler
{
exec::static_thread_pool::scheduler base;
};
Ниже по коду мы определяем свой кастомный tag_invoke:
// Перехват schedule(ps) через tag_invoke
inline auto tag_invoke( ex::schedule_t, profiling_scheduler ps )
{
// Оригинальный sender<void> для переноса в пул
auto s = ex::schedule( ps.base );
// Вешаем then-лямбду, чтобы замерить
return s | ex::then(
[ start = std::chrono::steady_clock::now() ]()
{
auto dur = std::chrono::steady_clock::now() - start;
std::cout << "[Profiling] schedule→execution: "
<< dur.count() << std::endl;
} );
}
В результате каждый schedule(ps)
выводит задержку переводa задачи в пул без правки бизнес-логики, иначе говоря, замеряет время от вызова schedule(...)
до реального начала выполнения задачи в пуле.
Способов применения масса, а вот как выглядит доработанный пример:
#include <exec/static_thread_pool.hpp>
#include <fstream>
#include <iostream>
#include <stdexec/execution.hpp>
namespace ex = stdexec;
// Profiling-scheduler: обёртка над реальным scheduler-ом
struct profiling_scheduler
{
exec::static_thread_pool::scheduler base;
};
// Перехват schedule(ps) через tag_invoke
inline auto tag_invoke( ex::schedule_t, profiling_scheduler ps )
{
// Оригинальный sender<void> для переноса в пул
auto s = ex::schedule( ps.base );
// Вешаем then-лямбду, чтобы замерить
return s | ex::then(
[ start = std::chrono::steady_clock::now() ]()
{
auto dur = std::chrono::steady_clock::now() - start;
std::cout << "[Profiling] schedule→execution: "
<< dur.count() << std::endl;
} );
}
int main()
{
// Готовим пул и profiling_scheduler
exec::static_thread_pool pool{ 3 };
profiling_scheduler ps{ pool.get_scheduler() };
auto sched = ps;
std::vector< std::string > files = { "file1.txt", "file2.txt", "file3.txt" };
// 6. Фабрика senders для подсчёта слов
auto make_count = [ & ]( std::string path )
{
return ex::schedule( sched ) | ex::then( [ p = std::move( path ) ]
{ return p; } ) |
ex::then(
[ & ]( std::string p ) -> size_t
{
std::ifstream in( p );
if( !in )
throw std::runtime_error( "Cannot open " + p );
size_t cnt = 0;
std::string w;
while( in >> w )
{
++cnt;
}
std::cout << "Count in " << p << " = " << cnt << std::endl;
return cnt;
} ) |
ex::upon_error(
[]( std::exception_ptr ep ) -> size_t
{
try
{
if( ep )
std::rethrow_exception( ep );
}
catch( const std::exception& e )
{
std::cerr << e.what() << "\n";
}
return 0;
} );
};
auto s1 = make_count( files[ 0 ] );
auto s2 = make_count( files[ 1 ] );
auto s3 = make_count( files[ 2 ] );
// Композиция: parallel → vector → sum
auto pipeline =
ex::when_all( std::move( s1 ), std::move( s2 ), std::move( s3 ) ) |
ex::then(
[]( size_t a, size_t b, size_t c ) {
return std::vector< size_t >{ a, b, c };
} ) |
ex::then( []( std::vector< size_t > v )
{ return std::accumulate( v.begin(), v.end(), size_t{ 0 } ); } );
auto result = ex::sync_wait( std::move( pipeline ) );
if( result )
{
auto [ r ] = result.value();
std::cout << "[Main] Total words = " << r << "\n";
}
}
В третьих, добавляется удобная обработка ошибок и кооперативная отмена
Адаптор upon_error
позволяет унести try/catch
из main
, сделав логику ошибок отдельной частью кода. Это перехват и обработка ошибки без изменения пайплайна. Он не заменяет ошибку, а просто добавляет "побочный эффект". Это действительно удобно, когда хочешь залогировать ошибку или напечатать сообщение в консоль или метрику в мониторинг.
| ex::upon_error([]( std::exception_ptr ep )
{
try
{
if( ep )
std::rethrow_exception( ep );
}
catch( const std::exception& e )
{
std::cerr << "[Error] " << e.what() << "\n";
}
return -1;
} );
Адаптор let_error
- это замена ошибки на новый sender, когда перехватывается выброшенный set_error
, и подменяет на новый sender. Позволяет превратить ошибку в set_value
, set_stopped
или даже другую ошибку. Например, перевести ошибку в остановку - just_stopped()
или перезапустить логику с другим путём.
#include <fstream>
#include <iostream>
#include <stdexec/execution.hpp>
namespace ex = stdexec;
int main()
{
// Построим самый простой sender, который "читает" файл
auto sender = ex::just( std::string{ "missing.txt" } )
// Пытаемся открыть файл и, если не можем — бросаем исключение
| ex::then(
[]( std::string path ) -> size_t
{
std::ifstream in( path );
if( !in )
throw std::runtime_error( "File not found: " + path );
// Тут может быть ваша реализация чтения/обработки...
return 0;
} )
// 4) let_error перехватывает любое исключение и вместо ошибки
// возвращает sender, который сразу отправит set_stopped()
| ex::let_error(
[]( std::exception_ptr )
{
// just_stopped() — sender, вызывающий set_stopped()
return ex::just_stopped();
} );
auto result = ex::sync_wait( std::move( sender ) );
if( result )
{
// Если value пришло, значит файл нашёлся
auto [ r ] = result.value();
std::cout << "File opened successfully, result = " << r << "\n";
}
else
{
// Пустой optional — либо ошибка, либо остановка через set_stopped
std::cout << "Operation was cancelled or file not found\n";
}
}
Из оставшегося, но не менее приятного:
Корутинная интеграция ("P3109R0"). C++26 позволит напрямую co_await любой single-value sender: co_await pipeline.
При необходимости можно реализовать task<T>
как корутину-sender, чтобы иметь полный контроль над жизненным циклом.
Parallel Algorithms и policy_aware_scheduler (P2079R5)
Перегруженные параллельные алгоритмы в C++26 примут scheduler
вместо execution_policy
:
std::for_each( sched, v.begin(), v.end(), []( auto& x ) { x *= 2; } );
В рамках текущих инструментов можно будет несколько расширить паттерны и реактивный подход С++, как стартовые идеи:
Backpressure - можно вызывать set_stopped при переполнении буфера, downstream возобновляет работу через set_value по событию освобождения.
Fan-out / Fan-in - динамическое порождение senders и агрегирование через when_all_range для адаптивных задач.
Split / Multicast - один источник данных — и несколько потребителей должны получить одинаковый результат, без повторного вычисления.
Retry с экспоненциальным backoff - если задачу бросила ошибка (например, сеть упала), хотим повторить её несколько раз с увеличивающейся задержкой
Batching, Circuit-breaker, throttling, debounce и т.д. и т.д. и т.д.
Не без ложки дёгтя, а именно, проблемы и ограничения (2025)
Сложность API и крутая кривая вхождения. Принципиальная сила модели строится на шаблонах, концептах и множествах CPO (
then
,let_value
,upon_error
,when_all
и т. д.). На первый взгляд выглядит, как «лес из метапрограммирования».Отсутствие стандартного
thread_pool
в<execution>
— придётся выбирать сторонний: NVIDIA stdexec, Boost, TBB, Folly, libunifex или ваша собственная реализация.Пока что это ручная отмена, посредством проверки
stop_requested()
внутриthen
. На 2025 год в модели P2300 отмена передаётся черезstop_token
вenv
, но более удобное средство — адаптерlet_stopped
— ещё не стало общепринятым. Поэтому чтобы действительно прерывать сложные циклы внутриthen
, нужно вручную проверять флаг отмены или бросать своё исключениеIDE пока не умеют визуализировать DAG senders, но это пока...
Заключение и ресурсы
Senders/Receivers в C++26 меняют парадигму асинхронности с низкоуровневого кодирования на декларативное описание графа задач:
Вы больше не смешиваете логику работы с потоками, обработку ошибок и отмену в своих бизнес-функциях. Каждый этап конвейера/пайплайна — свой sender, свой адаптер (
then
,upon_error
,let_error
и т. д.) и свой receiver.Благодаря static-inspection (
completion_signatures_of_t
) и строгим концептам вы сразу видите, какие значения, ошибки или остановки могут пройти по вашему DAG-у. Поменялись ожидания — поменялся и подход к тестированию.Абстракция scheduler освобождает вас от навязчивых реализаций thread-pool или event-loop. Параллельные алгоритмы принимают scheduler прямо в
<algorithm>
/<ranges>
, а кастомные schedulers (GPU, GUI, реактор) легко встраиваются.tag_invoke
позволяет «приварить» профилирование, трассировку, backpressure-механику, retry-политику и любую другую логику на уровне CPO, не портишь чистоту конвейера.co_await sender
иtask<T>
превращают корутины в полноправных участников DAG-а, устраняя разрозненностьstd::future
, Asio иco_await
. Всё едино и взаимосвязано.
Остаётся надеяться, что однажды C++ перестанет быть «эпическим испытанием», и станет языком, на котором просто хочется писать.
Полезные источники:
P2300R10 (std::execution) wg21.link
P2079R5 (policy_aware_scheduler) wg21.link
P3109R0 (co_await sender ) isocpp
NVIDIA эталонная реализация P2300 GitHub
stdexec Dev Documentation — документация по
stdexec
(User’s Guide и Reference)Zenn.dev: WG21/P2300 std::execution
Косинцев Артём
Инженер-программист