Есть много вариаций на тему данного паттерна, но большинство примеров не подходит для многопоточных приложений.
В этой статье я хочу поделится опытом применения паттерна в многопоточных приложениях и опишу основные проблемы, с которыми мне приходилось сталкиваться.
Цель данной стати — обратить внимание разработчиков на проблемы, с которыми можно столкнуться при создании многопоточных приложений. Выявить подводные камни в реализации коммуникации между компонентами в многопоточном приложении.
Если Вам необходимо готовое решение, обратите внимание на библиотеку Signals2, котрая включена в boost с мая 2009-го года.
Я не пытаюсь предоставить решение, которое можно было бы использовать в готовом виде. Но тем не менее, ознакомившись с материалом, можно обойтись без использования сторонних библиотек, в тех проектах, в которых они по каким-либо причинам не доступны или нежелательны (драйвера, низкоуровневые приложения и т.п.).
NotificationSender — объект, рассылающий сообщения.
Как правило это рабочий поток, извещающий об изменении своего состояния, которое необходимо отобразить на пользовательском интерфейсе.
NotificationListener — объект, реализующий обработку уведомлений.
Как правило это объект, который управляет отображением части пользовательского интерфейса связанного с фоновой задачей.
Таких объектов может быть множество, при этом они могут подключаться/отключаться динамически (к примеру открытие далогового окна, где показываются детали выполнения задачи)
NotificationDispatcher — объект, управляющий подписчиками и рассылкой сообщений.
Рассылка сообщений всем подписчикам.
Процесс подписки/прекшащения подписки.
Время жизни объектов.
В данной статье описан метод синхронной рассылки сообщений. Это означает, что вызов функции SendMessage будет синхронным, и поток, вызывающий этот метод будет ожидать завершения обработки сообщений всеми подписчиками. В ряде случаев такой подход удобней ассинхронной рассылки, но при этом в нем есть трудности с прекращением подписки.
Здесь уникальный идентификатор подписчика — адресс объекта подписчика, функция GetSubscriberId возвращает всегда одинаковое значение для одного объекта подписчика в не зависимости от преобразования типов.
В примере есть проблема, не связанная с многопоточностью. Эта проблема проявляется, когда мы пытаемся отписаться внутри обработчика MessageHandler. Данная проблемма будет решена копированием списка подписчиков перед вызовом MessageHandler.
С одним потоком такой код будет работать довольно стабильно.
Давайте посмотрим что будет при работе нескольких потоков.
Рано или позно произойдет креш.
Проблема заключается в добавлении/удалении подписчиков и одновременной рассылке уведомлений (многопоточный доступ к CDispatcher::m_SubscriberList в нашем примере).
Здесь необходима синхронизация доступа к списку подписчиков.
Синхронизация доступа была реализована при помощи объектов синхронизации (Critical section или Mutex).
Для большей переносимости и для того, чтобы не отвлекаться от сути происходящего, абстрагируемся от прямых вызовов платформенно-зависимых функций, типа EnterCriticalSection. Для этого служит класс CLock.
Для устойчивости к с++ исключениям удобно использовать технологию RAII, а именно класс CScopeLocker, который в конструкторе захватывает объект синхронизации, а в деструкторе освобождает его.
При такой реализации программа не будет падать, но нас поджидает еще одна неприятная ситуация.
Допустим у нас есть некий поток, выполняющий какую-то фоновую задачу и есть окно, где отображается ход выполнения этой задачи.
Как правило, поток посылает уведомление классу окна, который в свою очередь вызывает системную функцию SendMessage, которая инициирует какие-то действия в контексте оконной процедуры.
Системная функция SendMessage является блокирующей, она отсылает сообщение потоку окна и ждет пока тот его обработает.
Если подключение/отключение объекта-слушателя будет происходить также в контексте оконной процедуры (в потоке окна) возможна взаимная блокировка потоков, так называемый deadlock.
Такой deadlock может воспроизоводится крайне редко (в момент вызова Subscribe/Unsubscribe и одновременном вызове MessageHandler в отдельном потоке)
Следующий код эмулирует ситуацию с блокирующим вызовом системной ф-ции SendMessage.
Проблема заключается в том, что главный поток захватывает глобальный объект синхронизации g_Lock (при аналогии с оконной процедурой — выполняется в контексте оконного потока), и затем вызывает метод Subscribe/Unsubscribe, который внутри пытается захватить второй объект синхронизации CDispatcher::m_Lock.
В этот момент рабочий поток посылает уведомление, захватив при этом CDispatcher::m_Lock в функции CDispatcher::SendMessage, и затем пытается захватить глобальный объект синхронизации g_Lock (при аналогии с оконом — вызывает системную функцию SendMessage).
Это можно назвать класическим deadlock-ом.
Проблема скрывается в функции CDispatcher::SendMessage().
Здесь должно соблюдаться правило — нельзя вызывать callback-функцию захватив при этом какой-либо объект синхронизации.
Итак, убираем блокировку при рассылке уведомлений.
После того, как мы убрали deadlock, появилась другая проблема — время жизни объектов-подписчиков.
У нас больше нет гарантии, что метод MessageHandler не будет вызван после вызова Unsubscribe, и по этому мы не можем удалять объект-подписчик непосредственно после вызова Unsubscribe.
В данной ситуации проще всего контролировать время жизни объектов-подписчиков с использованием счетчика ссылок.
Для этого можно исползовать технологию COM — унаследовать интерфейс CSubscriber от IUnknown и использовать ATL CComPtr для списка объектов-подписчиков, тоесть заменить std::vector<CSubscriber*> на std::vector<CComPtr>.
Но такая реализация чревата дополнительными расходами на реализацию классов-подписчиков, так как в каждом из них должны быть реализованы методы AddRef/Release и ненужный QueryInterface, хотя если в проекте активно используется COM, то такой подход может иметь приемущество.
Для контроля времени жизни объектов-подписчиков с исползованием счетчика ссылок хорошо подойдут умные указатели.
В данной реализации я заменил «голый» указатель CSubscriber* на «умный» указатель со счетчиком ссылок, такой оказался в библиотеке boost.
Также в функцию Unsubscribe я добавил переменную toRelease для того, чтобы вызвать деструктор объекта-подписчика уже после вызова Unlock (нельзя вызывать callback-функцию, включая деструктор объекта подписчика, захватив при этом какой-либо объект синхронизации).
Cтоит обратить внимание на то, что в функции SendMessage происходит копирование списка умных указателей (после копирования все указатели увеличивают свои счетчики ссылок, а при выходе из функции уменьшают, что и контролирует время жизни объектов-подписчиков)
Как правило вызов функции SendMessage будет происходить намного чаще чем Subscribe/Unsubscribe. При большом количестве подписчиков узким местом может стать копирование списка подписчиков внутри SendMessage.
Копирование списка подписчиков можно перенести в функции Subscribe/Unsubscribe. Это будет похоже на методику из lock-free алгоритмов.
Объект CDispatcher будет хранить список подписчиков не на прямую, а при помощи умного указателя. Внутри функции SendMessage мы будем получать указатель на текущий список подписчиков и работать с ним. В функциях Subscribe/Unsubscribe мы будем каждый раз создавать новый список подписчиков и перенаправлять указатель внутри объекта CDispatcher на новый список подписчиков. Таким образом в то время, когда указатель на список подписчиков в объекте CDispatcher будет указывать уже на новый список подписчиков, ф-ция SendMessage по прежнему будет работать со старым списком. Так как старый список подписчиков никто не изменяет, то все будет работать стабильно в многопоточной среде.
В принципе, можно несколько модифицировать функции Subscribe/Unsubscribe и реализовать полностью lock-free алгоритм, но это уже другая тема.
Медот Unsubscribe является асинхронным и не гарантирует после своего завершения полное прекращение рассылки, половинное решение — подписчик получает уведомление о прекращении подписки при помощи ф-ции UnsubscribeHandler. Для реализации этого поведения добавлен промежуточный класс CSubscriberItem, который в своем деструкоторе вызывает ф-цию UnsubscribeHandler.
Библиотека boost::signals2 статья
Умные указатели Джефф Элджер
Resource Acquisition Is Initialization (RAII) википедия
Комментарии к первой версии этой статьи можно найти здесь
В этой статье я хочу поделится опытом применения паттерна в многопоточных приложениях и опишу основные проблемы, с которыми мне приходилось сталкиваться.
Цель данной стати — обратить внимание разработчиков на проблемы, с которыми можно столкнуться при создании многопоточных приложений. Выявить подводные камни в реализации коммуникации между компонентами в многопоточном приложении.
Если Вам необходимо готовое решение, обратите внимание на библиотеку Signals2, котрая включена в boost с мая 2009-го года.
Я не пытаюсь предоставить решение, которое можно было бы использовать в готовом виде. Но тем не менее, ознакомившись с материалом, можно обойтись без использования сторонних библиотек, в тех проектах, в которых они по каким-либо причинам не доступны или нежелательны (драйвера, низкоуровневые приложения и т.п.).
Предметная область
Действующие лица
NotificationSender — объект, рассылающий сообщения.
Как правило это рабочий поток, извещающий об изменении своего состояния, которое необходимо отобразить на пользовательском интерфейсе.
NotificationListener — объект, реализующий обработку уведомлений.
Как правило это объект, который управляет отображением части пользовательского интерфейса связанного с фоновой задачей.
Таких объектов может быть множество, при этом они могут подключаться/отключаться динамически (к примеру открытие далогового окна, где показываются детали выполнения задачи)
NotificationDispatcher — объект, управляющий подписчиками и рассылкой сообщений.
Взаимодействие между объектами
Рассылка сообщений всем подписчикам.
Процесс подписки/прекшащения подписки.
Время жизни объектов.
В данной статье описан метод синхронной рассылки сообщений. Это означает, что вызов функции SendMessage будет синхронным, и поток, вызывающий этот метод будет ожидать завершения обработки сообщений всеми подписчиками. В ряде случаев такой подход удобней ассинхронной рассылки, но при этом в нем есть трудности с прекращением подписки.
Простейшая реализация для однопоточной среды
typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; };
Здесь уникальный идентификатор подписчика — адресс объекта подписчика, функция GetSubscriberId возвращает всегда одинаковое значение для одного объекта подписчика в не зависимости от преобразования типов.
Пример использования
class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { wprintf(L"%d\n", *((int*)pContext)); } }; int _tmain(int argc, _TCHAR* argv[]) { CDispatcher Dispatcher; CListener Listener1; CListener Listener2; Dispatcher.Subscribe(&Listener1); Dispatcher.Subscribe(&Listener2); for(int i = 0; i < 5; ++i) { Dispatcher.SendMessage(&i); } Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); return 0; }
Отключение подписчика внутри обработчика сообщений
В примере есть проблема, не связанная с многопоточностью. Эта проблема проявляется, когда мы пытаемся отписаться внутри обработчика MessageHandler. Данная проблемма будет решена копированием списка подписчиков перед вызовом MessageHandler.
Переходим к многопоточной среде
С одним потоком такой код будет работать довольно стабильно.
Давайте посмотрим что будет при работе нескольких потоков.
CDispatcher g_Dispatcher; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); } return 0; }
Рано или позно произойдет креш.
Проблема заключается в добавлении/удалении подписчиков и одновременной рассылке уведомлений (многопоточный доступ к CDispatcher::m_SubscriberList в нашем примере).
Здесь необходима синхронизация доступа к списку подписчиков.
Синхронизация доступа к списку подписчиков
class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; };
Синхронизация доступа была реализована при помощи объектов синхронизации (Critical section или Mutex).
Для большей переносимости и для того, чтобы не отвлекаться от сути происходящего, абстрагируемся от прямых вызовов платформенно-зависимых функций, типа EnterCriticalSection. Для этого служит класс CLock.
Для устойчивости к с++ исключениям удобно использовать технологию RAII, а именно класс CScopeLocker, который в конструкторе захватывает объект синхронизации, а в деструкторе освобождает его.
При такой реализации программа не будет падать, но нас поджидает еще одна неприятная ситуация.
Борьба с взаимной блокировкой потоков (deadlock)
Допустим у нас есть некий поток, выполняющий какую-то фоновую задачу и есть окно, где отображается ход выполнения этой задачи.
Как правило, поток посылает уведомление классу окна, который в свою очередь вызывает системную функцию SendMessage, которая инициирует какие-то действия в контексте оконной процедуры.
Системная функция SendMessage является блокирующей, она отсылает сообщение потоку окна и ждет пока тот его обработает.
Если подключение/отключение объекта-слушателя будет происходить также в контексте оконной процедуры (в потоке окна) возможна взаимная блокировка потоков, так называемый deadlock.
Такой deadlock может воспроизоводится крайне редко (в момент вызова Subscribe/Unsubscribe и одновременном вызове MessageHandler в отдельном потоке)
Следующий код эмулирует ситуацию с блокирующим вызовом системной ф-ции SendMessage.
CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //Эмулируем блокирующий вызов SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { //Эмулируем контекст оконной процедуры (обработчик оконного сообщения) g_Lock.Lock(); g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); g_Lock.Unlock(); } return 0; }
Проблема заключается в том, что главный поток захватывает глобальный объект синхронизации g_Lock (при аналогии с оконной процедурой — выполняется в контексте оконного потока), и затем вызывает метод Subscribe/Unsubscribe, который внутри пытается захватить второй объект синхронизации CDispatcher::m_Lock.
В этот момент рабочий поток посылает уведомление, захватив при этом CDispatcher::m_Lock в функции CDispatcher::SendMessage, и затем пытается захватить глобальный объект синхронизации g_Lock (при аналогии с оконом — вызывает системную функцию SendMessage).
Поток окна A -> B Рабочий поток B -> A
Это можно назвать класическим deadlock-ом.
Проблема скрывается в функции CDispatcher::SendMessage().
Здесь должно соблюдаться правило — нельзя вызывать callback-функцию захватив при этом какой-либо объект синхронизации.
Итак, убираем блокировку при рассылке уведомлений.
void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } }
Контроль времени жизни подписчиков
После того, как мы убрали deadlock, появилась другая проблема — время жизни объектов-подписчиков.
У нас больше нет гарантии, что метод MessageHandler не будет вызван после вызова Unsubscribe, и по этому мы не можем удалять объект-подписчик непосредственно после вызова Unsubscribe.
В данной ситуации проще всего контролировать время жизни объектов-подписчиков с использованием счетчика ссылок.
Для этого можно исползовать технологию COM — унаследовать интерфейс CSubscriber от IUnknown и использовать ATL CComPtr для списка объектов-подписчиков, тоесть заменить std::vector<CSubscriber*> на std::vector<CComPtr>.
Но такая реализация чревата дополнительными расходами на реализацию классов-подписчиков, так как в каждом из них должны быть реализованы методы AddRef/Release и ненужный QueryInterface, хотя если в проекте активно используется COM, то такой подход может иметь приемущество.
Для контроля времени жизни объектов-подписчиков с исползованием счетчика ссылок хорошо подойдут умные указатели.
Простая реализация для многопоточной среды
typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; class CDispatcher { private: typedef std::vector<CSubscriberPtr> CSubscriberList; public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CSubscriberPtr toRelease; CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { toRelease = m_SubscriberList[i]; m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; };
В данной реализации я заменил «голый» указатель CSubscriber* на «умный» указатель со счетчиком ссылок, такой оказался в библиотеке boost.
Также в функцию Unsubscribe я добавил переменную toRelease для того, чтобы вызвать деструктор объекта-подписчика уже после вызова Unlock (нельзя вызывать callback-функцию, включая деструктор объекта подписчика, захватив при этом какой-либо объект синхронизации).
Cтоит обратить внимание на то, что в функции SendMessage происходит копирование списка умных указателей (после копирования все указатели увеличивают свои счетчики ссылок, а при выходе из функции уменьшают, что и контролирует время жизни объектов-подписчиков)
Тестируем
CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //Эмулируем блокирующий вызов SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); for(;;) { boost::shared_ptr<CListener> pListener1(new CListener); boost::shared_ptr<CListener> pListener2(new CListener); //Эмулируем контекст оконной процедуры (обработчик оконного сообщения) g_Lock.Lock(); g_Dispatcher.Subscribe(pListener1); g_Dispatcher.Subscribe(pListener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId()); g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId()); g_Lock.Unlock(); } return 0; }
Соптимизированная реализация для многопоточной среды
Как правило вызов функции SendMessage будет происходить намного чаще чем Subscribe/Unsubscribe. При большом количестве подписчиков узким местом может стать копирование списка подписчиков внутри SendMessage.
Копирование списка подписчиков можно перенести в функции Subscribe/Unsubscribe. Это будет похоже на методику из lock-free алгоритмов.
Объект CDispatcher будет хранить список подписчиков не на прямую, а при помощи умного указателя. Внутри функции SendMessage мы будем получать указатель на текущий список подписчиков и работать с ним. В функциях Subscribe/Unsubscribe мы будем каждый раз создавать новый список подписчиков и перенаправлять указатель внутри объекта CDispatcher на новый список подписчиков. Таким образом в то время, когда указатель на список подписчиков в объекте CDispatcher будет указывать уже на новый список подписчиков, ф-ция SendMessage по прежнему будет работать со старым списком. Так как старый список подписчиков никто не изменяет, то все будет работать стабильно в многопоточной среде.
В принципе, можно несколько модифицировать функции Subscribe/Unsubscribe и реализовать полностью lock-free алгоритм, но это уже другая тема.
Медот Unsubscribe является асинхронным и не гарантирует после своего завершения полное прекращение рассылки, половинное решение — подписчик получает уведомление о прекращении подписки при помощи ф-ции UnsubscribeHandler. Для реализации этого поведения добавлен промежуточный класс CSubscriberItem, который в своем деструкоторе вызывает ф-цию UnsubscribeHandler.
namespace Observer { ////////////////////////// // Subscriber ////////////////////////// typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; virtual void UnsubscribeHandler() = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; ////////////////////////////////////////////////////////////////////// // Dispatcher /////////////////////////////////// class CDispatcher { private: class CSubscriberItem { public: CSubscriberItem(CSubscriberPtr pSubscriber) :m_pSubscriber(pSubscriber) { } ~CSubscriberItem() { m_pSubscriber->UnsubscribeHandler(); }; CSubscriberPtr Subscriber()const {return m_pSubscriber;} private: CSubscriberPtr m_pSubscriber; }; typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr; typedef std::vector<CSubscriberItemPtr> CSubscriberList; typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr; public: CDispatcher() { } private: CDispatcher(const CDispatcher&){} CDispatcher& operator=(const CDispatcher&){return *this;} public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { //Declaration of the next shared pointer before ScopeLocker //prevents release of subscribers from under lock CSubscriberListPtr pNewSubscriberList(new CSubscriberList()); //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(m_pSubscriberList) { //Copy existing subscribers pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end()); } for(size_t i = 0; i < pNewSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } //Add new subscriber to new subscriber list pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber))); //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { //Declaration of the next shared pointers before ScopeLocker //prevents release of subscribers from under lock CSubscriberItemPtr pSubscriberItemToRelease; CSubscriberListPtr pNewSubscriberList; //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return false; } pNewSubscriberList = CSubscriberListPtr(new CSubscriberList()); for(size_t i = 0; i < m_pSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == id) { pSubscriberItemToRelease = pSubscriberItem; } else { pNewSubscriberList->push_back(pSubscriberItem); } } //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; if(!pSubscriberItemToRelease.get()) { return false; } return true; } void SendMessage(void* pContext) { CSubscriberListPtr pSubscriberList; { CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return; } //Get shared pointer to an existing list of subscribers pSubscriberList = m_pSubscriberList; } //pSubscriberList pointer to copy of subscribers' list for(size_t i = 0; i < pSubscriberList->size(); ++i) { (*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext); } } private: CSubscriberListPtr m_pSubscriberList; CLock m_Lock; }; }; //namespace Observer
Ссылки
Библиотека boost::signals2 статья
Умные указатели Джефф Элджер
Resource Acquisition Is Initialization (RAII) википедия
Комментарии к первой версии этой статьи можно найти здесь
