Данная статья является продолжением статьи — Организация рабочих потоков: синхронизационный канал. Продолжение родилось как попытка написать пример использования подхода с синхронными сообщениями.
В этой части я хочу на примере показать, как можно организовать управление и отображение состояния движка с рабочим потоком, используя синхронные сообщения между потоками. И показать, как при этом обойти проблему взаимной блокировки потоков при закрытии приложения.
Давайте вернемся к примеру с предыдущей статьи. У нас есть графический интерфейс, отображающий состояние движка с рабочим потоком. Допустим движок можно запустить, остановить, поставить на паузу и соответственно снять с паузы. Для реализации такого поведения проще всего применить что-то подобное шаблонам проектирования конечный автомат и наблюдатель.
Для начала определимся с набором состояний движка. Движок у нас будет асинхронным, по этому в наборе состояний будут также и переходные состояния. У меня получился следующий набор состояний: NotStarted, StartPending, Started, PausePending, Paused, ResumePending, StopPending.
Эти состояния прежде всего будут использоваться для изменения состояния GUI. Тоесть GUI будет получать уведомление о изменении состояния движка, и соответствующим образом отображать это состояние. Например, при переходе в состояние NotStarted, GUI должно показать кнопку «Старт», а кнопки «Пауза» «Продолжить» и «Остановить» должны быть заблокированными. Соответственно при переходе в состояние Paused кнопки «Старт» и «Пауза» будут заблокированными, а кнопки «Продолжить» и «Остановить» должны быть разблокированными.
Давайте посмотрим, как может выглядеть обработчик уведомления о изменении состояния движка, на примере WTL диалога с соответствующими управляющими кнопками:
Для управления своим состоянием, движок будет иметь соответствующий набор функций: Start, Pause, Resume, Stop, которые в классе диалога будут вызываться из соответствующих обработчиков нажатия на кнопки.
За счет использования синхронных сообщений, переход из одного состояния в другое будет выполняться синхронно по отношению к потоку GUI. То есть, пока поток GUI находится в обработчике нажатия на кнопку «Старт», движок не может перейти в состояние Started или Paused асинхронно, он ждет пока обработчик нажатия на кнопку «Старт» завершится. Это существенно упрощает управление состояниями движка.
Переход в ожидающие состояния, такие как StartPending, осуществляется внутри вызова управляющих функций, таких как Start, и по этому, после выхода из функции Start, движок будет иметь состояние StartPending. То есть уведомление о переходе в состояние StartPending будет вызвано синхронно, еще до завершения вызова функции Start.
Посмотрим на реализацию движка.
Привожу класс движка полностью, т.к. при работе с многопоточностью любая упущенная деталь может играть большую роль.
Использование класса CEngine.
В моем примере объект класса CEngine объявлен членом класса WTL диалога.
Набор функций диалога сводится к обработчикам нажатия на клавиши «Старт», «Стоп», «Продолжить», «Пауза» которые перевызывают соответствующие фукнции объекта CEngine.
Также класс диалога подписывается на уведомления о изменении состояния движка при помощи интерфейса IEngineEvents, реализация функции OnStateChanged этого интерфейса была приведена в начале статьи.
Для обеспечения трансляции сообщений потока в класс CEngine, класс диалога устанавливает себя фильтром оконных сообщений, используя стандартные методы WTL::CMessageLoop::AddMessageFilter, и реализует интерфейс WTL::CMessageFilter:
Реализация функции PreTranslateMessage сводится к перевызову функции CEngine::ProcessMessage
Для решения проблемы взаимной блокировки потоков при закрытии приложения, в классе диалога никаких дополнительный действий не требуется. Эта проблема полностью решена за счет использования CSyncChannel::Close в деструкторе класса CEngine. Таким образом рабочий поток полностью инкапсулирован внутри класса CEngine, что дает ощутимые преимущества при работе с таким классом.
В этой части я хочу на примере показать, как можно организовать управление и отображение состояния движка с рабочим потоком, используя синхронные сообщения между потоками. И показать, как при этом обойти проблему взаимной блокировки потоков при закрытии приложения.
Давайте вернемся к примеру с предыдущей статьи. У нас есть графический интерфейс, отображающий состояние движка с рабочим потоком. Допустим движок можно запустить, остановить, поставить на паузу и соответственно снять с паузы. Для реализации такого поведения проще всего применить что-то подобное шаблонам проектирования конечный автомат и наблюдатель.
Для начала определимся с набором состояний движка. Движок у нас будет асинхронным, по этому в наборе состояний будут также и переходные состояния. У меня получился следующий набор состояний: NotStarted, StartPending, Started, PausePending, Paused, ResumePending, StopPending.
Эти состояния прежде всего будут использоваться для изменения состояния GUI. Тоесть GUI будет получать уведомление о изменении состояния движка, и соответствующим образом отображать это состояние. Например, при переходе в состояние NotStarted, GUI должно показать кнопку «Старт», а кнопки «Пауза» «Продолжить» и «Остановить» должны быть заблокированными. Соответственно при переходе в состояние Paused кнопки «Старт» и «Пауза» будут заблокированными, а кнопки «Продолжить» и «Остановить» должны быть разблокированными.
Давайте посмотрим, как может выглядеть обработчик уведомления о изменении состояния движка, на примере WTL диалога с соответствующими управляющими кнопками:
void CMainDlg::OnStateChanged(EngineState::State state) { // Это функция обратного вызова, она вызывается только из GUI потока по средством класса CSyncChannel, // по этому нет необходимости в дополнительной синхронизации при доступе к членам класса диалога // В случае Pending состояний, блокируем все кнопки, дальше мы разблокируем только нужные кнопки // Так было сделано для уменьшения количества строчек в примере GetDlgItem(IDC_BUTTON_START).EnableWindow(FALSE); GetDlgItem(IDC_BUTTON_STOP).EnableWindow(FALSE); GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(FALSE); GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(FALSE); switch (state) { case EngineState::NotStarted: GetDlgItem(IDC_BUTTON_START).EnableWindow(TRUE); break; case EngineState::Started: GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE); GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(TRUE); break; case EngineState::Paused: GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE); GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(TRUE); break; } }
Для управления своим состоянием, движок будет иметь соответствующий набор функций: Start, Pause, Resume, Stop, которые в классе диалога будут вызываться из соответствующих обработчиков нажатия на кнопки.
За счет использования синхронных сообщений, переход из одного состояния в другое будет выполняться синхронно по отношению к потоку GUI. То есть, пока поток GUI находится в обработчике нажатия на кнопку «Старт», движок не может перейти в состояние Started или Paused асинхронно, он ждет пока обработчик нажатия на кнопку «Старт» завершится. Это существенно упрощает управление состояниями движка.
Переход в ожидающие состояния, такие как StartPending, осуществляется внутри вызова управляющих функций, таких как Start, и по этому, после выхода из функции Start, движок будет иметь состояние StartPending. То есть уведомление о переходе в состояние StartPending будет вызвано синхронно, еще до завершения вызова функции Start.
Посмотрим на реализацию движка.
Привожу класс движка полностью, т.к. при работе с многопоточностью любая упущенная деталь может играть большую роль.
//Engine.h namespace EngineState { enum State { NotStarted, StartPending, Started, PausePending, Paused, ResumePending, StopPending }; }; class IEngineEvents { public: virtual void OnStateChanged(EngineState::State state) = 0; }; class CEngine { public: CEngine(IEngineEvents* listener); ~CEngine(void); public: // Управляющие команды для рабочего потока void Start(); void Stop(); void Pause(); void Resume(); public: // GUI поток должен вызывать эту функцию для всех своих сообщений bool ProcessMessage(MSG& msg); private: void WaitForThread(); static DWORD WINAPI ThreadProc(LPVOID param); void Run(); bool CheckStopAndPause(); void ChangeState(EngineState::State state); void OnStateChanged(EngineState::State state); private: CSyncChannel m_syncChannel; // Этот класс был описан в предыдущей статье IEngineEvents* m_listener; HANDLE m_hThread; volatile EngineState::State m_state; };
// Engine.cpp CEngine::CEngine(IEngineEvents* listener) : m_listener(listener), m_hThread(NULL), m_state(EngineState::NotStarted) { m_syncChannel.Create(GetCurrentThreadId()); } CEngine::~CEngine(void) { // Если рабочий поток не был остановлен, необходимо установить флаг завершения потока m_state = EngineState::StopPending; // Необходимо дождаться, пока завершиться рабочий поток. // Если рабочий поток в этот момент пошлет сообщение на GUI, то он заблокируется. // Для того, чтобы разрешить ситуацию с взаимной блокировкой потоков, // закрываем m_syncChannel, после чего рабочий поток будет разблокирован. m_syncChannel.Close(); // Теперь рабочий поток успешно завершиться. WaitForThread(); } void CEngine::WaitForThread() { if (m_hThread) { // Ожидание завершения рабочего потока. // Рабочий поток завершиться при переходе в одно из состояний: // StopPending или NotStarted _ASSERT(m_state == EngineState::StopPending || m_state == EngineState::NotStarted); // Ждем полной остановки рабочего потока DWORD waitResult = WaitForSingleObject(m_hThread, INFINITE); _ASSERT(waitResult == WAIT_OBJECT_0); // Рабочий поток полностью завершился, теперь можно освободить HANDLE рабочего потока CloseHandle(m_hThread); m_hThread = NULL; } } void CEngine::Start() { // Это управляющая комманда, вызывается из GUI потока // Запустить рабочий поток можно только в том случае, если он еще не запущен // Для этого проверяем состояние движка if (m_state == EngineState::NotStarted) { // Если функция Start была вызвана повторно, после завершения рабочего потока, // предыдущий рабочий поток может некоторое время продолжать работать // Необходимо дождаться его полного завершения перед тем, как создавать новый поток WaitForThread(); // Создаем новый рабочий поток, // передаем параметром this, чтобы поток мог вызвать функцию Run для текущего объекта m_hThread = CreateThread(NULL, 0, CEngine::ThreadProc, this, 0, NULL); if (m_hThread) { // Переключаем состояние движка в StartPending // Рабочий поток уже мог вызвать функцию ChangeState(EngineState::Started) // Но так, как для переключения состояния используется SyncChannel, // то состояние не может быть изменено асинхронно, // рабочий поток будет ждать, пока не завершится обработка нажатия на кнопку "Start" // И по этому состояние StartPending гарантированно придет перед тем, // как придет состояние Started от рабочего потока ChangeState(EngineState::StartPending); } } } void CEngine::Stop() { // Это управляющая комманда, вызывается из GUI потока if (m_state != EngineState::NotStarted && m_state != EngineState::StopPending) { // Устанавливаем флаг остановки, если он еще не установлен и поток еще работает ChangeState(EngineState::StopPending); } // Рабочий поток остановится асинхронно, после выхода этой функции // поток еще будет продолжать работать // После остановки рабочий поток вызовет функцию ChangeState(EngineState::Stopped) // Для ожидания полной остановки потока необходимо вызвать функцию WaitForThread } void CEngine::Pause() { // Это управляющая комманда, вызывается из GUI потока if (m_state == EngineState::Started) { // Переход в состояние PausePending возможен только из состояния Started ChangeState(EngineState::PausePending); } } void CEngine::Resume() { // Это управляющая комманда, вызывается из GUI потока if (m_state == EngineState::Paused) { // Переход в состояние ResumePending возможен только из состояния Paused ChangeState(EngineState::ResumePending); } } bool CEngine::ProcessMessage(MSG& msg) { // GUI поток вызывает эту функцию для всех своих сообщений return m_syncChannel.ProcessMessage(msg); } DWORD WINAPI CEngine::ThreadProc(LPVOID param) { // Это статическая функция потока, получаем указатель // на объект класса и вызываем его функцию Run reinterpret_cast<CEngine*>(param)->Run(); return 0; } void CEngine::Run() { // Оповещаем GUI о том, что рабочий поток запустился ChangeState(EngineState::Started); for (;;) { // Это функция рабочего потока, здесь выполняется какая-то работа // При этом необходимо периодически проверять флаг остановки и паузы if (!CheckStopAndPause()) { break; } // Выполняется важная работа - хорошо поспать, мечта всех родителей :) Sleep(1000); } // Оповещаем о том, что рабочий поток завершился ChangeState(EngineState::NotStarted); // После переключения в состояние NotStarted рабочий поток // еще какое-то время продолжает свое выполнение по этому необходимо // дождаться завершения рабочего потока с использованием функции WaitForSingleObject } bool CEngine::CheckStopAndPause() { // Эта функция вызывается периодически из рабочего потока. // Частота вызова этой функции влияет на отзывчивость GUI. if (m_state == EngineState::StopPending) { // Была вызывана функция Stop, необходимо остановить рабочий поток return false; } if (m_state == EngineState::PausePending) { // Ставим рабочий поток на паузу, // для этого оповещаем GUI поток о изменении состояния // и входим в цикл ожидания дальнейших команд ChangeState(EngineState::Paused); while (m_state == EngineState::Paused) { Sleep(100); } if (m_state == EngineState::StopPending) { // Была вызывана функция Stop, необходимо остановить рабочий поток return false; } // Снимаем рабочий поток с паузы // Для этого оповещаем GUI о изменении состояния и возвращаем управление рабочему потоку ChangeState(EngineState::Started); } // Рабочий поток может продолжать свое выполнение return true; } void CEngine::ChangeState(EngineState::State state) { // Эта функция может быть вызвана рабочим потоком, // транслируем вызов в GUI поток, используя m_syncChannel m_syncChannel.Execute(boost::bind(&CEngine::OnStateChanged, this, state)); } void CEngine::OnStateChanged(EngineState::State state) { // Эта функция вызывается только из GUI потока, устанавливаем переменную m_state // Таким образом изменение переменной m_state будет происходить только из одного потока // и не требует использования объектов синхронизации, таких как критические секции или мьютексы // То есть состояние рабочего потока не может переключится асинхронно по отношению к GUI потоку m_state = state; m_listener->OnStateChanged(m_state); }
Использование класса CEngine.
В моем примере объект класса CEngine объявлен членом класса WTL диалога.
Набор функций диалога сводится к обработчикам нажатия на клавиши «Старт», «Стоп», «Продолжить», «Пауза» которые перевызывают соответствующие фукнции объекта CEngine.
Также класс диалога подписывается на уведомления о изменении состояния движка при помощи интерфейса IEngineEvents, реализация функции OnStateChanged этого интерфейса была приведена в начале статьи.
Для обеспечения трансляции сообщений потока в класс CEngine, класс диалога устанавливает себя фильтром оконных сообщений, используя стандартные методы WTL::CMessageLoop::AddMessageFilter, и реализует интерфейс WTL::CMessageFilter:
class CMessageFilter { public: virtual BOOL PreTranslateMessage(MSG* pMsg) = 0; };
Реализация функции PreTranslateMessage сводится к перевызову функции CEngine::ProcessMessage
Для решения проблемы взаимной блокировки потоков при закрытии приложения, в классе диалога никаких дополнительный действий не требуется. Эта проблема полностью решена за счет использования CSyncChannel::Close в деструкторе класса CEngine. Таким образом рабочий поток полностью инкапсулирован внутри класса CEngine, что дает ощутимые преимущества при работе с таким классом.
