Организация рабочих потоков: синхронизационный канал

    Представьте себе архитектуру типичного приложения:

    Есть рабочий поток движка, выполняющий какую-то функциональность, допустим копирование файлов (архивирование, поиск простых чисел). В общем что-то длительное.
    Данный поток должен периодически сообщать информацию о текущем копируемом файле, а также уметь обрабатывать ошибки, допустим ошибка нехватки места на диске.

    Графический интерфейс такого приложения должен позволять запускать процесс копирования файлов, уметь приостановить копирование, а также, в случае ошибки, отобразить соответствующий диалог с вопросом к пользователю.

    Казалось бы, как можно допустить ошибку в такой простой ситуации?

    Проблемы многопоточности


    Когда в программе появляется дополнительный поток — сразу же возникает проблема взаимодействия между потоками. Даже если поток ничего не делает и ни с кем не общается, всегда есть проблема правильной остановки потока.

    Даже при работе с высокоуровневыми классами-обертками над потоками, легко сделать что-то не так, если до конца не понимать правильность работы с потоками. По этому в данной статье будет идти речь о работе с потоками на уровне WinAPI.

    И так, вернемся к нашему примеру.

    Рабочий поток движка должен каким-то образом сообщать потоку GUI о своем состоянии (текущий копируемый файл), уведомлять о постановке на паузу, а так-же инициировать сообщение об ошибке.

    Два основных способа уведомлений — асинхронный и синхронный


    Асинхронный способ — рабочий поток уведомляет о своем состоянии асинхронными сообщениями (PostMessage).
    После посылки такого сообщения, поток, как правило, не дожидается ответа и продолжает свою работу.
    А в случае невозможности продолжать, поток ожидает вызова управляющей команды от GUI.

    Синхронный способ — рабочий поток уведомляет о своем состоянии синхронными вызовами (SendMessage), с ожиданием завершения обработки таких вызовов.
    Такой способ удобен тем, что рабочий поток, в момент обработки сообщений, находится в заранее известном состоянии. Нет необходимости в излишней синхронизации.

    Также не мало важен третий вариант — опрос состояния движка по таймеру. Таким образом лучше всего реализовывать часто меняющиеся состояния, например прогресс выполнения. Но этот способ не относится к теме данной статьи.

    В асинхронном способе есть и свои преимущества, но речь пойдет о синхронных сообщениях, основная выгода которых — простота.

    Подводные камни: SendMessage + остановка потока


    Когда я вижу рабочий поток, то сразу задаюсь вопросом как он взаимодействует с GUI и как его при этом останавливают.

    Будьте внимательны, если рабочий поток прямым или косвенным образом вызывает блокирующую функцию SendMessage для GUI потока. На примере WinAPI, это может быть что-нибудь совсем безобидное, например какой нибудь вызов SetText, который внутри вызывает SendMessage WM_SETTEXT. В этом случае нужно быть особо внимательным при попытке остановки потока в обработчиках нажатия на кнопки и при закрытии приложения (в случае если GUI поток является основным потоком приложения). Это не совсем очевидно, дальше я попытаюсь объяснить.

    Правильный способ завершить поток — это дождаться завершения, с использованием одной из функций WaitFor, передав параметром HANDLE потока. Притом дожидаться полной остановки потока обязательно — никаких таймаутов с последующим вызовом TerminateThread. Например:

    // INFINITE означает, что - функция не вернет управление до тех пор, пока поток не завершится
    WaitForSingleObject(hThread, INFINITE);
    

    Если этого не сделать, возможны непредсказуемые последствия (зависания и падения программы).
    Особенно, если поток находится внутри динамически подключаемой библиотеки, которая должна тут-же выгрузиться.

    И так, еще раз о проблеме SendMessage: если мы будем в обработчике оконных сообщений ждать завершения потока, то мы таким образом заблокируем эту самую обработку оконных сообщений. А рабочий поток, в свою очередь, пошлет сообщение и будет ждать пока его обработают. Таким образом мы гарантированно получим взаимную блокировку потоков (deadlock).
    Один из вариантов решения в случае синхронных сообщений — не просто ждать завершение потока, а прокручивать оконные сообщения, пока поток не завершиться (костыль конечно, но тоже имеет право на существование)

    Вторая архитектурная проблема — если рабочий поток вызывает напрямую код GUI, то необходимо позаботиться о синхронизации. Синхронизация потоков получается размазанной по всей программе.

    Вариант решения перечисленных проблем


    Рабочий поток должен быть изолирован внутри интерфейса движка.

    Все уведомления от движка должны приходить синхронно и в контексте клиентского потока, по принципу COM Single-Threaded Apartments.

    Вызовы должны происходить синхронно и прозрачно: рабочий поток вызывает некую функцию, которая не возвращает управление до тех пор, пока поток GUI не обработает этот вызов.
    Но при этом рабочий поток должен иметь возможность завершиться, даже в момент вызова такой функции.

    В итоге интерфейс движка для GUI будет однопоточным, что существенно упростит работу с таким движком.

    Вариант реализации и пример на C++


    Для реализации этого поведения можно создать повторно используемый объект, который будет обеспечивать переключение контекста потоков при вызове кода GUI.

    Я назвал такой объект — синхронизационный канал.

    И так, делаем некий синхронизационный канал, при помощи которого рабочий поток движка будет вызывать функции обратного вызова, реализуемые GUI.

    Канал будет иметь функцию Execute, с параметром boost::function, куда можно передать функтор, созданный boost::bind. Таким образом, с использованием данного канала, можно будет вызвать функцию обратного вызова с любой сигнатурой, например:
    class IEngineEvents
    {
    public:
        virtual void OnProgress(int progress) = 0;
        ...
    };
    //где-то в движке...
    IEngineEvents* listener; //указатель на объект, реализуемый GUI
    syncChannel.Execute(boost::bind(&IEngineEvents::OnProgress, listener, 30));
    

    Функция Execute, как говорилось раньше, синхронная — она не завершается до тех пор, пока функция обратного вызова не будет завершена. Кроме исключения, описанного ниже.

    Канал также должен иметь функцию Close, действие которой следующее: все вызовы функции Execute завершаются, новые вызовы функции Execute не проходят. Рабочий поток освобождается и, таким образом, решается проблема остановки рабочего потока — можно использовать функцию WaitFor без необходимости прокрутки оконных сообщений.

    Для переключения контекста потоков в примере используется стандартная очередь сообщений Win32 потока и функция PostThreadMessage.

    Принцип работы следующий: рабочий поток посылает сообщение при помощи PostThreadMessage, и далее ждет, пока это сообщение не будет обработано, для этого в примере используется отдельный объект событие.
    В это время GUI поток должен обрабатывать свои оконные сообщения, одним из которых должно быть сообщение от рабочего потока, которое он обязан обработать и известить рабочий поток о завершении обработки, используя объект событие.

    Данная реализация предполагает функцию ProcessMessage, которую необходимо вызывать из цикла обработки оконных сообщений или оконной процедуры. Возможна реализации и без такой функции, например канал может создавать себе невидимое окно, и обрабатывать все сообщения внутри. Кроме того, возможны реализации без использования оконных сообщений в принципе.

    Хотелось бы еще сказать, что пример несет лишь ознакомительный характер, и не является готовым решением.
    // SyncChannel.h
    class CSyncChannel
    {
    public:
        typedef boost::function<void()> CCallback;
    
    public:
        CSyncChannel(void);
        ~CSyncChannel(void);
    
    public:
        bool Create(DWORD clientThreadId);
        void Close();
        bool Execute(CCallback callback);
        bool ProcessMessage(MSG msg);
    
    private:
        DWORD                           m_clientThreadId;
        CCallback                       m_callback;
        HANDLE                          m_deliveredEvent;
        volatile bool                   m_closeFlag;
    };
    

    // SyncChannel.cpp
    UINT WM_SYNC_CHANNEL_COMMAND = WM_APP + 500;
    CSyncChannel::CSyncChannel(void) : m_closeFlag(true)
    {}
    CSyncChannel::~CSyncChannel(void)
    {}
    bool CSyncChannel::Create(DWORD clientThreadId)
    {
        if (!m_closeFlag)
        {
            return false;
        }
        m_clientThreadId = clientThreadId;
        m_deliveredEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
        if (!m_deliveredEvent)
        {
            return false;
        }
        m_closeFlag = false;
        return true;
    }
    void CSyncChannel::Close()
    {
        m_closeFlag = true;
        if (m_deliveredEvent)
        {
            CloseHandle(m_deliveredEvent);
            m_deliveredEvent = NULL;
        }
    }
    bool CSyncChannel::Execute(CCallback callback)
    {
        // Эта функция может быть вызвана с любого потока.
        // Дело в том, что некоторые функции движка могут вызываться клиентским потоком.
        // Например функция Pause(), в которой может быть
        // тут-же вызвана функция обратного вызова,
        // изменяющая состояние движка на что нибудь вроде "pause pending"
        if (m_closeFlag)
        {
            return false;
        }
        if (GetCurrentThreadId() == m_clientThreadId)
        {
            // Если вызывающий поток - это клиентский поток,
            // то мы должны вызвать колбек напрямую, без переключения контекста потоков.
            // Иначе поток сам себе пошлет сообщение, и будет ждать пока он сам его обработает,
            // что привело бы к блокировке потоков - поток будет ждать сам себя.
            callback();
        }
        else
        {
            // Функция Execute была вызвана из рабочего потока,
            // по этому мы должны послать сообщение клиентскому потоку,
            // и вызвать функцию обратного вызова уже в нем.
    
            // Сохраняем функцию обратного вызова для того,
            // чтобы она была вызвана в клиентском потоке.
            // Данная реализация предполагает один рабочий поток, и один клиентский,
            // если рабочих потоков будет сразу несколько,
            // то здесь необходимо добавить синхронизацию.
            m_callback = callback;
    
            // Сбрасываем объект событие для того, чтобы клиентский поток
            // мог нам его установить после того, как он обработает вызов
            ResetEvent(m_deliveredEvent);
    
            // Уведомляем клиентский поток о том, что необходимо
            // вызвать функцию обратного вызова.
            // Для этого посылаем в клиентский поток сообщение, получив которое,
            // клиентский поток должен будет вызвать функцию CSyncChannel::ProcessMessage()
            if (!PostThreadMessage(m_clientThreadId, WM_SYNC_CHANNEL_COMMAND, NULL, NULL))
            {
                return false;
            }
    
            // Ждем, пока клиенсткий поток вызовет функцию CSyncChannel::ProcessMessage(),
            // в которой установится событие m_deliveredEvent,
            // либо пока не будет установлен флаг m_closeFlag
            // Можно заменить флаг m_closeFlag на объект событие
            // и использовать WaitForMultipleObjects, но канал будет закрываться не часто,
            // и моментальной реакции на это не требуется.
            DWORD waitResult = WAIT_TIMEOUT;
            while (waitResult == WAIT_TIMEOUT && !m_closeFlag)
            {
                waitResult = WaitForSingleObject(m_deliveredEvent, 100);
            }
            if (waitResult != WAIT_OBJECT_0)
            {
                // Мы не дождались сообщения о доставке, а значит мы дождались флага закрытия
                return false;
            }
        }
        // Функция обратного вызова была успешно вызвана в клиентском потоке
        return true;
    }
    bool CSyncChannel::ProcessMessage(MSG msg)
    {
        // Эта функция вызывается только из клиентского потока
        if (msg.message != WM_SYNC_CHANNEL_COMMAND)
        {
            // Клиентский код вызывает эту функцию для всех сообщений потока,
            // фильтруем не наши сообщения
            return false;
        }
        if (!m_closeFlag)
        {
            // Мы переключились в контекст клиентского потока,
            // и теперь мы можем вызвать функцию обратного вызова
            m_callback();
    
            // После обработки вызова, отпускаем рабочий поток.
            // Для этого необходимо установить объект событие
            SetEvent(m_deliveredEvent);
        }
        return true;
    }
    

    Пример использования класса CSyncChannel смотрите в следующей статье — Организация рабочих потоков: управление состоянием движка.
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 8

      0
      Приведенный в начале стаьи пример с копрованием можно решить и без многопоточности — порты завершения ввода вывода. Да, с дополнительным потоком оно как бы проще, но UI и многопоточность очень много несут подводных камней — некоторые действия зависят от того в каком потоке вы что-то инициализировали, а в каком что-то пытаетесь рисовать.
      Мое мнение — стремиться минимизировать количество потоков в программе, т.к. потоки дают весьма «простое» на первый взгляд архитектурное решение, но со временем накапливают множество потенциальных ошибок, которые произойдут в самый неподходящий момент.
        0
        Замените «копирование файлов» на что-нибудь другое, что необходимо реализовывать самому, без доступного асинхронного API.
        Предлагаемое мной решение — как-раз таки не допускать многопоточность в UI, а изолировать рабочие потоки в движке.
          0
          Напоминает одно из прошлых решений, в котором был некоторый менеждер сообщений. В менеждер можно было складывать сообщения от разных компонент из разных потоков и подписываться на сообщения, которые обрабатывались в одном потоке.
            0
            Я не говорил что идея новаторская, я даже упоминал что это что-то подобное COM STA.
            Идея стати в том, чтобы показать проблемы многопоточности, и как их можно разрешить простым способом.
            Люди упорно продолжают допускать грубые ошибки при работе с потоками, и даже не задумываются об этом.
        0
        Хотелось бы узнать мнение минусующих.
        Почему данная тема не достойна того, чтобы быть на хабре?
          0
          Я несколько раз прочитал но не понял как это работает. Можно пример как это использовать?
          И не помешало бы комментарии в код
          if (GetCurrentThreadId() == m_clientThreadId)
          {
          callback();
          return true;
          }
          else
          {
          m_callback = callback;
          ResetEvent(m_deliveredEvent);

          если треды совпали делаем коллбек, если нет делаем какую то околесицу…
            0
            Коментарии добавил, чуть позже добавлю пример использования.
            Если вкратце, то объект CSyncChannel должен использоваться внутри интерфейса движка.
            Движок использует канал для того, чтобы оповещать GUI о изменении своего состояния и о ошибках.
            GUI подписывается на эти сообщения, используя какой-то интерфейс, вроде такого:

            class IEngineEvents
            {
            public:
                virtual void OnProgress(int progress) = 0;
                virtual bool OnError(int code) = 0;
            };
            


            Движок вызывает функции GUI, используя следующую конструкцию:

            IEngineEvents* listener; //указатель на объект, реализуемый GUI
            syncChannel.Execute(boost::bind(&IEngineEvents::OnProgress, listener, 30));
            
              0
              >>Можно пример как это использовать?
              Вот, пожалуйста:
              habrahabr.ru/post/141783/

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое