В первой части этой статьи основное внимание будет уделено потокам и блокировкам в С++11, условные переменные во всей своей красе будут подробно рассмотрены во второй части…
В C++11, работа с потокам осуществляется по средствам класса
В этом примере,
Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в
Программа напечатает в консоль 2. Если не использовать
Помимо метода
Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):
Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.
Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые
В последнем примере, я должен был синхронизировать доступ к вектору
Приведу пример использования
Программа должна выдавать примерно следующее:
Перед обращением к общим данным, мьютекс должен быть заблокирован методом
Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе
Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование
При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова
Теперь, результат работы программы будет следующего вида:
Вы, наверное, заметили, что при вызове
Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:
С учетом этого, мы можем переписать класс контейнер следующим образом:
Можно поспорить насчет того, что метод
Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов
Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:
Объявлены они следующим образом:
Помимо «оберток» для мьютексов,
Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция
Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).
Для решения этой проблемы можно использовать
Продолжение: условные переменные
Потоки
В C++11, работа с потокам осуществляется по средствам класса
std::thread
(доступного из заголовочного файла <thread>
), который может работать с регулярными функциями, лямбдами и функторами. Кроме того, он позволяет вам передавать любое число параметров в функцию потока.#include <thread>
void threadFunction()
{
// do smth
}
int main()
{
std::thread thr(threadFunction);
thr.join();
return 0;
}
В этом примере,
thr
— это объект, представляющий поток, в котором будет выполняться функция threadFunction()
. Вызов join
блокирует вызывающий поток (в нашем случае — поток main) до тех пор, пока thr
(а точнее threadFunction()
) не выполнит свою работу. Если функция потока возвращает значение — оно будет проигнорировано. Однако принять функция может любое количество параметров.void threadFunction(int i, double d, const std::string &s)
{
std::cout << i << ", " << d << ", " << s << std::endl;
}
int main()
{
std::thread thr(threadFunction, 1, 2.34, "example");
thr.join();
return 0;
}
Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в
std::ref
или std::cref
, как в примере:void threadFunction(int &a)
{
a++;
}
int main()
{
int a = 1;
std::thread thr(threadFunction, std::ref(a));
thr.join();
std::cout << a << std::endl;
return 0;
}
Программа напечатает в консоль 2. Если не использовать
std::ref
, то результатом работы программы будет 1.Помимо метода
join
, следует рассмотреть еще один, похожий метод — detach
. detach
позволяет отсоединить поток от объекта, иными словами, сделать его фоновым. К отсоединенным потокам больше нельзя применять join
.int main()
{
std::thread thr(threadFunction);
thr.detach();
return 0;
}
Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):
try
{
std::thread thr1(threadFunction);
std::thread thr2(threadFunction);
thr1.join();
thr2.join();
}
catch (const std::exception &ex)
{
std::cout << ex.what() << std::endl;
}
Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.
std::mutex g_mutex;
std::vector<std::exception_ptr> g_exceptions;
void throw_function()
{
throw std::exception("something wrong happened");
}
void threadFunction()
{
try
{
throw_function();
}
catch (...)
{
std::lock_guard<std::mutex> lock(g_mutex);
g_exceptions.push_back(std::current_exception());
}
}
int main()
{
g_exceptions.clear();
std::thread thr(threadFunction);
thr.join();
for(auto &e: g_exceptions)
{
try
{
if(e != nullptr)
std::rethrow_exception(e);
}
catch (const std::exception &e)
{
std::cout << e.what() << std::endl;
}
}
return 0;
}
Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые
<thread>
, в пространстве имен std::this_thread
: - get_id: возвращает id текущего потока
- yield: говорит планировщику выполнять другие потоки, может использоваться при активном ожидании
- sleep_for: блокирует выполнение текущего потока в течение установленного периода
- sleep_until: блокирует выполнение текущего потока, пока не будет достигнут указанный момент времени
Блокировки
В последнем примере, я должен был синхронизировать доступ к вектору
g_exceptions
, чтобы быть уверенным, что только один поток одновременно может вставить новый элемент. Для этого я использовал мьютекс и блокировку на мьютекс. Мьютекс — базовый элемент синхронизации и в С++11 представлен в 4 формах в заголовочном файле <mutex>
:- mutex: обеспечивает базовые функции lock() и unlock() и не блокируемый метод try_lock()
- recursive_mutex: может войти «сам в себя»
- timed_mutex: в отличие от обычного мьютекса, имеет еще два метода: try_lock_for() и try_lock_until()
- recursive_timed_mutex: это комбинация timed_mutex и recursive_mutex
Приведу пример использования
std::mutex
с упомянутыми ранее функциями-помощниками get_id()
и sleep_for()
:#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
std::mutex g_lock;
void threadFunction()
{
g_lock.lock();
std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(rand()%10));
std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
g_lock.unlock();
}
int main()
{
srand((unsigned int)time(0));
std::thread t1(threadFunction);
std::thread t2(threadFunction);
std::thread t3(threadFunction);
t1.join();
t2.join();
t3.join();
return 0;
}
Программа должна выдавать примерно следующее:
entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424
Перед обращением к общим данным, мьютекс должен быть заблокирован методом
lock
, а после окончания работы с общими данными — разблокирован методом unlock
.Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе
std::vector
), имеющий методы add()
для добавления одного элемента и addrange()
для добавления нескольких элементов.Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование
va_args
. Также, метод dump()
не должен принадлежать контейнеру, а должен быть автономной функцией. Цель этого примера в том, что показать основные концепции использования мьютексов, а не не сделать полноценный, безошибочный, потокобезопасный контейнер.template <typename T>
class container
{
std::mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
_lock.lock();
_elements.push_back(element);
_lock.unlock();
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
_lock.lock();
add(va_arg(arguments, T));
_lock.unlock();
}
va_end(arguments);
}
void dump()
{
_lock.lock();
for(auto e: _elements)
std::cout << e << std::endl;
_lock.unlock();
}
};
void threadFunction(container<int> &c)
{
c.addrange(3, rand(), rand(), rand());
}
int main()
{
srand((unsigned int)time(0));
container<int> cntr;
std::thread t1(threadFunction, std::ref(cntr));
std::thread t2(threadFunction, std::ref(cntr));
std::thread t3(threadFunction, std::ref(cntr));
t1.join();
t2.join();
t3.join();
cntr.dump();
return 0;
}
При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова
unlock
), что невозможно. Здесь и выходит на сцену std::recursive_mutex
, который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock
бросит исключение std::system_error. Поэтому, решение проблемы в коде выше (кроме изменения реализации addrange()
, чтобы не вызывались lock
и unlock
), заключается в замене мьютекса на std::recursive_mutex
.template <typename T>
class container
{
std::recursive_mutex _lock;
// ...
};
Теперь, результат работы программы будет следующего вида:
6334
18467
41
6334
18467
41
6334
18467
41
Вы, наверное, заметили, что при вызове
threadFunction()
, генерируются одни и те же числа. Это происходит потому, что функция void srand (unsigned int seed);
инициализирует seed
только для потока main. В других потоках, генератор псевдо-случайных чисел не инициализируется и получаются каждый раз одни и те же числа.Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:
- lock_guard: когда объект создан, он пытается получить мьютекс (вызывая
lock()
), а когда объект уничтожен, он автоматически освобождает мьютекс (вызываяunlock()
) - unique_lock: в отличие от
lock_guard
, также поддерживает отложенную блокировку, временную блокировку, рекурсивную блокировку и использование условных переменных
С учетом этого, мы можем переписать класс контейнер следующим образом:
template <typename T>
class container
{
std::recursive_mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
std::lock_guard<std::recursive_mutex> locker(_lock);
_elements.push_back(element);
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
std::lock_guard<std::recursive_mutex> locker(_lock);
add(va_arg(arguments, T));
}
va_end(arguments);
}
void dump()
{
std::lock_guard<std::recursive_mutex> locker(_lock);
for(auto e: _elements)
std::cout << e << std::endl;
}
};
Можно поспорить насчет того, что метод
dump()
должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex'
to ‘std::recursive_mutex &'
Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов
lock()
и unlock()
. Таким образом, аргумент lock_guard
не может быть константой. Решение этой проблемы заключается в том, чтобы сделать мьютекс mutable
, тогда спецификатор const будет игнорироваться и это позволит изменять состояние из константных функций.template <typename T>
class container
{
mutable std::recursive_mutex _lock;
std::vector<T> _elements;
public:
void dump() const
{
std::lock_guard<std::recursive_mutex> locker(_lock);
for(auto e: _elements)
std::cout << e << std::endl;
}
};
Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:
defer_lock
типаdefer_lock_t
: не получать мьютексtry_to_lock
типаtry_to_lock_t
: попытаться получить мьютекс без блокировкиadopt_lock
типаadopt_lock_t
: предполагается, что у вызывающего потока уже есть мьютекс
Объявлены они следующим образом:
struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
Помимо «оберток» для мьютексов,
std
также предоставляет несколько методов для блокировки одного или нескольких мьютексов:- lock: блокирует мьютекс, используя алгоритм избегания deadlock'ов (используя
lock()
,try_lock()
иunlock()
) - try_lock: пытается блокировать мьютексы в порядке, в котором они были указаны
Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция
exchange()
, которая меняет местами два элемента разных контейнеров. Для потокобезопасности, функция синхронизирует доступ к этим контейнерам, получая мьютекс, связанный с каждым контейнером. template <typename T>
class container
{
public:
std::mutex _lock;
std::set<T> _elements;
void add(T element)
{
_elements.insert(element);
}
void remove(T element)
{
_elements.erase(element);
}
};
void exchange(container<int> &c1, container<int> &c2, int value)
{
c1._lock.lock();
std::this_thread::sleep_for(std::chrono::seconds(1)); // симулируем deadlock
c2._lock.lock();
c1.remove(value);
c2.add(value);
c1._lock.unlock();
c2._lock.unlock();
}
Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).
int main()
{
srand((unsigned int)time(NULL));
container<int> cntr1;
cntr1.add(1);
cntr1.add(2);
cntr1.add(3);
container<int> cntr2;
cntr2.add(4);
cntr2.add(5);
cntr2.add(6);
std::thread t1(exchange, std::ref(cntr1), std::ref(cntr2), 3);
std::thread t2(exchange, std::ref(cntr2), std::ref(cntr1), 6);
t1.join();
t2.join();
return 0;
}
Для решения этой проблемы можно использовать
std::lock
, который гарантирует блокировку безопасным (с точки зрения взаимоблокировки) способом:void exchange(container<int> &c1, container<int> &c2, int value)
{
std::lock(c1._lock, c2._lock);
c1.remove(value);
c2.add(value);
c1._lock.unlock();
c2._lock.unlock();
}
Продолжение: условные переменные