Представьте, что вы едете в ночном поезде. Чтобы гарантированно выйти на нужной станции, придется не спать всю ночь и внимательно отслеживать остановки. Свою станцию вы не пропустите, но сойдете с поезда уставшим. Другой способ: узнать из расписания предполагаемое время прибытия поезда, поставить будильник на нужное время с небольшим запасом и лечь спать. Этого вполне достаточно, чтобы не пропустить свою станцию, но, если поезд задержится, пробуждение окажется слишком ранним. Также есть вероятность не услышать будильник и в итоге проспать остановку. Идеальным решением было бы лечь спать, положившись на то, что кто-нибудь или что-нибудь разбудит вас незадолго до реального прибытия поезда на нужную станцию.
Какое отношение этот пример имеет к работе с потоками в программировании? Дело в том, что решить задачу синхронизации конкурентных операций (как в примере выше — хочется поспать, но не проспать остановку) можно также несколькими способами. Если какой-то поток ожидает, пока другой поток завершит выполнение своей задачи, есть несколько вариантов развития событий. Меня зовут Александр, я разработчик на С++ в дивизионе Телеком YADRO, в этой статье я разберу несколько вариантов эффективной организации ожидания потоков, сравнив «академический» подход из книги «C++ Concurrency in Action» с реализацией в продакшене.
Итак, сначала рассмотрим основные способы ожидания события одним потоком от другого.
Вариант 1. Первый поток может постоянно проверять состояние флага в совместно используемых данных, защищенных мьютексом, а второй поток будет обязан установить флаг по завершении своей задачи. В итоге, постоянно проверяя состояние флага, первый поток впустую тратит ценное процессорное время, а когда мьютекс заблокирован ожидающим потоком, его нельзя заблокировать никаким другим потоком.
Это как не спать всю ночь, разговаривая с машинистом поезда: ему приходится вести поезд медленнее, поскольку вы его постоянно отвлекаете, поэтому ваш путь удлиняется. Аналогично этому ожидающий поток потребляет ресурсы, которые могли бы использовать другие потоки, имеющиеся в системе, и ожидание может неоправданно затянуться.
Вариант 2. Введение ожидающего потока в спящий режим на короткий промежуток времени между проверками с помощью функции std::this_thread::sleep_for(). Однако и этот подход не лишен недостатков: поток просыпается вхолостую, выполняя избыточные проверки. А если интервал сна выбран слишком большим, реакция на событие становится медленной и непредсказуемой.
Как же организовать ожидание эффективно, без потери ресурсов и задержек? Ответ кроется в механизмах синхронизации, позволяющих потокам засыпать и просыпаться строго по сигналу. Чтобы понять, как это работает на практике, перейдем к классической задаче многопоточного программирования.
Постановка задачи многопоточного программирования
Классическая абстрактная модель, идеально иллюстрирующая необходимость ожидания событий, — это задача «Производитель — Потребитель» (Producer-Consumer). В основе лежит взаимодействие двух или более потоков, разделяющих общий буфер данных. Поток-производитель генерирует элементы (задачи, пакеты данных, сообщения) и помещает их в буфер. Поток-потребитель, в свою очередь, извлекает элементы из буфера и обрабатывает их.
На первый взгляд, задача тривиальна, однако в многопоточной среде она порождает несколько критических требований:
Потокобезопасность. Доступ к общему буферу должен быть строго синхронизирован, чтобы избежать гонок данных (race conditions) и нарушения внутренней структуры контейнера.
Согласование темпов. Производитель может работать быстрее потребителя (риск переполнения буфера) или медленнее (риск опустошения буфера). Поток-потребитель не должен тратить процессорное время на проверку пустого буфера, а производитель — на ожидание свободного места.
Эффективное ожидание. Как и в примере с поездом, потребитель не должен «спрашивать» буфер постоянно. Ему нужно перейти в режим ожидания и быть разбуженным ровно в тот момент, когда в буфере появится новый элемент.
Именно эта задача станет полигоном для сравнения подходов к синхронизации конкурентных операций. В следующем разделе мы рассмотрим эталонное решение из книги, а затем перейдем к production-пригодному варианту, использующему механизмы условных переменных (std::condition_variable) и стандартных контейнеров C++.
Решение задачи с помощью примитива std::condition_variable
Как же воспользоваться std::condition_variable, чтобы справиться с классической задачей «Производитель — Потребитель»? Как позволить потоку, ожидающему результатов работы, находиться в спячке, пока идет обработка данных? Вот один из способов решения с помощью условной переменной.
Если вам хорошо знаком этот способ решения задачи синхронизации конкурентных операций, то можете сразу перейти к моему production-варианту.
Пример кода, который мы будем рассматривать в тексте, я взял из книги Энтони Уильямса «C++ Concurrency in Action». Автор также описал, как разработать обобщенную очередь (глава 4, подглава 4.1), но мне вариант из книги кажется не очень применимым в реальном проекте. Почему? Подробно описал в конце статьи. А пока посмотрим на этот пример.
Ожидание завершения обработки данных с помощью std::condition_variable:
std::mutex mut; std::queue<data_chunk> data_queue; std::condition_variable data_cond; void data_preparation_thread() { while(more_data_to_prepare()) { data_chunk const data=prepare_data(); { std::lock_guard<std::mutex> lk(mut); data_queue.push(data); } data_cond.notify_one(); } } void data_processing_thread() { while(true) { std::unique_lock<std::mutex> lk(mut); data_cond.wait( lk,[]{return !data_queue.empty();}); data_chunk data=data_queue.front(); data_queue.pop(); lk.unlock(); process(data); if(is_last_chunk(data)) break; } }
Прежде всего здесь есть очередь data_queue, используемая для передачи данных между потоками. Когда данные готовы, поток, готовящий данные, блокирует мьютекс, защищающий очередь, с помощью std::lock_guard и помещает данные в очередь. Затем он вызывает в отношении экземпляра std::condition_variable компонентную функцию notify_one() для оповещения ожидающего потока, если он есть.
Обратите внимание: код для помещения данных в очередь находится в более узкой области видимости, поэтому условную переменную уведомляют после разблокировки мьютекса. Так, при немедленном пробуждении ожидающего потока ему не придется снова быть заблокированным, ожидая разблокировки мьютекса.
По другую сторону существует обрабатывающий поток. Первым делом он блокирует мьютекс, но теперь уже с помощью std::unique_lock, а не std::lock_guard. Почему — скоро объясню. Затем поток вызывает в отношении std::condition_variable функцию wait(), передавая ей объект-блокировку и лямбда-функцию, которая выражает ожидаемое условие.
Затем реализация функции wait() проверяет условие, вызывая предоставленную лямбда-функцию, и возвращает управление, если условие соблюдено (лямбда-функция вернула true). Если условие не соблюдено (лямбда-функция вернула false), функция wait() снимает блокировку с мьютекса и вводит поток в состояние заблокированности, или ожидания.
Когда условная переменная уведомлена путем вызова функции notify_one() из потока, занимающегося подготовкой данных, поток пробуждается (разблокируется), повторно получает блокировку мьютекса и снова проверяет условие, возвращаясь из wait() со все еще заблокированным мьютексом, если условие выполнено. Если условие не выполнено, поток снимает блокировку с мьютекса и возобновляет ожидание.
Именно поэтому нужен std::unique_lock, а не std::lock_guard. Ожидающий поток должен разблокировать мьютекс во время ожидания и после него заблокировать его снова, а std::lock_guard этой гибкости не обеспечивает. Если мьютекс остается заблокированным на период спячки потока, поток подготовки данных не сможет заблокировать мьютекс, чтобы добавить элемент в очередь, и ожидающий поток никогда не сможет увидеть его условие выполненным.
В ходе вызова wait() условная переменная может проверять предоставленное условие любое количество раз. Но всегда делает это с заблокированным мьютексом и немедленно вернет управление, если и только если предоставленная для тестирования условия функция вернет значение true.
Когда ожидающий поток повторно получает блокировку мьютекса и проверяет условие не в ответ на извещение от другого потока, это действие называется ложным пробуждением (spurious wake). Поскольку количество и частота любых ложных пробуждений не регламентируются по определению, не рекомендую задействовать для проверки условия функцию с побочными эффектами. Иначе можно спровоцировать неоднократное возникновение побочных эффектов.

По сути, применение std::condition_variable::wait, по сравнению с использованием схемы «занят — ожидайте», является оптимизацией. И действительно, соответствующий, но все еще далекий от идеала метод реализации представляет собой простой цикл:
template <typename Predicate> void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred) { while(!pred()) { lk.unlock(); lk.lock(); } }
Ваш код должен быть готов к работе с подобной минимальной реализацией функции wait() наравне с реализацией, пробуждающейся только с вызовом notify_one() или notify_all().
Использование очереди для передачи данных между потоками, как в первом примере решения, — широко распространенный сценарий. При качественной проработке проекта синхронизация может ограничиться самой очередью, а это существенно сокращает возможное количество проблем синхронизации и состояний гонки. Поэтому давайте поработаем над извлечением обобщенной потокобезопасной очереди из первого примера.
Итак, извлечем push() и wait_and_pop() из нашего первого, наивного решения задачи:
#include <queue> #include <mutex> #include <condition_variable> template<typename T> class threadsafe_queue { private: std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public: void push(T new_value) { { std::lock_guard<std::mutex> lk(mut); data_queue.push(std::move(new_value)); } data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty();}); value = data_queue.front(); data_queue.pop(); } };
А теперь — посмотрим, как изменилась программа, представленная в самом начале, после применения очереди, которая содержит синхронизацию и условную переменную:
threadsafe_queue<data_chunk> data_queue; void data_preparation_thread() { while(more_data_to_prepare()) { data_chunk const data = prepare_data(); data_queue.push(data); } } void data_processing_thread() { while(true) { data_chunk data; data_queue.wait_and_pop(data); process(data); if(is_last_chunk(data)) break; } }
Теперь мьютекс и условная переменная содержатся в экземпляре threadsafe_queue, поэтому отдельные переменные уже не требуются, а для вызова push() не нужна внешняя синхронизация. Кроме того, за ожидание условной переменной отвечает функция wait_and_pop(). Теперь не составляет труда создать еще одно переопределение функции wait_and_pop(), а остальные функции можно легко дополнить.
Посмотрим на финальную реализацию очереди (также ссылка на полную очередь из книги — подглава 4.1.2, листинг 4.5).
Полное определение класса потокобезопасной очереди
#include <queue> #include <memory> #include <mutex> #include <condition_variable> template<typename T> class threadsafe_queue { private: mutable std::mutex mut; std::queue<T> data_queue; std::condition_variable data_cond; public: threadsafe_queue() { } threadsafe_queue(threadsafe_queue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue = other.data_queue; } void push(T new_value) { { std::lock_guard<std::mutex> lk(mut); data_queue.push(std::move(new_value)); } data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty();}); value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty();}); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if(data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } };
Несмотря на то, что empty() — const-метод, а параметр копирующего конструктора является const-ссылкой, у других потоков могут быть не-const-ссылки на объект. И эти другие потоки способны вызывать изменяющие компонентные функции, поэтому потребность в блокировке мьютекса сохраняется. Поскольку блокировка мьютекса — изменяющая операция, объект мьютекса должен быть помечен как изменяемый, mutable, и его можно заблокировать в функции empty() и в копирующем конструкторе.
Опыт применения решения из книги
Изучив учебный пример, мне захотелось воспользоваться этим инструментом в реальном проекте. Опишу задачу, которая стояла передо мной.
Контекст. При реализации проекта требовалось передавать файлы между виртуальными машинами (способ передачи и средства не важны, важна архитектура).
Проблема. Главный поток, обрабатывающий клиентские запросы, был вынужден тратить время на обработку файлов и их распределение по нескольким ВМ. Это блокировало возможность принимать новые запросы от клиентов.
Решение. Перенести логику обработки и отправки файлов в отдельный поток, а клиента оповещать о прогрессе загрузки файлов с помощью PUB/SUB-сообщений.
При более подробном изучении вопроса и попытке использовать пример на практике я выявил несколько моментов, которые не раскрыты в учебном примере из книги.
Важно корректно завершить цикл
Учебный пример предполагает бесконечный цикл обработки:
void data_processing_thread() { while(true) // Как корректно выйти из цикла? { data_chunk data; data_queue.wait_and_pop(data); process(data); if(is_last_chunk(data)) break; // Но что если «последний элемент» не дошел? } }
Для исправления нужно добавить механизм сигнала завершения (shutdown flag) и модифицировать очередь так, чтобы wait_and_pop() мог разблокироваться не только при появлении данных, но и при получении команды завершения.
Стоит добиваться универсальности через шаблоны
Например, template <typename ItemType> class SafeQueue.
Благодаря шаблонам мы получаем:
Универсальность. Один и тот же класс работает с любыми типами данных — от простых
intдо сложных структур с файлами, сообщениями или контекстом обработки.Типобезопасность. Компилятор проверяет типы на этапе компиляции, исключая ошибки приведения типов в рантайме.
Удобство использования. Не нужно писать обертки или кастовать типы.
// Работает с любым типом «из коробки» SafeQueue<std::string> stringQueue; SafeQueue<FileTransferTask> taskQueue; SafeQueue<protobuf::Message> messageQueue;
От учебного примера к production-коду: анализ реализации SafeQueue
Представленная ниже реализация SafeQueue — это эволюция учебного примера из книги в инструмент, готовый к использованию в реальном проекте.
Потокобезопасная очередь с условными переменными:
#pragma once #include <atomic> #include <chrono> #include <condition_variable> #include <mutex> #include <queue> /** * @brief SafeQueue class */ template <typename ItemType> class SafeQueue { private: std::queue<ItemType> itemsQueue; std::mutex itemsQueueMutex; std::condition_variable itemIsAvailableCv; bool forceExitFromWaiting; public: SafeQueue() : forceExitFromWaiting(false){}; ~SafeQueue() = default; bool isEmpty() { std::lock_guard<std::mutex> itemsQueueLock(itemsQueueMutex); return itemsQueue.empty(); } std::size_t size() { std::lock_guard<std::mutex> itemsQueueLock(itemsQueueMutex); return itemsQueue.size(); } void clear() { std::lock_guard<std::mutex> itemsQueueLock(itemsQueueMutex); while (!itemsQueue.empty()) { itemsQueue.pop(); } } void push(const ItemType &item) { { std::lock_guard<std::mutex> itemsQueueLock(itemsQueueMutex); itemsQueue.push(item); } itemIsAvailableCv.notify_one(); } bool waitWithTimeoutAndPop(ItemType &poppedItem, uint32_t timeoutMs = 0) { std::unique_lock<std::mutex> itemsQueueLock(itemsQueueMutex); itemIsAvailableCv.wait_for(itemsQueueLock, std::chrono::milliseconds(timeoutImMs), [&]() { return !itemsQueue.empty() || forceExitFromWaiting; }); if (forceExitFromWaiting) return false; popedItem = itemsQueue.front(); itemsQueue.pop(); return true; } bool waitUntilDataIsAvailableAndPop(ItemType &popedItem) { std::unique_lock<std::mutex> itemsQueueLock(itemsQueueMutex); itemIsAvailableCv.wait(itemsQueueLock, [&]() { return !itemsQueue.empty() || forceExitFromWaiting; }); if (forceExitFromWaiting) return false; popedItem = std::move(itemsQueue.front()); itemsQueue.pop(); return true; } void releaseThreadsWaitingItems() { { std::lock_guard<std::mutex> itemsQueueLock(itemsQueueMutex); forceExitFromWaiting = true; } itemIsAvailableCv.notify_all(); } }; // class SafeQueue
Ключевые отличия от учебного примера
Аспект | Пример из книги | Реализация SafeQueue |
Завершение работы | Отсутствует механизм остановки |
|
Таймауты ожидания | Только бесконечное |
|
Варианты | Две перегрузки: | Одна версия с выходным параметром |
Вспомогательные методы | Только |
|
Семантика копирования | Есть копирующий конструктор | Отсутствует (упрощение, предотвращение скрытых копий) |
Универсальность | Конкретный тип | Шаблонный класс
|
Почему я убрал дублирующие определения методов?
В учебном примере для каждой операции извлечения предоставлены две версии:
// Версия 1: значение через ссылку bool try_pop(T& value); // Версия 2: возврат через shared_ptr std::shared_ptr<T> try_pop();
В production-коде оставлена только первая версия. Вот почему:
Так мы избегаем ненужных аллокаций.
std::shared_ptrтребует динамического выделения памяти при создании. Для высоконагруженных систем, где очередь обрабатывает тысячи элементов в секунду, это создает избыточную нагрузку на аллокатор и фрагментирует память.Получаем гибкость на стороне вызывающего кода. Если потребителю действительно нужна семантика владения через
shared_ptr, он может реализовать функционал самостоятельно:
MyType item; if (queue.waitUntilDataIsAvailableAndPop(item)) { auto ptr = std::make_shared<MyType>(std::move(item)); // используем ptr... }
Такой подход не добавляет накладных расходов тем, кому достаточно работы со значением.
Преимущества production-решения
Внедрен флаг forceExitFromWaiting;:
Грейсфул-шатдаун: метод
releaseThreadsWaitingItems()устанавливает флаг и будит потоки, позволяя им корректно завершиться.Защита от вечного ожидания: даже если производитель завершил работу, потребители не зависнут в
wait().
Добавлена поддержка таймаутов: bool waitWithTimeoutAndPop(ItemType &popedItem, uint32_t timeoutImMs = 0);:
Позволяет реализовать дедлайны и retry-логику.
Значение 0 по умолчанию сохраняет поведение «ждать бесконечно» для обратной совместимости.
Использует стандартный
std::chrono::milliseconds— безопасно и читаемо.
Явные и самодокументируемые имена методов.
Метод | Что делает | Почему это лучше |
waitUntilDataIsAvailableAndPop | Блокирует поток до появления данных | Сразу ясно, что это блокирующая операция |
waitWithTimeoutAndPop | Ждет не дольше указанного времени | Явный контракт: «Могу вернуть таймаут» |
releaseThreadsWaitingItems | Сигнал завершения для ожидающих потоков | Понятно без чтения реализации |
Есть потокобезопасные вспомогательные методы:
bool isEmpty(); std::size_t size(); void clear();
Нет копирующего конструктора. Очередь нельзя случайно скопировать, что предотвращает тонкие баги с разделяемым состоянием.
Нет перегрузок «на все случаи жизни», а значит, интерфейс остается узким и сфокусированным на основной задаче.
Вывод
На мой взгляд, реализация SafeQueue демонстрирует зрелый подход к переносу академического примера в реальную разработку. В итоге мы получаем:
Безопасность: атомарные операции, инкапсуляция синхронизации.
Производительность: минимум аллокаций, lock-free-флаг, узкий интерфейс.
Удобство использования: явные имена, таймауты, вспомогательные методы
Расширяемость: наличие точек роста для будущих улучшений
Это именно тот тип кода, который хочется видеть в основе многопоточных систем: предсказуемый, тестируемый и готовый к нагрузкам.
А какие изменения в «академический» код из книжки внесли бы вы? Или у вас есть корректировки к production-реализации? Делитесь своими лайфхаками в организации многопоточности на С++ в комментариях.