Как стать автором
Обновить

Библиотека асинхронных виджетов qt-async

Время на прочтение10 мин
Количество просмотров7.2K
Всем доброго времени суток. Хочу немного рассказать о своём проекте qt-async, возможно кому-то он покажется интересным или даже полезным.

Асинхронность и многопоточность уже давно и серьезно входит в обиход разработчиков. Многие современные языки и библиотеки разрабатываются с прицелом на асинхронное использование. Язык С++ тоже потихоньку движется в этом направлении — появились std::thread, std::promise/future, должны вот-вот завезти корутины и networking. Библиотека Qt тоже не отстает, предлагая свои аналоги QThread, QRunnable, QThreadPool, QFuture и т.п. При этом виджетов для отображения асинхронных действий в Qt я не нашел (возможно плохо искал, поправьте, если я ошибаюсь).

Поэтому я решил восполнить недостаток и попробовать реализовать такой виджет самому. Многопоточная разработка — дело сложное, но интересное.

Прежде, чем приступить к реализации виджета, нужно описать модель, которую он будет представлять пользователю в виде окна. В самом общем виде работа виджета мне представляется следующим образом: в некоторый момент времени пользователь или система стартует асинхронную операцию. В этот момент виджет показывает прогресс операции или просто индикацию выполнения операции. Опционально пользователь имеет возможность отменить операцию. Далее асинхронная операция завершается двумя исходами: либо произошла ошибка и наш виджет показывает её, либо виджет показывает результат успешного выполнения операции.

Таким образом наша модель может находиться в одном из трех состояний:

  1. Progress — асинхронная операция выполняется
  2. Error — асинхронная операция завершилась ошибкой
  3. Value — асинхронная операция завершилась успешно

В каждом из состояний модель должна хранить соответствующие данные, поэтому я назвал модель AsyncValue. Важно заметить, что сама асинхронная операция не является частью нашей модели, она лишь переключает её состояние. Получается, что AsyncValue можно использовать с любой асинхронной библиотекой, соблюдая простой шаблон использования:

  1. В начале асинхронной операции перевести AsuncValue в состояние Progress
  2. В конце — либо в Error, либо в Value в зависимости от успешности операции
  3. Опционально в процессе выполнения операции можно обновлять Progress данные и слушать Stop флаг, если пользователь имеет возможность остановить операцию.

Вот схематичный пример с использованием QRunnable:

class MyRunnable : public QRunnable
{
public:
    MyRunnable(AsyncValue& value)
     : m_value(value)
    {}

    void run() final
    {
        m_value.setProgress(...);

        // do calculation

        if (success)
            m_value.setValue(...);
        else
            m_value.setError(...);
    }

private:
    AsyncValue& m_value;
}

Такая же схема для работы с std::thread:

AsyncValue value;
std::thread thread([&value] () {
    value.setProgress(...);

    // do calculation

    if (success)
        value.setValue(...);
    else
        value.setError(...);
});

Таким образом первая версия нашего класса могла бы выглядеть примерно так:

template <typename ValueType_t, typename ErrorType_t, typename ProgressType_t>
class AsyncValue
{
public:
    using ValueType = ValueType_t;
    using ErrorType = ErrorType_t;
    using ProgressType = ProgressType_t;

    // public API

private:
    QReadWriteLock m_lock;
    std::variant<ValueType, ErrorType, ProgressType> m_value;
};

Каждый, кто сталкивался с классами, которые поддерживают многопоточность, знает, что интерфейс таких классов отличается от однопоточных аналогов. Например функция size() бесполезна и опасна в многопоточном векторе. Её результат сразу же может стать невалидным, так как вектор может модифицироваться в данный момент в другом потоке.

Пользователи класса AsyncValue должны иметь возможность получить доступ к данным класса. Выдавать копию данных может быть накладно, любой из типов ValueType/ErrorType/ProgressType могут быть тяжеловесными. Выдавать ссылку на внутренние данные опасно — в любой момент они могут стать невалидными. Предлагается следующее решение:

1. Доступ к данным давать через функции accessValue/accessError/accessProgress, в которые передаются лямбды, принимающие соответствующие данные. Например:

template <typename Pred>
bool accessValue(Pred valuePred)
{
    QReadLocker locker(&m_lock);

    if (m_value.index() != 0)
        return false;

    valuePred(std::get<0>(m_value));
    return true;
}

Таким образом, доступ к внутреннему значению осуществляется по ссылке и находится под локом на чтение. То есть ссылка в момент доступа не станет невалидной.

2. Пользователь AsyncValue в функции accessValue может запомнить ссылку на внутренние данные, при условии, что он подписан на сигнал stateChanged и после обработки сигнала обязан больше не пользоваться этой ссылкой, т.к. она станет невалидной.

При таких условиях потребитель AsyncValue всегда гарантированно имеет валидный и удобный доступ к данным. У данного решения несколько последствий, влияющих на реализацию класса AsyncValue.

Во-первых, наш класс должен посылать сигнал при изменении состояния, но при этом он шаблонный. Придется добавить базовый Qt класс, где определить сигнал, по которому виджет будет обновлять свое содержимое, а все заинтересованные обновлять ссылки на внутренние данные.

сlass AsyncValueBase : public QObject
{
    Q_OBJECT
    Q_DISABLE_COPY(AsyncValueBase)

signals:
    void stateChanged();
};

Во-вторых, момент рассылки сигнала должен быть заблокирован на чтение (чтобы AsyncValue нельзя было менять, пока все не обработают сигнал) и, самое важное, в этот момент должны быть валидные ссылки на новые и старые данные. Потому что в процессе рассылки сигнала часть потребителей AsyncValue еще пользуются старыми ссылками, а те, кто обработал сигнал — новыми.

Получается, что std::variant нам не подходит и придется хранить данные в динамической памяти, чтобы адреса новых и старых данных были неизменны.

Небольшое отступление.

Можно рассмотреть другие реализации класса AsyncValue, которые не требуют динамических аллокаций:

  1. Выдавать потребителям только копии внутренних данных AsyncValue. Как я писал раннее, такое решение может быть более неоптимально, если данные большие.
  2. Определить два сигнала вместо одного: stateWillChange/stateDidChange. Обязать потребителей по первому сигналу избавляться от старых ссылок и по второму сигналу получать новые ссылки. Данная схема, мне кажется, чрезмерно усложняет потребителей AsyncValue, т.к. у них появляются промежутки времени когда доступ к AsyncValue запрещен.

Получается следующая схематичная реализация функции setValue:

void AsyncValue::setValue(...)
{
    заблокировать m_lock на чтение
    скопировать указатели на старые внутренние данные в локальную переменную

    {
        повысить блокировку m_lock на запись
        поменять указатели на новые данные
        вернуть блокировку m_lock на чтение
    }

    разослать stateChanged сигнал
    удалить старые данные
    снять блокировку m_lock на чтение
};

Как видно, нам необходимо повышать блокировку m_lock на запись и возвращать обратно на чтение. К сожалению такой поддержки в классе QReadWriteLock нет. Достичь нужного функционала можно парой QMutex/QReadWriteLock. Вот реализация класса AsyncValue, приближенная к реальной:

// возможные состояния AsyncValue
enum class ASYNC_VALUE_STATE
{
    VALUE,
    ERROR,
    PROGRESS
};
Q_DECLARE_METATYPE(ASYNC_VALUE_STATE);

// базовый класс с сигналом и нешаблонными данными
class AsyncValueBase : public QObject
{
    Q_OBJECT
    Q_DISABLE_COPY(AsyncValueBase)

signals:
    void stateChanged(ASYNC_VALUE_STATE state);

protected:
    explicit AsyncValueBase(ASYNC_VALUE_STATE state, QObject* parent = nullptr);

    // пара локов для имитации PromoteToWriteLock/DemoteToReadLock
    QMutex m_writeLock;
    QReadWriteLock m_contentLock;
    // текущее состояние
    ASYNC_VALUE_STATE m_state;
};

template <typename ValueType_t, typename ErrorType_t, typename ProgressType_t>
class AsyncValueTemplate : public AsyncValueBase
{
    // данные
    struct Content
    {
        std::unique_ptr<ValueType_t> value;
        std::unique_ptr<ErrorType_t> error;
        std::unique_ptr<ProgressType+t> progress;
    };
    Content m_content;

public:
    using ValueType = ValueType_t;
    using ErrorType = ErrorType_t;
    using ProgressType = ProgressType_t;

    // создать новое значение
    template <typename... Args>
    void emplaceValue(Args&& ...arguments)
    {
        moveValue(std::make_unique<ValueType>(std::forward<Args>(arguments)...));
    }

    // положить новое значение
    void moveValue(std::unique_ptr<ValueType> value)
    {
        // локальная переменная для сохранения старых данных
        Content oldContent;

        // блокируем все emplaceXXX/moveXXX функции между собой
        QMutexLocker writeLocker(&m_writeLock);
        {
            // блокируем доступ к данным на запись
            QWriteLocker locker(&m_contentLock);

            // перемещаем указатели на старые данные
            oldContent = std::move(m_content);
            // выставляем новые данные
            m_content.value = std::move(value);
            // меняем состояние объекта
            m_state = ASYNC_VALUE_STATE::VALUE;

            // разблокируем доступ к данным
        }

        // рассылаем сигнал
        emitStateChanged();

        // снимаем блокировку для emplaceXXX/moveXXX функций
        // удаляем старые данные
    }

    // реализация аналогична value
    void emplaceError(Args&& ...arguments);
    void moveError(std::unique_ptr<ErrorType> error);
    void emplaceProgress(Args&& ...arguments);
    void moveProgress(std::unique_ptr<ProgressType> progress);

    template <typename Pred>
    bool accessValue(Pred valuePred)
    {
        // блокируем данные на чтение
        QReadLocker locker(&m_contentLock);

        // проверяем состояние объекта
        if (m_state != ASYNC_VALUE_STATE::VALUE)
            return false;

        // даем доступ к внутренним данным
        valuePred(*m_content.value);

        // снимаем блокировку с данных
        return true;
     }

    // аналогично accessValue
    bool accessError(Pred errorPred)
    bool accessProgress(Pred progressPred)
};

Для тех, кто не устал и не потерялся, продолжим.

Как можно заметить у нас есть функции accessXXX, которые не ждут пока AsyncValue перейдет в соответствующее состояние, а просто возвращают false. Иногда полезно синхронно дождаться пока в AsyncValue появится или значение или ошибка. По сути нам нужен аналог std::future::get. Вот сигнатура функции:

template <typename ValuePred, typename ErrorPred>
void wait(ValuePred valuePred, ErrorPred errorPred);

Для работы этой функции нам понадобится condition variable — объект синхронизации, который можно ждать в одном потоке и активизировать в другом. В функции wait нам следует ждать, а при смене состояния AsyncValue c Progress на Value или Error мы должны оповестить ждущих.

Добавление еще одного поля в класс AsyncValue, которое нужно в редких случаях, когда используют функцию wait, навела меня на размышления — можно ли сделать это поле опциональным? Ответ очевиден, конечно можно, если хранить std::unique_ptr и создавать его при необходимости. Возник второй вопрос — можно ли сделать это поле опциональным и не делать динамических аллокаций. Кому интересно, прошу посмотреть на следующий код. Основная идея в следующем, первый вызов wait создает на стеке структуру с QWaitCondition и записывает ее указатель в AsyncValue, последующие вызовы wait просто проверяют, если указатель не пустой, по пользуются структурой по этому указателю, если указатель пустой — см. выше про первый вызов wait.

class AsyncValueBase : public QObject
{
    ...
    struct Waiter
    {
        // основная переменная для ожидания 
        QWaitCondition waitValue;
        // счетчик вторичных wait
        quint16 subWaiters = 0;
        // первый wait должен ждать всех последующих
        QWaitCondition waitSubWaiters;
    };
    // указатель на структуру
    Waiter* m_waiter = nullptr;
};

    template <typename ValuePred, typename ErrorPred>
    void wait(ValuePred valuePred, ErrorPred errorPred)
    {
        // простой случай - мы получили значение или ошибку
        if (access(valuePred, errorPred))
            return;

        // блокируем AsyncValue от изменений
        QMutexLocker writeLocker(&m_writeLock);
        // проверяем простой случай снова
        if (access(valuePred, errorPred))
            return;

        // если данный вызов wait первый
        if (!m_waiter)
        {
            // создаем Waiter на стеке
            Waiter theWaiter;

            // выполняем этот код при выходе из if
            SCOPE_EXIT {
                // если есть последующие вызовы wait,
                // то они используют theWaiter 
                if (m_waiter->subWaiters > 0)
                {
                    // поэтому ждем пока subWaiters не обнулится
                    do
                    {
                         m_waiter->waitSubWaiters.wait(&m_writeLock);
                    } while (m_waiter->subWaiters != 0);
                }

                // больше никаких wait не осталось,
                // можно занулять поле и разрушать временный Waiter
                m_waiter = nullptr;
            };

            // выставляем адрес локального Waiter в AsyncValue
            // чтобы последующие вызовы wait использовали его
            m_waiter = &theWaiter;

            // ждем пока AsyncValue не перейдет в состояние Value или Error
            // и оповестит нас
            do
            {
                m_waiter->waitValue.wait(&m_writeLock);
            } while (!access(valuePred, errorPred));
        }
        // случай когда wait не первый
        else
        {
            // выполняем этот код при выходе из else
            SCOPE_EXIT {
                // убираем себя из счетчика ожидающих
                m_waiter->subWaiters -= 1;
                // если никого не осталось -> оповещаем первый wait
                if (m_waiter->subWaiters == 0)
                    m_waiter->waitSubWaiters.wakeAll();
            };

            // добавляем себя в счетчик ожидающих
            m_waiter->subWaiters += 1;

            // ждем пока AsyncValue не перейдет в состояние Value или Error
            // и оповестит нас
            do
            {
                m_waiter->waitValue.wait(&m_writeLock);
            } while (!access(valuePred, errorPred));
        }
    }

Как уже упоминалось, AsyncValue не имеет метода для асинхронных вычислений чтобы не привязываться к конкретной библиотеке. Вместо этого используются свободные функции, которые реализуют асинхронность тем или иным способом. Ниже приведён пример вычисления AsyncValue на пуле потоков:

template <typename AsyncValueType, typename Func, typename... ProgressArgs>
bool asyncValueRunThreadPool(QThreadPool *pool, AsyncValueType& value, Func&& func, ProgressArgs&& ...progressArgs)
{
    // создаём объект прогресс
    auto progress = std::make_unique<typename AsyncValueType::ProgressType>(std::forward<ProgressArgs>(progressArgs)...);
    //  запоминаем его адрес
    auto progressPtr = progress.get();

    // перемещаем прогресс в AsyncValue
    if (!value.startProgress(std::move(progress)))
        return false;

    QtConcurrent::run(pool, [&value, progressPtr, func = std::forward<Func>(func)](){
        SCOPE_EXIT {
            // в конце функции сообщаем AsyncValue, что прогресс завершён
            value.completeProgress(progressPtr);
        };

        // вычисляем AsyncValue
        func(*progressPtr, value);
    });

    return true;
}

В библиотеке реализовано ещё две схожих функции: asyncValueRunNetwork для обработки сетевых запросов и asyncValueRunThread, который выполняет операцию на вновь созданом потоке. Пользователи библиотеки могут легко создать свои собственные функции и использовать там использовать те асинхронные средства, которые они используют в других местах.

Для повышения безопасности класс AsyncValue был расширен еще одним шаблонным классом AsyncTrackErrorsPolicy, который позволяет реагировать на неправильное использование AsyncValue. Например, вот реализация по умолчанию функции AsyncTrackErrorsPolicy::inProgressWhileDestruct, которая вызовется, если AsyncValue будет разрушаться во время работы асинхронной операции:

    void inProgressWhileDestruct() const
    {
        Q_ASSERT(false && "Destructing value while it's in progress");
    }

Что касается виджетов, то их реализация достаточно проста и лаконична. AsyncWidget — это контейнер, которые содержит в себе виджет для показа ошибки, либо прогресса, либо значения в зависимости в каком состоянии сейчас находится AsyncValue.

    virtual QWidget* createValueWidgetImpl(ValueType& value, QWidget* parent);
    virtual QWidget* createErrorWidgetImpl(ErrorType& error, QWidget* parent);
    virtual QWidget* createProgressWidgetImpl(ProgressType& progress, QWidget* parent);
 

Пользователь обязан переопределить лишь первую функцию, для отображения value, две остальных имеют реализации по умолчанию.

Библиотека qt-async получилась компактной, но в то же время достаточно полезной. Использование AsyncValue/AsyncWidget, там где раньше были синхронные функции и статический GUI, позволит вашим приложениям стать современными и более отзывчивыми.

Для тех, кто прочитал до конца бонус — видео работы демо приложения

Теги:
Хабы:
Всего голосов 4: ↑4 и ↓0+4
Комментарии45

Публикации

Истории

Работа

Программист C++
151 вакансия
QT разработчик
12 вакансий

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн
10 – 11 октября
HR IT & Team Lead конференция «Битва за IT-таланты»
МоскваОнлайн
25 октября
Конференция по росту продуктов EGC’24
МоскваОнлайн
7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн