Организация рабочих потоков: управление состоянием движка

    Данная статья является продолжением статьи — Организация рабочих потоков: синхронизационный канал. Продолжение родилось как попытка написать пример использования подхода с синхронными сообщениями.

    В этой части я хочу на примере показать, как можно организовать управление и отображение состояния движка с рабочим потоком, используя синхронные сообщения между потоками. И показать, как при этом обойти проблему взаимной блокировки потоков при закрытии приложения.

    Давайте вернемся к примеру с предыдущей статьи. У нас есть графический интерфейс, отображающий состояние движка с рабочим потоком. Допустим движок можно запустить, остановить, поставить на паузу и соответственно снять с паузы. Для реализации такого поведения проще всего применить что-то подобное шаблонам проектирования конечный автомат и наблюдатель.

    Для начала определимся с набором состояний движка. Движок у нас будет асинхронным, по этому в наборе состояний будут также и переходные состояния. У меня получился следующий набор состояний: NotStarted, StartPending, Started, PausePending, Paused, ResumePending, StopPending.

    Эти состояния прежде всего будут использоваться для изменения состояния GUI. Тоесть GUI будет получать уведомление о изменении состояния движка, и соответствующим образом отображать это состояние. Например, при переходе в состояние NotStarted, GUI должно показать кнопку «Старт», а кнопки «Пауза» «Продолжить» и «Остановить» должны быть заблокированными. Соответственно при переходе в состояние Paused кнопки «Старт» и «Пауза» будут заблокированными, а кнопки «Продолжить» и «Остановить» должны быть разблокированными.

    Давайте посмотрим, как может выглядеть обработчик уведомления о изменении состояния движка, на примере WTL диалога с соответствующими управляющими кнопками:
    void CMainDlg::OnStateChanged(EngineState::State state)
    {
        // Это функция обратного вызова, она вызывается только из GUI потока по средством класса CSyncChannel,
        // по этому нет необходимости в дополнительной синхронизации при доступе к членам класса диалога
    
        // В случае Pending состояний, блокируем все кнопки, дальше мы разблокируем только нужные кнопки
        // Так было сделано для уменьшения количества строчек в примере
        GetDlgItem(IDC_BUTTON_START).EnableWindow(FALSE);
        GetDlgItem(IDC_BUTTON_STOP).EnableWindow(FALSE);
        GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(FALSE);
        GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(FALSE);
        switch (state)
        {
        case EngineState::NotStarted:
            GetDlgItem(IDC_BUTTON_START).EnableWindow(TRUE);
            break;
        case EngineState::Started:
            GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE);
            GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(TRUE);
            break;
        case EngineState::Paused:
            GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE);
            GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(TRUE);
            break;
        }
    }
    

    Для управления своим состоянием, движок будет иметь соответствующий набор функций: Start, Pause, Resume, Stop, которые в классе диалога будут вызываться из соответствующих обработчиков нажатия на кнопки.

    За счет использования синхронных сообщений, переход из одного состояния в другое будет выполняться синхронно по отношению к потоку GUI. То есть, пока поток GUI находится в обработчике нажатия на кнопку «Старт», движок не может перейти в состояние Started или Paused асинхронно, он ждет пока обработчик нажатия на кнопку «Старт» завершится. Это существенно упрощает управление состояниями движка.

    Переход в ожидающие состояния, такие как StartPending, осуществляется внутри вызова управляющих функций, таких как Start, и по этому, после выхода из функции Start, движок будет иметь состояние StartPending. То есть уведомление о переходе в состояние StartPending будет вызвано синхронно, еще до завершения вызова функции Start.

    Посмотрим на реализацию движка.
    Привожу класс движка полностью, т.к. при работе с многопоточностью любая упущенная деталь может играть большую роль.
    //Engine.h
    namespace EngineState
    {
        enum State {
            NotStarted,
            StartPending,
            Started,
            PausePending,
            Paused,
            ResumePending,
            StopPending
        };
    };
    class IEngineEvents {
    public:
        virtual void OnStateChanged(EngineState::State state) = 0;
    };
    class CEngine {
    public:
        CEngine(IEngineEvents* listener);
        ~CEngine(void);
    public:
        // Управляющие команды для рабочего потока
        void Start();
        void Stop();
        void Pause();
        void Resume();
    public:
        // GUI поток должен вызывать эту функцию для всех своих сообщений
        bool ProcessMessage(MSG& msg);
    private:
        void WaitForThread();
        static DWORD WINAPI ThreadProc(LPVOID param);
        void Run();
        bool CheckStopAndPause();
        void ChangeState(EngineState::State state);
        void OnStateChanged(EngineState::State state);
    private:
        CSyncChannel                m_syncChannel; // Этот класс был описан в предыдущей статье
        IEngineEvents*              m_listener;
        HANDLE                      m_hThread;
        volatile EngineState::State m_state;
    };
    
    

    // Engine.cpp
    CEngine::CEngine(IEngineEvents* listener) :
        m_listener(listener),
        m_hThread(NULL),
        m_state(EngineState::NotStarted)
    {
        m_syncChannel.Create(GetCurrentThreadId());
    }
    CEngine::~CEngine(void)
    {
        // Если рабочий поток не был остановлен, необходимо установить флаг завершения потока
        m_state = EngineState::StopPending;
    
        // Необходимо дождаться, пока завершиться рабочий поток.
        // Если рабочий поток в этот момент пошлет сообщение на GUI, то он заблокируется.
        // Для того, чтобы разрешить ситуацию с взаимной блокировкой потоков,
        // закрываем  m_syncChannel, после чего рабочий поток будет разблокирован.
        m_syncChannel.Close();
    
        // Теперь рабочий поток успешно завершиться.
        WaitForThread();
    }
    void CEngine::WaitForThread()
    {
        if (m_hThread)
        {
            // Ожидание завершения рабочего потока.
            // Рабочий поток завершиться при переходе в одно из состояний:
            // StopPending или NotStarted
            _ASSERT(m_state == EngineState::StopPending || m_state == EngineState::NotStarted);
    
            // Ждем полной остановки рабочего потока
            DWORD waitResult = WaitForSingleObject(m_hThread, INFINITE);
            _ASSERT(waitResult == WAIT_OBJECT_0);
    
            // Рабочий поток полностью завершился, теперь можно освободить HANDLE рабочего потока
            CloseHandle(m_hThread);
            m_hThread = NULL;
        }
    }
    void CEngine::Start()
    {
        // Это управляющая комманда, вызывается из GUI потока
        // Запустить рабочий поток можно только в том случае, если он еще не запущен
        // Для этого проверяем состояние движка
        if (m_state == EngineState::NotStarted)
        {
            // Если функция Start была вызвана повторно, после завершения рабочего потока,
            // предыдущий рабочий поток может некоторое время продолжать работать
            // Необходимо дождаться его полного завершения перед тем, как создавать новый поток
            WaitForThread();
    
            // Создаем новый рабочий поток,
            // передаем параметром this, чтобы поток мог вызвать функцию Run для текущего объекта
            m_hThread = CreateThread(NULL, 0, CEngine::ThreadProc, this, 0, NULL);
            if (m_hThread)
            {
                // Переключаем состояние движка в StartPending
                // Рабочий поток уже мог вызвать функцию ChangeState(EngineState::Started)
                // Но так, как для переключения состояния используется SyncChannel,
                // то состояние не может быть изменено асинхронно,
                // рабочий поток будет ждать, пока не завершится обработка нажатия на кнопку "Start"
                // И по этому состояние StartPending гарантированно придет перед тем,
                // как придет состояние Started от рабочего потока
                ChangeState(EngineState::StartPending);
            }
        }
    }
    void CEngine::Stop()
    {
        // Это управляющая комманда, вызывается из GUI потока
        if (m_state != EngineState::NotStarted && m_state != EngineState::StopPending)
        {
            // Устанавливаем флаг остановки, если он еще не установлен и поток еще работает
            ChangeState(EngineState::StopPending);
        }
        // Рабочий поток остановится асинхронно, после выхода этой функции
        // поток еще будет продолжать работать
        // После остановки рабочий поток вызовет функцию ChangeState(EngineState::Stopped)
        // Для ожидания полной остановки потока необходимо вызвать функцию WaitForThread
    }
    void CEngine::Pause()
    {
        // Это управляющая комманда, вызывается из GUI потока
        if (m_state == EngineState::Started)
        {
            // Переход в состояние PausePending возможен только из состояния Started
            ChangeState(EngineState::PausePending);
        }
    }
    void CEngine::Resume()
    {
        // Это управляющая комманда, вызывается из GUI потока
        if (m_state == EngineState::Paused)
        {
            // Переход в состояние ResumePending возможен только из состояния Paused
            ChangeState(EngineState::ResumePending);
        }
    }
    bool CEngine::ProcessMessage(MSG& msg)
    {
        // GUI поток вызывает эту функцию для всех своих сообщений
        return m_syncChannel.ProcessMessage(msg);
    }
    DWORD WINAPI CEngine::ThreadProc(LPVOID param)
    {
        // Это статическая функция потока, получаем указатель
        // на объект класса и вызываем его функцию Run
        reinterpret_cast<CEngine*>(param)->Run();
        return 0;
    }
    void CEngine::Run()
    {
        // Оповещаем GUI о том, что рабочий поток запустился
        ChangeState(EngineState::Started);
        for (;;)
        {
            // Это функция рабочего потока, здесь выполняется какая-то работа
            // При этом необходимо периодически проверять флаг остановки и паузы
            if (!CheckStopAndPause())
            {
                break;
            }
            // Выполняется важная работа - хорошо поспать, мечта всех родителей :)
            Sleep(1000);
        }
        // Оповещаем о том, что рабочий поток завершился
        ChangeState(EngineState::NotStarted);
        // После переключения в состояние NotStarted рабочий поток
        // еще какое-то время продолжает свое выполнение по этому необходимо
        // дождаться завершения рабочего потока с использованием функции WaitForSingleObject
    }
    bool CEngine::CheckStopAndPause()
    {
        // Эта функция вызывается периодически из рабочего потока.
        // Частота вызова этой функции влияет на отзывчивость GUI.
        if (m_state == EngineState::StopPending)
        {
            // Была вызывана функция Stop, необходимо остановить рабочий поток
            return false;
        }
        if (m_state == EngineState::PausePending)
        {
            // Ставим рабочий поток на паузу,
            // для этого оповещаем GUI поток о изменении состояния
            // и входим в цикл ожидания дальнейших команд
            ChangeState(EngineState::Paused);
            while (m_state == EngineState::Paused)
            {
                Sleep(100);
            }
            if (m_state == EngineState::StopPending)
            {
                // Была вызывана функция Stop, необходимо остановить рабочий поток
                return false;
            }
            // Снимаем рабочий поток с паузы
            // Для этого оповещаем GUI о изменении состояния и возвращаем управление рабочему потоку
            ChangeState(EngineState::Started);
        }
        // Рабочий поток может продолжать свое выполнение
        return true;
    }
    void CEngine::ChangeState(EngineState::State state)
    {
        // Эта функция может быть вызвана рабочим потоком,
        // транслируем вызов в GUI поток, используя m_syncChannel
        m_syncChannel.Execute(boost::bind(&CEngine::OnStateChanged, this, state));
    }
    void CEngine::OnStateChanged(EngineState::State state)
    {
        // Эта функция вызывается только из GUI потока, устанавливаем переменную m_state
        // Таким образом изменение переменной m_state будет происходить только из одного потока
        // и не требует использования объектов синхронизации, таких как критические секции или мьютексы
        // То есть состояние рабочего потока не может переключится асинхронно по отношению к GUI потоку
        m_state = state;
        m_listener->OnStateChanged(m_state);
    }
    

    Использование класса CEngine.
    В моем примере объект класса CEngine объявлен членом класса WTL диалога.
    Набор функций диалога сводится к обработчикам нажатия на клавиши «Старт», «Стоп», «Продолжить», «Пауза» которые перевызывают соответствующие фукнции объекта CEngine.

    Также класс диалога подписывается на уведомления о изменении состояния движка при помощи интерфейса IEngineEvents, реализация функции OnStateChanged этого интерфейса была приведена в начале статьи.

    Для обеспечения трансляции сообщений потока в класс CEngine, класс диалога устанавливает себя фильтром оконных сообщений, используя стандартные методы WTL::CMessageLoop::AddMessageFilter, и реализует интерфейс WTL::CMessageFilter:
    class CMessageFilter
    {
    public:
    	virtual BOOL PreTranslateMessage(MSG* pMsg) = 0;
    };
    

    Реализация функции PreTranslateMessage сводится к перевызову функции CEngine::ProcessMessage

    Для решения проблемы взаимной блокировки потоков при закрытии приложения, в классе диалога никаких дополнительный действий не требуется. Эта проблема полностью решена за счет использования CSyncChannel::Close в деструкторе класса CEngine. Таким образом рабочий поток полностью инкапсулирован внутри класса CEngine, что дает ощутимые преимущества при работе с таким классом.
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 7

      +1
      А почему вы не воспользовались готовым FSM—решением типа того же Boost FSM?
        0
        Возможно по этому — habrahabr.ru/post/141477/
        А если серьезно, то я с опаской отношусь к фреймворкам, особенно если они работают в многопоточной среде.
        Любая недокументированная, или слабо документированная мелочь может привести к неправильному использованию и как результат к трудно обнаруживаемым ошибкам.
        Любой такой фреймворк нужно досконально изучать и в первую очередь изучать на примерах.
          +1
          Ну… тот же бустец, например, — изрядно оттестированная вещь, используемая в продакшыне многих крупных проектов.

          Ок, посыл статьи теперь понятен, спасибо.
            0
            В моем примере, используется boost::function + boost::bind. Я считаю использование этих вещей здесь оправданным. Использование же обобщенного фреймворка конечных автоматов я считаю излишним.
            Сложность такого фреймворка в разы превышает сложность моей предметной области.
            Посыл статьи — показать проблемы многопоточности. Как фреймворк конечных автоматов поможет мне в илюстрации этих проблем?
              0
              Гм. Я и написал, что посыл статьи теперь понятен, ну.
                +1
                Извините, я принял Ваш предыдущий комментарий за сарказм.
                  0
                  Сам таким страдаю, бывает.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое