Всем доброго времени суток. Хочу немного рассказать о своём проекте qt-async, возможно кому-то он покажется интересным или даже полезным.
Асинхронность и многопоточность уже давно и серьезно входит в обиход разработчиков. Многие современные языки и библиотеки разрабатываются с прицелом на асинхронное использование. Язык С++ тоже потихоньку движется в этом направлении — появились std::thread, std::promise/future, должны вот-вот завезти корутины и networking. Библиотека Qt тоже не отстает, предлагая свои аналоги QThread, QRunnable, QThreadPool, QFuture и т.п. При этом виджетов для отображения асинхронных действий в Qt я не нашел (возможно плохо искал, поправьте, если я ошибаюсь).
Поэтому я решил восполнить недостаток и попробовать реализовать такой виджет самому. Многопоточная разработка — дело сложное, но интересное.
Прежде, чем приступить к реализации виджета, нужно описать модель, которую он будет представлять пользователю в виде окна. В самом общем виде работа виджета мне представляется следующим образом: в некоторый момент времени пользователь или система стартует асинхронную операцию. В этот момент виджет показывает прогресс операции или просто индикацию выполнения операции. Опционально пользователь имеет возможность отменить операцию. Далее асинхронная операция завершается двумя исходами: либо произошла ошибка и наш виджет показывает её, либо виджет показывает результат успешного выполнения операции.
Таким образом наша модель может находиться в одном из трех состояний:
В каждом из состояний модель должна хранить соответствующие данные, поэтому я назвал модель AsyncValue. Важно заметить, что сама асинхронная операция не является частью нашей модели, она лишь переключает её состояние. Получается, что AsyncValue можно использовать с любой асинхронной библиотекой, соблюдая простой шаблон использования:
Вот схематичный пример с использованием QRunnable:
Такая же схема для работы с std::thread:
Таким образом первая версия нашего класса могла бы выглядеть примерно так:
Каждый, кто сталкивался с классами, которые поддерживают многопоточность, знает, что интерфейс таких классов отличается от однопоточных аналогов. Например функция size() бесполезна и опасна в многопоточном векторе. Её результат сразу же может стать невалидным, так как вектор может модифицироваться в данный момент в другом потоке.
Пользователи класса AsyncValue должны иметь возможность получить доступ к данным класса. Выдавать копию данных может быть накладно, любой из типов ValueType/ErrorType/ProgressType могут быть тяжеловесными. Выдавать ссылку на внутренние данные опасно — в любой момент они могут стать невалидными. Предлагается следующее решение:
1. Доступ к данным давать через функции accessValue/accessError/accessProgress, в которые передаются лямбды, принимающие соответствующие данные. Например:
Таким образом, доступ к внутреннему значению осуществляется по ссылке и находится под локом на чтение. То есть ссылка в момент доступа не станет невалидной.
2. Пользователь AsyncValue в функции accessValue может запомнить ссылку на внутренние данные, при условии, что он подписан на сигнал stateChanged и после обработки сигнала обязан больше не пользоваться этой ссылкой, т.к. она станет невалидной.
При таких условиях потребитель AsyncValue всегда гарантированно имеет валидный и удобный доступ к данным. У данного решения несколько последствий, влияющих на реализацию класса AsyncValue.
Во-первых, наш класс должен посылать сигнал при изменении состояния, но при этом он шаблонный. Придется добавить базовый Qt класс, где определить сигнал, по которому виджет будет обновлять свое содержимое, а все заинтересованные обновлять ссылки на внутренние данные.
Во-вторых, момент рассылки сигнала должен быть заблокирован на чтение (чтобы AsyncValue нельзя было менять, пока все не обработают сигнал) и, самое важное, в этот момент должны быть валидные ссылки на новые и старые данные. Потому что в процессе рассылки сигнала часть потребителей AsyncValue еще пользуются старыми ссылками, а те, кто обработал сигнал — новыми.
Получается, что std::variant нам не подходит и придется хранить данные в динамической памяти, чтобы адреса новых и старых данных были неизменны.
Небольшое отступление.
Можно рассмотреть другие реализации класса AsyncValue, которые не требуют динамических аллокаций:
Получается следующая схематичная реализация функции setValue:
Как видно, нам необходимо повышать блокировку m_lock на запись и возвращать обратно на чтение. К сожалению такой поддержки в классе QReadWriteLock нет. Достичь нужного функционала можно парой QMutex/QReadWriteLock. Вот реализация класса AsyncValue, приближенная к реальной:
Для тех, кто не устал и не потерялся, продолжим.
Как можно заметить у нас есть функции accessXXX, которые не ждут пока AsyncValue перейдет в соответствующее состояние, а просто возвращают false. Иногда полезно синхронно дождаться пока в AsyncValue появится или значение или ошибка. По сути нам нужен аналог std::future::get. Вот сигнатура функции:
Для работы этой функции нам понадобится condition variable — объект синхронизации, который можно ждать в одном потоке и активизировать в другом. В функции wait нам следует ждать, а при смене состояния AsyncValue c Progress на Value или Error мы должны оповестить ждущих.
Добавление еще одного поля в класс AsyncValue, которое нужно в редких случаях, когда используют функцию wait, навела меня на размышления — можно ли сделать это поле опциональным? Ответ очевиден, конечно можно, если хранить std::unique_ptr и создавать его при необходимости. Возник второй вопрос — можно ли сделать это поле опциональным и не делать динамических аллокаций. Кому интересно, прошу посмотреть на следующий код. Основная идея в следующем, первый вызов wait создает на стеке структуру с QWaitCondition и записывает ее указатель в AsyncValue, последующие вызовы wait просто проверяют, если указатель не пустой, по пользуются структурой по этому указателю, если указатель пустой — см. выше про первый вызов wait.
Как уже упоминалось, AsyncValue не имеет метода для асинхронных вычислений чтобы не привязываться к конкретной библиотеке. Вместо этого используются свободные функции, которые реализуют асинхронность тем или иным способом. Ниже приведён пример вычисления AsyncValue на пуле потоков:
В библиотеке реализовано ещё две схожих функции: asyncValueRunNetwork для обработки сетевых запросов и asyncValueRunThread, который выполняет операцию на вновь созданом потоке. Пользователи библиотеки могут легко создать свои собственные функции и использовать там использовать те асинхронные средства, которые они используют в других местах.
Для повышения безопасности класс AsyncValue был расширен еще одним шаблонным классом AsyncTrackErrorsPolicy, который позволяет реагировать на неправильное использование AsyncValue. Например, вот реализация по умолчанию функции AsyncTrackErrorsPolicy::inProgressWhileDestruct, которая вызовется, если AsyncValue будет разрушаться во время работы асинхронной операции:
Что касается виджетов, то их реализация достаточно проста и лаконична. AsyncWidget — это контейнер, которые содержит в себе виджет для показа ошибки, либо прогресса, либо значения в зависимости в каком состоянии сейчас находится AsyncValue.
Пользователь обязан переопределить лишь первую функцию, для отображения value, две остальных имеют реализации по умолчанию.
Библиотека qt-async получилась компактной, но в то же время достаточно полезной. Использование AsyncValue/AsyncWidget, там где раньше были синхронные функции и статический GUI, позволит вашим приложениям стать современными и более отзывчивыми.
Для тех, кто прочитал до конца бонус — видео работы демо приложения
Асинхронность и многопоточность уже давно и серьезно входит в обиход разработчиков. Многие современные языки и библиотеки разрабатываются с прицелом на асинхронное использование. Язык С++ тоже потихоньку движется в этом направлении — появились std::thread, std::promise/future, должны вот-вот завезти корутины и networking. Библиотека Qt тоже не отстает, предлагая свои аналоги QThread, QRunnable, QThreadPool, QFuture и т.п. При этом виджетов для отображения асинхронных действий в Qt я не нашел (возможно плохо искал, поправьте, если я ошибаюсь).
Поэтому я решил восполнить недостаток и попробовать реализовать такой виджет самому. Многопоточная разработка — дело сложное, но интересное.
Прежде, чем приступить к реализации виджета, нужно описать модель, которую он будет представлять пользователю в виде окна. В самом общем виде работа виджета мне представляется следующим образом: в некоторый момент времени пользователь или система стартует асинхронную операцию. В этот момент виджет показывает прогресс операции или просто индикацию выполнения операции. Опционально пользователь имеет возможность отменить операцию. Далее асинхронная операция завершается двумя исходами: либо произошла ошибка и наш виджет показывает её, либо виджет показывает результат успешного выполнения операции.
Таким образом наша модель может находиться в одном из трех состояний:
- Progress — асинхронная операция выполняется
- Error — асинхронная операция завершилась ошибкой
- Value — асинхронная операция завершилась успешно
В каждом из состояний модель должна хранить соответствующие данные, поэтому я назвал модель AsyncValue. Важно заметить, что сама асинхронная операция не является частью нашей модели, она лишь переключает её состояние. Получается, что AsyncValue можно использовать с любой асинхронной библиотекой, соблюдая простой шаблон использования:
- В начале асинхронной операции перевести AsuncValue в состояние Progress
- В конце — либо в Error, либо в Value в зависимости от успешности операции
- Опционально в процессе выполнения операции можно обновлять Progress данные и слушать Stop флаг, если пользователь имеет возможность остановить операцию.
Вот схематичный пример с использованием QRunnable:
class MyRunnable : public QRunnable
{
public:
MyRunnable(AsyncValue& value)
: m_value(value)
{}
void run() final
{
m_value.setProgress(...);
// do calculation
if (success)
m_value.setValue(...);
else
m_value.setError(...);
}
private:
AsyncValue& m_value;
}
Такая же схема для работы с std::thread:
AsyncValue value;
std::thread thread([&value] () {
value.setProgress(...);
// do calculation
if (success)
value.setValue(...);
else
value.setError(...);
});
Таким образом первая версия нашего класса могла бы выглядеть примерно так:
template <typename ValueType_t, typename ErrorType_t, typename ProgressType_t>
class AsyncValue
{
public:
using ValueType = ValueType_t;
using ErrorType = ErrorType_t;
using ProgressType = ProgressType_t;
// public API
private:
QReadWriteLock m_lock;
std::variant<ValueType, ErrorType, ProgressType> m_value;
};
Каждый, кто сталкивался с классами, которые поддерживают многопоточность, знает, что интерфейс таких классов отличается от однопоточных аналогов. Например функция size() бесполезна и опасна в многопоточном векторе. Её результат сразу же может стать невалидным, так как вектор может модифицироваться в данный момент в другом потоке.
Пользователи класса AsyncValue должны иметь возможность получить доступ к данным класса. Выдавать копию данных может быть накладно, любой из типов ValueType/ErrorType/ProgressType могут быть тяжеловесными. Выдавать ссылку на внутренние данные опасно — в любой момент они могут стать невалидными. Предлагается следующее решение:
1. Доступ к данным давать через функции accessValue/accessError/accessProgress, в которые передаются лямбды, принимающие соответствующие данные. Например:
template <typename Pred>
bool accessValue(Pred valuePred)
{
QReadLocker locker(&m_lock);
if (m_value.index() != 0)
return false;
valuePred(std::get<0>(m_value));
return true;
}
Таким образом, доступ к внутреннему значению осуществляется по ссылке и находится под локом на чтение. То есть ссылка в момент доступа не станет невалидной.
2. Пользователь AsyncValue в функции accessValue может запомнить ссылку на внутренние данные, при условии, что он подписан на сигнал stateChanged и после обработки сигнала обязан больше не пользоваться этой ссылкой, т.к. она станет невалидной.
При таких условиях потребитель AsyncValue всегда гарантированно имеет валидный и удобный доступ к данным. У данного решения несколько последствий, влияющих на реализацию класса AsyncValue.
Во-первых, наш класс должен посылать сигнал при изменении состояния, но при этом он шаблонный. Придется добавить базовый Qt класс, где определить сигнал, по которому виджет будет обновлять свое содержимое, а все заинтересованные обновлять ссылки на внутренние данные.
сlass AsyncValueBase : public QObject
{
Q_OBJECT
Q_DISABLE_COPY(AsyncValueBase)
signals:
void stateChanged();
};
Во-вторых, момент рассылки сигнала должен быть заблокирован на чтение (чтобы AsyncValue нельзя было менять, пока все не обработают сигнал) и, самое важное, в этот момент должны быть валидные ссылки на новые и старые данные. Потому что в процессе рассылки сигнала часть потребителей AsyncValue еще пользуются старыми ссылками, а те, кто обработал сигнал — новыми.
Получается, что std::variant нам не подходит и придется хранить данные в динамической памяти, чтобы адреса новых и старых данных были неизменны.
Небольшое отступление.
Можно рассмотреть другие реализации класса AsyncValue, которые не требуют динамических аллокаций:
- Выдавать потребителям только копии внутренних данных AsyncValue. Как я писал раннее, такое решение может быть более неоптимально, если данные большие.
- Определить два сигнала вместо одного: stateWillChange/stateDidChange. Обязать потребителей по первому сигналу избавляться от старых ссылок и по второму сигналу получать новые ссылки. Данная схема, мне кажется, чрезмерно усложняет потребителей AsyncValue, т.к. у них появляются промежутки времени когда доступ к AsyncValue запрещен.
Получается следующая схематичная реализация функции setValue:
void AsyncValue::setValue(...)
{
заблокировать m_lock на чтение
скопировать указатели на старые внутренние данные в локальную переменную
{
повысить блокировку m_lock на запись
поменять указатели на новые данные
вернуть блокировку m_lock на чтение
}
разослать stateChanged сигнал
удалить старые данные
снять блокировку m_lock на чтение
};
Как видно, нам необходимо повышать блокировку m_lock на запись и возвращать обратно на чтение. К сожалению такой поддержки в классе QReadWriteLock нет. Достичь нужного функционала можно парой QMutex/QReadWriteLock. Вот реализация класса AsyncValue, приближенная к реальной:
// возможные состояния AsyncValue
enum class ASYNC_VALUE_STATE
{
VALUE,
ERROR,
PROGRESS
};
Q_DECLARE_METATYPE(ASYNC_VALUE_STATE);
// базовый класс с сигналом и нешаблонными данными
class AsyncValueBase : public QObject
{
Q_OBJECT
Q_DISABLE_COPY(AsyncValueBase)
signals:
void stateChanged(ASYNC_VALUE_STATE state);
protected:
explicit AsyncValueBase(ASYNC_VALUE_STATE state, QObject* parent = nullptr);
// пара локов для имитации PromoteToWriteLock/DemoteToReadLock
QMutex m_writeLock;
QReadWriteLock m_contentLock;
// текущее состояние
ASYNC_VALUE_STATE m_state;
};
template <typename ValueType_t, typename ErrorType_t, typename ProgressType_t>
class AsyncValueTemplate : public AsyncValueBase
{
// данные
struct Content
{
std::unique_ptr<ValueType_t> value;
std::unique_ptr<ErrorType_t> error;
std::unique_ptr<ProgressType+t> progress;
};
Content m_content;
public:
using ValueType = ValueType_t;
using ErrorType = ErrorType_t;
using ProgressType = ProgressType_t;
// создать новое значение
template <typename... Args>
void emplaceValue(Args&& ...arguments)
{
moveValue(std::make_unique<ValueType>(std::forward<Args>(arguments)...));
}
// положить новое значение
void moveValue(std::unique_ptr<ValueType> value)
{
// локальная переменная для сохранения старых данных
Content oldContent;
// блокируем все emplaceXXX/moveXXX функции между собой
QMutexLocker writeLocker(&m_writeLock);
{
// блокируем доступ к данным на запись
QWriteLocker locker(&m_contentLock);
// перемещаем указатели на старые данные
oldContent = std::move(m_content);
// выставляем новые данные
m_content.value = std::move(value);
// меняем состояние объекта
m_state = ASYNC_VALUE_STATE::VALUE;
// разблокируем доступ к данным
}
// рассылаем сигнал
emitStateChanged();
// снимаем блокировку для emplaceXXX/moveXXX функций
// удаляем старые данные
}
// реализация аналогична value
void emplaceError(Args&& ...arguments);
void moveError(std::unique_ptr<ErrorType> error);
void emplaceProgress(Args&& ...arguments);
void moveProgress(std::unique_ptr<ProgressType> progress);
template <typename Pred>
bool accessValue(Pred valuePred)
{
// блокируем данные на чтение
QReadLocker locker(&m_contentLock);
// проверяем состояние объекта
if (m_state != ASYNC_VALUE_STATE::VALUE)
return false;
// даем доступ к внутренним данным
valuePred(*m_content.value);
// снимаем блокировку с данных
return true;
}
// аналогично accessValue
bool accessError(Pred errorPred)
bool accessProgress(Pred progressPred)
};
Для тех, кто не устал и не потерялся, продолжим.
Как можно заметить у нас есть функции accessXXX, которые не ждут пока AsyncValue перейдет в соответствующее состояние, а просто возвращают false. Иногда полезно синхронно дождаться пока в AsyncValue появится или значение или ошибка. По сути нам нужен аналог std::future::get. Вот сигнатура функции:
template <typename ValuePred, typename ErrorPred>
void wait(ValuePred valuePred, ErrorPred errorPred);
Для работы этой функции нам понадобится condition variable — объект синхронизации, который можно ждать в одном потоке и активизировать в другом. В функции wait нам следует ждать, а при смене состояния AsyncValue c Progress на Value или Error мы должны оповестить ждущих.
Добавление еще одного поля в класс AsyncValue, которое нужно в редких случаях, когда используют функцию wait, навела меня на размышления — можно ли сделать это поле опциональным? Ответ очевиден, конечно можно, если хранить std::unique_ptr и создавать его при необходимости. Возник второй вопрос — можно ли сделать это поле опциональным и не делать динамических аллокаций. Кому интересно, прошу посмотреть на следующий код. Основная идея в следующем, первый вызов wait создает на стеке структуру с QWaitCondition и записывает ее указатель в AsyncValue, последующие вызовы wait просто проверяют, если указатель не пустой, по пользуются структурой по этому указателю, если указатель пустой — см. выше про первый вызов wait.
class AsyncValueBase : public QObject
{
...
struct Waiter
{
// основная переменная для ожидания
QWaitCondition waitValue;
// счетчик вторичных wait
quint16 subWaiters = 0;
// первый wait должен ждать всех последующих
QWaitCondition waitSubWaiters;
};
// указатель на структуру
Waiter* m_waiter = nullptr;
};
template <typename ValuePred, typename ErrorPred>
void wait(ValuePred valuePred, ErrorPred errorPred)
{
// простой случай - мы получили значение или ошибку
if (access(valuePred, errorPred))
return;
// блокируем AsyncValue от изменений
QMutexLocker writeLocker(&m_writeLock);
// проверяем простой случай снова
if (access(valuePred, errorPred))
return;
// если данный вызов wait первый
if (!m_waiter)
{
// создаем Waiter на стеке
Waiter theWaiter;
// выполняем этот код при выходе из if
SCOPE_EXIT {
// если есть последующие вызовы wait,
// то они используют theWaiter
if (m_waiter->subWaiters > 0)
{
// поэтому ждем пока subWaiters не обнулится
do
{
m_waiter->waitSubWaiters.wait(&m_writeLock);
} while (m_waiter->subWaiters != 0);
}
// больше никаких wait не осталось,
// можно занулять поле и разрушать временный Waiter
m_waiter = nullptr;
};
// выставляем адрес локального Waiter в AsyncValue
// чтобы последующие вызовы wait использовали его
m_waiter = &theWaiter;
// ждем пока AsyncValue не перейдет в состояние Value или Error
// и оповестит нас
do
{
m_waiter->waitValue.wait(&m_writeLock);
} while (!access(valuePred, errorPred));
}
// случай когда wait не первый
else
{
// выполняем этот код при выходе из else
SCOPE_EXIT {
// убираем себя из счетчика ожидающих
m_waiter->subWaiters -= 1;
// если никого не осталось -> оповещаем первый wait
if (m_waiter->subWaiters == 0)
m_waiter->waitSubWaiters.wakeAll();
};
// добавляем себя в счетчик ожидающих
m_waiter->subWaiters += 1;
// ждем пока AsyncValue не перейдет в состояние Value или Error
// и оповестит нас
do
{
m_waiter->waitValue.wait(&m_writeLock);
} while (!access(valuePred, errorPred));
}
}
Как уже упоминалось, AsyncValue не имеет метода для асинхронных вычислений чтобы не привязываться к конкретной библиотеке. Вместо этого используются свободные функции, которые реализуют асинхронность тем или иным способом. Ниже приведён пример вычисления AsyncValue на пуле потоков:
template <typename AsyncValueType, typename Func, typename... ProgressArgs>
bool asyncValueRunThreadPool(QThreadPool *pool, AsyncValueType& value, Func&& func, ProgressArgs&& ...progressArgs)
{
// создаём объект прогресс
auto progress = std::make_unique<typename AsyncValueType::ProgressType>(std::forward<ProgressArgs>(progressArgs)...);
// запоминаем его адрес
auto progressPtr = progress.get();
// перемещаем прогресс в AsyncValue
if (!value.startProgress(std::move(progress)))
return false;
QtConcurrent::run(pool, [&value, progressPtr, func = std::forward<Func>(func)](){
SCOPE_EXIT {
// в конце функции сообщаем AsyncValue, что прогресс завершён
value.completeProgress(progressPtr);
};
// вычисляем AsyncValue
func(*progressPtr, value);
});
return true;
}
В библиотеке реализовано ещё две схожих функции: asyncValueRunNetwork для обработки сетевых запросов и asyncValueRunThread, который выполняет операцию на вновь созданом потоке. Пользователи библиотеки могут легко создать свои собственные функции и использовать там использовать те асинхронные средства, которые они используют в других местах.
Для повышения безопасности класс AsyncValue был расширен еще одним шаблонным классом AsyncTrackErrorsPolicy, который позволяет реагировать на неправильное использование AsyncValue. Например, вот реализация по умолчанию функции AsyncTrackErrorsPolicy::inProgressWhileDestruct, которая вызовется, если AsyncValue будет разрушаться во время работы асинхронной операции:
void inProgressWhileDestruct() const
{
Q_ASSERT(false && "Destructing value while it's in progress");
}
Что касается виджетов, то их реализация достаточно проста и лаконична. AsyncWidget — это контейнер, которые содержит в себе виджет для показа ошибки, либо прогресса, либо значения в зависимости в каком состоянии сейчас находится AsyncValue.
virtual QWidget* createValueWidgetImpl(ValueType& value, QWidget* parent);
virtual QWidget* createErrorWidgetImpl(ErrorType& error, QWidget* parent);
virtual QWidget* createProgressWidgetImpl(ProgressType& progress, QWidget* parent);
Пользователь обязан переопределить лишь первую функцию, для отображения value, две остальных имеют реализации по умолчанию.
Библиотека qt-async получилась компактной, но в то же время достаточно полезной. Использование AsyncValue/AsyncWidget, там где раньше были синхронные функции и статический GUI, позволит вашим приложениям стать современными и более отзывчивыми.
Для тех, кто прочитал до конца бонус — видео работы демо приложения