Pull to refresh

Неблокируемая очередь сообщений для двух потоков

Algorithms *
Sandbox
Несколько лет назад, при работе над своим небольшим игровым проектом, у меня возникла необходимость реализовать передачу сообщений от одного потока другому. В ходе поисков вариантов решения появилась идея реализовать неблокируемую очередь.

Подробности под катом.

Постановка задачи.
Есть 2 потока — пишущий (в дальнейшем Писатель) и читающий (Читатель). Необходима очередь для передачи сообщений от Писателя к Читателю, не использующая блокировки (мютексы или другие методы блокировки).

После некоторых размышлений получилось такое решение: чтоб потоки не мешали друг другу нужно использовать очередь, состоящую из двух частей. Одна часть используется Писателем, а другая — Читателем. Алгоритм состоит в следующем:
  1. Писатель записывает сообщение в свою часть очереди и проверяет очередь Читателя. Если она пуста, то очередь Писателя передается Читателю, а для Писателя создается новая очередь.
  2. Читатель проверяет свою часть очереди и если она не пуста, то забирает из нее значения.

Реализацию я выкладывал на RSDN (ссылка приведена в конце статьи), но после некоторого обсуждения эта тема затихла. И вот сейчас хотел бы поделиться своей наработкой здесь.

Собственно код:

#define LOCK_FREE_QUEUE2_USE_FLUSH 1

template <class TYPE> class LockFreeQueue2
{
public:
    LockFreeQueue2();
    ~LockFreeQueue2();

    // Добавление данных в очередь
    void Enqueue(TYPE* data);

    // Чтение данных из очереди
    bool Dequeue(volatile TYPE*& data);

#if LOCK_FREE_QUEUE2_USE_FLUSH
    // Сброс очереди Читателю. Используется Писателем
    bool Flush();
#else
    // Установка флага окончания работы Писателя
    void SetWriterFinished();
#endif

private:
    volatile TYPE *readerTop; // Вершина Читательской очереди
    TYPE *writerTop, *writerBottom; // Начало и конец Писательской очереди

#if LOCK_FREE_QUEUE2_USE_FLUSH == 0
    volatile bool isWriterFinished;
#endif
};

template<class TYPE>
LockFreeQueue2<TYPE>::LockFreeQueue2()
{
    readerTop = writerTop = writerBottom = NULL;
#if LOCK_FREE_QUEUE2_USE_FLUSH == 0
    isWriterFinished = false;
#endif
}

template<class TYPE>
LockFreeQueue2<TYPE>::~LockFreeQueue2()
{
}

template<class TYPE>
void LockFreeQueue2<TYPE>::Enqueue(TYPE* data)
{
    data->next = NULL;

    if (writerTop) // Если очередь Писателя уже существует, добавляем елемент в ее конец
    {
        writerBottom->next = data;
        writerBottom = data;
    }
    else // Новая очередь Писателя
    {
        writerTop = writerBottom = data;
    }

    if (!readerTop) // Если у Читателя пусто, сбрасываем ему Писательскую очередь
    {
        readerTop = writerTop;
        writerTop = NULL;
    }
}

template<class TYPE>
bool LockFreeQueue2<TYPE>::Dequeue(volatile TYPE*& data)
{
    if (!readerTop) // Если нечего читать
    {
#if LOCK_FREE_QUEUE2_USE_FLUSH == 0
        if (isWriterFinished && writerTop) // Проверяем Писетельскую очередь, если он закончил свою работу
        {
            readerTop = writerTop;
            writerTop = NULL;
        }
        else
#endif
        {
            return false; // Нечего читать
        }
    }

    // Читательская очередь не пуста - читаем
    data = readerTop;
    readerTop = readerTop->next;

    return true;
}

#if LOCK_FREE_QUEUE2_USE_FLUSH
template<class TYPE>
bool LockFreeQueue2<TYPE>::Flush()
{
    if (!writerTop) return true;

    if (!readerTop) // Проверяем можно ли сбросить Читателю свой остаток
    {
        readerTop = writerTop;
        return true;
    }
    return false;
}
#else
template<class TYPE>
void LockFreeQueue2<TYPE>::SetWriterFinished()
{
    isWriterFinished = true;
}
#endif


Дефайн LOCK_FREE_QUEUE2_USE_FLUSH используется для компиляции двух разних вариантов поведения очереди при окончании работы Писателя. Когда значение равно 1, Писатель должен вызывать метод Flush, пока не скинет свою чать очереди Читателю. Если же значение равно 0, тогда Писатель просто устанавливает значение переменной isWriterFinished в true и останавливается. Читатель же сам заберет в конце оставшуюся часть.
Тут похоже также нужен флаг окончания работы Читателя, чтоб Писатель не ждал бесконечно.

К данной реализации можно добавить еще некоторые фичи. Например, можно добавить счетчик елементов. Он также должен быть отдельным для каждой части очереди, чтоб не происходило одновременного доступа к переменной-счетчику. Сумма этих счетчиков и будет общим количеством элементов в очереди.
Еще можно добавить ивент для Читателя, по которому он будет просыпаться, когда Писатель передает ему свою часть. Но тут не все так просто и возможны проблемы с одновременным доступом.
Данную очередь нельзя применять для ситуации один Писатель и много Читателей, но можно использовать маршрутизатор (диспатчер) и по одной очереди на каждого читателя. Маршрутизатор должен будет определять кому из читателей добавлять в очередь сообщение. В простейшем случае этим критерием может быть минимальное количество елементов в очереди.

Перед публикацией статьи я погуглил немного по данной теме, но не нашел ничего похожего. Возможно я плохо искал, так что в случае, если я изобрел велосипед, или если найдете ошибки в реализации, пишите в комментарии.

Ссылка на обсуждение в RSDN.
Tags:
Hubs:
Total votes 38: ↑26 and ↓12 +14
Views 4.1K
Comments Comments 34