Использование Boost.Asio с Coroutines TS

Введение


Использование функций обратного вызова (callback) — популярный подход к построению сетевых приложений с использованием библиотеки Boost.Asio (и не только ее). Проблемой этого подхода является ухудшение читабельности и поддерживаемости кода при усложнении логики протокола обмена данными [1].


Как альтернатива коллбекам, сопрограммы (coroutines) можно применить для написания асинхронного кода, уровень читабельности которого будет близок к читабельности синхронного кода. Boost.Asio поддерживает такой подход, предоставляя возможность использования библиотеки Boost.Coroutine для обработки коллбеков.


Boost.Coroutine реализует сопрограммы с помощью сохранения контекста выполнения текущего потока. Этот подход конкурировал за включение в следующую редакцию стандарта C++ с предложением от Microsoft, которое вводит новые ключевые слова co_return, co_yield и co_await. Предложение Microsoft получило статус Technical Specification (TS) [2] и имеет высокие шансы стать стандартом.


Статья [3] демонстрирует использование Boost.Asio с Coroutines TS и boost::future. В своей статье я хочу показать, как можно обойтись без boost::future. Мы возьмем за основу пример асинхронного TCP эхо-сервера из Boost.Asio и будем его модифицировать, используя сопрограммы из Coroutines TS.



На момент написания статьи Coroutines TS реализована в компиляторах Visual C++ 2017 и clang 5.0. Мы будем использовать clang. Необходимо установить флаги компилятора для включения экспериментальной поддержки стандарта C++ 20 (-std=c++2a) и Coroutines TS (-fcoroutines-ts). Также нужно включить заголовочный файл <experimental/coroutine>.



Сопрограмма для чтения из сокета



В оригинальном примере функция для чтения из сокета выглядит так:


void do_read() {
    auto self(shared_from_this());
    socket_.async_read_some(
        boost::asio::buffer(data_, max_length),
        [this, self](boost::system::error_code ec, std::size_t length) {
            if (!ec) {
                do_write(length);
            }
        });
}

Мы инициируем асинхронное чтение из сокета и задаем коллбек, который будет вызван при получении данных и инициирует их отсылку обратно. Функция записи в оригинале выглядит так:


void do_write(std::size_t length) {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(data_, length),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
            if (!ec) {
                do_read();
            }
        });
}

При успешной записи данных в сокет мы снова инициируем асинхронное чтение. По сути, логика программы сводится к циклу (псевдокод):



while (!ec)
{
	ec = read(buffer);
	if (!ec)
	{
		ec = write(buffer);
	}
}

Было бы удобно закодировать это в виде явного цикла, однако в таком случае нам пришлось бы сделать чтение и запись синхронными операциями. Нам это не подходит, поскольку мы хотим обслуживать несколько клиентских сессий в одном потоке выполнения одновременно. На помощь приходят сопрограммы. Перепишем функцию do_read() в следующем виде:



void do_read() {
    auto self(shared_from_this());
    const auto[ec, length] = co_await async_read_some(
        socket_, boost::asio::buffer(data_, max_length));

    if (!ec) {
        do_write(length);
    }
}

Использование ключевого слова co_await (а также co_yield и co_return) превращает функцию в сопрограмму. Такая функция имеет несколько точек (suspension point), где ее выполнение приостанавливается (suspend) с сохранением состояния (значений локальных переменных). Позже выполнение сопрограммы может быть возобновлено (resume), начиная с последней остановки. Ключевое слово co_await в нашей функции создает suspension point: после того, как асинхронное чтение инициировано, выполнение сопрограммы do_read() будет приостановлено до завершения чтения. Возврата из функции при этом не происходит, но выполнение программы продолжается, начиная с точки вызова сопрограммы. Когда клиент подключается, вызывается session::start(), где do_read() вызывается первый раз для этой сессии. После начала асинхронного чтения продолжается выполнение функции start(), происходит возврат из нее и инициируется прием следующего соединения. Далее продолжает выполнение код из Asio, который вызвал обработчик-аргумент async_accept().


Для того, чтобы магия co_await работала, его выражение — в нашем случае функция async_read_some() — должен возвращать объект класса, который соответствует определенному контракту. Реализация async_read_some() взята из комментария к статье [3].



template <typename SyncReadStream, typename DynamicBuffer>
auto async_read_some(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer buffers;

        std::error_code ec;
        size_t sz;

        bool await_ready() { return false; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            s.async_read_some(std::move(buffers),
                              [this, coro](auto ec, auto sz) mutable {
                                  this->ec = ec;
                                  this->sz = sz;
                                  coro.resume();
                              });
        }
        auto await_resume() { return std::make_pair(ec, sz); }
    };
    return Awaiter{s, std::forward<DynamicBuffer>(buffers)};
}

async_read_some() возвращает объект класса Awaiter, который реализует контракт, требуемый co_await:


  • await_ready() вызывается в начале ожидания для проверки, готов ли уже результат асинхронной операции. Поскольку для получения результата нам всегда нужно дождаться, пока данные будут прочитаны, мы возвращаем false.
  • await_suspend() вызывается перед тем, как вызывающая сопрограмма будет приостановлена. Здесь мы инициируем асинхронное чтение и передаем обработчик, который сохранит результаты выполнения асинхронной операции в переменных-членах класса Awaiter и возобновит сопрограмму.
  • await_resume() — возвращаемое значение этой функции будет результатом выполнения co_await. Просто возвращаем сохраненные ранее результаты выполнения асинхронной операции.

Если теперь попытаться собрать нашу программу, то получим ошибку компиляции:



error: this function cannot be a coroutine: 'std::experimental::coroutines_v1::coroutine_traits<void, session &>' has no member named 'promise_type'
    void do_read() {
         ^

Причина в том, что компилятор требует, чтобы для сопрограммы тоже был реализован некоторый контракт. Это делается с помощью специализации шаблона std::experimental::coroutine_traits:



template <typename... Args>
struct std::experimental::coroutine_traits<void, Args...> {
    struct promise_type {
        void get_return_object() {}
        std::experimental::suspend_never initial_suspend() { return {}; }
        std::experimental::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

Мы специализировали coroutine_traits для сопрограмм с возращаемым значением типа void и любым количеством и типами параметров. Сопрограмма do_read() подходит под это описание. Специализация шаблона содержит тип promise_type с следующими функциями:


  • get_return_object() вызывается для создания объекта, который сопрограмма будет впоследствии заполнять и возвращать. В нашем случае ничего создавать не нужно, так как do_read() ничего не возвращает.
  • initial_suspend() определяет, будет ли сопрограмма приостановлена перед первым вызовом. Аналогия — запуск приостановленного потока в Windows. Нам нужно, чтобы do_read() выполнялась без начальной остановки, поэтому возвращаем suspend_never.
  • final_suspend() определяет, будет ли сопрограмма приостановлена перед возвратом значения и завершением. Возвращаем suspend_never.
  • return_void() указывает компилятору, что сопрограмма ничего не возвращает.
  • unhandled_exception() вызывается, если внутри сопрограммы было сгенерировано исключение, и оно не было обработано внутри сопрограммы. В этом случае аварийно завершаем программу.

Теперь можно запустить сервер и проверить его работоспособность, открыв несколько подключений с помощью telnet.



Сопрограмма для записи в сокет


Функция записи do_write() все еще основывается на использовании коллбека. Исправим это. Перепишем do_write() в следующем виде:



auto do_write(std::size_t length) {
    auto self(shared_from_this());
    struct Awaiter {
        std::shared_ptr<session> ssn;
        std::size_t length;
        std::error_code ec;

        bool await_ready() { return false; }
        auto await_resume() { return ec; }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            const auto[ec, sz] = co_await async_write(
                ssn->socket_, boost::asio::buffer(ssn->data_, length));
            this->ec = ec;
            coro.resume();
        }
    };
    return Awaiter{self, length};
}

Напишем awaitable-обертку для записи в сокет:



template <typename SyncReadStream, typename DynamicBuffer>
auto async_write(SyncReadStream &s, DynamicBuffer &&buffers) {
    struct Awaiter {
        SyncReadStream &s;
        DynamicBuffer buffers;

        std::error_code ec;
        size_t sz;

        bool await_ready() { return false; }
        auto await_resume() { return std::make_pair(ec, sz); }
        void await_suspend(std::experimental::coroutine_handle<> coro) {
            boost::asio::async_write(
                s, std::move(buffers), [this, coro](auto ec, auto sz) mutable {
                    this->ec = ec;
                    this->sz = sz;
                    coro.resume();
                });
        }
    };
    return Awaiter{s, std::forward<DynamicBuffer>(buffers)};
}

Последний шаг — перепишем do_read() в виде явного цикла:



void do_read() {
    auto self(shared_from_this());
    while (true) {
        const auto[ec, sz] = co_await async_read_some(
            socket_, boost::asio::buffer(data_, max_length));
        if (!ec) {
            auto ec = co_await do_write(sz);
            if (ec) {
                std::cout << "Error writing to socket: " << ec << std::endl;
                break;
            }
        } else {
            std::cout << "Error reading from socket: " << ec << std::endl;
            break;
        }
    }
}

Логика программы теперь записана в виде, близком к синхронному коду, однако она выполняется асинхронно. Ложкой дёгтя является то, что нам пришлось написать дополнительный awaitable-класс для возвращаемого значения do_write(). Это иллюстрирует один из недостатков Coroutines TS — распространение co_await вверх по стеку асинхронных вызовов [4].


Переделку функции server::do_accept() в сопрограмму оставим в качестве упражнения. Полный текст программы можно найти на GitHub.



Заключение


Мы рассмотрели использование Boost.Asio с Coroutines TS для программирования асинхронных сетевых приложений. Преимущество такого подхода — улучшение читабельности кода, поскольку он становится близок по форме к синхронному. Недостаток — необходимость в написании дополнительных оберток для поддержки модели сопрограмм, реализованной в Coroutines TS.



Ссылки


  1. Асинхронность: назад в будущее
  2. Working Draft, Technical Specification for C++ Extensions for Coroutines
  3. Using C++ Coroutines with Boost C++ Libraries
  4. Возражения против принятия Coroutines с await в C++17
Поделиться публикацией
Комментарии 16
    +1
    Спасибо за статью, пример взаимодействия с asio лучше любой синтетики.
    <holywar_mode="on"/>
    Сколько уже копий сломано об эти корутины, но мне до сих пор не понятно. В бусте уже были нормальные корутины, что мешало внести в язык что-то похожее на boost::context? С чего вдруг предлагаемая модель обёрток над промисами лучше, чем корутины со стеком? Вот теперь придумывают, как использовать asio, хотя он уже работал с бустовыми корутинами.
    <holywar_mode="off"/>
      +1
      Насколько я понял, «модель оберток» предпочли потому, что Boost.Context не переносимый и требует низкоуровневой реализации для каждой платформы. Возможно, играет роль и то, что Coroutines TS предложена Microsoft'ом.
        0
        В бусте были потоки пользовательского режима (=волокна), а не сопрограммы.
            0
            И я про них же. Хоть они и называются «сопрограммами», но механизм у них как у потоков. Даже в документации написано что один из возможных механизмов реализации, WinFiber, использует волокна из WinAPI.
          +1

          Да, по поводу преимуществ — тут все просто. Корутину компилятор способен оптимизировать в ноль — а волокно нет.

          0
          С do_write тут что-то ужасное сделали. Его нужно было делать нормальной сопрограммой, или вовсе объединить с do_read (единственный смысл разделения этих методов в старом коде я вижу лишь в уменьшении уровня вложенности).
            0
            Я намеренно не объединял, чтобы показать проблему с расползанием co_await вверх по стеку. Если сделать do_write «нормальной корутиной», тогда придется убрать co_await do_write() в do_read() и вызывать просто do_write(). В результате новое чтение будет начинаться, не дожидаясь окончания записи. У меня есть такой вариант кода, вечером выложу.
              0
              Если сделать do_write «нормальной корутиной», тогда придется убрать ее из-под co_await в do_read()

              Нет, не надо. Надо просто дать ей нормальное возвращаемое значение. Например, для примера подойдет std::experimental::future.

                0
                Я делал возвращаемое значение std:error_code. Использование std::experimental::future или boost::future — такой подход используется в blogs.msdn.microsoft.com/vcblog/2017/05/19/using-c-coroutines-with-boost-c-libraries, но я хочу этого избежать, потому что тогда будет либо синхронное ожидание (future::wait), либо then(), при использовании которого код уже менее близок к синхронному.
                  0

                  А, извиняюсь, перепутал Concurrency TS и Coroutines TS :-) Точно, из Coroutines TS же конкретные типы сто лет назад убрали...


                  Можно подключить cppcoro и использовать cppcoro::task


                  Или можно использовать вот такой тип в качестве возвращаемого значения (пишу без проверки потому что под рукой нет компилятора):


                  template <typename T = void> class lazy_task { // Для void нужна отдельная специализация
                      std::experimental::coroutine_handle<promise_type> handle;
                  
                  public:
                      bool await_ready() { return false; }
                      bool await_suspend(std::experimental::coroutine_handle<> coro) {
                          handle.resume();
                          if (handle.done()) return true;
                          handle.promise().m_continuation = coro;
                          return false;
                      }
                      T await_resume() {
                          promise_type promise = handle.promise();
                          handle.destroy();
                  
                          if (promise .m_exception)
                              std::rethrow_exception(promise.m_exception);
                          else
                              return promise.m_value;
                      }
                  
                      struct promise_type {
                          T m_value;
                          std::exception_ptr m_exception;
                          std::experimental::coroutine_handle<> m_continuation;
                  
                          lazy_task get_return_object() { 
                             return { std::experimental::coroutine_handle<promise_type>::from_promise(*this) }; 
                          }
                          std::experimental::suspend_always initial_suspend() { return {}; }
                          std::experimental::suspend_always final_suspend() { return {}; }
                          void return_value(T value) { 
                              m_value = value;
                              if (m_continuation) m_continuation.resume();
                          }
                          void unhandled_exception() { 
                              m_exception = std::current_exception(); 
                              if (m_continuation) m_continuation.resume();
                          }
                      };
                  }
                    0

                    UPD: упс, return_value же вызывается до final suspend point, нельзя в нем m_continuation.resume() вызывать! Вот так надо:


                    auto final_suspend() { 
                        struct awaitable {
                            std::experimental::coroutine_handle<> m_continuation;
                            bool await_ready() { return false; }
                            void await_suspend(std::experimental::coroutine_handle<> coro) {
                                if (m_continuation) m_continuation.resume();
                            }
                            void await_resume() { }
                        };
                        return awaitable { m_continuation };
                    }
            0

            Чистый мёд!
            Спасибо!

              0
              Как-то слишком много синтаксического мусора.

              В Asio можно намного проще корутины использовать. Неявно.

              boost::asio::spawn(io_service, [socket] (boost::asio::yield_context y)) {
                  socket.async_read(buffer, y);
                  socket.async_write(buffer, y);
              });
              
                0

                Это не те корутины которые Coroutines TS.

                  0
                  Конечно не Coroutines TS — их я критикую в своем комменте.
                  А эти я привел как пример того, что можно все сделать проще, если стоит задача асинхронный код сделать «синхронным» по форме.

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

              Самое читаемое