Как стать автором
Обновить

Senders/Receivers в C++26: от теории к практике

Уровень сложностиСредний
Время на прочтение15 мин
Количество просмотров4.2K

Практически каждый день мы сталкиваемся с необходимостью писать параллельный и асинхронный код: загрузка сетевых данных, взаимодействие с БД, UI-обработка, фоновые вычисления. Традиционные инструменты — std::thread, std::async, std::future — порождают boilerplate, разбросанные try/catch, сложную отмену и «callback hell».

Взамен этого C++26 стандартизирует декларативную модель Senders/Receivers в заголовке <execution>, дополняемую совместимостью с корутинам и расширением параллельных алгоритмов через policy_aware_scheduler. В результате мы получаем:

  • Единый API для асинхронности и параллелизма, устраняющий «callback-hell» и громоздкость std::future/std::async

  • Композиционные конвейеры (| then | when_all | upon_error), позволяющие строить гибкие DAG(Directed Acyclic Graph) задачи

  • Контроль отмены и обработку ошибок как неотъемлемую часть пайплайна (set_stopped, upon_error)

  • Интеграцию корутин (co_await sender) и передачу scheduler в параллельные алгоритмы (std::for_each)

Чтобы оценить и разобраться с предстоящими нововведениями, в этой статье:

  • Разберём три ключевые абстракции: Scheduler, Sender, Receiver.

  • Поговорим об инспекции кода (completion_signatures_of_t).

  • Реализуем профилирование через tag_invoke.

  • Объединим всё это в небольших примерах с NVIDIA stdexec.


Почему понадобился новый и единый подход?

Попытки реализовать асинхронность в C++ предпринимались давно — и не всегда удачно. До появления Senders/Receivers С++ опирался на такие инструменты, как std::async, std::future, Boost.Asio и корутины co_await. Они помогали решать задачи, но по мере роста сложности приложений обнаружились ограничения, мешающие их эффективному применению в реальных сценариях.

Появившись в C++11, std::async и std::future стали первым шагом к удобной асинхронности на уровне стандарта. Однако эти инструменты быстро показали свою несостоятельность в более сложных случаях:

  • Нельзя выразить последовательность задач декларативно, приходится вручную вызывать get() и обрабатывать состояния;

  • Eсли задача уже запущена — остановить её невозможно, поскольку нет предусмотренного механизма отмены;

  • Отсутствие интеграции с планировщиками и I/O: нельзя задать, где именно выполнять задачу, или привязать к событиям ввода-вывода;

  • Нет полноценной обработки ошибок, исключения пробрасываются через future::get(), что создаёт хаос в контроле потока исполнения;

  • Результат одной задачи нельзя передать другой без вложенных вызовов и бойлерплейта.

  • И, наверняка, можно продолжать еще долго, постепенно углубляясь в реализации...

Тот же Boost.Asio обеспечил высочайшую производительность в I/O, но расплатой за это стала крайне низкая читаемость. Композиция асинхронных операций строится через вложенные лямбды — отсюда и "callback hell". Это затрудняет не только поддержку, но и любую рефакторизацию:

async_read(..., [&](...) {
  async_write(..., [&](...) {
    async_timer(..., [&](...) {
      // Довольно больно
    });
  });
});

Кроме того, управление состоянием задачи (ошибки, отмена, сбои) перекладывается на плечи разработчика (надежно, как указатель в C++) .

С введением co_await в C++20 появилась надежда — императивный стиль вернулся в асинхронный код. Но оказалось, что корутины: не умеют напрямую "понимать" std::future или asio::awaitable; не композируются между собой без дополнительной инфраструктуры; не интегрируются с scheduler или I/O без написания собственных адаптеров; имеют неочевидные правила lifetime и сложные семантики отмены.

Поэтому Senders/Receivers - это системный ответ на фрагментированность и разобщённость подходов к асинхронному программированию в C++, который интегрирует их в единую архитектуру, способную масштабироваться от однопоточного embedded-кода до реактивных микросервисов и GPU-обработки данных.


Архитектура Senders/Receivers (с чего все начинается)

Новая модель базируется на трёх абстракциях:

1) Scheduler описывает контекст исполнения (thread-pool, inline, I/O-реактор) без жёсткой привязки к конкретной реализации. Наглядные реализации:

  • inline_scheduler — выполняет задачи синхронно сразу,

  • static_thread_pool::scheduler из NVIDIA stdexec,

  • reactor_scheduler (I/O на io_uring),

  • по хабам разбросаны и другие пользовательские GPU/GUI schedulers.

2) Sender — ленивое описание асинхронной операции. Он не начинает работу, пока не будет подписан или не передан адаптеру:

  • Single-value senders: sync_wait, just, then, co_await sender.

  • Multi-value senders: when_all, when_any, when_all_range.

Как это работает? За кулисами Senders/Receivers определены два низкоуровневых CPO (равно как и в coroutine interop ("P3109R0")):

  • std::execution::connect(Sender, Receiver) — связывает sender и receiver, создавая «operation state» (состояние операции) без запуска задачи

  • std::execution::start(OperationState&) — запускает уже сконструированную операцию, переводя её в активный режим исполнения

Поэтому мы можем смело отложить запуск задачи до полной компоновки графа задач, агрегировать аллокации (единственная память для всех operation state) и настроить отмену (через get_stop_token) до фактического старта.

Если бы всё запускалось сразу, мы потеряли бы контроль над инициализацией контекста, отменой и объединением нескольких senders в DAG.

3) Receiver — это коллбэк с тремя каналами:

  • set_value(v…) при результате,

  • set_error(exception_ptr) при исключении,

  • set_stopped() при отмене.

И, куда же без них, разные адаптеры:

  • just(v…) создаёт sender, который сразу отдаёт значение при подписке.

  • read_env(get_scheduler) извлекает scheduler из окружения.

  • then(sender, func) — принимает sender (можно и от предыдущего адаптера) и возвращает обычное готовое значение (или tuple значений), и запаковывает этот результат в новый sender (если звать подряд - будет sender<sender<int>> и т.д. :

    auto s = just( 10 ) | then( []( int x ) { return x * 2; } ); - простой трансформер

  • let_value(sender, func) — асинхронное продолжение, которое также возвращает sender, но не углубляя вложенность. Требует, чтобы func возвращал sender(другую асинхронную работу), и «раскрывает» его результаты напрямую, объединяя два шага в один DAG-узел.

  • when_all(s1,s2,…) — ждёт всех senders, отдаёт set_value(v1,v2,…).

  • when_any(s1,s2,…) — завершается первым.

  • when_all_range(container) — принимает run-time коллекцию senders.

  • sync_wait - блокирующий вызов, возвращающий optional<tuple<...>> (пустой при ошибке или остановке)

По ходу статьи я буду приводить примеры, попутно задействуя большую часть данных инструментов. Для экспериментов буду использовать референсную реализацию NVIDIA stdexec, пока <execution> не появится в стандартной библиотеке.

Начнем знакомство с малого, и переведем слова в верхний регистр:

#include <iostream>
#include <stdexec/execution.hpp>
#include <string>

namespace ex = stdexec;

int main()
{
   // Создаем sender посредством just, который сразу отправляет строку "Hello"
   auto sender = ex::just( std::string( "Hello" ) ) |
                 ex::then(
                    []( auto&& s )
                    {
                       for( auto& c : s )
                          c = std::toupper( c );
                       return s; 
                       // возвращает sender<std::string>
                    } ) |
                 ex::let_value(
                    []( std::string s )
                    {
                       return ex::just( s + "!" ); 
                       // возвращает sender<std::string>
                    } );

   auto result = ex::sync_wait( sender ); 
   // sync_wait автоматически connect+start

   if( result )
   {
      auto [ str ] = result.value(); // Распаковка кортежа
      std::cout << "Результат: " << str << std::endl;
   }
}

1) ex::just("Hello") сразу передаёт дальше по графу задач строку "Hello", облеченную в sender -> (sender<std::string>).

2) Первый then берёт эту строку через forwarding auto&&, модифицирует её и возвращает просто строку. В результате после второго шага у нас по-прежнему sender<std::string>.

3) let_value видит, что возвращен sender, распаковывает вложенный sender и делает так, чтобы он стал частью общей цепочки. В итоге sender выдаёт непосредственно "HELLO!" через set_value.

Если бы вместо let_value использовался then, то лямбда [](std::string s){ return ex::just(s + "!"); } вернула бы sender<std::string>, и then упаковал бы его как значение, результатом стало бы sender<sender<std::string>>, что не то, чего мы хотим.

Следующим примером пойдем через пул потоков считать количество слов в файлах, добавим обработку исключений и суммарный подсчет слов.

// NVIDIA-пул потоков для примера
#include <exec/static_thread_pool.hpp> 
#include <fstream>
#include <iostream>
#include <stdexec/execution.hpp>

namespace ex = stdexec;

int main()
{
   // Создаём NVIDIA-пул из 3 потоков и получаем его scheduler:
   exec::static_thread_pool pool{ 3 };
   auto sched = pool.get_scheduler();

   // Мини-фабрика sender-ов, каждый считает слова в своём файле:
   auto make_count = [ &sched ]( std::string path )
   {  //     переключаемся в pool
      return ex::schedule( sched ) 
             // передаём путь
             | ex::then( [ p = std::move( path ) ] { return p; } )
             | ex::then(
                  []( std::string&& p ) -> size_t
                  {
                     // читаем и считаем слова
                     std::ifstream in( p );

                     if( !in ) // выбросим исключение, обработчик ниже
                        throw std::runtime_error( "Cannot open: " + p );

                     size_t cnt = 0;
                     std::string w;
                     while( in >> w )
                        ++cnt;
                     std::cout << "Count in " << p << " = " << cnt << std::endl;

                     return cnt;
                  } ) | 
             ex::upon_error( // наш обработчик по всем исключениям
                []( std::exception_ptr ep ) -> size_t
                {
                  // Если в любом шаге произошло исключение 
                  // upon_error вернёт 0, и sync_wait отработает корректно.
                   try
                   {
                      if( ep )
                         std::rethrow_exception( ep );
                   }
                   catch( const std::exception& e )
                   {
                      std::cerr << e.what() << std::endl;
                   }
                   return 0;
                } );
   };

   // Строим три sender-а — по одному на файл:
   auto s1 = make_count( "file1.txt" );
   auto s2 = make_count( "file2.txt" );
   auto s3 = make_count( "file3.txt" );

   // Собираем в один pipeline:
   // - when_all запускает все три параллельно,
   // - первый then упаковывает результаты в вектор,
   // - второй then суммирует вектор.
   auto pipeline =
      ex::when_all( std::move( s1 ), std::move( s2 ), std::move( s3 ) ) |
      ex::then(
         []( size_t a, size_t b, size_t c ) {
            return std::vector< size_t >{ a, b, c };
         } ) |
      ex::then( []( std::vector< size_t > v ) { return std::accumulate( v.begin(), v.end(), size_t{ 0 } ); } );

   // Запуск и синхронное ожидание:
   auto result = ex::sync_wait( std::move( pipeline ) );

   if( result )
   {
      auto [ total ] = result.value();
      std::cout << "Total words = " << total << "\n";
   }
}

Такой подход даёт чистый и композиционный стиль: каждая операция — отдельный этап пайплайна, всё легко читать и масштабировать.

Но на этом потенциальная польза не заканчивается — Sender/Receiver-подход открывает дверь к множеству дополнительных, ранее сложных или неявных возможностей.


Во-первых, статическая инспекция: completion_signatures_of_t

Благодаря типобезопасности и метапрограммным трюкам (value_types_of_t, completion_signatures_of_t), вы можете на этапе компиляции проверить, что конкретный sender порождает именно те значения или ошибки, с которыми вы готовы работать. Это исключает целый класс рантайм-ошибок — пайплайн можно “просмотреть” как тип, а не как черный ящик.

Фактически мы говорим про статическую, декларативную инспекцию возможностей асинхронного звена в конвейере задач.

Каждый sender декларирует, какие каналы завершения он поддерживает.

#include <stdexec/execution.hpp>
#include <string>
#include <type_traits>

namespace ex = stdexec;

int main()
{
   // Тип самого sender-а:
   using Sender = decltype( ex::just( std::string{} ) );
   // Окружение: нам интересен get_scheduler, но для just() оно не влияет на значения
   using Env = ex::env< ex::get_scheduler_t >;

   // Вычисляем фактические сигнатуры:
   using actual_sigs = ex::completion_signatures_of_t< Sender, Env >;
   // Задаём ТОЛЬКО ожидаемый набор: just() выдаёт ровно set_value(string)
   using expected_sigs = ex::completion_signatures< ex::set_value_t( std::string ) >;

   static_assert( std::is_same_v< actual_sigs, expected_sigs >,
                  "completion_signatures_of_t<just(string)> must be <set_value(string)>" );

   return 0;
}

В модели Senders/Receivers частью сигнатур адаптера является environment — совокупность дополнительных параметров (scheduler, allocator, stop_token). Мы выбираем минимальное окружение, содержащее только get_scheduler_t. Для senders, не использующих scheduler, это окружение «нейтрально», но его обязательно указывать для метапрограммной проверки.

completion_signatures_of_t - это метафункциональный CPO, который извлекает из sender-а набор допустимых «каналов» завершения — именно set_value, set_error, set_stopped вместе с типами их аргументов. По сути, это чисто статическая информация, доступная на этапе компиляции.

completion_signatures - это шаблон для явного конструирования ожидаемого набора каналов. Здесь мы утверждаем, что just(string) исключительно выдаёт set_value(string) и не поддерживает error или stopped. Любое отличие считается ошибкой.

Такой подход позволяет провести статический анализ, static_assert согласованный API и покрытие тестов для всех ветвей DAG:

#include <exec/static_thread_pool.hpp>
#include <iostream>
#include <stdexec/execution.hpp>
#include <type_traits>

namespace ex = stdexec;

int main()
{
   // Создаём пул потоков и получаем scheduler
   exec::static_thread_pool pool{ 2 };
   auto sched = pool.get_scheduler();
   using Env = ex::env< ex::get_scheduler_t >;

   // schedule(sched) возвращает sender, который при старте вызовет set_value()
   // (без аргументов), то есть его «value»-канал имеет тип void.
   auto stage1 = ex::schedule( sched )
      // then перехватывает результат void() и превращает его в std::vector<int>
      | ex::then(
           []() {
              return std::vector< int >{ 1, 2, 3, 4, 5 };
           } );

   // Какие каналы completion здесь?
   //  - set_error(exception_ptr) — если при scheduling или в лямбде выбросилось
   //  - set_stopped()           — если отменили задачу до или во время выполнения
   //  - set_value(vector<int>)  — успешный результат с нашим вектором

   using stage1_sigs = ex::completion_signatures_of_t< decltype( stage1 ), Env >;
   using expected_stage1 = ex::completion_signatures< 
              ex::set_error_t( std::exception_ptr ), ex::set_stopped_t(),
              ex::set_value_t( std::vector< int > ) >;
  
   static_assert( std::is_same_v< stage1_sigs, expected_stage1 >, 
                 "Stage1: unexpected signatures" );
   // Здесь всё ок: void→vector<int> после then, плюс стандартные error/stopped.

   // === Pipeline: then(vector→int) + upon_error → int ===
   auto pipeline = stage1
   // then ─ берёт на вход vector<int> из stage1
   // и "заменяет" канал set_value(vector<int>) на новый канал set_value(int).
   | ex::then( []( const std::vector< int >& v ) { return std::accumulate( v.begin(), v.end(), 0 ); } )
   //   ex::upon_error ─ перехватывает любой exception_ptr из предыдущих senders
   | ex::upon_error([]( std::exception_ptr ep )
                    {
                       try
                       {
                          if( ep )
                             std::rethrow_exception( ep );
                       }
                       catch( const std::exception& e )
                       {
                          std::cerr << "[Error] " << e.what() << "\n";
                       }
                       return -1;
                    } );

   // Инспекция completion_sigs для pipeline:
   // 1) set_error(exception_ptr)  — любая ошибка на любом этапе
   // 2) set_stopped() — кооперативная отмена (schedule + then поддерживают stopped)
   // 3) set_value(int) — единственный value'канал после второго then/upon_error
   using pipeline_sigs = ex::completion_signatures_of_t< decltype( pipeline ), Env >;
   
   using expected_pipeline =
      ex::completion_signatures< ex::set_error_t( std::exception_ptr ), 
      ex::set_stopped_t(), ex::set_value_t( int ) >;
  
   static_assert( std::is_same_v< pipeline_sigs, expected_pipeline >, 
                 "Pipeline: unexpected signatures" );
   // Здесь нет set_value(vector<int>) от предыдущего then

   auto result = ex::sync_wait( std::move( pipeline ) );
   if( result )
   {
      auto [ r ] = result.value();
      std::cout << "Result = " << r << "\n"; // Вывод: Result = 15
   }
}

Данный способ наглядно позволяет формально описывать поведение асинхронных операций без запуска, в чисто типовом виде и выявлять потенциальные ошибки и несогласованности в адаптерах, особенно при сложных преобразованиях потоков данных и исключений. Может, немного, но это может быть удобнее.


Во-вторых, профилирование через tag_invoke

В <execution> для каждого «core operation» (schedule, connect, start и т.д.) определён соответствующий customization-point object (CPO), реализованный через вызов tag_invoke.

При вызове

ex::schedule(some_scheduler);

стандартная реализация ищет свободную функцию

tag_invoke(ex::schedule_t, /* ваш тип scheduler */ )

и если она найдена — вызывает именно её.

Мы оборачиваем schedule для собственного profiling_scheduler, чтобы автоматически замерять производительность:

// Profiling-scheduler: обёртка над реальным scheduler-ом
 struct profiling_scheduler
 {
   exec::static_thread_pool::scheduler base;
 };

Ниже по коду мы определяем свой кастомный tag_invoke:

// Перехват schedule(ps) через tag_invoke
inline auto tag_invoke( ex::schedule_t, profiling_scheduler ps )
{
   // Оригинальный sender<void> для переноса в пул
   auto s = ex::schedule( ps.base );
   // Вешаем then-лямбду, чтобы замерить
   return s | ex::then(
                 [ start = std::chrono::steady_clock::now() ]()
                 {
                    auto dur = std::chrono::steady_clock::now() - start;
                    std::cout << "[Profiling] schedule→execution: " 
                              << dur.count() << std::endl;
                 } );
}

В результате каждый schedule(ps) выводит задержку переводa задачи в пул без правки бизнес-логики, иначе говоря, замеряет время от вызова schedule(...) до реального начала выполнения задачи в пуле.

Способов применения масса, а вот как выглядит доработанный пример:

#include <exec/static_thread_pool.hpp>
#include <fstream>
#include <iostream>
#include <stdexec/execution.hpp>

namespace ex = stdexec;

// Profiling-scheduler: обёртка над реальным scheduler-ом
struct profiling_scheduler
{
   exec::static_thread_pool::scheduler base;
};

// Перехват schedule(ps) через tag_invoke
inline auto tag_invoke( ex::schedule_t, profiling_scheduler ps )
{
   // Оригинальный sender<void> для переноса в пул
   auto s = ex::schedule( ps.base );
   // Вешаем then-лямбду, чтобы замерить
   return s | ex::then(
                 [ start = std::chrono::steady_clock::now() ]()
                 {
                    auto dur = std::chrono::steady_clock::now() - start;
                    std::cout << "[Profiling] schedule→execution: " 
                              << dur.count() << std::endl;
                 } );
}

int main()
{
   // Готовим пул и profiling_scheduler
   exec::static_thread_pool pool{ 3 };
   profiling_scheduler ps{ pool.get_scheduler() };
   auto sched = ps;

   std::vector< std::string > files = { "file1.txt", "file2.txt", "file3.txt" };

   // 6. Фабрика senders для подсчёта слов
   auto make_count = [ & ]( std::string path )
   {
      return ex::schedule( sched ) | ex::then( [ p = std::move( path ) ] 
                                               { return p; } ) |
             ex::then(
                [ & ]( std::string p ) -> size_t
                {
                   std::ifstream in( p );
                   if( !in )
                      throw std::runtime_error( "Cannot open " + p );

                   size_t cnt = 0;
                   std::string w;
                   while( in >> w )
                   {

                      ++cnt;
                   }
                   std::cout << "Count in " << p << " = " << cnt << std::endl;

                   return cnt;
                } ) |
             ex::upon_error(
                []( std::exception_ptr ep ) -> size_t
                {
                   try
                   {
                      if( ep )
                         std::rethrow_exception( ep );
                   }
                   catch( const std::exception& e )
                   {
                      std::cerr << e.what() << "\n";
                   }
                   return 0;
                } );
   };

   auto s1 = make_count( files[ 0 ] );
   auto s2 = make_count( files[ 1 ] );
   auto s3 = make_count( files[ 2 ] );

   // Композиция: parallel → vector → sum
   auto pipeline =
      ex::when_all( std::move( s1 ), std::move( s2 ), std::move( s3 ) ) |
      ex::then(
         []( size_t a, size_t b, size_t c ) {
            return std::vector< size_t >{ a, b, c };
         } ) |
      ex::then( []( std::vector< size_t > v ) 
               { return std::accumulate( v.begin(), v.end(), size_t{ 0 } ); } );

   auto result = ex::sync_wait( std::move( pipeline ) );
   if( result )
   {
      auto [ r ] = result.value();
      std::cout << "[Main] Total words = " << r << "\n";
   }
}

В третьих, добавляется удобная обработка ошибок и кооперативная отмена

Адаптор upon_error позволяет унести try/catch из main, сделав логику ошибок отдельной частью кода. Это перехват и обработка ошибки без изменения пайплайна. Он не заменяет ошибку, а просто добавляет "побочный эффект". Это действительно удобно, когда хочешь залогировать ошибку или напечатать сообщение в консоль или метрику в мониторинг.

| ex::upon_error([]( std::exception_ptr ep )
                    {
                       try
                       {
                          if( ep )
                             std::rethrow_exception( ep );
                       }
                       catch( const std::exception& e )
                       {
                          std::cerr << "[Error] " << e.what() << "\n";
                       }
                       return -1;
                    } );

Адаптор let_error - это замена ошибки на новый sender, когда перехватывается выброшенный set_error, и подменяет на новый sender. Позволяет превратить ошибку в set_value, set_stopped или даже другую ошибку. Например, перевести ошибку в остановку - just_stopped() или перезапустить логику с другим путём.

#include <fstream>
#include <iostream>
#include <stdexec/execution.hpp>

namespace ex = stdexec;

int main()
{
   // Построим самый простой sender, который "читает" файл
   auto sender = ex::just( std::string{ "missing.txt" } )
                 // Пытаемся открыть файл и, если не можем — бросаем исключение
                 | ex::then(
                      []( std::string path ) -> size_t
                      {
                         std::ifstream in( path );
                         if( !in )
                            throw std::runtime_error( "File not found: " + path );
                         // Тут может быть ваша реализация чтения/обработки...
                         return 0;
                      } )
                 // 4) let_error перехватывает любое исключение и вместо ошибки
                 // возвращает sender, который сразу отправит set_stopped()
                 | ex::let_error(
                      []( std::exception_ptr )
                      {
                         // just_stopped() — sender, вызывающий set_stopped()
                         return ex::just_stopped();
                      } );

   auto result = ex::sync_wait( std::move( sender ) );

   if( result )
   {
      // Если value пришло, значит файл нашёлся
      auto [ r ] = result.value();
      std::cout << "File opened successfully, result = " << r << "\n";
   }
   else
   {
      // Пустой optional — либо ошибка, либо остановка через set_stopped
      std::cout << "Operation was cancelled or file not found\n";
   }
}

Из оставшегося, но не менее приятного:

Корутинная интеграция ("P3109R0"). C++26 позволит напрямую co_await любой single-value sender: co_await pipeline.

При необходимости можно реализовать task<T> как корутину-sender, чтобы иметь полный контроль над жизненным циклом.

Parallel Algorithms и policy_aware_scheduler (P2079R5)

Перегруженные параллельные алгоритмы в C++26 примут scheduler вместо execution_policy:

std::for_each( sched, v.begin(), v.end(), []( auto& x ) { x *= 2; } );

В рамках текущих инструментов можно будет несколько расширить паттерны и реактивный подход С++, как стартовые идеи:

Backpressure - можно вызывать set_stopped при переполнении буфера, downstream возобновляет работу через set_value по событию освобождения.

Fan-out / Fan-in - динамическое порождение senders и агрегирование через when_all_range для адаптивных задач.

Split / Multicast - один источник данных — и несколько потребителей должны получить одинаковый результат, без повторного вычисления.

Retry с экспоненциальным backoff - если задачу бросила ошибка (например, сеть упала), хотим повторить её несколько раз с увеличивающейся задержкой

Batching, Circuit-breaker, throttling, debounce и т.д. и т.д. и т.д.


Не без ложки дёгтя, а именно, проблемы и ограничения (2025)

  • Сложность API и крутая кривая вхождения. Принципиальная сила модели строится на шаблонах, концептах и множествах CPO (then, let_value, upon_error, when_all и т. д.). На первый взгляд выглядит, как «лес из метапрограммирования».

  • Отсутствие стандартного thread_pool в <execution> — придётся выбирать сторонний: NVIDIA stdexec, Boost, TBB, Folly, libunifex или ваша собственная реализация.

  • Пока что это ручная отмена, посредством проверки stop_requested() внутри then. На 2025 год в модели P2300 отмена передаётся через stop_token в env, но более удобное средство — адаптер let_stopped — ещё не стало общепринятым. Поэтому чтобы действительно прерывать сложные циклы внутри then, нужно вручную проверять флаг отмены или бросать своё исключение

  • IDE пока не умеют визуализировать DAG senders, но это пока...


Заключение и ресурсы

Senders/Receivers в C++26 меняют парадигму асинхронности с низкоуровневого кодирования на декларативное описание графа задач:

  • Вы больше не смешиваете логику работы с потоками, обработку ошибок и отмену в своих бизнес-функциях. Каждый этап конвейера/пайплайна — свой sender, свой адаптер (then, upon_error, let_error и т. д.) и свой receiver.

  • Благодаря static-inspection (completion_signatures_of_t) и строгим концептам вы сразу видите, какие значения, ошибки или остановки могут пройти по вашему DAG-у. Поменялись ожидания — поменялся и подход к тестированию.

  • Абстракция scheduler освобождает вас от навязчивых реализаций thread-pool или event-loop. Параллельные алгоритмы принимают scheduler прямо в <algorithm>/<ranges>, а кастомные schedulers (GPU, GUI, реактор) легко встраиваются.

  • tag_invoke позволяет «приварить» профилирование, трассировку, backpressure-механику, retry-политику и любую другую логику на уровне CPO, не портишь чистоту конвейера.

  • co_await sender и task<T> превращают корутины в полноправных участников DAG-а, устраняя разрозненность std::future, Asio и co_await. Всё едино и взаимосвязано.

Остаётся надеяться, что однажды C++ перестанет быть «эпическим испытанием», и станет языком, на котором просто хочется писать.

Полезные источники:

Косинцев Артём

Инженер-программист

Теги:
Хабы:
+29
Комментарии18

Публикации

Работа

QT разработчик
5 вакансий
Программист C++
96 вакансий

Ближайшие события