Всем доброго времени суток. Хочу немного рассказать о своём проекте qt-async, возможно кому-то он покажется интересным или даже полезным.
Асинхронность и многопоточность уже давно и серьезно входит в обиход разработчиков. Многие современные языки и библиотеки разрабатываются с прицелом на асинхронное использование. Язык С++ тоже потихоньку движется в этом направлении — появились std::thread, std::promise/future, должны вот-вот завезти корутины и networking. Библиотека Qt тоже не отстает, предлагая свои аналоги QThread, QRunnable, QThreadPool, QFuture и т.п. При этом виджетов для отображения асинхронных действий в Qt я не нашел (возможно плохо искал, поправьте, если я ошибаюсь).
Поэтому я решил восполнить недостаток и попробовать реализовать такой виджет самому. Многопоточная разработка — дело сложное, но интересное.
Прежде, чем приступить к реализации виджета, нужно описать модель, которую он будет представлять пользователю в виде окна. В самом общем виде работа виджета мне представляется следующим образом: в некоторый момент времени пользователь или система стартует асинхронную операцию. В этот момент виджет показывает прогресс операции или просто индикацию выполнения операции. Опционально пользователь имеет возможность отменить операцию. Далее асинхронная операция завершается двумя исходами: либо произошла ошибка и наш виджет показывает её, либо виджет показывает результат успешного выполнения операции.
Таким образом наша модель может находиться в одном из трех состояний:
В каждом из состояний модель должна хранить соответствующие данные, поэтому я назвал модель AsyncValue. Важно заметить, что сама асинхронная операция не является частью нашей модели, она лишь переключает её состояние. Получается, что AsyncValue можно использовать с любой асинхронной библиотекой, соблюдая простой шаблон использования:
Вот схематичный пример с использованием QRunnable:
Такая же схема для работы с std::thread:
Таким образом первая версия нашего класса могла бы выглядеть примерно так:
Каждый, кто сталкивался с классами, которые поддерживают многопоточность, знает, что интерфейс таких классов отличается от однопоточных аналогов. Например функция size() бесполезна и опасна в многопоточном векторе. Её результат сразу же может стать невалидным, так как вектор может модифицироваться в данный момент в другом потоке.
Пользователи класса AsyncValue должны иметь возможность получить доступ к данным класса. Выдавать копию данных может быть накладно, любой из типов ValueType/ErrorType/ProgressType могут быть тяжеловесными. Выдавать ссылку на внутренние данные опасно — в любой момент они могут стать невалидными. Предлагается следующее решение:
1. Доступ к данным давать через функции accessValue/accessError/accessProgress, в которые передаются лямбды, принимающие соответствующие данные. Например:
Таким образом, доступ к внутреннему значению осуществляется по ссылке и находится под локом на чтение. То есть ссылка в момент доступа не станет невалидной.
2. Пользователь AsyncValue в функции accessValue может запомнить ссылку на внутренние данные, при условии, что он подписан на сигнал stateChanged и после обработки сигнала обязан больше не пользоваться этой ссылкой, т.к. она станет невалидной.
При таких условиях потребитель AsyncValue всегда гарантированно имеет валидный и удобный доступ к данным. У данного решения несколько последствий, влияющих на реализацию класса AsyncValue.
Во-первых, наш класс должен посылать сигнал при изменении состояния, но при этом он шаблонный. Придется добавить базовый Qt класс, где определить сигнал, по которому виджет будет обновлять свое содержимое, а все заинтересованные обновлять ссылки на внутренние данные.
Во-вторых, момент рассылки сигнала должен быть заблокирован на чтение (чтобы AsyncValue нельзя было менять, пока все не обработают сигнал) и, самое важное, в этот момент должны быть валидные ссылки на новые и старые данные. Потому что в процессе рассылки сигнала часть потребителей AsyncValue еще пользуются старыми ссылками, а те, кто обработал сигнал — новыми.
Получается, что std::variant нам не подходит и придется хранить данные в динамической памяти, чтобы адреса новых и старых данных были неизменны.
Небольшое отступление.
Можно рассмотреть другие реализации класса AsyncValue, которые не требуют динамических аллокаций:
Получается следующая схематичная реализация функции setValue:
Как видно, нам необходимо повышать блокировку m_lock на запись и возвращать обратно на чтение. К сожалению такой поддержки в классе QReadWriteLock нет. Достичь нужного функционала можно парой QMutex/QReadWriteLock. Вот реализация класса AsyncValue, приближенная к реальной:
Для тех, кто не устал и не потерялся, продолжим.
Как можно заметить у нас есть функции accessXXX, которые не ждут пока AsyncValue перейдет в соответствующее состояние, а просто возвращают false. Иногда полезно синхронно дождаться пока в AsyncValue появится или значение или ошибка. По сути нам нужен аналог std::future::get. Вот сигнатура функции:
Для работы этой функции нам понадобится condition variable — объект синхронизации, который можно ждать в одном потоке и активизировать в другом. В функции wait нам следует ждать, а при смене состояния AsyncValue c Progress на Value или Error мы должны оповестить ждущих.
Добавление еще одного поля в класс AsyncValue, которое нужно в редких случаях, когда используют функцию wait, навела меня на размышления — можно ли сделать это поле опциональным? Ответ очевиден, конечно можно, если хранить std::unique_ptr и создавать его при необходимости. Возник второй вопрос — можно ли сделать это поле опциональным и не делать динамических аллокаций. Кому интересно, прошу посмотреть на следующий код. Основная идея в следующем, первый вызов wait создает на стеке структуру с QWaitCondition и записывает ее указатель в AsyncValue, последующие вызовы wait просто проверяют, если указатель не пустой, по пользуются структурой по этому указателю, если указатель пустой — см. выше про первый вызов wait.
Как уже упоминалось, AsyncValue не имеет метода для асинхронных вычислений чтобы не привязываться к конкретной библиотеке. Вместо этого используются свободные функции, которые реализуют асинхронность тем или иным способом. Ниже приведён пример вычисления AsyncValue на пуле потоков:
В библиотеке реализовано ещё две схожих функции: asyncValueRunNetwork для обработки сетевых запросов и asyncValueRunThread, который выполняет операцию на вновь созданом потоке. Пользователи библиотеки могут легко создать свои собственные функции и использовать там использовать те асинхронные средства, которые они используют в других местах.
Для повышения безопасности класс AsyncValue был расширен еще одним шаблонным классом AsyncTrackErrorsPolicy, который позволяет реагировать на неправильное использование AsyncValue. Например, вот реализация по умолчанию функции AsyncTrackErrorsPolicy::inProgressWhileDestruct, которая вызовется, если AsyncValue будет разрушаться во время работы асинхронной операции:
Что касается виджетов, то их реализация достаточно проста и лаконична. AsyncWidget — это контейнер, которые содержит в себе виджет для показа ошибки, либо прогресса, либо значения в зависимости в каком состоянии сейчас находится AsyncValue.
Пользователь обязан переопределить лишь первую функцию, для отображения value, две остальных имеют реализации по умолчанию.
Библиотека qt-async получилась компактной, но в то же время достаточно полезной. Использование AsyncValue/AsyncWidget, там где раньше были синхронные функции и статический GUI, позволит вашим приложениям стать современными и более отзывчивыми.
Для тех, кто прочитал до конца бонус — видео работы демо приложения
Асинхронность и многопоточность уже давно и серьезно входит в обиход разработчиков. Многие современные языки и библиотеки разрабатываются с прицелом на асинхронное использование. Язык С++ тоже потихоньку движется в этом направлении — появились std::thread, std::promise/future, должны вот-вот завезти корутины и networking. Библиотека Qt тоже не отстает, предлагая свои аналоги QThread, QRunnable, QThreadPool, QFuture и т.п. При этом виджетов для отображения асинхронных действий в Qt я не нашел (возможно плохо искал, поправьте, если я ошибаюсь).
Поэтому я решил восполнить недостаток и попробовать реализовать такой виджет самому. Многопоточная разработка — дело сложное, но интересное.
Прежде, чем приступить к реализации виджета, нужно описать модель, которую он будет представлять пользователю в виде окна. В самом общем виде работа виджета мне представляется следующим образом: в некоторый момент времени пользователь или система стартует асинхронную операцию. В этот момент виджет показывает прогресс операции или просто индикацию выполнения операции. Опционально пользователь имеет возможность отменить операцию. Далее асинхронная операция завершается двумя исходами: либо произошла ошибка и наш виджет показывает её, либо виджет показывает результат успешного выполнения операции.
Таким образом наша модель может находиться в одном из трех состояний:
- Progress — асинхронная операция выполняется
- Error — асинхронная операция завершилась ошибкой
- Value — асинхронная операция завершилась успешно
В каждом из состояний модель должна хранить соответствующие данные, поэтому я назвал модель AsyncValue. Важно заметить, что сама асинхронная операция не является частью нашей модели, она лишь переключает её состояние. Получается, что AsyncValue можно использовать с любой асинхронной библиотекой, соблюдая простой шаблон использования:
- В начале асинхронной операции перевести AsuncValue в состояние Progress
- В конце — либо в Error, либо в Value в зависимости от успешности операции
- Опционально в процессе выполнения операции можно обновлять Progress данные и слушать Stop флаг, если пользователь имеет возможность остановить операцию.
Вот схематичный пример с использованием QRunnable:
class MyRunnable : public QRunnable { public: MyRunnable(AsyncValue& value) : m_value(value) {} void run() final { m_value.setProgress(...); // do calculation if (success) m_value.setValue(...); else m_value.setError(...); } private: AsyncValue& m_value; }
Такая же схема для работы с std::thread:
AsyncValue value; std::thread thread([&value] () { value.setProgress(...); // do calculation if (success) value.setValue(...); else value.setError(...); });
Таким образом первая версия нашего класса могла бы выглядеть примерно так:
template <typename ValueType_t, typename ErrorType_t, typename ProgressType_t> class AsyncValue { public: using ValueType = ValueType_t; using ErrorType = ErrorType_t; using ProgressType = ProgressType_t; // public API private: QReadWriteLock m_lock; std::variant<ValueType, ErrorType, ProgressType> m_value; };
Каждый, кто сталкивался с классами, которые поддерживают многопоточность, знает, что интерфейс таких классов отличается от однопоточных аналогов. Например функция size() бесполезна и опасна в многопоточном векторе. Её результат сразу же может стать невалидным, так как вектор может модифицироваться в данный момент в другом потоке.
Пользователи класса AsyncValue должны иметь возможность получить доступ к данным класса. Выдавать копию данных может быть накладно, любой из типов ValueType/ErrorType/ProgressType могут быть тяжеловесными. Выдавать ссылку на внутренние данные опасно — в любой момент они могут стать невалидными. Предлагается следующее решение:
1. Доступ к данным давать через функции accessValue/accessError/accessProgress, в которые передаются лямбды, принимающие соответствующие данные. Например:
template <typename Pred> bool accessValue(Pred valuePred) { QReadLocker locker(&m_lock); if (m_value.index() != 0) return false; valuePred(std::get<0>(m_value)); return true; }
Таким образом, доступ к внутреннему значению осуществляется по ссылке и находится под локом на чтение. То есть ссылка в момент доступа не станет невалидной.
2. Пользователь AsyncValue в функции accessValue может запомнить ссылку на внутренние данные, при условии, что он подписан на сигнал stateChanged и после обработки сигнала обязан больше не пользоваться этой ссылкой, т.к. она станет невалидной.
При таких условиях потребитель AsyncValue всегда гарантированно имеет валидный и удобный доступ к данным. У данного решения несколько последствий, влияющих на реализацию класса AsyncValue.
Во-первых, наш класс должен посылать сигнал при изменении состояния, но при этом он шаблонный. Придется добавить базовый Qt класс, где определить сигнал, по которому виджет будет обновлять свое содержимое, а все заинтересованные обновлять ссылки на внутренние данные.
сlass AsyncValueBase : public QObject { Q_OBJECT Q_DISABLE_COPY(AsyncValueBase) signals: void stateChanged(); };
Во-вторых, момент рассылки сигнала должен быть заблокирован на чтение (чтобы AsyncValue нельзя было менять, пока все не обработают сигнал) и, самое важное, в этот момент должны быть валидные ссылки на новые и старые данные. Потому что в процессе рассылки сигнала часть потребителей AsyncValue еще пользуются старыми ссылками, а те, кто обработал сигнал — новыми.
Получается, что std::variant нам не подходит и придется хранить данные в динамической памяти, чтобы адреса новых и старых данных были неизменны.
Небольшое отступление.
Можно рассмотреть другие реализации класса AsyncValue, которые не требуют динамических аллокаций:
- Выдавать потребителям только копии внутренних данных AsyncValue. Как я писал раннее, такое решение может быть более неоптимально, если данные большие.
- Определить два сигнала вместо одного: stateWillChange/stateDidChange. Обязать потребителей по первому сигналу избавляться от старых ссылок и по второму сигналу получать новые ссылки. Данная схема, мне кажется, чрезмерно усложняет потребителей AsyncValue, т.к. у них появляются промежутки времени когда доступ к AsyncValue запрещен.
Получается следующая схематичная реализация функции setValue:
void AsyncValue::setValue(...) { заблокировать m_lock на чтение скопировать указатели на старые внутренние данные в локальную переменную { повысить блокировку m_lock на запись поменять указатели на новые данные вернуть блокировку m_lock на чтение } разослать stateChanged сигнал удалить старые данные снять блокировку m_lock на чтение };
Как видно, нам необходимо повышать блокировку m_lock на запись и возвращать обратно на чтение. К сожалению такой поддержки в классе QReadWriteLock нет. Достичь нужного функционала можно парой QMutex/QReadWriteLock. Вот реализация класса AsyncValue, приближенная к реальной:
// возможные состояния AsyncValue enum class ASYNC_VALUE_STATE { VALUE, ERROR, PROGRESS }; Q_DECLARE_METATYPE(ASYNC_VALUE_STATE); // базовый класс с сигналом и нешаблонными данными class AsyncValueBase : public QObject { Q_OBJECT Q_DISABLE_COPY(AsyncValueBase) signals: void stateChanged(ASYNC_VALUE_STATE state); protected: explicit AsyncValueBase(ASYNC_VALUE_STATE state, QObject* parent = nullptr); // пара локов для имитации PromoteToWriteLock/DemoteToReadLock QMutex m_writeLock; QReadWriteLock m_contentLock; // текущее состояние ASYNC_VALUE_STATE m_state; }; template <typename ValueType_t, typename ErrorType_t, typename ProgressType_t> class AsyncValueTemplate : public AsyncValueBase { // данные struct Content { std::unique_ptr<ValueType_t> value; std::unique_ptr<ErrorType_t> error; std::unique_ptr<ProgressType+t> progress; }; Content m_content; public: using ValueType = ValueType_t; using ErrorType = ErrorType_t; using ProgressType = ProgressType_t; // создать новое значение template <typename... Args> void emplaceValue(Args&& ...arguments) { moveValue(std::make_unique<ValueType>(std::forward<Args>(arguments)...)); } // положить новое значение void moveValue(std::unique_ptr<ValueType> value) { // локальная переменная для сохранения старых данных Content oldContent; // блокируем все emplaceXXX/moveXXX функции между собой QMutexLocker writeLocker(&m_writeLock); { // блокируем доступ к данным на запись QWriteLocker locker(&m_contentLock); // перемещаем указатели на старые данные oldContent = std::move(m_content); // выставляем новые данные m_content.value = std::move(value); // меняем состояние объекта m_state = ASYNC_VALUE_STATE::VALUE; // разблокируем доступ к данным } // рассылаем сигнал emitStateChanged(); // снимаем блокировку для emplaceXXX/moveXXX функций // удаляем старые данные } // реализация аналогична value void emplaceError(Args&& ...arguments); void moveError(std::unique_ptr<ErrorType> error); void emplaceProgress(Args&& ...arguments); void moveProgress(std::unique_ptr<ProgressType> progress); template <typename Pred> bool accessValue(Pred valuePred) { // блокируем данные на чтение QReadLocker locker(&m_contentLock); // проверяем состояние объекта if (m_state != ASYNC_VALUE_STATE::VALUE) return false; // даем доступ к внутренним данным valuePred(*m_content.value); // снимаем блокировку с данных return true; } // аналогично accessValue bool accessError(Pred errorPred) bool accessProgress(Pred progressPred) };
Для тех, кто не устал и не потерялся, продолжим.
Как можно заметить у нас есть функции accessXXX, которые не ждут пока AsyncValue перейдет в соответствующее состояние, а просто возвращают false. Иногда полезно синхронно дождаться пока в AsyncValue появится или значение или ошибка. По сути нам нужен аналог std::future::get. Вот сигнатура функции:
template <typename ValuePred, typename ErrorPred> void wait(ValuePred valuePred, ErrorPred errorPred);
Для работы этой функции нам понадобится condition variable — объект синхронизации, который можно ждать в одном потоке и активизировать в другом. В функции wait нам следует ждать, а при смене состояния AsyncValue c Progress на Value или Error мы должны оповестить ждущих.
Добавление еще одного поля в класс AsyncValue, которое нужно в редких случаях, когда используют функцию wait, навела меня на размышления — можно ли сделать это поле опциональным? Ответ очевиден, конечно можно, если хранить std::unique_ptr и создавать его при необходимости. Возник второй вопрос — можно ли сделать это поле опциональным и не делать динамических аллокаций. Кому интересно, прошу посмотреть на следующий код. Основная идея в следующем, первый вызов wait создает на стеке структуру с QWaitCondition и записывает ее указатель в AsyncValue, последующие вызовы wait просто проверяют, если указатель не пустой, по пользуются структурой по этому указателю, если указатель пустой — см. выше про первый вызов wait.
class AsyncValueBase : public QObject { ... struct Waiter { // основная переменная для ожидания QWaitCondition waitValue; // счетчик вторичных wait quint16 subWaiters = 0; // первый wait должен ждать всех последующих QWaitCondition waitSubWaiters; }; // указатель на структуру Waiter* m_waiter = nullptr; }; template <typename ValuePred, typename ErrorPred> void wait(ValuePred valuePred, ErrorPred errorPred) { // простой случай - мы получили значение или ошибку if (access(valuePred, errorPred)) return; // блокируем AsyncValue от изменений QMutexLocker writeLocker(&m_writeLock); // проверяем простой случай снова if (access(valuePred, errorPred)) return; // если данный вызов wait первый if (!m_waiter) { // создаем Waiter на стеке Waiter theWaiter; // выполняем этот код при выходе из if SCOPE_EXIT { // если есть последующие вызовы wait, // то они используют theWaiter if (m_waiter->subWaiters > 0) { // поэтому ждем пока subWaiters не обнулится do { m_waiter->waitSubWaiters.wait(&m_writeLock); } while (m_waiter->subWaiters != 0); } // больше никаких wait не осталось, // можно занулять поле и разрушать временный Waiter m_waiter = nullptr; }; // выставляем адрес локального Waiter в AsyncValue // чтобы последующие вызовы wait использовали его m_waiter = &theWaiter; // ждем пока AsyncValue не перейдет в состояние Value или Error // и оповестит нас do { m_waiter->waitValue.wait(&m_writeLock); } while (!access(valuePred, errorPred)); } // случай когда wait не первый else { // выполняем этот код при выходе из else SCOPE_EXIT { // убираем себя из счетчика ожидающих m_waiter->subWaiters -= 1; // если никого не осталось -> оповещаем первый wait if (m_waiter->subWaiters == 0) m_waiter->waitSubWaiters.wakeAll(); }; // добавляем себя в счетчик ожидающих m_waiter->subWaiters += 1; // ждем пока AsyncValue не перейдет в состояние Value или Error // и оповестит нас do { m_waiter->waitValue.wait(&m_writeLock); } while (!access(valuePred, errorPred)); } }
Как уже упоминалось, AsyncValue не имеет метода для асинхронных вычислений чтобы не привязываться к конкретной библиотеке. Вместо этого используются свободные функции, которые реализуют асинхронность тем или иным способом. Ниже приведён пример вычисления AsyncValue на пуле потоков:
template <typename AsyncValueType, typename Func, typename... ProgressArgs> bool asyncValueRunThreadPool(QThreadPool *pool, AsyncValueType& value, Func&& func, ProgressArgs&& ...progressArgs) { // создаём объект прогресс auto progress = std::make_unique<typename AsyncValueType::ProgressType>(std::forward<ProgressArgs>(progressArgs)...); // запоминаем его адрес auto progressPtr = progress.get(); // перемещаем прогресс в AsyncValue if (!value.startProgress(std::move(progress))) return false; QtConcurrent::run(pool, [&value, progressPtr, func = std::forward<Func>(func)](){ SCOPE_EXIT { // в конце функции сообщаем AsyncValue, что прогресс завершён value.completeProgress(progressPtr); }; // вычисляем AsyncValue func(*progressPtr, value); }); return true; }
В библиотеке реализовано ещё две схожих функции: asyncValueRunNetwork для обработки сетевых запросов и asyncValueRunThread, который выполняет операцию на вновь созданом потоке. Пользователи библиотеки могут легко создать свои собственные функции и использовать там использовать те асинхронные средства, которые они используют в других местах.
Для повышения безопасности класс AsyncValue был расширен еще одним шаблонным классом AsyncTrackErrorsPolicy, который позволяет реагировать на неправильное использование AsyncValue. Например, вот реализация по умолчанию функции AsyncTrackErrorsPolicy::inProgressWhileDestruct, которая вызовется, если AsyncValue будет разрушаться во время работы асинхронной операции:
void inProgressWhileDestruct() const { Q_ASSERT(false && "Destructing value while it's in progress"); }
Что касается виджетов, то их реализация достаточно проста и лаконична. AsyncWidget — это контейнер, которые содержит в себе виджет для показа ошибки, либо прогресса, либо значения в зависимости в каком состоянии сейчас находится AsyncValue.
virtual QWidget* createValueWidgetImpl(ValueType& value, QWidget* parent); virtual QWidget* createErrorWidgetImpl(ErrorType& error, QWidget* parent); virtual QWidget* createProgressWidgetImpl(ProgressType& progress, QWidget* parent);
Пользователь обязан переопределить лишь первую функцию, для отображения value, две остальных имеют реализации по умолчанию.
Библиотека qt-async получилась компактной, но в то же время достаточно полезной. Использование AsyncValue/AsyncWidget, там где раньше были синхронные функции и статический GUI, позволит вашим приложениям стать современными и более отзывчивыми.
Для тех, кто прочитал до конца бонус — видео работы демо приложения
