Как стать автором
Обновить
95.18
Рейтинг
ISPsystem
Софт для управления IT-инфраструктурой

Каналы на корутинах С++

Блог компании ISPsystem Программирование *C++ *
Tutorial

В предыдущей статье я описал простой путь создания генераторов на корутинах С++. На мой взгляд генераторы неплохо демонстрируют работу с такими объектами как coroutine_handle и promise_type. На этот раз речь пойдет об awaitable объектах — еще одной неотъемлемой части поддержки корутин в С++. А рассматривать мы их будем на примере реализации каналов, аналогичных каналам в GoLang. Как С++ разработчик, я не в восторге от многих решений принятых в GoLang, но в их каналы влюбился с первого взгляда. Итак, приступим!

Ставим амбициозную цель

Я задался вопросом: можно ли сделать в C++ что-нибудь, что будет похоже на каналы в GoLang? Простое чтение и запись каналов — задачи тривиальные, поэтому я решил: «а не замахнуться ли нам на Вильяма нашего Шекспира», то есть на select. За образец была взята следующая функция на GoLang:

func test(out1 <-chan int, out2 <-chan string) {
	select {
	case value := <-out1:
		fmt.Println("got int ", value)
	case value := <-out2:
		fmt.Println("got string ", value)
	}
}

Первая задача, которая перед мной встала, — это вообразить, как всё это может выглядеть на C++. Итогом моих размышлений стали два варианта: через std::variant и через лямбды. Сразу оговорюсь, я не хотел использовать никаких #define.

/* Аналог на variant */
task coroutine(channel<int>::out out1, channel<std::string>::out out2) {
  auto result = co_await select(out1, out2);
  if (auto *v = std::get_if<int>(&result); v != nullptr) {
    std::cout << "got int " << *v << std::endl;
  } else if (auto *v = std::get_if<std::string>(&result); v != nullptr) {
    std::cout << "got string " << *v << std::endl;
  }
}

/* Аналог на лямбдах */
task coroutine(channel<int>::out out1, channel<std::string>::out out2) {
  co_await select(
    out1 >> [](auto value) {
      std::cout << "got int " << value << std::endl;
    },
    out2 >> [](auto value) {
      std::cout << "got string " << value << std::endl;
    }
  );
}

У каждого из этих вариантов есть свои плюсы и минусы. Огромный минус первого — большое количество «странного кода», на вроде std::get_if<std::string>(&result). Помимо его ужасного вида, он позволяет легко ошибиться в типах и не позволяет использовать совместно однотипные каналы. Конечно, вместо типов при вызове std::get_if можно использовать индексы, но это тоже не очень красивое решение — у вас возникает два источника истины: при вызове select и в условии. Стоит поменять порядок параметров в select и забыть поменять номера в if, и в лучшем случае ваш код перестанет собираться, а в худшем — вы просто сломаете логику приложения.

«Почему же автор оставил этот убогий вариант?» — спросите вы. Ведь второй выглядит заметно привлекательнее! Тут соль в особенности совместной работы корутин и лямбд в С++, которая отняла несколько дней моей жизни. Да, лямбда может быть корутиной. Но дело в том, что лямбды, будучи корутинами, не захватывают значения, попавшие в замыкание. То есть всё, что было перечислено в квадратных скобках, пропадет после первого же co_await. Но при этом такой код скомпилируется и никакой tidy вам ничего не скажет :(

К тому же, вариант с std::variant немного проще в реализации и по сути является подготовительным шагом для второго. Кто не догадался при чем тут std::variant, result — это std::variant<int, std::string>, но об этом позже.

Немного теории

Как я писал выше, в данной статье речь в основном пойдет о реализации awaitable объектов. Если в предыдущей статье мы использовали и говорили о co_yield, в этой мы будем пользоваться co_await. Давайте немного разберемся в том, как он работает.

Данного описания логики работы co_await будет достаточно для понимания рассматриваемого примера, но оно не претендует на полноту. Например, я не буду подробно описывать все возможные варианты реализации await_suspend. Помимо void он может возвращать еще bool и std::coroutine_handle<>. Но ни то ни другое мне еще ни разу не понадобилось. Интересный пример на эту тему можно посмотреть в описании std::noop_coroutine на cppreference.com.

Так же я ничего не скажу об await_transform и перегрузке оператора co_await. Более полное описание можно найти на том же cppreference.com

Оператор co_await в качестве параметра принимает awaitable объект. Awaitable — это объект (структура или класс), реализующий следующие методы:

struct awaitable {
  bool await_ready() {}
  void await_suspend(std::coroutine_handle<>) {}
  // await_resume может возвращать любой тип, включая void
  auto await_resume() {}
};

В clang 12, на момент написания статьи, coroutine_handle и другие объекты и функции для работы с корутинами находились в пространстве имен std::experimental.

Первым делом co_await вызывает метод await_ready(). Если await_ready() возвращает true, то никакого прерывания корутины не происходит, тут же вызывается await_resume() и его результат возвращается в качестве результата вызова co_await. Если же await_ready() возвращает false, вызывается метод await_suspend() и после его завершения выполнение корутины приостанавливается вплоть до вызова coroutine_handle::resume().

Если вы запустите корутину как обычную функцию, после завершения await_suspend() функция-корутина «как бы завершится» и вернет вам в нашем случае task в качестве результата. На самом деле состояние всех локальных переменных при этом будет сохранено в куче. Они будут ждать возобновления её работы.

Ответственность за вызов resume целиком лежит на совести разработчика awaitable объекта. resume возобновляет корутину, вызывает метод await_resume() вашего awaitable объекта, результат которого опять же возвращается в корутину как результат вызова оператора co_await.

Если вы используете обратные вызовы (callback), а корутины можно рассматривать как частный случай таких приложений, важно следить за тем, чтобы callback (для корутин это coroutine_handle::resume()) вызывался В ЛЮБОМ СЛУЧАЕ. Даже если произошла ошибка, callback не должен быть проигнорирован.

Разминка. Накидываем каркас

Для начала определимся, что нам надо и напишем пустые заготовки. И так, нам нужен некий класс task с promise_type для компиляции корутины, channel — это собственно наш канал и некий select, возвращающий awaitable объект.

В нашей задаче task несет лишь вспомогательную функцию — объясняет компилятору какой promise_type использовать для корутины. Детали реализации task большой роли не играют. Вы можете заменить его на свой класс, со своей логикой. Поэтому я просто приведу его код с некоторыми пояснениями. Возможно в STL уже есть что-то готовое. Подскажите — благодарен буду.

Но вот использовать generator из предыдущей статьи совместно с каналами из этой не получится. Проблема в фундаментальной разнице между co_yield и co_await. Образно говоря, co_yield — просто приостановка выполнения функции, которая может быть в любой момент возобновлена. А вот co_await — ожидание какого-то события. В общем случае вы не можете такое событие поторопить. Гипотетически попытка принудительного возобновления функции прерванной co_await должна приводить к блокировке текущего потока выполнения — он будет ждать возникновения события. В однопоточном приложении это привело бы к зависанию всего приложения.

Поэтому, если вам нужен генератор, использующий в своей логике co_await, вам придется обращаться к нему также через co_await, а не простым вызовом, как я это делал в своей статье. И это будет совсем другой генератор.

class task {
public:
  struct promise_type {
    using suspend_never = std::suspend_never;
    // мы никак не будем управлять нашими корутинами извне поэтому
    // нашему классу task не нужно ничего знать о корутине
    task get_return_object() noexcept { return {}; }
    // наши корутины не будут возвращать значений. Поэтому в момент
    // завершения корутины будет вызван метод return_void
    // соответствующего promise_type
    void return_void() noexcept { }
    // никаких лишних остановок, код корутины начинает выполняться сразу
    suspend_never initial_suspend() noexcept { return {}; }
    // никаких задержек в конце, destroy() вызовется автоматически
    suspend_never final_suspend() noexcept { return {}; }
    // мы не обрабатываем непойманные исключения. а что с ними делать?
    void unhandled_exception() { std::terminate(); }
  };
private:
  // нечего создавать такие объекты вручную
  task() noexcept = default;
};

Чтобы определиться с интерфейсом channel нам надо всё же написать те самые тривиальные запись и чтение каналов. Мне захотелось, чтобы они выглядели следующим образом:

// чтение
channel<int>::out output;
auto value = co_await output;
// запись
channel<int>::in input;
co_await input << value;
// было бы проще использовать запись co_await input.write(value),
// но мне захотелось сделать именно так

Теперь мы знаем что, во первых, channel — это шаблон. Во вторых, он имеет два вложенных класса in для записи и out для чтения. Чтение и запись — асинхронные операции, следовательно in и out должны реализовывать интерфейс awaitable. Кроме того, нам понадобится еще некий helper для реализации конструкции co_await output << value . У меня получилось следующее:

using handle = std::coroutine_handle<>;

template <typename Type>
class channel {
public:
  class in {
  public:
    using value_type = Type;
    bool await_ready() const { return false; }
    void await_suspend(handle coro) {}
    auto await_resume() {
      // co_await имеет приоритет над operator <<, поэтому
      // await_resume возвращает helper реализующий operator <<
      // мы как бы будем получать разрешение на запись в канал,
      // а саму запись будем производить вызовом оператора <<
      struct [[nodiscard]] helper {
        void operator << (value_type value) {}
      };
      return helper{};
    }
  };

  class out {
  public:
    using value_type = Type;
    bool await_ready() const { return false; }
    void await_suspend(handle coro) {}
    value_type await_resume() {
      return {};
    }
  };

  // size как и в GoLang будет определять сколько значений
  // может храниться в канале. попытка записать больше будет
  // приводить к остановке записывающей корутины
  explicit channel(size_t size) {}
  operator in() { return {}; }
  operator out() { return {}; }
};

В своих примерах я не всегда расставляю [[nodiscard]] везде, где это требует тот же tidy. Но для функций, возвращающих awaitable объекты или различные helper объекты, это палочка выручалочка, спасающая от забытых co_await или тех же операторов записи. Особенно легко ошибиться, когда co_await ничего не возвращает. Вы просто забываете написать co_await, и всё собирается, но ничего не работает.

И наконец, заготовка на select. Здесь мы используем variadic template и формируем тип того самого std::variant.

template <typename ...Channels>
auto select(Channels ...channels) {
  // маркируем awaitable как [[nodiscard]], теперь компилятор не позволит
  // нам просто вызвать select, а намекнет, что мы что-то забыли :)
  struct [[nodiscard]] awaitable {
    bool await_ready() const { return false; }
    void await_suspend(handle coro) {}
    std::variant<typename Channels::value_type...> await_resume() {
      return {};
    }
  };
  return awaitable{};
}

Поздравляю! Наш код имеет все шансы быть скомпилированным. Но не заработать :)

Наращиваем мясо

Идея проста как мычание, пока в канале есть место — записываем данные в канал, пока в канале есть данные — читаем их из него без приостановки корутин. Если места или данных нет — складываем coroutine_handle в очереди и достаем их из очередей по мере того как данные вычитываются или наоборот появляются. Чтобы не загромождать статью кодом, приведу лишь реализацию класса in, класс out полностью аналогичен только без helper.

class in {
public:
  using value_type = channel::value_type;
  // проверяем наличие места в канале, если оно есть - просто сохраняем
  // данные и не блокируем вызвавшую нас корутину
  bool await_ready() const { return m_self->m_capacity > m_self->m_data.size(); }
  // await_suspend вызывается если await_ready вернул false
  // в нем мы сохраняем handle в очередь ожидающих возможности
  // записать данные
  void await_suspend(handle coro) { m_self->m_in_queue.emplace(coro); }
  // в момент вызова await_resume место в канале гарантированно есть
  auto await_resume() {
    struct [[nodiscard]] helper {
      channel *m_self;
      void operator << (value_type value) {
        // помещаем данные в канал
        m_self->m_data.template emplace(std::move(value));
        // проверяем очередь корутин ожидающих появления данных
        m_self->resume_out();
      }
    };
    return helper{m_self};
  }
private:
  // запрещаем создавать объекты in вручную
  explicit in(channel *self) noexcept : m_self{self} {}
  channel *m_self;
  friend class channel;
};

В классе channel дописываем конструктор и свойства для хранения данных и очередей ожидающих своего часа корутин.

template <typename Type>
class channel {
public:
  using value_type = Type;
  //
  // ...
  //
  
  explicit channel(size_t size) noexcept : m_capacity{size} {}

  operator in() noexcept { return in{this}; }
  operator out() noexcept { return out{this}; }
private:
    size_t m_capacity;
    std::queue<value_type> m_data;
    std::queue<handle> m_in_queue;
    std::queue<handle> m_out_queue;

    void resume_in() {
      if (!m_in_queue.empty()) {
        pop(m_in_queue).resume();
      }
    }
    
    void resume_out() {
      if (!m_out_queue.empty()) {
        pop(m_out_queue).resume();
      }
    }
};

Вот скажите мне, почему ни в одном STL контейнере нет метода pop(), который бы не только удалял, но и возвращал значение? Функция pop в моём примере делает как раз это.

Вот мы и получили каналы, в которые можно писать и из которых можно читать при помощи co_await. Данное решение будет работать, но у него есть пара серьезных недостатков:

  • Прямой вызов resume() из resume_in/resume_out может привести к неограниченному росту стека. Поэтому лучше делать такие вызовы через boost::asio::post или любой другой планировщик

  • Вторая проблема касается helper в in::await_resume. Дело в том, что никто не гарантирует, что в промежутке между завершением этого метода и вызовом оператора << место в канале не закончится. Такой эффект можно легко получить следующей последовательностью co_await chan << co_await another_chan. Поэтому неплохо было бы знать, сколько объектов helper данного канала создано и резервировать под них место

Магия шаблонов

Вот мы и подошли к реализации select. Если channel::in и channel::out сами представляют из себя awaitable объекты, то select — это функция, которая возвращает awaitable объект.

Но, прежде чем перейти к реализации методов, нам надо как-то сохранить разнотипные каналы, полученные в качестве параметров. Ничего лучшего для этой цели чем std::tuple я не знаю, поэтому его и использовал.

template <typename ...Channels>
auto select(Channels ...channels) {
  struct [[nodiscard]] awaitable {
    std::tuple<Channels...> channels;
		// ...
  };
  // channel<>::out - по сути один указатель,
  // поэтому везде передаем его по значению
  return awaitable{std::make_tuple(channels...)};
}

Это было не сложно. Теперь давайте попробуем по очереди разобраться с методами awaitable и начнем с await_ready. В нём надо вызвать метод await_ready из всех переданных в select каналов. Тут нам на помощь приходит std::apply. Вообще, это функция для вызова некоторого функтора с передачей в него элементов std::tuple в качестве параметров. Но в нашем случае он используется чтобы развернуть std::tuple обратно в variadic template. Затем fold expressions из C++17 помогают сделать наш код лаконичным и простым.

bool await_ready() const {
  // магия шаблонов превращает std::tuple в параметры лямбды с сохранением типов
  return std::apply([](auto & ...values) {
      // нас интересует хотя бы один непустой канал, поэтому ||
      return (values.await_ready() || ...);
  }, channels);
}

Для того, чтобы реализовать await_resume, нам надо вызвать await_resume из канала, метод await_ready из которого возвращает true, и положить результат в std::variant<Channels::value_type...>. Для этого мне пришлось написать еще один шаблон и добавить в channel::out шаблонный метод.

class channel {
  // ...
  class out {
    // ...
    // при работе с std::variant, Index нельзя передать
    // параметром функции, только параметром шаблона
    template <size_t Index, typename Variant>
    bool extract(Variant &v) {
      if (await_ready()) {
        v.template emplace<Index>(await_resume());
        return true;
      }
      return false;
    }
  };
};

// чтобы что-то положить в std::variant нам надо явно указать
// позицию внутри std::variant. здесь нам на помощь приходит
// std::index_sequence<>. мы берем по очереди элементы tuple и вызываем
// из каждого канала шаблонный метод extract, передавая ему результирующий
// std::variant и номер позиции, куда необходимо записать результат.
// а оператор || обеспечит извлечение только одного значения
template<typename Variant, class Tuple, std::size_t... Is>
void tuple2variant(Variant &v, Tuple& t, std::index_sequence<Is...>) {
    (std::get<Is>(t).template extract<Is>(v) || ...);
}

template <typename ...Channels>
auto select(Channels ...channels) {
  struct [[nodiscard]] awaitable {
  	// ...
    // нам осталось только вызвать tuple2variant
    auto await_resume() {
      std::variant<typename Channels::value_type...> result;
      tuple2variant(result, channels, std::index_sequence_for<Channels...>{});
      return std::move(result);
    }
  }
}

Почти всё. Осталось реализовать await_suspend. Для этого надо положить полученный coroutine_handle во все каналы, переданные в select. А далее сделать так, чтобы только один канал смог разбудить остановленную при помощи select корутину. Для этого я воспользовался связкой std::shared_ptr/std::weak_ptr и переделал channel::m_out_queue. Теперь очередь содержит не coroutine_handle, а std::variant<coroutine_handle, std::weak_ptr<coroutine_handle>>. А чтобы эти weak_ptr не протухли раньше времени, добавляем std::shared_ptr<coroutine_handle> в свойства awaitable, возвращаемого функцией select. Когда нам надо разбудить "счастливчика", который дождался данных, мы перебираем очередь, пока не встретим живой std::weak_ptr содержащий непустой coroutine_handle

По своей сути coroutine_handle — указатели и вполне могут принимать значение nullptr

class channel {
  // ..
  class out {
    // ...
    // добавить weak_ptr в очередь ждущих корутин
    void _suspend(std::weak_ptr<handle> c) {
      m_self->m_out_queue.emplace(c);
    }
  };
  
  // ...
  std::queue<std::variant<handle, std::weak_ptr<handle>>> m_out_queue;
  
  void resume_out() {
    while (!m_out_queue.empty()) {
      auto ptr = pop(m_out_queue);
      // это обрабока чтения каналов, когда оно происходит
      // напрямую, без использования select - всё осталось как было
      if (auto coro = std::get_if<handle>(&ptr); coro != nullptr) {
        coro->resume();
        break;
        // это обработка каналов, ожидаемых через select
      } else if (auto coro = std::get<std::weak_ptr<handle>>(ptr).lock(); coro != nullptr && *coro != nullptr) {
        auto c = *coro;
        *coro = nullptr;
        c.resume();
        break;
      }
    }
  }
};
// ...
template <typename ...Channels>
auto select(Channels ...channels) {
  struct [[nodiscard]] awaitable {
    // ...
    void await_suspend(handle c) {
      coro = std::make_shared<handle>(c);
      std::apply([this](auto & ...values) {
        (values._suspend(coro), ...);
      }, channels);
    }
  }
}

Еще немного сахара

Вначале я не планировал описывать реализацию select на лямбдах. Но это оказалось довольно просто. Мне лишь понадобился дополнительный helper теперь уже для channel::out, который возвращается оператором >>.

auto operator >> (std::function<void(value_type)> callback) {
  struct helper {
    using value_type = channel::value_type;
    std::function<void(value_type)> callback;
    typename channel<Type>::out self;
  };
  return helper{std::move(callback), *this};
}

Сама функция select фактически не изменилась, только теперь наш await_resume не возвращает значение, а вызывает callback.

class channel {
  class out {
    // ...
    // нам больше не надо работать с std::variant
    bool callback(std::function<void(value_type)> &callback) {
      if (await_ready()) {
        callback(await_resume());
        return true;
      }
      return false;
    }
  };
};

template <typename ...Callbacks>
auto select(Callbacks ...callbacks) {
  struct [[nodiscard]] awaitable {
    using handle = std::coroutine_handle<>;
    std::tuple<Callbacks...> callbacks;
    std::shared_ptr<handle> coro;
    // в tuple у нас появилась дополнительная вложенность (свойство self)
    // в остальном - никаких изменений
    bool await_ready() const {
      return std::apply([](auto & ...values) {
        return (values.self.await_ready() || ...);
      }, callbacks);
    }
    void await_suspend(handle c) {
      coro = std::make_shared<handle>(c);
      std::apply([this](auto & ...values) {
        (values.self._suspend(coro), ...);
      }, callbacks);
    }
    // уже после публикации первого варианта статьи я понял, что в этом
    // случае у нас отпала необходимость во вспомогательном шаблоне
    void await_resume() {
      std::apply([](auto & ...values) {
        return (values.self.callback(values.callback) || ...);
      }, callbacks);
    }
  };
  return awaitable{std::make_tuple(std::move(callbacks)...)};
}

К моему большому удивлению, вариант на лямбдах получился даже проще. Но пришел я к нему через std::variant. Можно еще от метода callback в channel::out избавиться, написав вместо него еще одну лямбду.

void await_resume() {
  std::apply([](auto & ...values) {
    return ([] (auto &value) {
      if (value.self.await_ready()) {
        value.callback(value.self.await_resume());
        return true;
      }
      return false;
    }(values) || ...);
  }, callbacks);
}

Эпилог

Рад приветствовать тех, кто добрался до этих строк! Я описал не всё. Неплохо было бы объяснить компилятору, какой из вариантов select выбирать. Я сделал это через концепты — еще одну замечательную возможность C++20. Добавить планировщик и обернуть все resume() в конструкцию post([coro] () mutable { coro.resume(); }). Реализовать channel::close(). Впрочем, первое можно найти в моём github. А вот с close() всё не так просто однозначно, кстати как и в GoLang.

Даже решение с лямбдами получилось не столь лаконичное как в GoLang - сложно тягаться со встроенными механизмами языка. Но у него есть один существенный плюс — оно однопоточное. Следовательно, при шаринге ресурсов между корутинами вам не придется думать о всевозможных mutex. Вы будете просто писать «однопоточный код».

В заключении хочу сказать, что я прошел увлекательный квест пока писал код и эту статью. Надеюсь кому-то она помогла пройти его со мной рядом. Ставьте лайки, пишите комментарии, задавайте вопросы. Буду рад помочь или дополнить свой рассказ подробностями. За сим на сегодня всё.

Теги:
Хабы:
Всего голосов 15: ↑13 и ↓2 +11
Просмотры 6.7K
Комментарии 5
Комментарии Комментарии 5

Информация

Дата основания
Местоположение
Россия
Сайт
www.ispsystem.ru
Численность
101–200 человек
Дата регистрации
Представитель
ISPsystem