Pull to refresh

Организация рабочих потоков: управление состоянием движка

Reading time8 min
Views3K
Данная статья является продолжением статьи — Организация рабочих потоков: синхронизационный канал. Продолжение родилось как попытка написать пример использования подхода с синхронными сообщениями.

В этой части я хочу на примере показать, как можно организовать управление и отображение состояния движка с рабочим потоком, используя синхронные сообщения между потоками. И показать, как при этом обойти проблему взаимной блокировки потоков при закрытии приложения.

Давайте вернемся к примеру с предыдущей статьи. У нас есть графический интерфейс, отображающий состояние движка с рабочим потоком. Допустим движок можно запустить, остановить, поставить на паузу и соответственно снять с паузы. Для реализации такого поведения проще всего применить что-то подобное шаблонам проектирования конечный автомат и наблюдатель.

Для начала определимся с набором состояний движка. Движок у нас будет асинхронным, по этому в наборе состояний будут также и переходные состояния. У меня получился следующий набор состояний: NotStarted, StartPending, Started, PausePending, Paused, ResumePending, StopPending.

Эти состояния прежде всего будут использоваться для изменения состояния GUI. Тоесть GUI будет получать уведомление о изменении состояния движка, и соответствующим образом отображать это состояние. Например, при переходе в состояние NotStarted, GUI должно показать кнопку «Старт», а кнопки «Пауза» «Продолжить» и «Остановить» должны быть заблокированными. Соответственно при переходе в состояние Paused кнопки «Старт» и «Пауза» будут заблокированными, а кнопки «Продолжить» и «Остановить» должны быть разблокированными.

Давайте посмотрим, как может выглядеть обработчик уведомления о изменении состояния движка, на примере WTL диалога с соответствующими управляющими кнопками:
void CMainDlg::OnStateChanged(EngineState::State state)
{
    // Это функция обратного вызова, она вызывается только из GUI потока по средством класса CSyncChannel,
    // по этому нет необходимости в дополнительной синхронизации при доступе к членам класса диалога

    // В случае Pending состояний, блокируем все кнопки, дальше мы разблокируем только нужные кнопки
    // Так было сделано для уменьшения количества строчек в примере
    GetDlgItem(IDC_BUTTON_START).EnableWindow(FALSE);
    GetDlgItem(IDC_BUTTON_STOP).EnableWindow(FALSE);
    GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(FALSE);
    GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(FALSE);
    switch (state)
    {
    case EngineState::NotStarted:
        GetDlgItem(IDC_BUTTON_START).EnableWindow(TRUE);
        break;
    case EngineState::Started:
        GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE);
        GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(TRUE);
        break;
    case EngineState::Paused:
        GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE);
        GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(TRUE);
        break;
    }
}

Для управления своим состоянием, движок будет иметь соответствующий набор функций: Start, Pause, Resume, Stop, которые в классе диалога будут вызываться из соответствующих обработчиков нажатия на кнопки.

За счет использования синхронных сообщений, переход из одного состояния в другое будет выполняться синхронно по отношению к потоку GUI. То есть, пока поток GUI находится в обработчике нажатия на кнопку «Старт», движок не может перейти в состояние Started или Paused асинхронно, он ждет пока обработчик нажатия на кнопку «Старт» завершится. Это существенно упрощает управление состояниями движка.

Переход в ожидающие состояния, такие как StartPending, осуществляется внутри вызова управляющих функций, таких как Start, и по этому, после выхода из функции Start, движок будет иметь состояние StartPending. То есть уведомление о переходе в состояние StartPending будет вызвано синхронно, еще до завершения вызова функции Start.

Посмотрим на реализацию движка.
Привожу класс движка полностью, т.к. при работе с многопоточностью любая упущенная деталь может играть большую роль.
//Engine.h
namespace EngineState
{
    enum State {
        NotStarted,
        StartPending,
        Started,
        PausePending,
        Paused,
        ResumePending,
        StopPending
    };
};
class IEngineEvents {
public:
    virtual void OnStateChanged(EngineState::State state) = 0;
};
class CEngine {
public:
    CEngine(IEngineEvents* listener);
    ~CEngine(void);
public:
    // Управляющие команды для рабочего потока
    void Start();
    void Stop();
    void Pause();
    void Resume();
public:
    // GUI поток должен вызывать эту функцию для всех своих сообщений
    bool ProcessMessage(MSG& msg);
private:
    void WaitForThread();
    static DWORD WINAPI ThreadProc(LPVOID param);
    void Run();
    bool CheckStopAndPause();
    void ChangeState(EngineState::State state);
    void OnStateChanged(EngineState::State state);
private:
    CSyncChannel                m_syncChannel; // Этот класс был описан в предыдущей статье
    IEngineEvents*              m_listener;
    HANDLE                      m_hThread;
    volatile EngineState::State m_state;
};


// Engine.cpp
CEngine::CEngine(IEngineEvents* listener) :
    m_listener(listener),
    m_hThread(NULL),
    m_state(EngineState::NotStarted)
{
    m_syncChannel.Create(GetCurrentThreadId());
}
CEngine::~CEngine(void)
{
    // Если рабочий поток не был остановлен, необходимо установить флаг завершения потока
    m_state = EngineState::StopPending;

    // Необходимо дождаться, пока завершиться рабочий поток.
    // Если рабочий поток в этот момент пошлет сообщение на GUI, то он заблокируется.
    // Для того, чтобы разрешить ситуацию с взаимной блокировкой потоков,
    // закрываем  m_syncChannel, после чего рабочий поток будет разблокирован.
    m_syncChannel.Close();

    // Теперь рабочий поток успешно завершиться.
    WaitForThread();
}
void CEngine::WaitForThread()
{
    if (m_hThread)
    {
        // Ожидание завершения рабочего потока.
        // Рабочий поток завершиться при переходе в одно из состояний:
        // StopPending или NotStarted
        _ASSERT(m_state == EngineState::StopPending || m_state == EngineState::NotStarted);

        // Ждем полной остановки рабочего потока
        DWORD waitResult = WaitForSingleObject(m_hThread, INFINITE);
        _ASSERT(waitResult == WAIT_OBJECT_0);

        // Рабочий поток полностью завершился, теперь можно освободить HANDLE рабочего потока
        CloseHandle(m_hThread);
        m_hThread = NULL;
    }
}
void CEngine::Start()
{
    // Это управляющая комманда, вызывается из GUI потока
    // Запустить рабочий поток можно только в том случае, если он еще не запущен
    // Для этого проверяем состояние движка
    if (m_state == EngineState::NotStarted)
    {
        // Если функция Start была вызвана повторно, после завершения рабочего потока,
        // предыдущий рабочий поток может некоторое время продолжать работать
        // Необходимо дождаться его полного завершения перед тем, как создавать новый поток
        WaitForThread();

        // Создаем новый рабочий поток,
        // передаем параметром this, чтобы поток мог вызвать функцию Run для текущего объекта
        m_hThread = CreateThread(NULL, 0, CEngine::ThreadProc, this, 0, NULL);
        if (m_hThread)
        {
            // Переключаем состояние движка в StartPending
            // Рабочий поток уже мог вызвать функцию ChangeState(EngineState::Started)
            // Но так, как для переключения состояния используется SyncChannel,
            // то состояние не может быть изменено асинхронно,
            // рабочий поток будет ждать, пока не завершится обработка нажатия на кнопку "Start"
            // И по этому состояние StartPending гарантированно придет перед тем,
            // как придет состояние Started от рабочего потока
            ChangeState(EngineState::StartPending);
        }
    }
}
void CEngine::Stop()
{
    // Это управляющая комманда, вызывается из GUI потока
    if (m_state != EngineState::NotStarted && m_state != EngineState::StopPending)
    {
        // Устанавливаем флаг остановки, если он еще не установлен и поток еще работает
        ChangeState(EngineState::StopPending);
    }
    // Рабочий поток остановится асинхронно, после выхода этой функции
    // поток еще будет продолжать работать
    // После остановки рабочий поток вызовет функцию ChangeState(EngineState::Stopped)
    // Для ожидания полной остановки потока необходимо вызвать функцию WaitForThread
}
void CEngine::Pause()
{
    // Это управляющая комманда, вызывается из GUI потока
    if (m_state == EngineState::Started)
    {
        // Переход в состояние PausePending возможен только из состояния Started
        ChangeState(EngineState::PausePending);
    }
}
void CEngine::Resume()
{
    // Это управляющая комманда, вызывается из GUI потока
    if (m_state == EngineState::Paused)
    {
        // Переход в состояние ResumePending возможен только из состояния Paused
        ChangeState(EngineState::ResumePending);
    }
}
bool CEngine::ProcessMessage(MSG& msg)
{
    // GUI поток вызывает эту функцию для всех своих сообщений
    return m_syncChannel.ProcessMessage(msg);
}
DWORD WINAPI CEngine::ThreadProc(LPVOID param)
{
    // Это статическая функция потока, получаем указатель
    // на объект класса и вызываем его функцию Run
    reinterpret_cast<CEngine*>(param)->Run();
    return 0;
}
void CEngine::Run()
{
    // Оповещаем GUI о том, что рабочий поток запустился
    ChangeState(EngineState::Started);
    for (;;)
    {
        // Это функция рабочего потока, здесь выполняется какая-то работа
        // При этом необходимо периодически проверять флаг остановки и паузы
        if (!CheckStopAndPause())
        {
            break;
        }
        // Выполняется важная работа - хорошо поспать, мечта всех родителей :)
        Sleep(1000);
    }
    // Оповещаем о том, что рабочий поток завершился
    ChangeState(EngineState::NotStarted);
    // После переключения в состояние NotStarted рабочий поток
    // еще какое-то время продолжает свое выполнение по этому необходимо
    // дождаться завершения рабочего потока с использованием функции WaitForSingleObject
}
bool CEngine::CheckStopAndPause()
{
    // Эта функция вызывается периодически из рабочего потока.
    // Частота вызова этой функции влияет на отзывчивость GUI.
    if (m_state == EngineState::StopPending)
    {
        // Была вызывана функция Stop, необходимо остановить рабочий поток
        return false;
    }
    if (m_state == EngineState::PausePending)
    {
        // Ставим рабочий поток на паузу,
        // для этого оповещаем GUI поток о изменении состояния
        // и входим в цикл ожидания дальнейших команд
        ChangeState(EngineState::Paused);
        while (m_state == EngineState::Paused)
        {
            Sleep(100);
        }
        if (m_state == EngineState::StopPending)
        {
            // Была вызывана функция Stop, необходимо остановить рабочий поток
            return false;
        }
        // Снимаем рабочий поток с паузы
        // Для этого оповещаем GUI о изменении состояния и возвращаем управление рабочему потоку
        ChangeState(EngineState::Started);
    }
    // Рабочий поток может продолжать свое выполнение
    return true;
}
void CEngine::ChangeState(EngineState::State state)
{
    // Эта функция может быть вызвана рабочим потоком,
    // транслируем вызов в GUI поток, используя m_syncChannel
    m_syncChannel.Execute(boost::bind(&CEngine::OnStateChanged, this, state));
}
void CEngine::OnStateChanged(EngineState::State state)
{
    // Эта функция вызывается только из GUI потока, устанавливаем переменную m_state
    // Таким образом изменение переменной m_state будет происходить только из одного потока
    // и не требует использования объектов синхронизации, таких как критические секции или мьютексы
    // То есть состояние рабочего потока не может переключится асинхронно по отношению к GUI потоку
    m_state = state;
    m_listener->OnStateChanged(m_state);
}

Использование класса CEngine.
В моем примере объект класса CEngine объявлен членом класса WTL диалога.
Набор функций диалога сводится к обработчикам нажатия на клавиши «Старт», «Стоп», «Продолжить», «Пауза» которые перевызывают соответствующие фукнции объекта CEngine.

Также класс диалога подписывается на уведомления о изменении состояния движка при помощи интерфейса IEngineEvents, реализация функции OnStateChanged этого интерфейса была приведена в начале статьи.

Для обеспечения трансляции сообщений потока в класс CEngine, класс диалога устанавливает себя фильтром оконных сообщений, используя стандартные методы WTL::CMessageLoop::AddMessageFilter, и реализует интерфейс WTL::CMessageFilter:
class CMessageFilter
{
public:
	virtual BOOL PreTranslateMessage(MSG* pMsg) = 0;
};

Реализация функции PreTranslateMessage сводится к перевызову функции CEngine::ProcessMessage

Для решения проблемы взаимной блокировки потоков при закрытии приложения, в классе диалога никаких дополнительный действий не требуется. Эта проблема полностью решена за счет использования CSyncChannel::Close в деструкторе класса CEngine. Таким образом рабочий поток полностью инкапсулирован внутри класса CEngine, что дает ощутимые преимущества при работе с таким классом.
Tags:
Hubs:
Total votes 8: ↑5 and ↓3+2
Comments7

Articles