Pull to refresh

Современные возможности C++ и проверенные паттерны: активный объект, внешний полиморфизм и корутины

Reading time14 min
Views16K

Краткое содержание

В этой статье я покажу, как внешний полиморфизм (реализация полиморфного поведения вне объекта, для которого такое поведение требуется) помогает писать красивые и чистые программы, и расскажу о некоторых базовых и продвинутых способах реализации. Примером будет служить старый добрый паттерн многопоточного программирования “активный объект”. В конце я покажу, как просто реализовать этот паттерн с помощью корутин из стандарта C++20, и как вы можете использовать их, чтобы сделать активный объект еще лучше, добавив в него настоящие асинхронные функции.

Постановка задачи

В качестве примера я буду использовать сценарий из области телекома. Представьте, что у нас есть установленный VoIP вызов между пользовательским устройством и нашим сервером бизнес-логики. Пользователь находится в голосовом меню и слушает подсказки, которые воспроизводит для него сервер. Пользователь может нажимать цифровые кнопки для выбора опций в меню или может повесить трубку и завершить звонок. Для обработки этих действий мы должны получать и анализировать сообщения протокола SIP и запускать соответствующую бизнес-логику. Детали SIP-коммуникации выходят за рамки этого примера, мы сосредоточимся на бизнес-логике. Нужно иметь в виду, что мы должны анализировать сообщения как можно быстрее, но логика получения следующей подсказки и ее воспроизведения может быть медленной и не должна мешать обработке SIP-связи. Вот где нам пригодится активный объект.

Структура паттерна “активный объект”

Даже если читатель не знаком с вышеупомянутым паттерном, его довольно легко понять. Вот самая простая реализация:

class VoiceMenuHandler {
public:
    // Эти методы возвращаются сразу, отправляя задачу в очередь
    void receiveInput(const MenuInput& data);
    void receiveHangup(const HangUp& data);

private:

    std::string fetchMenuSectionPrompt(
        char digit, const std::string& callId);
    void playVoiceMenuPrompt(
        const std::string& callId, const std::string& prompt);
    // Этот метод выполняет обработку нажатия кнопки, вызывая
    // два метода выше.
    void processInput(const MenuInput& data);

    void cleanupCallData(const std::string& callId);
    // Этот метод выполняет обработку завершения вызова,
    // используя метод выше.
    void processHangup(const HangUp& data);

    // Поток, где будут выполняться задачи.
    Worker worker_;
};

Объект Worker объединяет рабочий поток и очередь задач, защищенную мьютексом. Для краткости я не буду приводить полную реализацию, но вы можете найти ее здесь при желании - GitHub.

Остаётся важный вопрос, что именно мы будем помещать в очередь задач? Как видите, у нашего VoiceMenuHandler есть две «задачи»: processInput и processHangup. У них разные сигнатуры, то есть, они имеют разные типы, соответственно, в один контейнер их не поместить. В классической реализации мы бы создали виртуальный базовый объект и унаследовали от него два объекта, по одному для каждой задачи. Это было бы довольно громоздко, и, конечно, в современном C++ мы можем сделать лучше.

Начиная с C++11 у нас есть обертка со стиранием типов, подходящая для любого вызываемого объекта — std::function. Это, кстати, пример внешнего полиморфизма, где полиморфным поведением является вызов функции или функтора. Используя её, мы могли бы написать что-то вроде этого:

// Объявление очереди внутри класса Worker
std::queue<std::function<void()>> queue;

// Публичный метод для добавления задач в очередь
void Worker::addTask(std::function<void()> task) {
    // Синхронизация не показана
    queue_.emplace(std::move(task));
}

// И затем в определении VoiceMenuHandler -
void VoiceMenuHandler::receiveInput(const MenuInput& data) {
    // Лямбда захватывает указатель на обработчик и аргументы,
    // и мы кладём её в очередь, обернув в std::function
    worker_.addTask( [this, data]() { processInput(data); } );
}

// То же самое в обработчике завершения звонка
void VoiceMenuHandler::receiveHangup(const HangUp& data) {
    // Сигнатура этой лямбды такая же, как у предыдущей,
    // поэтому она без проблем помещается в очередь.
    worker_.addTask([this, data]() { processHangup(data); });
}

Проблема решена, верно? Да, конечно, но std::function — наименее эффективный способ реализации решения. Для поддержки всех возможных типов и размеров вызываемых объектов std::function должен выполнять выделение памяти при создании, и это медленная операция. Измерения производительности я покажу чуть позже, а пока поверьте на слово, можно сделать в два-три раза быстрее, приложив совсем немного усилий.

Реализация внешнего полиморфизма

Итак, как написать свою собственную обертку со стиранием типа? Начнем с наивной реализации без него.

template<typename Handler, typename Data>
class SimpleTask {
public:
    SimpleTask(Handler* handler, Data data)
        : handler_{ handler }, data_{ data }
    {}

    void operator()() {
   	 handler_->process(data_);
    }

private:
    Handler* handler_;
    Data data_;
};

Это тривиальный шаблонный класс, который содержит указатель на экземпляр обработчика сообщений и аргументы. Если бы у нас был только один тип задач, этого было бы достаточно. Но у нашего VoiceMenuHandler есть две задачи, и инстанциация этого шаблона для каждой из них создаст два разных типа, которые мы не можем хранить в одной очереди. Так что нам действительно нужно стирание типа. Теоретические основы и варианты реализации этого паттерна отлично изложена в выступлениях Шона Парента (видео) и Сай Бренд (видео). Моя реализация основана на втором из них, а в первом есть замечательные примеры, какую пользу внешний полиморфизм может принести для структуры программы. Перейдём к коду.

// В первую очередь, посмотрите как красиво это выглядит. Так же просто,
// как std::function. Не используя наследования или какой-то специальной
// реализации мы получаем полиморфное поведение там, где нам нужно.
// А когда оно не нужно, оно ничего не нам стоит.

auto functorWrapper = TaskWrapper{ MyFunctorObject };
auto lambdaWrapper = TaskWrapper{ [](){ std::cout << "lambda!\n"; } };
std::queue<TaskWrapper> queue;
queue.emplace(std::move(functorWrapper));
queue.emplace(std::move(lambdaWrapper));
queue.front()(); // вызов MyFunctorObject.operator()
queue.pop();


// Реализация класса-обёртки

namespace _detail {

    // Определение самодельной виртуальной таблицы.
    // Внутри только несколько указателей на функции.
    struct vtable {
   	    // Главная часть логики, это будет вызов вложенного объекта.
   	    void (*run)(void* ptr);
   	    // Эти функции потребуются для корректного копирования,
   	    // перемещения и разрушения вложенного объекта.
   	    void (*destroy)(void* ptr);
   	    void (*clone)(void* storage, const void* ptr);
   	    void (*move_clone)(void* storage, void* ptr);
    };

    // Шаблонная переменная из C++17. Инстанциируем vtable, инициализируя
    // указатели лямбдами, которые деградируют до указателей на функции,
    // так как они не ничего не захватывают.
    template<typename Callable>
    constexpr vtable vtable_for{
   	    [](void* ptr) {
   		    // Внутри каждой лямбды мы восстанавливаем тип вложенного
   		    // объекта, используя информацию из параметров шаблона.
   		    static_cast<Callable*>(ptr)->operator()();
   	    },

   	    // Деструктор
   	    [](void* ptr) {
   		    std::destroy_at(static_cast<Callable*>(ptr));
   	    },
   	    // Конструктор копирования
   	    [](void* storage, const void* ptr) {
   		    new (storage) Callable {
   		    *static_cast<const Callable*>(ptr)};
   	    },
   	    // Конструктор перемещения
   	    [](void* storage, void* ptr) {
   		    new (storage) Callable {
   			    std::move(*static_cast<Callable*>(ptr))};
   	    }
    };

}; // namespace _detail

class TaskWrapper {
public:
    TaskWrapper() : vtable_{ nullptr }
    {}

    // Необходимо реализовать полный набор конструкторов
    // и операторов присваивания.
    TaskWrapper(const TaskWrapper& other) {
   	    other.vtable_->clone(&buf_, &other.buf_);
   	    vtable_ = other.vtable_;
    }

    TaskWrapper(TaskWrapper&& other) noexcept {
   	    other.vtable_->move_clone(&buf_, &other.buf_);
   	    vtable_ = other.vtable_;
    }

    ~TaskWrapper() {
   	    if (vtable_) {
   		    vtable_->destroy(&buf_);
   	    }
    }

    TaskWrapper& operator=(const TaskWrapper& other) {
   	    if (vtable_) {
   		    vtable_->destroy(&buf_);
   	    }
   	    if (other.vtable_) {
   		    other.vtable_->clone(&buf_, &other.buf_);
   	    }
   	    vtable_ = other.vtable_;
   	    return *this;
    }

    TaskWrapper& operator=(TaskWrapper&& other) noexcept {
   	    if (vtable_) {
   		    vtable_->destroy(&buf_);
   	    }
   	    if (other.vtable_) {
   		    other.vtable_->move_clone(&buf_, &other.buf_);
   	    }
   	    vtable_ = other.vtable_;
   	    return *this;
    }

    // Здесь происходит вся магия, мы создаём экземпляр виртуальной таблицы,
    // в которой сохраняется информация о типе, в то время как вложенный объект
    // с помощью размещающего new сохраняется в буфере в виде простого
    // набора байтов, это и есть “стирание типа”.
    // Фактически мы реализуем SBO (Small Buffer Optimization).
    // Вместо выделения памяти на куче, мы сохраняем данные на стеке.
    // Таким образом достигается выигрыш в производительности.
    template<typename Callable>
    TaskWrapper(Callable c)
   	    : vtable_{ &_detail::vtable_for<Callable> }
    {
   	    static_assert(sizeof(Callable) < sizeof(buf_),
   		    "Wrapper buffer is too small.");
   	    new(&buf_) Callable{ std::move(c) };
    }

    // Здесь мы и вызываем вложенный объект/функтор.
    void operator()() {
   	    if (vtable_) {
   		    vtable_->run(&buf_);
   	    }
    }

private:
    // Использование aligned_storage обеспечивает правильное выравнивание.
    std::aligned_storage_t<64> buf_;
    const _detail::vtable* vtable_;
};

У этой реализации есть одно ограничение, она поддерживает только одну конкретную сигнатуру вызываемого объекта — возвращаемый тип void и пустой список аргументов, так сделано для простоты. Немного более сложную параметризованную реализацию, где можно настроить сигнатуру вызова,  можно найти здесь - GitHub.

Может показаться, что требуется довольно большой объем вспомогательного кода. Однако его нужно написать только один раз, и использовать потом везде, где требуется. Реализация бизнес-логики в VoiceMenuHandler теперь довольно проста.

void VoiceMenuHandler::receiveInput(const MenuInput& data) {
    // Вместо синхронной обработки, мы помещаем задачу в очередь,
    // и возвращаем управление немедленно, не блокируя вызывающий код.
    worker_.addTask(TaskWrapper{[this, data]() { processInput(data); } });
}

void VoiceMenuHandler::receiveHangup(const HangUp& data) {
    worker_.addTask(TaskWrapper{[this, data]() { processHangup(data); } });
}

std::string VoiceMenuHandler::fetchMenuSectionPrompt(
    char digit,
    const std::string& callId
) {
    // Условное получение следующей голосовой подсказки.
    std::cout << "in call [" << callId << "] menu item '"
   	    << digit << "' selected.\n";
    return callId + "_prompt_" + digit;
}

void VoiceMenuHandler::playVoiceMenuPrompt(
    const std::string& callId,
    const std::string& prompt
) {
    // Условная команда на воспроизведение подсказки.
    std::cout << "play prompt [" << prompt << "]\n";
}

void VoiceMenuHandler::processInput(const MenuInput& data) {
    const auto prompt = fetchMenuSectionPrompt(data.digit, data.callId);
    playVoiceMenuPrompt(data.callId, prompt);
}

void VoiceMenuHandler::cleanupCallData(const std::string& callId) {
    // Освобождение ресурсов, использованных для обслуживания звонка.
    std::cout << "call [" << callId << "] ended.\n";
}

void VoiceMenuHandler::processHangup(const HangUp& data) {
    cleanupCallData(data.callId);
}

Примерно так выглядит клиентский код, тестовый в данном случае.

VoiceMenuHandler menuHandler;
std::thread sender([&menuHandler]() {
    menuHandler.receiveInput(MenuInput{ '2', "call_1@ip_addr" });
    menuHandler.receiveInput(MenuInput{ '1', "call_2@ip_addr" });
    menuHandler.receiveHangup(HangUp{ "call_1@ip_addr" });
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
sender.join();

Будет выведено следующее:

in the call [call_1@ip_addr] menu item '2' selected.
play prompt [call_1@ip_addr_prompt_2]
in the call [call_2@ip_addr] menu item '1' selected.
play prompt [call_2@ip_addr_prompt_1]
call [call_1@ip_addr] ended.

Стоит ли результат потраченных усилий?

Посмотрим, как покажет себя приведённая реализация, в сравнении с std::function. Я написал небольшой бенчмарк, в котором измерил время, необходимое для создания обёртки, помещения ее в очередь, извлечения и вызова сохраненного объекта. Вычисления, выполняемые вызываемым объектом, тривиальны, поэтому измеряются в основном дополнительные затраты на саму обёртку.

Как видите, наша первоначальная наивная реализация оставляет все остальные далеко позади. Так что, если вам достаточно простейшего решения, непременно используйте его. Наша обёртка добавляет некоторые накладные расходы для косвенных вызовов, но она по-прежнему работает в два раза быстрее, чем std::function, потому что она не производит динамических выделений памяти. В последнем столбце показана производительность корутин, которые будут рассмотрены далее.

Достаточно ли асинхронности в активном объекте?

Давайте посмотрим повнимательнее на этот метод.

void VoiceMenuHandler::processInput(const MenuInput& data) {
    const auto prompt = fetchMenuSectionPrompt(data.digit, data.callId);
    playVoiceMenuPrompt(data.callId, prompt);
}

fetchMenuSectionPrompt звучит как вызов удалённого сервиса, не так ли? Поскольку у нас есть только один рабочий поток, в случае, если этот вызов займет много времени, вся очередь застрянет. Было бы здорово сделать этот вызов асинхронным. Один из способов реализовать асинхронность — разделить этот метод на два. Один отправит запрос, и когда мы получим ответ, мы положим в очередь другую задачу, чтобы воспроизвести подсказку. Такой подход сработает, но бизнес-логика будет разбросана по двум разным функциям, что затруднит чтение кода. А если у нас не два, а пять или десять таких вызовов? Разделить логику на десять частей? До C++20 особого выбора не было, но теперь мы можем реализовать асинхронные вызовы с помощью корутин. На первый взгляд они могут показаться немного пугающими из-за огромного количества точек настройки, которые предоставляет стандарт. Но на самом деле реализовать корутину на удивление легко. Кстати, если вы хотите ознакомиться с теорией, есть хорошее выступление Андреаса Фертига (видео). Также документация по корутинам на cppreference.com действительно хороша.

Первый шаг: превращаем функцию в корутину

Чтобы функция стала корутиной, нужны две вещи: использовать одно из ключевых слов co_await, co_yield или co_return и вернуть дескриптор корутины (или, в случае компилятора Microsoft, объект-обёртку, содержащий дескриптор). Вот так выглядит тип, который должны вернуть функция.

struct CoroutineTask {
    // Необходимый элемент для работы корутины - тип с именем 'promise_type'.
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    struct promise_type {
   	    CoroutineTask get_return_object() {
   		    return { handle_type::from_promise(*this) };
   	    }
   	    // Мы хотим, чтобы корутина была приостановлена при создании,
   	    // мы возобновим её позже в контексте рабочего потока.
   	    std::suspend_always initial_suspend() noexcept { return {}; }
   	    // Когда выполнение корутины дойдёт до co_return, не требуется
        // её приостанавливать.
   	    std::suspend_never final_suspend() noexcept { return {}; }
   	    // Наша корутина не будет ничего возвращать, поэтому promise_type
   	    // определяет метод 'return_void', иначе необходимо было бы 
        // определить метод 'return_value'.
   	    void return_void() {}
   	    void unhandled_exception() {}
    };

    // Этот дескриптор будет использован для возобновления корутины.
    handle_type h_;
};

Все подробности о жизненном цикле объектов этого типа можно найти на cppreference.com.

Как только у нас есть нужный тип, пара модификаций превратит наш метод в корутину. Обратите внимание на тип возвращаемого значения и ключевое слово co_return.

CoroutineTask VoiceMenuHandlerCoroutines::processInput(const MenuInput data) {
    const auto prompt = fetchMenuSectionPrompt(data.digit, data.callId);
    playVoiceMenuPrompt(data.callId, prompt);
    co_return;
    // После этой строки корутина будет разрушена.
}

// Публичный интерфейс VoiceMenuHandler тоже нужно чуть-чуть поменять.

void VoiceMenuHandlerCoroutines::receiveInput(const MenuInput& data) {
    // Поскольку CoroutineTask::promise_type реализует метод 'initial_suspend'
    // так, что он возвращает 'std::suspend_always', корутина будет
    // приостановлена после создания, и её тело выполнится только
    // после явного возобновления. Это случится в рабочем потоке.
    worker_.addTask( processInput(data).h_ );
}

void VoiceMenuHandlerCoroutines::receiveHangup(const HangUp& data) {
    // Используем одну очередь и поток для выполнения обычных задач
    // вместе с корутинами.
    worker_.addTask(TaskWrapper{ [this, data]() { processHangup(data); } });
}

Хочу обратить ваше внимание, что второй метод, receiveHangup, остался без изменений, и мы кладём в очередь старые задачи вместе с новыми корутинами с помощью той же обёртки, которая ничего не знает о типе coroutine_handle. И этот тип тоже ничего не знает об обёртке, и тем не менее они без проблем работают вместе. Это и есть преимущество внешнего полиморфизма. Полную реализацию нового VoiceMenuHandler можно найти здесь - GitHub.

Второй шаг: реализуем асинхронную операцию

Сама по себе переделка в корутину не решает нашу проблему. Нам нужно вставить ключевое слово co_await перед вызовом fetchMenuSectionPrompt. Но для того, чтобы это заработало, fetchMenuSectionPrompt необходимо сначала подправить, чтобы он возвращал “ожидающий объект” (awaiter object), который выглядит следующим образом:

// Для совместимости с co_await функция должна вернуть
// awaiter, который реализует три специальных метода.
struct AwaitablePrompt {
    std::string callId;
    char digit;
    // Внешний объект, выполняющий ввод-вывод
    PromptFetcher& fetcher_;
    std::string prompt_;

    // Вот эти специальные методы
    bool await_ready();
    void await_suspend(std::coroutine_handle<> h);
    std::string await_resume();
};

// Этот метод будет вызван первым, когда компилятор встретит выражение с co_await.
bool AwaitablePrompt::await_ready() {
    // Если бы PromptFetcher имел кэш, можно было бы проверить его здесь и
    // избежать приостановки корутины, вернув 'true'.
    return false;
}

// Этот метод вызывается вторым, если await_ready вернл false.
void VoiceMenuHandlerAsync::AwaitablePrompt::await_suspend(
    std::coroutine_handle<> h
) {
    // Когда этот метод вызван, корутина уже приостановлена, поэтому мы можем
    // безопасно передавать её дескриптор наружу (лямбда захватывает его).
    fetcher_.fetch(
        callId,
   	    digit,
   	    [this, h](
            const std::string& prompt,
   	        PromptFetcher::worker_type& worker
   	    ) {
   		    // co_await возобновит выполнение прямо перед вызовом 'await_resume',
   		    // так что присваивать значение prompt_ здесь безопасно, так как
   		    // жизнь ожидающего объекта длится до возврата 'await_resume'.
   		    prompt_ = prompt;
            // Поток для выполнения корутины будет передан снаружи.
   		    // Мы также могли бы отдать дескриптор корутины наружу,
   		    // передавая обязанность возобновить её.
   		    worker.addTask(h);
   		    // С этого места ожидающий объект может быть разрушен в любой момент,
   		    // и мы больше не должны использовать указатель this. То есть,
            // присваивать prompt_ здесь было бы неопределённым поведением
   	    }
    );
}

std::string AwaitablePrompt::await_resume() {
    // Возвращаемое значение станет результатом всего выражения co_await.
    return prompt_;
}

Вот так нужно будет поменять метод fetchMenuSectionPrompt, обратите внимание на его возвращаемое значение.

AwaitablePrompt VoiceMenuHandlerAsync::fetchMenuSectionPrompt(
    char digit,
    const std::string& callId
) {
    std::cout << "!Coroutine! - in call [" << callId
   	 << "] menu item '" << digit << "' selected.\n";
    return AwaitablePrompt{ callId, digit, fetcher_ };
}

Если этот короткий кусок не даёт полного представления о происходящем, смотрите весь код тут - GitHub.

Наконец, мы можем сделать проблемный метод по-настоящему асинхронным.

CoroutineTask VoiceMenuHandlerAsync::processInput(const MenuInput data) {
    // co_await будет преобразован компилятором примерно в такой код:
    // if(!AwaitablePrompt::await_ready()) {
    //   AwaitablePrompt::await_suspend(current_coro_handle);
    //   // в зависимости от возвращаемого значения await_suspend
    //   // продолжение будет различаться.
    // }
    // AwaitablePrompt::await_resume()
    // то, что вернёт await_resume станет результатом co_await
    const auto prompt = co_await fetchMenuSectionPrompt(data.digit, data.callId);
    playVoiceMenuPrompt(data.callId, prompt);
    co_return;
    // После этой точки корутина будет разрушена.
}

Итак, теперь, как только будет вызвана fetchMenuSectionPrompt, метод processInput будет приостановлен, и рабочий поток возьмёт на выполнение следующий элемент из очереди. А когда мы получим ответ, мы можем запланировать возобновление нашей корутины processInput в любом потоке! В моей реализации fetcher делает это, просто чтобы продемонстрировать такую возможность. См. код - GitHub.

Работа с асинхронной версией:

PromptFetcher fetcher;
VoiceMenuHandlerAsync menuHandlerAsync{ fetcher };
std::thread senderToAsync([&menuHandlerAsync, &fetcher]() {
    menuHandlerAsync.receiveInput(MenuInput{ '7', "call_async_9@ip_addr" });
    menuHandlerAsync.receiveInput(MenuInput{ '8', "call_async_8@ip_addr" });
    // "Получение" ответа и продолжение корутины в потоке fetcher-а.
    fetcher.processResponse("call_async_8@ip_addr", "prompt_AAA");
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    // "Получение" ответа и продолжение корутины в потоке fetcher-а.
    fetcher.processResponse("call_async_9@ip_addr", "prompt_BBB");
    menuHandlerAsync.receiveHangup(HangUp{ "call_async_8@ip_addr" });
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
senderToAsync.join();

Будет выведено следующее:

!Coroutine! - in the call [call_async_9@ip_addr] menu item '7' selected.
fetch request sent callId [call_async_9@ip_addr], input = 7.
received response for [call_async_9@ip_addr]
play prompt [prompt_BBB]
!Coroutine! - in the call [call_async_8@ip_addr] menu item '8' selected.
fetch request sent callId [call_async_8@ip_addr], input = 8.

Мы видим, что оба запроса были отправлены до того, как мы получили какие-либо ответы, и что ответ на второй запрос пришел первым, но все сработало! Таким образом, корутины из C++20 предоставляют очень мощный функционал, и они по-прежнему быстрее, чем std::function :)

Итак, это был большой объём информации для усвоения. Спасибо за внимание, и я надеюсь, что вам это будет полезным. Весь код из этой статьи можно найти здесь - GitHub.

Tags:
Hubs:
Total votes 11: ↑10 and ↓1+11
Comments22

Articles