Как стать автором
Обновить

Линеаризуем асинхронный код с помощью корутин

Время на прочтение17 мин
Количество просмотров4.8K
image

Помимо использования корутин для создания генераторов, их можно попробовать использовать для линеаризации уже существующего асинхронного кода. Давайте попробуем это сделать на небольшом примере. Возьмем код, написанный на акторном фреймворке и перепишем одну функцию этого кода на корутины. Для сборки проекта будем использовать gcc из ветки coroutines.

Наша цель — получить из лапши коллбэков:

    abActor.getA(ABActor::GetACallback([this](int a) {
        abActor.getB(ABActor::GetBCallback([a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback([this](){
                abActor.getA(ABActor::GetACallback([this](int a) {
                    abActor.getB(ABActor::GetBCallback([a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Что-то вроде:

const int a = co_await actor.abActor.getAAsync();
const int b = co_await actor.abActor.getBAsync();
co_await actor.abActor.saveABAsync(a - b, a + b);
const int newA = co_await actor.abActor.getAAsync();
const int newB = co_await actor.abActor.getBAsync();
std::cout << "Result " << newA << " " << newB << std::endl;

Итак, приступим.

Акторы


Для начала нам нужно создать простенький акторный фреймворк. Создание полноценного акторного фреймворка — непростая и большая задача, поэтому мы реализуем лишь некое его подобие.
Для начала создадим базовый класс:

class Actor {
public:
    using Task = std::function<void()>;
public:
    virtual ~Actor();
public:
    void addTask(const Task &task);
    void tryRunTask();
private:
    std::queue<Task> queue;
    mutable std::mutex mutex;
};

Идея в принципе проста: мы помещаем задачи, являющиеся функциональными объектами, в очередь, и по вызову tryRunTask пытаемся выполнить эту задачу. Реализация класса подтверждает наши намерения:

Actor::~Actor() = default;

void Actor::addTask(const Task &task) {
    std::lock_guard lock(mutex);
    queue.push(task);
}

void Actor::tryRunTask() {
    std::unique_lock lock(mutex);
    if (queue.empty()) {
        return;
    }

    const Task task = queue.front();
    queue.pop();
    lock.unlock();

    std::invoke(task);
}

Следующий класс — это «тред», к которому будет принадлежать наши акторы:

class Actor;

class ActorThread {
public:
    ~ActorThread();
public:
    void addActor(Actor &actor);
    void run();
private:
    std::vector<std::reference_wrapper<Actor>> actors;
};

Тут тоже все просто: в самом начале программы мы «привязываем» наши акторы к треду методом addActor, а потом запускаем тред методом run.

ActorThread::~ActorThread() = default;

void ActorThread::addActor(Actor &actor) {
    actors.emplace_back(actor);
}

void ActorThread::run() {
    while (true) {
        for (Actor &actor: actors) {
            actor.tryRunTask();
        }
    }
}

При запуске «треда», мы входим в бесконечный цикл и пытаемся выполнить по одной задаче с каждого актора. Не самое оптимальное решение, но для демонстрации пойдет.

Теперь давайте рассмотрим представителя класса акторов:

class ABActor: public Actor {
public:
    using GetACallback = Callback<void(int result)>;
    using GetBCallback = Callback<void(int result)>;
    using SaveABCallback = Callback<void()>;
public:
    void getA(const GetACallback &callback);
    void getB(const GetBCallback &callback);
    void saveAB(int a, int b, const SaveABCallback &callback);
private:
    void getAProcess(const GetACallback &callback);
    void getBProcess(const GetBCallback &callback);
    void saveABProcess(int a, int b, const SaveABCallback &callback);
private:
    int a = 10;
    int b = 20;
};

Этот класс хранит в себе 2 числа — a и b, и по запросу выдает их значения или перезаписывает.

В качестве коллбэка он принимает функциональный объект с необходимыми параметрами. Но давайте обратим внимание на то, что разные акторы могут быть запущены в разных потоках. И поэтому, если по окончании работы мы просто вызовем переданный в метод коллбэк, этот коллбэк будет вызван в текущем выполняемом треде, а не в том треде, что вызвал наш метод и создал этот коллбэк. Поэтому нам нужно создать обертку над коллбэком, которая разрулит эту ситуацию:

template<typename C>
class Callback {
public:
    template<typename Functor>
    Callback(Actor &sender, const Functor &callback)
        : sender(sender)
        , callback(callback)
    {}
public:
    template<typename ...Args>
    void operator() (Args&& ...args) const {
        sender.addTask(std::bind(callback, std::forward<Args>(args)...));
    }
private:
    Actor &sender;
    std::function<C> callback;
};

Эта обертка запоминает исходный актор и при попытке выполнить себя просто добавляет настоящий коллбэк в очередь задач исходного актора.
В результате, реализация класса ABActor выглядит так:

void ABActor::getA(const GetACallback &callback) {
    addTask(std::bind(&ABActor::getAProcess, this, callback));
}

void ABActor::getAProcess(const ABActor::GetACallback &callback) {
    std::invoke(callback, a);
}

void ABActor::getB(const GetBCallback &callback) {
    addTask(std::bind(&ABActor::getBProcess, this, callback));
}

void ABActor::getBProcess(const ABActor::GetBCallback &callback) {
    std::invoke(callback, b);
}

void ABActor::saveAB(int a, int b, const SaveABCallback &callback) {
    addTask(std::bind(&ABActor::saveABProcess, this, a, b, callback));
}

void ABActor::saveABProcess(int a, int b, const ABActor::SaveABCallback &callback) {
    this->a = a;
    this->b = b;
    std::invoke(callback);
}

В интерфейсном методе класса мы просто биндим переданные аргументы к соответствующему «слоту» класса, создавая таким образом задачу, и помещаем эту задачу в очередь задач этого класса. Когда тред задач начнет выполнять задачу, он таким образом вызовет правильный «слот», который выполнит все необходимые ему действия и вызовет коллбэк, который в свою очередь отдаст настоящий коллбэк в очередь вызвавшей задачи.

Давайте напишем актора, который будет использовать класс ABActor:

class ABActor;

class WokrerActor: public Actor {
public:
    WokrerActor(ABActor &actor)
        : abActor(actor)
    {}
public:
    void work();
private:
    void workProcess();
private:
    ABActor &abActor;
};

void WokrerActor::work() {
    addTask(std::bind(&WokrerActor::workProcess, this));
}

void WokrerActor::workProcess() {
    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        std::cout << "Result " << a << std::endl;
    }));
}

И соберем все это вместе:

int main() {
    ABActor abActor;
    WokrerActor workerActor(abActor);

    ActorThread thread;
    thread.addActor(abActor);
    thread.addActor(workerActor);

    workerActor.work();

    thread.run();
}

Давайте проследим всю цепочку работы кода.

В начале, мы создаем необходимые объекты и устанавливаем связи между ними.
Потом мы добавляем задачу workProcess в очередь задач Worker актора.
Когда тред запустится, он обнаружит в очереди нашу задачу и начнет ее выполнять.
В процессе выполнения, мы вызовем метод getA класса ABActor, тем самым положив соответствующую задачу в очередь класса ABActor, и завершим выполнение.
Дальше тред возьмет только что созданную задачу из класса ABActor, и выполнит ее, что приведет к выполнению кода getAProcess.
Этот код вызовет коллбэк, передав в него нужный аргумент — переменную a. Но так как коллбэк, которым он владеет, это обертка, то на самом деле настоящий коллбэк с заполненными параметрами положится в очередь класса Worker.
И когда на следующей итерации цикла тред вытащит и исполнит наш коллбэк из класса Worker, мы увидим вывод на экран строки «Result 10»

Акторный фреймворк — довольно удобный способ взаимодействия классов, раскиданных по разным физическим потокам, друг с другом. Особенность проектирования классов, как вы должны были в этом убедиться, в том, что внутри каждого отдельного актора все действия выполняются целиком и полностью в единственном потоке. Единственная точка синхронизации потоков вынесена в детали реализации акторного фреймворка и не видна программисту. Таким образом, программист может писать однопоточный код, не заботясь обкладыванием мьютексами и отслеживанием ситуаций гонок, deadlock-ов и прочей головной боли.

К сожалению, у такого решения есть своя цена. Так как результат выполнения другого актора доступен только из коллбэка, то рано или поздно акторный код превращается в нечто такое:

    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback(*this, [this](){
                abActor.getA(ABActor::GetACallback(*this, [this](int a) {
                    abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Давайте посмотрим, сможем ли мы этого избежать, используя нововведение C++20 — корутины.

Но сначала оговорим ограничения.

Естественно, мы никоим образом не можем менять код акторного фреймворка. Также, мы не можем менять сигнатуры публичных и приватных методов экземпляров класса Actor — ABActor и WorkerActor. Посмотрим, сможем ли мы выкрутиться из этой ситуации.

Корутины. Часть 1. Awaiter


Основная идея корутин — что при создании корутин для нее создается отдельный стековый фрейм на куче, из которого мы в любой момент можем «выйти», сохранив при этом текущую позицию выполнения, регистры процессора и другую необходимую информацию. Потом мы также в любой момент можем вернуться к выполнению приостановленной корутины и выполнить ее до конца или до следующей приостановки.

За управлением этими данными отвечает объект std::coroutine_handle<>, который по сути представляет указатель на стековый фрейм (и другие необходимые данные), и у которого есть метод resume (или его аналог, оператор ()), который возвращает нас к выполнению корутины.

Давайте на основе этих данных сначала напишем функцию getAAsync, а потом попробуем обобщить.

Итак, предположим, что у нас уже есть экземпляр класса std::coroutine_handle<> coro, что нам нужно сделать?

Необходимо вызвать уже существующий метод ABActor::getA, который разрулит ситуацию как нужно, но для начала необходимо создать для метода getA коллбэк.

Давайте вспомним, в коллбэк метода getA возвращается число — результат выполнения метода getA. Причем этот коллбэк вызывается в потоке Worker треда. Таким образом, из этого коллбэка мы можем безопасно продолжить выполнять корутину, которая была создана как раз из треда Worker-а и которая продолжит выполнять свою последовательность действий. Но также мы должны куда-то сохранить результат возвращенный в коллбэке, он нам, естественно, дальше пригодится.

auto callback = GetACallback(returnCallbackActor, [&value, coro](int result) {
        value = result;
        std::invoke(coro);
 });
getA(callback);

Итак, теперь нужно откуда-то взять экземпляр объекта coroutine_handle и ссылку, куда можно сохранить наш результат.

В дальнейшем мы увидим, что coroutine_handle передается к нам в результате вызова функции. Соответственно, все что мы можем с ним сделать, передать его в какую-то другую функцию. Давайте подготовим эту функцию в виде лямбды. (Ссылку на переменную, где будет храниться результат выполнения коллбэка, передадим в эту функцию за компанию).

auto storeCoroToQueue = [&returnCallbackActor, this](auto &value, std::coroutine_handle<> coro) {
    auto callback=GetACallback(returnCallbackActor, [&value, coro](int result){
        value = result;
        std::invoke(coro);
    });
    getA(callback);
};

Эту функцию мы сохраним в следующем классе.

struct ActorAwaiterSimple {
    int value;

    std::function<void(int &value,std::coroutine_handle<>)> forwardCoroToCallback;

    ActorAwaiterSimple(
        const std::function<void(int &value, std::coroutine_handle<>)> &forwardCoroToCallback
    )
        : forwardCoroToCallback(forwardCoroToCallback)
    {}

    ActorAwaiterSimple(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple& operator=(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple(ActorAwaiterSimple &&) = delete;
    ActorAwaiterSimple& operator=(ActorAwaiterSimple &&) = delete;

// ...

Помимо функционального объекта, мы также будем здесь держать память (в виде переменной value) под ожидающее нас в коллбэке значение.

Так как мы здесь держим память под значение, то вряд ли мы хотим, чтобы экземпляр этого класса куда-то скопировался или переместился. Представьте, что например кто-то скопировал этот класс, сохранил значение под переменную value в старом экземпляре класса, а потом попытался прочитать его из нового экземпляра. А его там естественно нет, так как копирование произошло раньше сохранения. Неприятно. Поэтому оградим себя от этой неприятности, запретив конструкторы и операторы копирования и перемещения.

Давайте продолжим писать этот класс. Следующий метод, который нам нужен, это:

    bool await_ready() const noexcept {
        return false;
    }

Он отвечает на вопрос, готово ли наше значение для того, чтобы быть выдано. Естественно, при первом вызове наше значение еще не готово, а в дальнейшем нас никто спрашивать об этом не будет, поэтому просто вернем false.

Экземпляр coroutine_handle нам будет передан в методе void await_suspend(std::coroutine_handle<> coro), так что давайте в нем вызовем наш подготовленный функтор, передав туда также ссылку на память под value:

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(forwardCoroToCallback, std::ref(value), coro);
    }

Результат выполнения функции в нужный момент нас попросят, вызвав метод await_resume. Не будем отказывать просящему:

    int await_resume() noexcept {
        return value;
    }

Теперь наш метод можно вызывать, используя ключевое слово co_await:

const int a = co_await actor.abActor.getAAsync(actor);

Что здесь произойдет, мы уже примерно представляем.

Сначала создастся объект типа ActorAwaiterSimple, который передастся на «вход» co_await-у. Он сначала поинтересуется (вызвав await_ready), нет ли у нас случайно уже готового результата (у нас нет), после чего вызовет await_suspend, передав в него контекст (по сути, указатель на текущий стековый фрейм корутины) и прервет выполнение.

В дальнейшем, когда актор ABActor выполнит свою работу и вызовет коллбэк с результатом, этот результат (уже в треде потока Worker) сохранится в единственный (оставшийся на стеке корутины) экземпляр ActorAwaiterSimple и запустится продолжение корутины.

Корутина продолжит выполнение, возьмет сохраненный результат, вызвав метод await_resume, и передаст этот результат в переменную a

На данный момент ограничение текущего Awaiter-а в том, что он умеет работать только с коллбеками с одним параметром типа int. Давайте попробуем расширить применение Awaiter-а:

template<typename... T>
struct ActorAwaiter {

    std::tuple<T...> values;

    std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> storeHandler;

    ActorAwaiter(const std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> &storeHandler)
        : storeHandler(storeHandler)
    {}

    ActorAwaiter(const ActorAwaiter &) = delete;
    ActorAwaiter& operator=(const ActorAwaiter &) = delete;
    ActorAwaiter(ActorAwaiter &&) = delete;
    ActorAwaiter& operator=(ActorAwaiter &&) = delete;

    bool await_ready() const noexcept {
        return false;
    }

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(storeHandler, std::ref(values), coro);
    }

    // Фиктивный параметр bool B здесь нужен,
    // так как sfinae не работает не на шаблонных функциях
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==0 && B, int>=0
    >
    void await_resume() noexcept {

    }

    // Фиктивный параметр bool B здесь нужен,
    // так как sfinae не работает не на шаблонных функциях
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==1 && B, int>=0
    >
    auto await_resume() noexcept {
        return std::get<0>(values);
    }

    // Фиктивный параметр bool B здесь нужен,
    // так как sfinae не работает не на шаблонных функциях
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len!=1 && len!=0 && B, int>=0
    >
    std::tuple<T...> await_resume() noexcept {
        return values;
    }
};

Здесь мы пользуемся std::tuple для того, чтобы иметь возможность сохранить сразу несколько переменных.

На метод await_resume наложен sfinae для того, чтобы можно было не возвращать во всех случаях tuple, а в зависимости от количества значений, лежащих в tuple, возвращать void, ровно 1 аргумент или tuple целиком.

Обертки для создания самого Awaiter-а теперь выглядит так:

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static auto makeCoroCallback(const Func &func, Actor &returnCallback) {
    return [&returnCallback, func](auto &values, std::coroutine_handle<> coro) {
        auto callback = MakeCallback(returnCallback, [&values, coro](ReturnArgs&& ...result) {
            values = std::make_tuple(std::forward<ReturnArgs>(result)...);
            std::invoke(coro);
        });
        func(callback);
    };
}

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static ActorAwaiter<ReturnArgs...> makeActorAwaiter(const Func &func, Actor &returnCallback) {
    const auto storeCoroToQueue = makeCoroCallback<MakeCallback, ReturnArgs...>(func, returnCallback);
    return ActorAwaiter<ReturnArgs...>(storeCoroToQueue);
}

ActorAwaiter<int> ABActor::getAAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

ActorAwaiter<int> ABActor::getBAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetBCallback, int>(std::bind(&ABActor::getB, this, _1), returnCallback);
}

ActorAwaiter<> ABActor::saveABAsync(Actor &returnCallback, int a, int b) {
    return makeActorAwaiter<SaveABCallback>(std::bind(&ABActor::saveAB, this, a, b, _1), returnCallback);
}

Теперь давайте разберемся, как воспользоваться созданным типом непосредственно в корутине.

Корутины. Часть 2. Resumable


С точки зрения C++, корутиной считается функция, которая содержит в себе слова co_await, co_yield или co_return. Но также такая функция должна возвращать определенный тип. Мы условились, что не будем менять сигнатуру функций (здесь я подразумеваю, что возвращаемый тип тоже относится к сигнатуре), поэтому придется как-то выкручиваться.

Давайте создадим лямбду-корутину и вызовем ее из нашей функции:

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

(Почему не захватить this в capture-list лямбды? Тогда весь код внутри вышел бы чуть проще. Но так получилось, что, видимо, лямбда-корутины в компиляторе пока поддерживаются не полностью, поэтому такой код работать не будет.)

Как видите, наш страшный код на коллбэках превратился теперь в довольно приятный линейный код. Все, что нам осталось, это изобрести класс ActorResumable

Давайте посмотрим на него.

struct ActorResumable {
    struct promise_type {
        using coro_handle = std::coroutine_handle<promise_type>;

        auto get_return_object() { 
            // Стандартное заклинание, чтобы создать объект ActorResumable из объекта promise_type
            return coro_handle::from_promise(*this);
        }

        auto initial_suspend() {
            // Не приостанавливать выполнение после подготовки корутины
            return std::suspend_never();
        }

        auto final_suspend() {
            // Не приостанавливать выполнение перед завершением корутины. 
            // Также, выполнить действия по очистке корутины
            return std::suspend_never();
        }

        void unhandled_exception() {
            // Для простоты считаем, что исключений изнутри корутины выбрасываться не будет
            std::terminate();
        }
    };

    ActorResumable(std::coroutine_handle<promise_type>) {}
};

Псевдокод сгенерированной корутины из нашей лямбды выглядит примерно следующим образом:

ActorResumable coro() {
    promise_type promise;
    ActorResumable retobj = promise.get_return_object();
    auto intial_suspend = promise.initial_suspend();
    if (initial_suspend == std::suspend_always)  {
          // yield
    }
    try { 
        // Наша программа.
        const int a = co_await actor.abActor.getAAsync(actor);
        std::cout << "Result " << a << std::endl;
    } catch(...) { 
        promise.unhandled_exception();
    }
final_suspend:
    auto final_suspend = promise.final_suspend();
    if (final_suspend == std::suspend_always)  {
         // yield
    } else {
         cleanup();
    }

Это всего лишь псевдокод, некоторые вещи намеренно упрощены. Давайте тем не менее посмотрим, что происходит.

Вначале мы создаем promise и ActorResumable.

После initial_suspend() мы не приостанавливаемся, а идем дальше. Начинаем выполнять основную часть программы.

Когда доходим до co_await-а, понимаем, что нужно приостановиться. Мы эту ситуацию уже разбирали в предыдущем разделе, можно вернуться к нему и пересмотреть.

После того, как мы продолжили выполнение и вывели результат на экран, выполнение корутины заканчивается. Проверяем final_suspend, и очищаем весь контекст корутины.

Корутины. Часть 3. Task


Давайте вспомним, до какого этапа мы сейчас дошли.

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

Выглядит неплохо, но несложно заметить, что код:

        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);

повторяется 2 раза. Нельзя ли отрефакторить этот момент и вынести его в отдельную функцию?

Давайте набросаем, как это может выглядеть:

CoroTask<std::pair<int, int>> WokrerActor::readAB() {
    const int a = co_await abActor.getAAsync2(*this);
    const int b = co_await abActor.getBAsync2(*this);
    co_return std::make_pair(a, b);
}

void WokrerActor::workCoroProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const auto [a, b] = co_await actor.readAB();
        co_await actor.abActor.saveABAsync2(actor, a - b, a + b);
        const auto [newA, newB] = co_await actor.readAB();
        std::cout << "Result " << newA << " " << newB << " " << a << " " << b << std::endl;
    };

    coroutine(*this);
}

Нам осталось лишь изобрести тип CoroTask. Давайте подумаем. Во-первых, внутри функции readAB используется co_return, это значит, что CoroTask должен удовлетворять интерфейсу Resumable. Но также, объект этого класса используется на вход co_await-а другой корутины. Значит, класс CoroTask также должен удовлетворять интерфейсу Awaitable. Давайте реализуем оба этих интерфейса в классе CoroTask:

template <typename T = void>
struct CoroTask {
    struct promise_type {
        T result;
        std::coroutine_handle<> waiter;

        auto get_return_object() {
            return CoroTask{*this};
        }

        void return_value(T value) {
            result = value;
        }

        void unhandled_exception() {
            std::terminate();
        }

        std::suspend_always initial_suspend() {
            return {};
        }

        auto final_suspend() {
            struct final_awaiter {
                bool await_ready() {
                    return false;
                }
                void await_resume() {}
                auto await_suspend(std::coroutine_handle<promise_type> me) {
                    return me.promise().waiter;
                }
            };
            return final_awaiter{};
        }
    };

    CoroTask(CoroTask &&) = delete;
    CoroTask& operator=(CoroTask&&) = delete;
    CoroTask(const CoroTask&) = delete;
    CoroTask& operator=(const CoroTask&) = delete;

    ~CoroTask() {
        if (h) {
            h.destroy();
        }
    }

    explicit CoroTask(promise_type & p)
        : h(std::coroutine_handle<promise_type>::from_promise(p))
    {}

    bool await_ready() {
        return false;
    }

    T await_resume() {
        auto &result = h.promise().result;
        return result;
    }

    void await_suspend(std::coroutine_handle<> waiter) {
        h.promise().waiter = waiter;
        h.resume();
    }
private:
    std::coroutine_handle<promise_type> h;
};

(Настоятельно рекомендую открыть фоном заглавную картинку этого поста. В дальнейшем это вам сильно поможет.)

Итак, давайте разберемся, что здесь происходит.

1. Заходим в лямбду coroutine и сразу же создаем корутину WokrerActor::readAB. Но после создания этой корутины, не начинаем выполнять ее (initial_suspend == suspend_always), что вынуждает нас прерваться и вернуться к выполнению лямбды coroutine.

2. co_await лямбды проверяет, готов ли результат выполнения readAB. Результат не готов (await_ready == false), что вынуждает ее передать свой контекст в метод CoroTask::await_suspend. Этот контекст сохраняется в CoroTask, и запускается resume корутины readAB

3. После того, как корутина readAB выполнила все нужные действия, она доходит до строки:

co_return std::make_pair(a, b);

в результате чего вызывается метод CoroTask::promise_type::return_value и внутри CoroTask::promise_type сохраняется созданная пара чисел

4. Так как вызвался метод co_return, выполнение корутины подходит к концу, а значит, самое время вызвать метод CoroTask::promise_type::final_suspend. Этот метод возвращает самописную структуру (не забывайте поглядывать на картинку), которая вынуждает вызвать метод final_awaiter::await_suspend, из которого возвращает сохраненный на шаге 2 контекст лямбды coroutine.

Почему мы не могли вернуть здесь просто suspend_always? Ведь в случае initial_suspend этого класса у нас это получилось? Дело в том, что в initial_suspend у нас это получилось потому, что эту корутину вызывала наша лямбда coroutine, и мы в нее вернулись. Но в момент, когда мы дошли до вызова final_suspend, нашу корутину скорее всего продолжали уже из другого стека (конкретно, из лямбды, которая подготовила функция makeCoroCallback), и, вернув здесь suspend_always, мы вернулись бы в нее, а не в метод workCoroProcess.

5. Так как метод final_awaiter::await_suspend вернул нам контекст, то это вынуждает программу продолжить выполнение возвращенного контекста, то есть лямбды coroutine. Так как выполнение вернулось в точку:

const auto [a, b] = co_await actor.readAB();

то мы должны вычленить сохраненный результат, вызвав метод CoroTask::await_resume. Результат получен, передан в переменные a и b, и теперь экземпляр CoroTask уничтожается.

6. Экземпляр CoroTask уничтожился, но что сталось с контекстом WokrerActor::readAB? Если бы мы из CoroTask::promise_type::final_suspend вернули бы suspend_never (точнее, вернули бы то, что на вопрос await_ready вернуло бы true), то в тот момент контекст корутины почистился бы. Но так как мы этого не сделали, то обязанность очищать контекст переносится на нас. Мы же очистим этот контекст в деструкторе CoroTask, на этот момент это уже безопасно.

7. Корутина readAB выполнена, результат из нее получен, контекст очищен, продолжается выполнение лямбды coroutine…

Уф, вроде разобрались. А помните, что из методов ABActor::getAAsync() и подобных мы возвращаем самописную структуру? На самом деле, метод getAAsync также можно превратить в корутину, объединив знания, полученные из реализации классов CoroTask и ActorAwaiter, и получив что-то вроде:

CoroTaskActor<int> ABActor::getAAsync(Actor &returnCallback) {
    co_return makeCoroCallback<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

но это я уже оставлю для самостоятельного разбора.

Выводы


Как видите, с помощью корутин можно довольно неплохо линеаризовать асинхронный код на коллбэках. Правда, процесс написания вспомогательных типов и функций пока не выглядит слишком интуитивным.

Весь код доступен в репозитории

Также рекомендую для более полного погружения в тему посмотреть эти лекции
.
Большое количество примеров на тему корутин от тогоже автора есть здесь.
И еще можно посмотреть эту лекцию
Теги:
Хабы:
Всего голосов 8: ↑5 и ↓3+5
Комментарии96

Публикации

Истории

Работа

Программист C++
109 вакансий
QT разработчик
4 вакансии

Ближайшие события

15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань