Предисловие
Итак, начнём мы с постановки задачи: мы хотим упростить вызов некой функции, которая является методом класса в отдельном потоке и при этом иметь инструментарий для её остановки / проверки статуса работоспособности / запуска и прочее. Разберём классический вариант класса, который будет содержать такой метод:
using namespace std;
class Object // наш некий объект, владелец метода
{
public:
Object() : // конструктор, ничего интересного
FuncThread(nullptr),
FuncStatus(false)
{}
~Object() // деструктор
{
StopFunc(); // обязательно останавливаем функцию, чтобы не получить утечку памяти
}
void Func() // сама функция, которую мы хотим отправить в другой поток
{
while (FuncStatus)
{
//Do something
}
}
bool StartFunc() // метод для её запуска
{
if (FuncThread || FuncStatus)
return false;
FuncThread = new thread(&Object::Func, this);
FuncStatus = true;
return true;
}
bool StopFunc() // метод для её остановки
{
if (!FuncThread && !FuncStatus)
return false;
FuncStatus = false;
if (FuncThread && FuncThread->joinable()) FuncThread->join();
if (FuncThread) delete FuncThread;
FuncThread = nullptr;
return true;
}
bool Status() { return FuncStatus; } // метод для проверки её состояния
private:
thread* FuncThread; // указатель на поток, в котором будет крутиться функция
mutex FuncMutex; // блокировка для обмена данными с потоком
atomic<bool> FuncStatus; // статус функции, он же управляющий флаг
};
Надо сказать, что mutex
добавлен исключительно, чтобы показать надобность наличие блокировки, для обмена данными между текущим потоком и потоком функции, а FuncStatus
может быть любым базовым численным типом для передачи более подробного статуса работы.
Ну а теперь представим, что нам потребуются две-три такие функции в одном классе. А что если мы захотим переместить владение функцией другому экземпляру объекта? Честно говоря, звучит как большая головная боль.
Мною и моим коллегой было принято решение создать некую абстрактную обёртку для подобных случаев и мы перешли к написанию технического задания:
относительно универсальное объявление
простые методы управления
возможность управления состоянием функции прямо из неё же
вывод о её состоянии в статус
конструктор перемещения
обработка результата работы основной функции
В итоге было принято решение писать обёртку сразу с поддержкой как обычных функций, объявленных глобально, так и для методов классов.
Подготовка
Начнём с заголовка type_traits
, это будет наш главный помощник в создании обёртки, поскольку в нём содержатся мощнейшие и, в то же время, простые методы для определения всех обстоятельств переданной нам функции, класса и прочего.
// Function
template<typename Callable>
struct callable_decay
{
using return_type = std::invoke_result_t<Callable>;
using owner_type = void;
};
// Function with args
template<typename Return, typename... Args>
struct callable_decay<Return(Args...)>
{
using return_type = Return;
using owner_type = void;
};
// Member pointer function
template<typename Return, typename Class, typename... Args>
struct callable_decay<Return(Class ::*)(Args...)>
{
using return_type = Return;
using owner_type = Class;
};
// Const member pointer function
template<typename Return, typename Class, typename... Args >
struct callable_decay<Return(Class::*)(Args...) const>
{
using return_type = Return;
using owner_type = Class;
};
template<typename Callable>
using callable_return_type_t = typename callable_decay<Callable>::return_type;
template<typename Callable>
using callable_owner_type_t = typename callable_decay<Callable>::owner_type;
Мы создали структуру callable_decay
, которая поможет нам определить тип возвращаемого значения функции и тип "владельца" функции, которым будет выступать класс, если мы пытаемся отправить в обёртку метод. Также объявим callable_return_type_t
и callable_owner_type_t
для того, чтобы наш код не выглядел так страшно и нагружено.
// Threaded func possible states
enum ThreadedFuncStatus_T
{
no_process,
is_starting,
is_running,
is_stopping
};
Далее объявим перечисляемый тип для описания всех возможных состояний функции:
ничего не происходит
стартуем
выполняемся
останавливаемся
Основной класс
Объявление
// function_type - тип функции, которую мы передаём обёртке
// function_owner_type - тип "владельца" функции (класса)
// handler_type - тип функции обработчика результата работы основной функции
template<typename function_type,
typename function_owner_type = std::nullptr_t,
typename handler_type = std::nullptr_t>
class ThreadedFunc
{
public:
// Выясняем какой тип возвращает основная функция
using function_return_type = callable_return_type_t<function_type>;
// А вот тут проверяем передал ли программист нам именно тот же тип "владельца" функции, что и говорит нам сама функция
static_assert(!std::is_member_function_pointer_v<function_type> ||
std::is_same_v<callable_owner_type_t<function_type>, function_owner_type>,
"Invalid function owner type");
// Классический конструктор
ThreadedFunc(function_type function, function_owner_type* owner = nullptr, handler_type handler_pred = nullptr);
// Конструктор перемещения с указанием нового владельца в случае его наличия
ThreadedFunc(ThreadedFunc&& other, function_owner_type* new_owner = nullptr);
// А вот конструктор копирования мы удаляем
ThreadedFunc(const ThreadedFunc&) = delete;
// Классический деструктор, ничего интересного
~ThreadedFunc();
// Также логично удаляем оператор присвоения
ThreadedFunc& operator=(const ThreadedFunc&) = delete;
// Методы запуска, остановки и получения статуса
inline bool Run();
inline bool Stop();
inline bool StopAsync();
inline void WaitUntilStop() const;
inline bool Status() const;
private:
// Хитрость для асинхронной остановки
inline void StopLoop();
// Метод для удобства вызова "хранимой" функции
inline function_return_type CallStoredFunction();
// Ну и конечно же главный цикл для потока
void MainLoop();
function_type StoredFunction; // Хранимая функция
function_owner_type* const StoredObjectPtr; // Хранимая ссылка на "владельца"
handler_type StoredHandler; // Хранимый обработчик
std::atomic<ThreadedFuncStatus_T> TFStatus; // Наш статус
std::thread MainThread; // Ну и конечно же сам поток
};
При наличие комментариев, всё таки поясню пару неочевидных моментов.
Во-первых, static_assert
крайне важен, поскольку предохраняет нас самих же на этапе компиляции от неправильной передачи функции-метода класса в обёртку, чтобы в последующем не искать где и как мы оплошали.
Во-вторых, обращу внимание на методы StopAsync()
и WaitUntilStop()
, которые были добавлены именно для того, чтобы мы могли асинхронно остановить работу обёртки, даже из самой же функции обёртки! Ну и конечно же отследить точный момент остановки.
В-третьих, стоит вспомнить про некий упомянутый обработчик, о нём мы поговорим подробнее ниже, а пока стоит отметить, что его наличие также необязательно и мы не завязаны на его тип.
Ну и последнее, что я замечу - это удалённый конструктор копирования и оператор присвоения, поскольку мы так и не смогли прийти к единому мнению по поводу поведения обёртки в этих сценариях, однако это не мешает читателю самостоятельно для себя реализовать требуемое поведение.
Реализация
CallStoredFunction()
Начнём с самого загадочного и интересного - приватного метода вызова хранимой функции
inline function_return_type CallStoredFunction()
{
if constexpr (std::is_member_function_pointer_v<function_type>)
return (StoredObjectPtr->*StoredFunction)();
else
return (*StoredFunction)();
}
Как видно, ничего сложного нет, однако constexpr
здесь не случайно и разгружает машину от лишних размышлений о типе вызова функции ещё на этапе компиляции, ведь функция-член не может внезапно стать независимой и наоборот.
MainLoop()
Когда мы уже умеем вызывать функцию нужным способом, пора бы описать и работу самого потока
void MainLoop()
{
TFStatus = ThreadedFuncStatus_T::is_running;
bool handler_error = false;
while (true)
{
if (TFStatus == ThreadedFuncStatus_T::is_stopping || handler_error)
break;
if constexpr (std::is_same_v<function_return_type, void>)
// no return type
CallStoredFunction();
else if constexpr (std::is_same_v<handler_type, bool(*)(function_return_type)>)
// a handler is presented
handler_error = (*StoredHandler)(CallStoredFunction());
else
// no handler - treating return type as bool
handler_error = static_cast<bool>(CallStoredFunction());
}
}
Тут мы с тем же ключевым словом constexpr
рассматриваем три сценария использования обёртки:
функция ничего не возвращает и мы просто её вызываем
результат функции отправляется в обработчик (при его наличии) и делается вывод о продолжении работы исходя уже из ответа обработчика
результат функции сам сообщает нам, продолжать ли работу без обработчика
При всём этом мы также обязаны проверять флаг статуса для отслеживания требуется ли дальше крутиться или пора остановиться из-за команды извне.
Пару слов об обработчике
Стоит всё таки наконец сказать пару слов об обработчике. До сих пор мы с коллегой так и не смогли прийти к единому мнению об обязательности этого функционала, но всё же решили, что при наличии некого стандартизированного результата функций в виде ошибок (к примеру boost::system::error_code
и им подобным), было бы здорово иметь функцию обработчик/переводчик таких вот ошибок к простому выводу: стоит ли продолжать работу или мы столкнулись действительно с критичной ошибкой.
Да, конечно этот функционал может быть заложен сразу в основной передаваемой функции в обёртку, но разве так было бы интереснее?
StopLoop()
Та самая уловка, про которую упоминалось выше ... впрочем совсем не хитрая. Мы банально ожидаем слияния потоков и ставим статус в "не работаем".
inline void StopLoop()
{
if (MainThread.joinable())
MainThread.join();
TFStatus = ThreadedFuncStatus_T::no_process;
}
Но хитростью является то, что запускать этот приватный метод мы будем в отдельном потоке, для отсутствия блокировки в методе StopAsync()
.
StopAsync()
Не отходя от кассы сразу, на него и посмотрим, а заодно дальше и на классический Stop()
.
inline bool StopAsync()
{
ThreadedFuncStatus_T _Expected = ThreadedFuncStatus_T::is_running;
const bool ret = TFStatus.compare_exchange_strong(_Expected, is_stopping);
if (ret) {
std::thread thread{ &ThreadedFunc::StopLoop, this };
thread.detach();
}
return ret;
}
Здесь мы выполняем запуск того самого отдельного потока обработки остановки основного потока, предварительно проверяя статус.
Заметим использование compare_exchange_strong()
у атомарной переменной, которая сравнивает значение в самой переменной с ожидаемым и заменяет на желаемое, при всём этом сообщает нам была ли выполнена замена значения (или другими словами "сравнить и обменять").
Stop()
inline bool Stop()
{
ThreadedFuncStatus_T _Expected = ThreadedFuncStatus_T::is_running;
const bool ret = TFStatus.compare_exchange_strong(_Expected, is_stopping);
if (ret) StopLoop();
return ret;
}
Всё то же самое, но в основном потоке программы.
Run()
Забавно, что останавливать мы научились, а запускать нет, так что ...
inline bool Run()
{
assert(StoredFunction != nullptr && "No pointer to the function is presented or this object was moved");
ThreadedFuncStatus_T _Expected = ThreadedFuncStatus_T::no_process;
const bool ret = TFStatus.compare_exchange_strong(_Expected, is_starting);
if (ret)
MainThread = std::thread{ &ThreadedFunc::MainLoop, this };
return ret;
}
Тут мы проверяем не перенесли ли этот объект и храним ли мы до сих пор функцию, а затем ставим статус и, конечно же, запускаем поток.
Status()
inline bool Status() const { return TFStatus != ThreadedFuncStatus_T::no_process; }
Тут стоит заметить, что мы логичным образом считаем, что любое действие обёртки признаётся активным статусом и только при полной остановке действий статус вернёт no_process
, то есть false
.
WaitUntilStop()
Ну и последний доступный метод - это метод ожидания:
inline void WaitUntilStop() const
{
while (this->Status())
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
Конструкторы
ThreadedFunc(function_type function, function_owner_type* owner = nullptr, handler_type handler_pred = nullptr) :
StoredFunction (function),
StoredObjectPtr(owner),
StoredHandler (handler_pred)
{
if constexpr (!std::is_same_v<function_owner_type, std::nullptr_t>)
assert(owner != nullptr);
}
Тут у нас всё просто, но опять добавляем assert
для быстрого отлавливания неправильного использования обёртки и сокращения времени отладки.
ThreadedFunc(ThreadedFunc&& other, function_owner_type* new_owner = nullptr) :
StoredFunction (other.StoredFunction),
StoredObjectPtr(new_owner),
StoredHandler (other.StoredHandler)
{
if constexpr (!std::is_same_v<function_owner_type, std::nullptr_t>)
assert(new_owner != nullptr);
if (other.Status())
{
other.Stop();
this->Run();
}
other.StoredFunction = nullptr;
}
Если обёртка была активна на момент перемещения, мы останавливаем цикл вызова, производим замену класса владельца и снова запускаем цикл.
Деструктор
~ThreadedFunc() { if (!Stop()) WaitUntilStop(); }
Такой финт ушами делается на случай, если Stop()
будет вызван из одного потока, а деструктор - из другого.
Примеры использования
void Func1() {}
bool Func2() {}
ThreadedFunc<void(*)(), nullptr_t> TF1(Func1);
ThreadedFunc<bool(*)(), nullptr_t> TF2(Func2);
// Теперь объявим некий класс
class Obj
{
public:
// Да, мы вынуждены строго объявлять конструкторы обёртки,
// потому что отсутствует конструктор по умолчанию
Obj() :
TF3(&Obj::Func1, this),
TF4(&Obj::Func2, this, &Obj::Hander) // Вариант с обработчиком
{}
// Пример обработчика/переводчика результата функции в bool ответ
bool Hander(int res) { return bool(res); }
// Функции для примера, но уже как методы
void Func1() {}
int Func2() {}
private:
// Вот так объявляем наши обёртки
ThreadedFunc<void(Obj::*)(), Obj> TF3;
ThreadedFunc<int (Obj::*)(), Obj, bool(Obj::*)(int)> TF4; // С обработчиком
}Obj1; // Создадим сразу один экземпляр, чтобы показать обёртки далее
// А вот тут пример объявления глобальных обёрток для функций-методов
ThreadedFunc<void(Obj::*)(), Obj> TF5(&Obj::Func1, &Obj1);
ThreadedFunc<int (Obj::*)(), Obj, bool(Obj::*)(int)> TF6(&Obj::Func2, &Obj1);
Ну а дальше конечно же пользуемся Run()
, Stop()
, StopAsync()
и так далее и ... радуемся компактности кода!
Ещё немного упрощения
Как заметно по примерам кода выше, пока что мы не достигли идеального дзена в объявлении нашей обёртки, всё ещё выглядит страшно. Так что я предложил моему коллеге поломать мозг. И после пары часов жонглирования он изобрёл ЭТО:
//Для объявления в составе класса
template<typename ClassT, typename ReturnT = std::nullptr_t>
using ClassThreadedFunc = ThreadedFunc<
std::conditional_t<std::is_same_v<ReturnT, std::nullptr_t>, void(ClassT::*)(), ReturnT(ClassT::*)()>,
ClassT,
std::conditional_t<std::is_same_v<ReturnT, std::nullptr_t>, std::nullptr_t, bool(ClassT::*)(ReturnT)>>;
// Для независимой функции
template<typename ReturnT = std::nullptr_t>
using FuncThreadedFunc = ThreadedFunc<
std::conditional_t<std::is_same_v<ReturnT, std::nullptr_t>, void(*)(), ReturnT(*)()>,
std::nullptr_t,
std::conditional_t<std::is_same_v<ReturnT, std::nullptr_t>, std::nullptr_t, bool(*)(ReturnT)>>;
Весь этот ужас сводит объявление к лаконичным строчкам по типу:
void Func1() {}
bool Func2() {}
FuncThreadedFunc< > TF1(Func1);
FuncThreadedFunc<bool> TF2(Func2);
// Теперь объявим некий класс
class Obj
{
public:
// Да, мы вынуждены строго объявлять конструкторы обёртки,
// потому что отсутствует конструктор по умолчанию
Obj() :
TF3(&Obj::Func1, this)
{}
// Функции для примера, но уже как методы
void Func1() {}
private:
// Вот так объявляем наши обёртки
ClassThreadedFunc<Obj> TF3;
}Obj1; // Создадим сразу один экземпляр, чтобы показать обёртки далее
// А вот тут пример объявления глобальных обёрток для функций-методов
ClassThreadedFunc<Obj> TF4(&Obj::Func1, &Obj1);
Послесловие
Может быть мы и не получили супер отлаженный и дзеновский код, но вся работа значительно заставила нас напрячь мозги и наконец познакомиться с type_traits
.
На наш скромный взгляд мы получили неплохой и удобный функционал для многопоточного программирования. Из нюансов, которые не нравятся мне самому - это появление третьего потока в случае асинхронного вызова Stop()
через метод StopAsync()
, однако на момент написания статьи мы не придумали, как красиво избавиться от этого.
Надеюсь эта статья удивила Вас или хотя бы дала неплохую жвачку для мозга, спасибо за внимание!