Как стать автором
Обновить

Корутины C++20 и многозадачность на примере контроллеров stm32

Время на прочтение12 мин
Количество просмотров16K

Никого не хотел обидеть КДПВ (в первую очередь @Saalur), действительно далеко не с первого раза становится понятно.

Введение

Одним из наиболее ярких нововведений, которые получил язык в стандарте C++20, является поддержка сопрограмм (или корутин). Разработчики ПО для микроконтроллеров сразу могут заметить, что корутина похожа на задачу в операционной системе. На хабре уже присутствуют материалы, посвященные этой теме, например, "Использование coroutines из С++20 в связке с NRF52832 и GTest" от @Firthermant и "CoroOS: концепт операционной системы для микроконтролеров на корутинах С++20" от @Saalur В то же время не могу не отметить, что сходу разобраться в представленных материалах и исходниках нелегко, особенно для тех программистов, которые пока еще не достаточно хорошо познакомились с сопрограммами в C++. В своём материале я постараюсь на более простом уровне разобрать вопросы применения нового стандарта языка при разработке планировщика заданий. В некотором смысле эту статью можно считать подготовительной перед прочтением указанных выше.

Итак, давайте разберем несколько несложных вариантов планирования задач от самого примитивного до чего-то отдалённо напоминающего операционную систему.

Простейшая кооперативная многозадачность

Кооперативная многозадачность подразумевает "добровольную" передачу управления от одной задачи другой. В терминах операционной системы, как правило, над всеми задачами расположена "суперзадача" – планировщик, принимающий управление от приостановленного кода и передающий очередной задаче. Пример функционирования наиболее простой системы из двух задач показан на рисунке 1 (зелёными стрелками выделен каждый второй круг цикла).

Предложенная система состоит из двух задач, первая из которых содержит три оператора и готова вернуть управление (перейти в состояние ожидания) после выполнения оператора 2, а вторая задача состоит из четырех операторов и также готова перейти в состояние ожидания после выполнения оператора 2.

Таким образом, многозадачность реализуется следующей последовательностью выполнения операторов:

Порядок выполнения операторов
  1. Задача 1. Оператор 1.

  2. Задача 1. Оператор 2.

  3. Задача 2. Оператор 1.

  4. Задача 2. Оператор 2.

  5. Задача 1. Оператор 3.

  6. Задача 2. Оператор 3.

  7. Задача 2. Оператор 4.

  8. Перейти к пункту 1.

Рисунок 1. Вариант кооперации двух задач.
Рисунок 1. Вариант кооперации двух задач.

Такой порядок выполнения легко переложить на механизм корутин:

  1. Объектами корутин владеет планировщик, что позволяет ему в порядке очереди передавать управление задачам.

  2. Продолжение выполнения задачи планировщиком – операция resume для очередной корутины-задачи.

  3. Приостановка выполнения задачи и передача управления планировщику – операция suspend для текущей корутины.

Таким образом, код планировщика в подобной системе выглядит приблизительно следующим образом:

auto tasks = { Task1(), Task2() }; // Создание задач

for(;;)
{
  // Последовательное продолжение выполнения задач
  for (auto& t : tasks)
    t.resume();
}

А сама задача так:

task TaskX()
{
  // Начальная инициализация
  for(;;)
  {
    // Операторы 1 .. N1
    co_await std::suspend_always(); // приостановка выполнения
    // Операторы N1+1 .. N2
    co_await std::suspend_always(); // приостановка выполнения
  }
}

В самом примитивном случае (а именно такой мы пока рассматриваем) объектом ожидания корутины (аргументом оператора co_await) является стандартный std::suspend_always(), подразумевающий просто передачу управления обратно вызывающей стороне – планировщику.

Тип task в этом случае также простейший, его исходный код приведен ниже. Стоит только отметить, что неизбежно придётся переопределить оператор new, однако от менеджера памяти требуется лишь однократного выделения области памяти в буфере, поскольку данная операция будет выполнена единожды для каждой задачи. Освобождение же памяти не подразумевается вовсе, поскольку классическая задача-корутина содержит бесконечный цикл. И вообще, отдельный менеджер памяти, в принципе, не нужен вовсе, можно объявить буфер в самом типе task или task::promise_type, однако такой подход чреват излишним расходованием памяти, так как для разных задач может потребоваться различный размер, в таком случае размер буфера придется определить как максимальный среди всех задач.

Из русскоязычных материалов по сопрограммам мне больше всего понравилась лекция Константина Владимирова, советую просмотреть обе части, так как весь дальнейший код старался делать максимально приближенным к тому, как материал излагал лектор.

Исходный код структуры task
struct task {
  struct promise_type {
    using coro_handle = std::coroutine_handle<promise_type>;

    auto get_return_object() {
      return coro_handle::from_promise(*this);
    }

    auto initial_suspend() {
      // Изначально задача остановлена
      // Хотя можно вернуть suspend_never, чтобы дать возможность проинициализировать всё необходимое
      return std::suspend_always();
    }

    auto final_suspend() noexcept {
      // Задача будет содержать бесконечный цикл, поэтому в этот метод попадать не должны
      return std::suspend_never();
    }

    void unhandled_exception() {
      // В этот метод тоже
    }

    void* operator new(std::size_t size) {
      return MemoryManager::Allocate(size);
    }

    void operator delete(void* ptr) {
      // По той же самой причине (бесконечное выполнение задач) в этот метод попасть не должны
      MemoryManager::Deallocate(ptr);
    }
  };

  using coro_handle = promise_type::coro_handle;

  task(coro_handle handle) : _handle(handle) {}

  // Деструктор можно не определять, так как он ни разу не будет вызван  

  void resume() const {
    _handle.resume();
  }
private:
  coro_handle _handle;
};

Код всей программы целиком (для работы с периферией использовал свою библиотеку) представлен под катом. Для упрощения в задачах оставлен только один оператор co_await, а полезная нагрузка представляет собой ожидание байта от UART и запись ответа. Работоспособность проверена на контроллере stm32f103c8t6.

Полный код программы
#include <iopins.h>
#include <usart.h>

#include <coroutine>

class MemoryManager {
  static const uint32_t Capacity = 512;

public:
  static void* Allocate(std::size_t size) {
    _size += size;
    return &_data[_size - size];
  }

  static void Deallocate(void* ptr) {
    // Задачи-корутины вечные, поэтому в этот метод попасть не должны
  }

private:
  static uint8_t _data[Capacity];
  static uint16_t _size;
};
uint8_t MemoryManager::_data[MemoryManager::Capacity];
uint16_t MemoryManager::_size = 0;

struct task {
  struct promise_type {
    using coro_handle = std::coroutine_handle<promise_type>;

    auto get_return_object() {
      return coro_handle::from_promise(*this);
    }

    auto initial_suspend() {
      // Изначально задача остановлена
      // Хотя можно вернуть suspend_never, чтобы дать возможность проинициализировать всё необходимое
      return std::suspend_always();
    }

    auto final_suspend() noexcept {
      // Задача будет содержать бесконечный цикл, поэтому в этот метод попадать не должны
      return std::suspend_always();
    }

    void unhandled_exception() {
      // В этот метод тоже
    }

    void* operator new(std::size_t size) {
      return MemoryManager::Allocate(size);
    }

    void operator delete(void* ptr) {
      // По той же самой причине (бесконечное выполнение задач) в этот метод попасть не должны
      MemoryManager::Deallocate(ptr);
    }
  };

  using coro_handle = promise_type::coro_handle;

  task(coro_handle handle) : _handle(handle) {}

  // Деструктор можно не определять, так как он ни разу не будет вызван  

  void resume() const {
    _handle.resume();
  }
private:
  coro_handle _handle;
};

using usart1 = Zhele::Usart1;
using usart2 = Zhele::Usart2;

task Task1()
{
  usart1::Init(9600);
  usart1::SelectTxRxPins<Zhele::IO::Pa9, Zhele::IO::Pa10>();
  usart1::Write("Init task1\r\n", 12);

  for(;;) {
    if (usart1::ReadReady()) {
      auto s = usart1::Read();
      if (s == '1')
        usart1::Write("Task1: you write '1'\r\n", 22);
      if (s == '2')
        usart1::Write("Task1: you write '2'\r\n", 22);
    }
    co_await std::suspend_always();
  }
}

task Task2()
{
  usart2::Init(9600);
  usart2::SelectTxRxPins<Zhele::IO::Pa2, Zhele::IO::Pa3>();
  usart2::Write("Init task2\r\n", 12);

  for (;;) {
    if (usart2::ReadReady()) {
      auto s = usart2::Read();
      if(s == '1')
        usart2::Write("Task2: you write '1'\r\n", 22);
      if(s == '2')
        usart2::Write("Task2: you write '2'\r\n", 22);
    }
    co_await std::suspend_always();
  }
}

int main()
{
  auto tasks = {Task1(), Task2()};
  
  for(;;)
  {
    for (auto& t : tasks)
      t.resume();
  }
}

Вытесняющая многозадачность на событиях

Хотя безусловная приостановка задач не лишена смысла, чаще приостановка выполнения связана с ожиданием некоторого внешнего события, без которого дальнейшее выполнение задачи невозможно в принципе. В нашем примере этим событием является приём очередного байта по UART. В таком случае аргументом оператора co_await разумно сделать не стандартный suspend_always, а некий awaitable-объект, который мы сейчас и попробуем описать.

К объекту ожидания можно предъявить несколько требований:

  1. Должен содержать promise, подходящий для оператора co_await.

  2. Должен иметь флаг состояния (состоялось или нет ожидаемое событие, можно или нет продолжать выполнение корутины).

  3. Должен иметь внутри себя список (в нашем случае ограничимся единственным экземпляром) потребителей, которые ожидают это самое событие.

Вообще говоря, третий пункт является спорным, потому что оповещение ожидающих задач можно сделать и иначе, например, сохранить их где-то отдельно.

В данном случае тип task можно упростить, удалив из его полей дескриптор coro_handle (так как в отличие от ранее предложенного варианта, конкретного владельца у корутины не будет. Владеть ею временно будет ожидаемое событие). При наступлении события активируется соответствующий потребитель. На текущем этапе эта процедура происходит прямо в обработчике прерывания, что делает систему вытесняющей.

Код упрощенного типа task (в лекции Владимирова это тип resumable_no_own):

Код структуры task
struct task {
  struct promise_type {
    using coro_handle = std::coroutine_handle<promise_type>;

    auto get_return_object() {
      return coro_handle::from_promise(*this);
    }

    // В двух следующих двух методах возвращается suspend_never
    auto initial_suspend() {
      return std::suspend_never();
    }

    auto final_suspend() noexcept {
      // Задача будет содержать бесконечный цикл, поэтому в этот метод попадать не должны
      return std::suspend_never();
    }

    void unhandled_exception() {
      // В этот метод тоже
    }

    void* operator new(std::size_t size) {
      return MemoryManager::Allocate(size);
    }

    void operator delete(void* ptr) {
      // По той же самой причине (бесконечное выполнение задач) в этот метод попасть не должны
      MemoryManager::Deallocate(ptr);
    }
  };

  using coro_handle = promise_type::coro_handle;

  // В конструкторе ничего не происходит
  task(coro_handle handle) {}
};

Наиболее важным элементом является тип события, его код:

Код класса event
class event {
  using coro_handle = std::coroutine_handle<>;

  struct awaiter {
    event& _event;
    coro_handle _handle = nullptr;

    awaiter(event& event) noexcept : _event(event) {}

    bool await_ready() const noexcept { return _event.is_set(); }

    void await_resume() noexcept { _event.reset(); }

    // Регистрация потребителя события
    void await_suspend(coro_handle handle) {
      _handle = handle;
      _event.set_awaiter(this);
    }
  };

public:
  // Метод установки (активации) события
  void set() {
    _set = true;
    // Возобновление соответствующей задачи
    // По замечанию @mayorovp проверяем, есть ли вообще потребитель
    if (_consumer)
    	_consumer->_handle.resume();
  }

  // Проверка состояния события
  bool is_set() {
    return _set;
  }
  
  // Сброс события
  void reset() {
    _set = false;
    // По замечанию @mayorovp обнуляю потребителя
    // По-хорошему операция должна быть атомарной, но
    // пока не готов предложить качественный вариант
    _consumer = nullptr;
  }

  // Метод регистрации потребителя
  // Можно реализовать целый список потребителей
  void set_awaiter(awaiter* consumer) {
    _consumer = consumer;
    // Пока исправлял код, подумал, а что если событие возникло раньше,
    // чем на него в очередной раз подписались? В случае с принятым
    // байтом, наверно, надо сразу пробуждать потребителя?
    // Прошу совета в комментариях.
  }

  // Перегруженный оператор co_await, так как тип event не является awaitable
  // Подробное разъяснение необходимости этой перегрузки можно найти в лекции Владимирова
  awaiter operator co_await() noexcept {
    return awaiter(*this);
  }

private:
  awaiter* _consumer = nullptr;
  bool _set = false;
};

При вытесняющей активации корутины непосредственно из обработчика прерывания смысл отдельного планировщика теряется, поэтому главный цикл в функции main пустой.

Полный код программы
#include <iopins.h>
#include <usart.h>

#include <coroutine>

class MemoryManager {
  static const uint32_t Capacity = 512;

public:
  static void* Allocate(std::size_t size) {
  _size += size;
  return &_data[_size - size];
  }

  static void Deallocate(void* ptr) {
  // Задачи-корутины вечные, поэтому в этот метод попасть не должны
  }

private:
  static uint8_t _data[Capacity];
  static uint16_t _size;
};
uint8_t MemoryManager::_data[MemoryManager::Capacity];
uint16_t MemoryManager::_size;

struct task {
  struct promise_type {
  using coro_handle = std::coroutine_handle<promise_type>;

  auto get_return_object() {
    return coro_handle::from_promise(*this);
  }

  // Отличие от предыдущей реализации только в следующих двух методах
  auto initial_suspend() {
    return std::suspend_never();
  }

  auto final_suspend() noexcept {
    // Задача будет содержать бесконечный цикл, поэтому в этот метод попадать не должны
    return std::suspend_never();
  }

  void unhandled_exception() {
    // В этот метод тоже
  }

  void* operator new(std::size_t size) {
    return MemoryManager::Allocate(size);
  }

  void operator delete(void* ptr) {
    // По той же самой причине (бесконечное выполнение задач) в этот метод попасть не должны
    MemoryManager::Deallocate(ptr);
  }
  };

  using coro_handle = promise_type::coro_handle;

  // В конструкторе ничего не происходит
  task(coro_handle handle) {}
};

class event {
  using coro_handle = std::coroutine_handle<>;

  struct awaiter {
    event& _event;
    coro_handle _handle = nullptr;

    awaiter(event& event) noexcept : _event(event) {}

    bool await_ready() const noexcept { return _event.is_set(); }

    void await_resume() noexcept { _event.reset(); }

    // Регистрация потребителя события
    void await_suspend(coro_handle handle) {
      _handle = handle;
      _event.set_awaiter(this);
    }
  };

public:
  // Метод установки (активации) события
  void set() {
    _set = true;
    // Возобновление соответствующей задачи
    _consumer->_handle.resume();
  }

  // Проверка состояния события
  bool is_set() {
    return _set;
  }
  
  // Сброс события
  void reset() {
    _set = false;
  }

  // Метод регистрации потребителя
  // Можно реализовать целый список потребителей
  void set_awaiter(awaiter* consumer) {
    _consumer = consumer;
  }

  // Перегруженный оператор co_await, так как тип event не является awaitable
  // Подробное разъяснение необходимости этой перегрузки можно найти в лекции Владимирова
  awaiter operator co_await() noexcept {
    return awaiter(*this);
  }

private:
  awaiter* _consumer = nullptr;
  bool _set = false;
};

using usart1 = Zhele::Usart1;
using usart2 = Zhele::Usart2;

event usart1Rx;
event usart2Rx;

char u1, u2;

task Task1()
{
  usart1::Init(9600);
  usart1::SelectTxRxPins<Zhele::IO::Pa9, Zhele::IO::Pa10>();
  usart1::Write("Init task1\r\n", 12);
  usart1::EnableInterrupt(usart1::InterruptFlags::RxNotEmptyInt);

  for(;;) {
    co_await usart1Rx;

    if (u1 == '1')
      usart1::Write("Task1: you write '1'\r\n", 22);
    if (u1 == '2')
      usart1::Write("Task1: you write '2'\r\n", 22);
  }
}

task Task2()
{
  usart2::Init(9600);
  usart2::SelectTxRxPins<Zhele::IO::Pa2, Zhele::IO::Pa3>();
  usart2::Write("Init task2\r\n", 12);
  usart2::EnableInterrupt(usart2::InterruptFlags::RxNotEmptyInt);

  for (;;) {
    co_await usart2Rx;

    if(u2 == '1')
      usart2::Write("Task2: you write '1'\r\n", 22);
    if(u2 == '2')
      usart2::Write("Task2: you write '2'\r\n", 22);
  }
}

int main()
{
  Task1();
  Task2();
  
  for(;;) {}
}

extern "C" {
    void USART1_IRQHandler() {
        u1 = usart1::Read();

        usart1Rx.set();
        usart1::ClearInterruptFlag(usart1::InterruptFlags::RxNotEmptyInt);
    }

    void USART2_IRQHandler() {
        u2 = usart2::Read();

        usart2Rx.set();
        usart2::ClearInterruptFlag(usart2::InterruptFlags::RxNotEmptyInt);
    }
}

В коде выше предложено объективно некрасивое решение с глобальными переменными u1, u2. От них можно несложным образом избавиться, внеся результат в объект event, что позволит внутри корутины лаконично написать char s = co_await usartNRx;

Кооперативная многозадачность на событиях

Предыдущий код можно модернизировать и получить кооперативную многозадачность, основанную на событиях. В обработчике прерываний мы откажемся от немедленной установки флага события, а вместо этого добавим объект события в очередь на оповещение его потребителя. Для этого потребуется совсем немного изменений.

В программу можно добавить глобальную очередь событий:

std::queue<Event*> TasksQueue;

В главном цикле (планировщике) остается только активировать события в порядке очереди:

for(;;) {
  if (!TasksQueue.empty()) {
    TasksQueue.front()->set();
    TasksQueue.pop();
  }
}

В обработчике прерываний вместо немедленной активации события необходимо добавить событие в очередь:

void USART1_IRQHandler() {
  u1 = usart1::Read();

  TasksQueue.push(&usart1Rx);
  usart1::ClearInterruptFlag(usart1::InterruptFlags::RxNotEmptyInt);
}
// Код обработчика UART2 аналогичный

В такой модификации при наступлении события текущей задаче даётся возможность дойти до первого по ходу выполнения оператора co_await, то есть вытеснения не происходит.

Важно! Данный код не заработал с опцией -Os, что я связываю с использованием std::queue. Без оптимизации (с опцией -O0) всё работает корректно.

Кооперативная многозадачность с приоритетами

Очевидным развитием системы является приоритизация выполнения задач. Сделать это можно простым добавлением очереди (или нескольких очередей – для каждого уровня приоритета), причем почти «бесплатно» достается поддержка динамического приоритета задач, если приоритет имеет не сама задача, а ожидаемое ею событие (более подробно эту тему раскрыл @Saalur), так что рассмотрим сразу этот пример.

Для поддержки приоритетов предыдущую реализацию достаточно дополнить не одной очередью, а несколькими (своя на каждый уровень приоритета), либо предусмотреть сортировку при вставке в очередь. Для демонстрации примем, что есть два уровня: высокий и низкий, и рассмотрим изменения в исходном коде.

В первую очередь тип event обзаведется приоритетом:

// Вложенное перечисление приоритетов
enum class Priority
{
  Low,
  High
};

// Появился конструктор с параметром
event(Priority priority) : _priority(priority) {}
// А также метод, возвращающий приоритет события
Priority priority() const {return _priority;}

Глобальные переменные событий должны быть созданы с нужными приоритетами:

event usart1Rx(event::Priority::High);
event usart2Rx(event::Priority::Low);

В главном цикле (планировщике) необходимо сначала обработать все задачи с высоким приоритетом, а уже после с низким:

for(;;) {
  if (!HighPriorityTasksQueue.empty()){
    HighPriorityTasksQueue.front()->set();
    HighPriorityTasksQueue.pop();
    continue;
  }
  if (!LowPriorityTasksQueue.empty()) {
    LowPriorityTasksQueue.front()->set();
    LowPriorityTasksQueue.pop();
  }
}

А в обработчике прерываний вставка в очередь зависит от приоритета события. Текущая реализация не слишком красивая, однако код написан исключительно для демонстрации, поэтому считаю это позволительным.

void USART1_IRQHandler() {
  u1 = usart1::Read();

  (usart1Rx.priority() == event::Priority::High ? HighPriorityTasksQueue : LowPriorityTasksQueue).push(&usart1Rx);
  usart1::ClearInterruptFlag(usart1::InterruptFlags::RxNotEmptyInt);
}
// Код обработчика UART2 аналогичный

Несложно заметить, что возможности расстановки приоритетов практически безграничны. Например, можно комбинировать приоритет самой корутины и объекта ожидания и выбирать из двух наибольший (или наименьший).

Заключение

В данной статье я постарался рассмотреть вопрос применения корутин языка C++ для диспетчеризации задач, предложив наиболее примитивную систему и варианты ее последовательного развития. Дальнейшая модернизацию уже можно считать некоторой операционной системой, что замечательно раскрыл @Saalur. Также хотел бы поблагодарить уважаемого @lamerok, который проконсультировал меня по вопросу переключения контекста (очень советую к прочтению его статью), хотя я и решил не включать этот вопрос, так как к корутинам он не относится.

Надеюсь, что мне удалось хотя бы в какой-то мере продемонстрировать на простых примерах, как можно применить последний стандарт C++ для диспетчеризации задач.

Теги:
Хабы:
Всего голосов 22: ↑21 и ↓1+28
Комментарии30

Публикации

Истории

Работа

QT разработчик
6 вакансий
Программист C++
123 вакансии

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
11 сентября
Митап по BigData от Честного ЗНАКа
Санкт-ПетербургОнлайн
14 сентября
Конференция Practical ML Conf
МоскваОнлайн
19 сентября
CDI Conf 2024
Москва
20 – 22 сентября
BCI Hack Moscow
Москва
24 сентября
Конференция Fin.Bot 2024
МоскваОнлайн
25 сентября
Конференция Yandex Scale 2024
МоскваОнлайн
28 – 29 сентября
Конференция E-CODE
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
30 сентября – 1 октября
Конференция фронтенд-разработчиков FrontendConf 2024
МоскваОнлайн