Pull to refresh

Transparent coroutines

Reading time 20 min
Views 2.7K

Когда мне становится грустно, я пишу никому не нужные библиотеки…
Ссылка на исходный код:



В интернете полно статей про сопрограммы (coroutines) и хабр эта тема не обошла стороной. Вот например, замечательные статьи: Использование Boost.Asio с Coroutines TS, Основы Userver — фреймворка для написания асинхронных микросервисов, но все это становится бесполезно, когда в сопрограмме вам необходимо вызвать функцию из библиотеки, которая делает блокирующий ввод/вывод.


Встречайте еще одну никому ненужную библиотеку: yurco — библиотека, которая поможет встроить сопрограммы прозрачно для стороннего кода. Данная статья написана с расчетом на то, что читатель знает, что такое сопрограммы (coroutine) и хотя бы примерно представляет как они реализованы.


Для начала рассмотрим фрагмент кода из примера к библиотеке:


void process_connection(unistd::fd& fd)
    {
    try
        {
        char buf[100];
        unistd::read(fd, buf, sizeof(buf));
        /* Create a connection */
        std::unique_ptr<MYSQL, std::function<decltype(mysql_close)>> con(mysql_init(nullptr), mysql_close);
        mysql_real_connect(con.get(), "db.local", "ro", "", nullptr, 0, nullptr, 0);
        mysql_query(con.get(), "SELECT NOW(), SLEEP(10);");
        std::unique_ptr<MYSQL_RES, std::function<decltype(mysql_free_result)>> result(mysql_store_result(con.get()), mysql_free_result);
        if (!result)
            return; // silently close connection
        for (MYSQL_ROW row = mysql_fetch_row(result.get()); row; row = mysql_fetch_row(result.get()))
            {
            static char header[] = "HTTP/1.1 200 OK\r\nContent-Length: 21\r\nConnection: close\r\n\r\n";
            unistd::write_all(fd, header, strlen(header));
            const char* const answer = row[0];
            unistd::write_all(fd, answer, strlen(answer));
            unistd::write_all(fd, "\r\n", 2);
            }
        }
...

немного пояснений к коду

Исходный код примера https://github.com/yurial/yurco-examples/blob/master/042_mysql.cpp


Q: Что за unistd:*?
A: Это другая никому не нужная библиотека (https://github.com/yurial/unistd). Она реализует простые обертки над системным функциями. Эти обертки проверяют код возврата и в случае ошибки кидают std::system_error. Никакой магии тут нет, просто код становится чуть лаконичнее.


Q: А unistd::fd?
A: Простенький класс, который делает ::close() в деструкторе и ::dup() при копировании. Тоже никакой магии.


Q: А для чего SLEEP(10) в SQL запросе?
A: Это сделано специально, программа работающая с блокирующим вводом/выводом подвиснет здесь на 10 секунд и не будет обрабатывать другие запросы.


Q: А почему код так неуклюже работает с mysql, HTTP, etc?
A: Правильность кода в данном примере не важна, это увеличит объем и затруднит понимание главного:


  • используется библиотека с блокирующим вводом/выводом от сторонних разработчиков;
  • с виду выглядит, как будто код синхронный.

Может показаться, что данная функция работает синхронно, делая блокирующий ввод/вывод (как минимум при выполнении SQL запроса, ведь libmysqlclient ничего не знает про сопрограммы), но на самом деле все не так. Благодаря магии и какой-то там матери этот код выполняется в сопрограмме, бережно прерываясь на операциях ввода/вывода.




А теперь взглянем как это все устроено изнутри. Обзор будет идти от простого к сложному.


1. class Coroutine


Стоит отметить, что для нашей задачи нужны именно stackfull сопрограммы, т.к. мы хотим прерываться в момент выполнения сторонних библиотек. Существует 2 популярные реализации сопрограмм для C/C++ под linux: boost.coroutine и ::swapcontext. Для реализации идеи не принципиально какая библиотека для сопрограмм будет использоваться, в нашем случае выбор пал на ::swapcontext по религиозным соображениям.


А если еще точнее...

А если еще точнее, то функция ::swapcontext была переписана — из нее был убран вызов rt_sigprocmask. Это позволило избавиться от избыточных системных вызовов и немного ускорить переключение контекста, в остальном код остался неизменным.


class Coroutine
{
public:
        Coroutine(const Coroutine&) = delete;
template <class Func, class... Args>
        Coroutine(Stack&& stack, Func&& func, Args&&... args) noexcept;
bool    is_completed() const noexcept;
bool    is_running() const noexcept;
void    yield();
void    operator() ();
void    operator() (const std::nothrow_t&) noexcept;
void    set_exception(const std::exception_ptr& e) noexcept;
const std::exception_ptr& get_exception() const noexcept;
void    rethrow();
Stack&& take_away_stack() noexcept;
};

Конструктор класса в качестве входных аргументов получает экземпляр стека, точку входа — func и аргументы для вызова точки входа. Точка входа связывается с параметрами с помощью std::bind, стоит отметить, что точка входа может принимать необязательный дополнительный аргумент — Coroutine&, в этом случае при вызове будет дополнительно передана ссылка на текущий экземпляр класса.


А что за Stack?

Простенький класс занимающийся выделением памяти под стек сопрограммы.


class Stack
{
public:
          Stack(const size_t size, const bool protect=true);
          Stack(const Stack&) = delete;
          Stack(Stack&&) = default;

char*   data() const noexcept;
size_t  size() const noexcept;
};

Аргумент конструктора size — указывает размер выделяемой памяти в байтах. Он должен быть кратен размеру страницы памяти (sysconf(_SC_PAGE_SIZE)). Аргумент protect выделяет дополнительно 2 страницы (одна в начале диапазона, вторая — в конце) для которых устанавливается режим доступа PROT_NONE. Таким образом реализуется простая защита от переполнения стека.


Оператор void operator() (); и его noexcept версия используется для переключения в контекст сопрограммы, а метод void yield(); — для переключения обратно.


Давайте напишем простой пример:


создание проекта, подключение сабмодулей, CMakeList.txt
$ mkdir yurco-examples && cd yurco-examples
$ git init
$ git submodule add https://github.com/yurial/yurco
$ git submodule add https://github.com/yurial/unistd
$ git submodule update --init --remote
$ cat CMakeList.txt
project(yurco-examples)
cmake_minimum_required(VERSION 3.10)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
if ("${CMAKE_BUILD_TYPE}" STREQUAL "Release" AND NOT YURCO_TRANSPARENCY)
    set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)
endif()
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-inline")
option(USE_SINGLE_THREAD "Disable multithread locking")
if (USE_SINGLE_THREAD)
    add_definitions(-DSINGLE_THREAD)
endif()
if (YURCO_TRANSPARENCY)
    add_definitions(-DYURCO_TRANSPARENCY)
    set(CMAKE_EXE_LINKER_FLAGS "-Wl,-wrap=read,-wrap=write,-wrap=close,-wrap=connect,-wrap=accept4,-wrap=recv,-wrap=recvfrom,-wrap=recvmsg,-wrap=send,-wrap=sendto,-wrap=sendmsg ${CMAKE_EXE_LINKER_FLAGS}")
endif()

include_directories(.)
add_subdirectory(unistd)
add_subdirectory(yurco)

add_executable(011_basic "011_basic.cpp")
target_link_libraries(011_basic unistd yurco)

$ mkdir build && cd build
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DUSE_SINGLE_THREAD=1 -DYURCO_TRANSPARENCY=0
$ make

$ cat 011_basic.cpp
#include <yurco/all.hpp>
#include <iostream>
#include <stdlib.h>

void entry(yurco::Coroutine& self)
    {
    std::cout << "At 3" << std::endl;
    self.yield();
    std::cout << "At 5" << std::endl;
    }

int main()
    {
    std::cout << "At 1" << std::endl;
    yurco::Coroutine coro(yurco::Stack(16*1024), entry);
    std::cout << "At 2" << std::endl;
    coro();
    std::cout << "At 4" << std::endl;
    coro();
    std::cout << "At 6" << std::endl;
    return EXIT_SUCCESS;
    }

$ ./011_basic
At 1
At 2
At 3
At 4
At 5
At 6
По этому примеру легко отследить в каких местах происходит переключение контекста.
А теперь давайте измерим время необходимое на переключение контекста.:


$ cat 012_time.cpp
#include <yurco/all.hpp>
#include <stdlib.h>

void entry(yurco::Coroutine& self)
    {
    for (;;)
        self.yield();
    }

int main()
    {
    yurco::Coroutine coro(yurco::Stack(16*1024), entry);
    for (uint32_t i = 0; i < 5000000; ++i)
        coro();
    return EXIT_SUCCESS;
    }

$ time ./012_time
real 0m0.921s
user 0m0.920s
sys 0m0.000s


На каждую итерацию цикла у нас имеется 2 переключения: один — в контекст сопрограммы, второй — обратно в контекст main(). Цикл имеет 5 000 000 итераций, а значит программа делает 10 000 000 переключений контекста. На все это мы тратим чуть менее секунды, а значит на одно переключение мы тратим порядка 0.092us. Результат не является запредельным, но если в коде сопрограммы вы выполняете что-то более существенное чем ++i, то производительности хватит с запасом.


Следующая группа методов: set_exception, get_exception, rethrow — позволяют пробросить исключение как в сопрограмму, так и из нее.


$ cat 013_exceptions.cpp
#include <yurco/all.hpp>
#include <iostream>
#include <stdlib.h>

void entry(yurco::Coroutine& self)
    {
    std::cout << "Coroutine started" << std::endl;
    throw std::runtime_error("My Exception ;)");
    }

int main()
    {
    yurco::Coroutine coro(yurco::Stack(16*1024), entry);
    try
        {
        coro(std::nothrow); // noexcept version should ignore any returned exception
        std::cout << "1: No exception" << std::endl;
        }
    catch (const std::exception& e)
        {
        std::cout << "1: Exception: " << e.what() << std::endl;
        }

    try
        {
        coro.rethrow();
        std::cout << "2: No exception" << std::endl;
        }
    catch (const std::exception& e)
        {
        std::cout << "2: Exception: " << e.what() << std::endl;
        }

    try
        {
        coro();
        std::cout << "3: No exception" << std::endl;
        }
    catch (const std::exception& e)
        {
        std::cout << "3: Exception: " << e.what() << std::endl;
        }
    return EXIT_SUCCESS;
    }

$ ./013_exceptions
Coroutine started
1: No exception
2: Exception: My Exception ;)
Coroutine started
3: Exception: My Exception ;)


Как видно, мы имеем возможность получить исключение из сопрограммы мгновенно, а можем отложенно.
Этот же механизм можно использовать для прерывания работы сопрограммы:


$ cat 014_terminate.cpp
#include <yurco/all.hpp>
#include <iostream>
#include <stdlib.h>

void entry(yurco::Coroutine& self)
    {
    try
        {
        for (;;)
            self.yield();
        }
    catch (const yurco::terminate_exception&)
        {
        std::cout << "Coroutine should be terminated" << std::endl;
        }
    }

int main()
    {
    yurco::Coroutine coro(yurco::Stack(16*1024), entry);
    coro();
    try
        {
        coro.set_exception(std::make_exception_ptr(yurco::terminate_exception()));
        coro();
        std::cout << "No exception" << std::endl;
        }
    catch (const yurco::terminate_exception&)
        {
        std::cout << "Terminate exception was returned" << std::endl;
        }
    return EXIT_SUCCESS;
    }

Почему исключение не возвращается обратно

Для возбуждения исключения в сопрограмме, метод yield() вызывает rethrow(), который в свою очередь очищает сохраненное исключение.


$ ./014_terminate
Coroutine should be terminated
No exception returned from coroutine


Медот take_away_stack() позволяет забрать стек у экземпляра класса. Применяется для переиспользования стека, обычно нет необходимости вызывать этот метод вручную.


2. class SimpleScheduler


Как и в большинстве многозадачных систем, необходимо планировать выполнения задач. В библиотеке для этого реализован класс SimpleScheduler (более продвинутые планировщики с приоритетом задач, возможно, появятся позже). Класс SimpleScheduler имеет 3 очереди для каждого из состояний сопрограммы: ready, suspended, executing. Сопрограммы перемещаются между очередями благодаря соответствующим методам.


class SimpleScheduler
{
public:
        SimpleScheduler(std::atomic<bool>& terminate, const size_t stack_size, const bool protect_stack) noexcept;
template <class Func, class... Args>
void    coroutine(Func&& func, Args&&... args) noexcept;

bool    try_execute_one() noexcept;
void    resume_all() noexcept;
void    resume(Coroutine& coro) noexcept;
void    resume_many(std::vector<Coroutine*>& coros) noexcept;
void    suspend(Coroutine& coro) noexcept;

Метод suspend(Coroutine& coro) помечает сопрограмму как желающую приостановить свое выполнение. Важно понимать, что этот метод не останавливает выполнение сопрограммы сам, лишь когда сопрограмма возвращает управление, экземпляр ее класса перемещается в очередь suspended.
Методы resume*() позволяют переместить сопрограмму(ы) из очереди suspended в очередь ready (в реакторе этот метод вызывается при событиях на связанном сокете).
Как вы, наверное, уже догадались try_execute_one() выполняет сопрограмму из очереди ready. На время выполнения, сопрограмма перемещается в очередь executing, а по возвращении управления перемещается обратно в ready или в suspnded, если в процессе ее выполнения был вызван соответствующий метод.
Все это было бы бесполезно, если бы мы не имели возможность создать новую сопрограмму. В этом нам поможет метод coroutine(Func&& func, Args&&... args). Внимательный читатель заметит, что этот метод не требует передачи стека, за стек отвечает сам планировщик — внутри него есть пул стеков.
Размеры создаваемых стеков задаются в конструкторе: параметры stack_size и protect_stack полностью соответствуют параметрам конструктора класса Stack. А вот параметр std::atomic<bool>& terminate — особый: когда значение становится true, для всех запускаемых сопрограмм, автоматичски устанавливается исключение yurco::terminate_exception.


$ cat 021_scheduler.cpp
#include <yurco/all.hpp>
#include <iostream>
#include <stdlib.h>

void entry(yurco::Coroutine& self, yurco::SimpleScheduler& scheduler, int my_id)
    {
    for (uint32_t i=0; i < 2; ++i)
        {
        std::cout << "Coroutine " << my_id << std::endl;
        self.yield();
        }
    scheduler.suspend(self);
    for (;;)
        self.yield();
    }

int main()
    {
    std::atomic<bool> terminate_flag = false;
    yurco::SimpleScheduler scheduler(terminate_flag, 16*1024, true);
    scheduler.coroutine(entry, std::ref(scheduler), 1);
    scheduler.coroutine(entry, std::ref(scheduler), 2);
    while (scheduler.try_execute_one())
        ;
    terminate_flag = true;
    scheduler.resume_all();
    while (scheduler.try_execute_one())
        ;
    std::cout << "Coroutines was terminated via yurco::terminate_exception, cause terminate_flag set to true" << std::endl;
    return EXIT_SUCCESS;
    }

$ ./021_scheduler
Coroutine 1
Coroutine 2
Coroutine 1
Coroutine 2
Coroutines was terminated via yurco::terminate_exception, cause terminate_flag set to true


Теперь у нас есть возможность выполнять множество сопрограмм и кое-какая управляемость.
Самое время перейти к "асинхронному" вводу/выводу (на самом деле, конечно не к асинхронному, а к не блокируемому вводу/выводу с мультиплексированием).


3. class Reactor


Реактор используется для выполнения готовых (ready) сопрограмм, ожидания события на файловых дескрипторах и пробуждения приостановленных сопрограмм (suspended).


class Reactor
{
public:
        Reactor(const size_t stack_size, const bool protect_stack=true) noexcept;
        Reactor(const Reactor&) = delete;

template <class Func, class... Args>
void    coroutine(Func&& func, Args&&... args) noexcept;
template <class Func, class... Args>
void    async(Func&& func, Args&&... args) noexcept;
void    run(const size_t batch_size=16, const size_t events_at_once=1024) noexcept;
void    terminate() noexcept;
void    suspend(Coroutine& coro, const int fd, int events);
void    close(const int fd);
int     close(const std::nothrow_t&, const int fd) noexcept;
};

Параметры конструктора stack_size и protect_stack вам уже знакомы, они соответствуют параметрам класса SimpleScheduler, который мы рассматривали ранее.


Метод coroutine(...) создает сопрограмму путем вызова соотвествующего метода у SimpleSchduler, а метод async является алиасом.


Метод run(const size_t batch_size=16, const size_t events_at_once=1024) запускает цикл выполнения сопрограмм и ожидания событий на файловых дескрипторах через ::epoll(). Прервать цикл можно методом terminate() — сопрограммы получат исключение yurco::terminate_exception и после их завершения, метод run() возвратит управление.


Метод close() и его noexcept версия закроют файловый дескриптор через ::close() и почистят внутренние структуры связанные с ним.


Метод suspend(Coroutine& coro, const int fd, int events) тут самый главный, именно он позволяет прервать выполнение сопрограммы пока не наступит соответствующее событие на файловом дескрипторе.


Самое время написать простой пример программы, которая завершается при получении сигналов SIGTERM, SIGQUIT, SIGINT.


$ cat 031_signals.cpp
#include <yurco/all.hpp>
#include <unistd/signalfd.hpp>

#include <iostream>
#include <signal.h>
#include <stdlib.h>

void signal_handler(yurco::Coroutine& coro, yurco::Reactor& reactor, unistd::fd& sigfd)
    {
    reactor.suspend(coro, sigfd, EPOLLIN);
    std::cerr << "we got a signal" << std::endl;
    reactor.terminate();
    }

void register_signal_handler(yurco::Reactor& reactor)
    {
    sigset_t sigmask;
    sigemptyset(&sigmask);
    sigaddset(&sigmask, SIGINT);
    sigaddset(&sigmask, SIGTERM);
    sigaddset(&sigmask, SIGQUIT);
    unistd::fd sigfd = unistd::fd::nodup(unistd::signalfd(sigmask));
    reactor.async(signal_handler, std::ref(reactor), std::move(sigfd)); // use std::move to avoid ::dup() file descriptior
    sigaddset(&sigmask, SIGPIPE); // SIGPIPE just ignored and not processed
    sigprocmask(SIG_BLOCK, &sigmask, nullptr);
    }

int main()
    {
    const size_t stack_size = 16*1024; // size less than 16k lead to SIGSEGV cause libunwind require more space
    yurco::Reactor reactor(stack_size);
    register_signal_handler(reactor);
    reactor.run();
    return EXIT_SUCCESS;
    }

$ ./031_signals
^Cwe got a signal


По аналогии можно написать функции для "асинхронного" ввода-вывода, выглядеть это будет примерно так:


yurco::read()
size_t read(const std::nothrow_t&, Reactor& reactor, Coroutine& coro, const int fd, void* const buf, const size_t count)
    {
    for (;;)
        {
        const ssize_t nread = ::read(fd, buf, count);
        if (-1 == nread && errno == EAGAIN)
            {
            reactor.suspend(coro, fd, EPOLLIN);
            continue;
            }
        return nread;
        }
    }

Аналогично реализуются и другие функции ввода/вывода. Их исходный код можно посмотреть в https://github.com/yurial/yurco/blob/master/operations.cpp


Теперь мы можем реализовать простой echo-server:


$ cat 032_echo.cpp
#include <yurco/all.hpp>
#include <unistd/signalfd.hpp>
#include <unistd/addrinfo.hpp>
#include <unistd/netdb.hpp>
#include <iostream>
#include <signal.h>
#include <stdlib.h>

void process_connection(yurco::Coroutine& coro, yurco::Reactor& reactor, int& clientfd)
    {
    (void)coro;
    char buf[1024];
    try
        {
        const size_t nread = yurco::read(reactor, coro, clientfd, buf, sizeof(buf));
        if (0 == nread)
            return; // terminate coroutine, close connection
        for (size_t nwrite = 0; nwrite < nread;)
            nwrite += yurco::write(reactor, coro, clientfd, buf+nwrite, nread-nwrite);
        }
    catch (const yurco::terminate_exception&)
        {
        std::cerr << "terminate connection coroutine" << std::endl;
        }
    catch (...)
        {
        std::cerr << "got unknown exception while read/write" << std::endl;
        }
    reactor.close(std::nothrow, clientfd);
    }

void listener(yurco::Coroutine& coro, yurco::Reactor& reactor, int serverfd)
    {
    try
        {
        for (;;)
            {
            for (size_t i = 0; i < 32; ++i) // sometimes we should yield() to process accepted connections
                {
                int clientfd = yurco::accept(reactor, coro, serverfd, nullptr, nullptr, SOCK_NONBLOCK);
                reactor.async(process_connection, std::ref(reactor), clientfd);
                }
            coro.yield();
            }
        }
    catch (const yurco::terminate_exception&)
        {
        std::cerr << "terminate listener coroutine" << std::endl;
        }
    catch (...)
        {
        std::cerr << "unknwon exception while accept" << std::endl;
        }
    reactor.close(std::nothrow, serverfd);
    }

void signal_handler(yurco::Coroutine& coro, yurco::Reactor& reactor, int sigfd)
    {
    reactor.suspend(coro, sigfd, EPOLLIN);
    std::cerr << "we got a signal" << std::endl;
    // use unistd::read(sigfd) to get a signals
    reactor.terminate();
    reactor.close(std::nothrow, sigfd);
    }

void register_signal_handler(yurco::Reactor& reactor)
    {
    sigset_t sigmask;
    sigemptyset(&sigmask);
    sigaddset(&sigmask, SIGINT);
    sigaddset(&sigmask, SIGTERM);
    sigaddset(&sigmask, SIGQUIT);
    int sigfd = unistd::signalfd(sigmask);
    reactor.async(signal_handler, std::ref(reactor), sigfd);
    sigaddset(&sigmask, SIGPIPE); // SIGPIPE just ignored and not processed
    sigprocmask(SIG_BLOCK, &sigmask, nullptr);
    }

void register_listener(yurco::Reactor& reactor)
    {
    const std::vector<unistd::addrinfo> addr = unistd::getaddrinfo("localhost:31337"); // or [::]:31337 or other valid variants
    int serverfd = unistd::socket(addr.at(0), SOCK_NONBLOCK);
    unistd::setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, 1);
    unistd::bind(serverfd, addr.at(0));
    unistd::listen(serverfd, 8192/*backlog*/);
    reactor.async(listener, std::ref(reactor), serverfd);
    }

int main()
    {
    const size_t stack_size = 16*1024; // size less than 16k lead to SIGSEGV cause libunwind require more space
    yurco::Reactor reactor(stack_size);
    register_listener(reactor);
    register_signal_handler(reactor);
    reactor.run();
    return EXIT_SUCCESS;
    }

console 1: $ ./032_echo
console 2: $ echo qweasd | nc -6 ::1 31337
console 2: qweasd
console 1: ^Cwe got a signal
console 1: terminate listener coroutine


Все работает! Пришло время грязных хаков магии!


4. Магия


Современные системы позволяют делать подмену функций как для динамически подключаемых библиотек, так и для статической линковке. Для подмены функций при статической линковке, линкеру передается дополнительный аргумент -wrap=<func> позволяющий подменить функцию <func> на __wrap_<func>, при этом оригинальная функция будет доступна по имени __real_<func>. Важным условием является совпадение прототипов функций, и чтобы подменить системный ::read() на yurco::read() нам необходимо написать еще одну обертку, которая будет знать о экземплярах классов Reactor и Coroutine.


Где взять текущий Reactor и Coroutine? Библиотека pthread дает нам замечательную возможность сохранить указатель на произвольный объект, по ключу, в специальной памяти потока с помощью функции ::pthread_setspecific(), и получить позже с помощью ::pthread_getspecific(). С помощью этих функций реализуем get_reactor()/set_reactor(), get_coroutine()/set_coroutine().


__inline__  void        set_reactor(Reactor& reactor) noexcept { pthread_setspecific(reactor_key, &reactor); }
__inline__  void        set_coroutine(Coroutine& coro) noexcept { pthread_setspecific(coro_key, &coro); }
__inline__  Reactor&    get_reactor() noexcept { return *reinterpret_cast<Reactor*>(pthread_getspecific(reactor_key)); }
__inline__  Coroutine&  get_coroutine() noexcept { return *reinterpret_cast<Coroutine*>(pthread_getspecific(coro_key)); }

Вызовы yurco::set_reactor() и yurco::set_coroutine() уже встроены во внутренности классов библиотеки yurco, чтобы все заработало, необходимо лишь вызвать yurco::init() до момента использования. Для большего удобства, вызов yurco::init() так же создаст один экземпляр класса Reactor, теперь можно написать методы для работы с ним:


            void        run(const size_t batch_size=16, const size_t events_at_once=1024) noexcept;
__inline__  void        terminate() noexcept;
template <class Func, class... Args>
__inline__  void        async(Func&& func, Args&&... args) noexcept;
__inline__  void        suspend(const int fd, int events);

и еще один метод для текущей сопрограммы:


__inline__  void        yield();

Напишем ::__wrap_read() для подмены ::read()


ssize_t __wrap_read(int fd, void* buf, size_t count) { return yurco::read(std::nothrow, yurco::get_reactor(), yurco::get_coroutine(), fd, buf, count); }

и по аналогии, напишем обертки для других системных функций https://github.com/yurial/yurco/blob/master/transparency.cpp


Теперь можно существенно упростить наш echo-server.
Для включения подмены системных вызовов необходимо собрать проект с директивой YURCO_TRANSPARENCY: build$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DUSE_SINGLE_THREAD=1 -DYURCO_TRANSPARENCY=1


$ cat 041_echo.cpp
#include <yurco/all.hpp>
#include <unistd/signalfd.hpp>
#include <unistd/addrinfo.hpp>
#include <unistd/netdb.hpp>
#include <iostream>
#include <signal.h>
#include <stdlib.h>

void process_connection(unistd::fd& clientfd)
    {
    char buf[1024];
    try
        {
        const size_t nread = unistd::read(clientfd, buf, sizeof(buf));
        if (0 == nread)
            return; // terminate coroutine, close connection
        unistd::write_all(clientfd, buf, nread);
        }
    catch (const yurco::terminate_exception&)
        {
        std::cerr << "terminate connection coroutine" << std::endl;
        }
    catch (...)
        {
        std::cerr << "got unknown exception while read/write" << std::endl;
        }
    }

void listener(unistd::fd& serverfd)
    {
    try
        {
        for (;;)
            {
            for (size_t i = 0; i < 32; ++i) // sometimes we should yield() to process accepted connections
                {
                unistd::fd clientfd = unistd::fd::nodup(unistd::accept(serverfd, SOCK_NONBLOCK));
                yurco::async(process_connection, std::move(clientfd));
                }
        yurco::yield();
            }
        }
    catch (const yurco::terminate_exception&)
        {
        std::cerr << "terminate listener coroutine" << std::endl;
        }
    catch (...)
        {
        std::cerr << "unknwon exception while accept" << std::endl;
        }
    }

void signal_handler(unistd::fd& sigfd)
    {
    yurco::suspend(sigfd, EPOLLIN);
    std::cerr << "we got a signal" << std::endl;
    // use unistd::read(sigfd) to get a signals
    yurco::terminate();
    }

void register_signal_handler()
    {
    sigset_t sigmask;
    sigemptyset(&sigmask);
    sigaddset(&sigmask, SIGINT);
    sigaddset(&sigmask, SIGTERM);
    sigaddset(&sigmask, SIGQUIT);
    unistd::fd sigfd = unistd::fd::nodup(unistd::signalfd(sigmask));
    yurco::async(signal_handler, std::move(sigfd)); // use std::move to avoid ::dup() file descriptior
    sigaddset(&sigmask, SIGPIPE); // SIGPIPE just ignored and not processed
    sigprocmask(SIG_BLOCK, &sigmask, nullptr);
    }

void register_listener()
    {
    const std::vector<unistd::addrinfo> addr = unistd::getaddrinfo("localhost:31337"); // or [::]:31337 or other valid variants
    unistd::fd serverfd = unistd::fd::nodup(unistd::socket(addr.at(0), SOCK_NONBLOCK));
    unistd::setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, 1);
    unistd::bind(serverfd, addr.at(0));
    unistd::listen(serverfd, 8192/*backlog*/);
    yurco::async(listener, std::move(serverfd)); // use std::move to avoid ::dup() file descriptior
    }

int main()
    {
    const size_t stack_size = 16*1024; // size less than 16k lead to SIGSEGV cause libunwind require more space
    yurco::init(stack_size);
    register_signal_handler();
    register_listener();
    yurco::run();
    return EXIT_SUCCESS;
    }

console 1: $ ./041_echo
console 2: $ echo qweasd | nc -6 ::1 31337
console 2: qweasd
console 1: ^Cwe got a signal
console 1: terminate listener coroutine


Код изрядно похудел, из него исчезли все упоминания Reactor и Coroutine, но он все еще рабочий.


Проверим работу прозрачных сопрограмм на сторонней библиотеке, например, mysql:


$ cat 042_mysql.cpp
#include <yurco/all.hpp>
#include <unistd/signalfd.hpp>
#include <unistd/addrinfo.hpp>
#include <unistd/netdb.hpp>

#include <mysql/mysql.h>

#include <iostream>
#include <signal.h>
#include <stdlib.h>

void process_connection(unistd::fd& fd)
    {
    try
        {
        char buf[100];
        unistd::read(fd, buf, sizeof(buf));
        /* Create a connection */
        std::unique_ptr<MYSQL, std::function<decltype(mysql_close)>> con(mysql_init(nullptr), mysql_close);
        mysql_real_connect(con.get(), "db.local", "ro", "", nullptr, 0, nullptr, 0);
        mysql_query(con.get(), "SELECT NOW(), SLEEP(10);");
        std::unique_ptr<MYSQL_RES, std::function<decltype(mysql_free_result)>> result(mysql_store_result(con.get()), mysql_free_result);
        if (!result)
            return; // silently close connection
        for (MYSQL_ROW row = mysql_fetch_row(result.get()); row; row = mysql_fetch_row(result.get()))
            {
            static char header[] = "HTTP/1.1 200 OK\r\nContent-Length: 21\r\nConnection: close\r\n\r\n";
            unistd::write_all(fd, header, strlen(header));
            const char* const answer = row[0];
            unistd::write_all(fd, answer, strlen(answer));
            unistd::write_all(fd, "\r\n", 2);
            }
        }
    catch (const yurco::terminate_exception&)
        {
        std::cerr << "terminate connection coroutine while write" << std::endl;
        }
    catch (...)
        {
        std::cerr << "unknown exception while write" << std::endl;
        }
    }

void listener(unistd::fd& sock)
    {
    try
        {
        for (;;)
            {
            for (size_t i = 0; i < 32; ++i) // sometimes we should yield() to processing accepted connections
                {
                unistd::fd clientfd = unistd::fd::nodup(unistd::accept(sock, SOCK_NONBLOCK));
                yurco::async(process_connection, std::move(clientfd));
                }
            yurco::yield();
            }
        }
    catch (const yurco::terminate_exception&)
        {
        std::cerr << "terminate listener coroutine" << std::endl;
        }
    catch (...)
        {
        std::cerr << "unknwon exception while accept" << std::endl;
        }
    }

void signal_handler(unistd::fd& sigfd)
    {
    yurco::suspend(sigfd, EPOLLIN);
    std::cerr << "we got a signal" << std::endl;
    // use unistd::read(sigfd) to get a signals instead of yurco::suspend
    yurco::terminate();
    }

void balast_handler()
    {
    uint64_t counter = 0;
    for (;;)
        {
        ++counter;
        yurco::yield();
        }
    }

void register_signal_handler()
    {
    sigset_t sigmask;
    sigemptyset(&sigmask);
    sigaddset(&sigmask, SIGINT);
    sigaddset(&sigmask, SIGTERM);
    sigaddset(&sigmask, SIGQUIT);
    unistd::fd sigfd = unistd::fd::nodup(unistd::signalfd(sigmask));
    yurco::async(signal_handler, std::move(sigfd)); // use std::move to avoid ::dup() file descriptior
    sigaddset(&sigmask, SIGPIPE); // SIGPIPE just ignored and not processed
    sigprocmask(SIG_BLOCK, &sigmask, nullptr);
    }

void register_listener()
    {
    const std::vector<unistd::addrinfo> addr = unistd::getaddrinfo("localhost:31337"); // or [::]:31337 or other valid variants
    unistd::fd sock = unistd::fd::nodup(unistd::socket(addr.at(0), SOCK_NONBLOCK));
    unistd::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
    unistd::setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, 3);
    unistd::setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, 1);
    unistd::setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, 1);
    unistd::setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1); // keep-alive not required, but good practic
    unistd::bind(sock, addr.at(0));
    unistd::listen(sock, 8192/*backlog*/);
    yurco::async(listener, std::move(sock)); // use std::move to avoid ::dup() file descriptior
    }

void async_main()
    {
    register_signal_handler();
    register_listener();
    }

int main()
    {
    const size_t stack_size = 16*1024; // size less than 16k lead to SIGSEGV cause libunwind require more space
    yurco::init(stack_size);
    yurco::async(async_main);
    yurco::run();
    return EXIT_SUCCESS;
    }

console 1: $ ./042_mysql
console 2: $ curl -6 'http://[::1]:31337/' && date
console 3: $ curl -6 'http://[::1]:31337/' && date
console 2: 2020-06-08 12:20:48
console 2: Mon Jun 8 12:20:58 MSK 2020
console 3: 2020-06-08 12:20:48
console 3: Mon Jun 8 12:20:58 MSK 2020
console 1: ^Cwe got a signal
console 1: terminate listener coroutine


Вуаля! Наша программа обработала 2 запроса (сделав запросы к mysql) в одном потоке, притом SQL запросы выполнялись параллельно (это видно по времени которым ответил mysql server).


ps На данный момент в библиотеке не реализована поддержка таймаутов и дискового ввода/вывода, перехват системного вызова '::getaddrinfo()'. Как станет грустно — обязательно добавлю.


Написание патчей приветствуется!


UPD 2020-06-08 статья была существенно переработана, добавлены описания классов, примеры использования.

Only registered users can participate in poll. Log in, please.
Что добавить в статью?
28.57% Добавить больше листинга кода, прототипов функций, их реализации 4
57.14% Добавить больше примеров использования 8
71.43% Добавить больше описания внутренностей классов 10
7.14% Cпеллчекер перед публикацией ^_^ 1
14 users voted. 9 users abstained.
Tags:
Hubs:
+12
Comments 2
Comments Comments 2

Articles