Задача
В ходе разработки интерактивного приложения у меня возникла необходимость передавать данные из одного потока в другой, при этом избегая каких либо задержек как в передающем так и в принимающем потоке. Данные должны передаваться один-за-другим, т.е. в виде очереди.
Я нашел два существующих решения используемых для безопасной передачи данных из одного потока в другой – использование взаимоисключающих объектов (MUTually EXclusive objects) и паттерна блокировок: Read-Write Lock Pattern. Однако данные решения не подходили для решения моей задачи. MUTEX объекты в один момент времени могут быть использованы только одним потоком, таким образом, если один поток использует MUTEX объект, другой поток должен ждать. Второй вариант имеет аналогичный недостаток – если идет чтение, то поток производящий запись должен ждать снятия блокировки, и наоборот – если идет запись, то поток производящий чтение ожидает окончание записи.
Поскольку существующие решения не подходили, пришлось разрабатывать решение, которое бы помогло в данном случае. Исходя из задачи требования к решению были следующими:
- Используемая структура данных – очередь.
- Решение должно поддерживать один поток производящий запись данных и один поток считывающий данные.
- Решение должно позволять записывать и считывать данные одновременно.
Решение
Решение основывалось на очереди построенной на основе однонаправленного связанного списка, в которую был добавлен еще один, постоянно присутствующий в ней элемент, т.е. очередь никогда не становилась пустой. Критерием того, что в очереди нет элементов являлся факт того, что указатели на голову и на хвост очереди показывали на один и тот же элемент, т.е. были равны:
При записи сначала создается новый элемент и только потом переносится указатель на хвост очереди. Благодоря тому, что указатель на хвост переносится в последний момент, когда данные в очереди уже есть избегается возможность конфликта записи и чтения:
При чтении сначала переносится указатель на голову очереди, затем удаляется пустой элемент очереди, и наконец производится чтение данных из очереди. При этом прочтенные данные удаляются, остается только «упаковка», которая становится новым пустым элементом:
В итоге чтение и запись полностью изолированны друг от друга, т.е. два потока могут читать и писать одновременнно, никак не мешая друг-другу. Конфликтов удается избегать за счет постоянного наличия хотя бы одного элемента в очереди.
Реализация
Элемент очереди был реализован в виде шаблона следующим образом:
template <class E>
class QueueItem
{
public:
E* data;
QueueItem* next;
QueueItem(E* data);
};
template <class E>
QueueItem<E>::QueueItem(E* data)
{
this->data = data;
next = NULL;
}
При создании очереди сразу же создается пустой элемент:
template <class T>
Queue<T>::Queue()
{
QueueItem<T>* stub = new QueueItem<T>(NULL);
head = stub;
tail = stub;
}
Проверка на пустоту очереди производится простым сравнением указателей на голову и на хвост очереди:
template <class T>
bool Queue<T>::empty()
{
return head == tail;
}
При записи нового элемента сначала элемент записывается в список и только затем переносится указатель на хвост очереди:
template <class T>
void Queue<T>::enqueue(T* value)
{
QueueItem<T>* item = new QueueItem<T>(value);
item->data = value;
item->next = NULL;
tail->next = item;
tail = item;
}
Чтение производится стандартно:
template <class T>
T* Queue<T>::dequeue()
{
if (head == tail)
return NULL; // queue is empty
QueueItem<T>* tmp = head;
head = head->next;
delete tmp;
return head->data;
}
Важно отметить что ответственность по возвращению памяти, использованной для передачи данных лежит на потоке-потребителе.
Итог
Данное решение является примером реализации потокобезопасной очереди, который может быть применен в некоторых частных случаях, а именно когда:
- Сущность задачи предполагает использование очереди.
- Необходимо избежать задержек при записи и чтении из очереди (например в компьютерных играх или других интерактивных программах).
- Есть только один поток передающий данные и только один поток принимающий данные. Данное ограничение может быть расширенно на случай несколький передающих и одного принимающего потока будет создания отдельной очереди для каждого передающего потока.