Речь пойдет о достаточно типичном для приложений реального времени шаблоне взаимодействия между поставщиком и потребителем информации. Поставщик (писатель) пописывает, потребитель (читатель) почитывает. Асинхронно по отношению друг к другу, каждый — в своём темпе. Обмен происходит отдельными сообщениями, и, как это часто бывает с информацией, привязанной к текущему времени, читателю интересна только самая свежая информация, то есть только последнее сообщение, отправленное писателем. Если какое-то из предыдущих сообщений пропущено — не страшно, всё равно пропущенные данные непоправимо устарели.

Наиболее эффективная техника, реализующая подобное взаимодействие, известна как тройная буферизация (Triple Buffering). Именно о ней и о её небольших обобщениях пойдет речь в данной статье.

Самый известный пример использования тройной буферизации — разделение процессов формирования изображения и вывода изображения на экран, в этом случае тройной буфер может быть реализован на аппаратном уровне.

Но, конечно, область применения данной техники намного шире. Мы ограничимся чисто «софтварными» применениями тройной буферизации как средства коммуникации между активностями внутри одного процесса. «Активность» в данном случае может быть нитью (thread), но не обязательно. Например, в ядре операционной системы одним из участников коммуникации может быть обработчик прерывания. Это возможно, поскольку тройной буфер реализуется без блокировок, исключительно на атомарных операциях.

Идея тройной буферизации проста. Пока писатель пишет в один буфер, читатель читает другой. А зачем же третий? В третьем хранится самая «свежая» информация, полностью сформированная писателем. Если читатель вдруг захочет её почитать — она всегда доступна, никого ждать не придётся. Никаких копирований данных не происходит, только «атомарное» переключение ролей буферов.

Ниже мы подробнее остановимся на реализации этого механизма, но сначала посмотрим на него в более широком контексте: а что если читателей и писателей много? Как эта техника выглядела бы в самом общем случае? Оставив пока заботу об эффективности, мы легко обнаружим, что концептуально подобное взаимодействие легко реализуется с помощью атомарного разделяемого указателя (atomic<shared_ptr<Message> >):

#include <array>
#include <atomic>
#include <memory>
#include <chrono>
#include <thread>
#include <iostream>

// Пусть класс, представляющий сообщение, называется Message:
struct Message{
    enum{LENGTH = 100}; // Максимальная длина сообщения
    std::array<char, LENGTH> text{0};  // Текст сообщения
};

using MessagePtr = std::shared_ptr<Message>;

// Канал общения поставщика и потребителя:
std::atomic<MessagePtr> channel{nullptr};

std::atomic<bool> shutdown_flag{false}; // (когда кончать)

inline void
writer(){ // (отдельная нить)
    unsigned count = 0;

    while(!shutdown_flag.load()){
        // Создаем сообщение:
        MessagePtr msg = MessagePtr(new Message{});

        // Формируем содержание:
        snprintf(
                msg->text.data(), Message::LENGTH,
                "Count: %u", ++count
        );

        // Отправляем...
        channel.store(MessagePtr(msg));

        std::this_thread::sleep_for(std::chrono::milliseconds(600));
    }
}

inline void
reader(){ // (другая нить)
    while(!shutdown_flag.load()){
        MessagePtr msg = channel.load();
        if(msg == nullptr){
            std::cout << "\n no data available\n";
        }else{
            // Читаем сообщение:
            std::cout << "\nMessage received: "
                    <<  msg->text.data() << "\n";
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(900));
    }
}

inline void
triple_buf_example1(){
    std::cout << "\n\nreader-writer interaction using atomic<shared_ptr>:\n";
    shutdown_flag.store(false);
    std::thread t2(reader);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::thread t1(writer);

    std::this_thread::sleep_for(std::chrono::milliseconds(7400));

    shutdown_flag.store(true);
    t1.join();
    t2.join();
}

В этом примере один читатель и один писатель, но понятно, что и тех и других можно понаделать сколько угодно, и всё будет великолепно работать. Замечательная особенность атомарного разделяемого указателя (доступен, начиная со стандарта C++20) состоит в том, что операции загрузки и выгрузки (и все прочие операции, в том числе CAS, то есть сравнение с обменом) соответствующим образом атомарно модифицируют счетчики использования объектов, на которые смотрят участвующие в операции «умные» указатели.

В нашем случае объект-сообщение исчезает, как только исчезает последняя ссылка на него — либо со стороны одного из участников взаимодействия, либо со стороны самого атомарного указателя, когда в него записывают ссылку на следующий, новый объект.

Таким образом, в теории с атомарным разделяемым указателем всё замечательно. Проблема — сложность его lock-free реализации, так что на некоторых платформах atomic<shared_ptr<Message> > может оказаться реализованным с блокировкой. Ну и, конечно, создание объектов сообщений из кучи (оператором new) не добавляет быстродействия, предсказуемости по времени и, по крайней мере, теоретически, чревато фрагментацией памяти. Но для нас сейчас атомарный разделяемый указатель — скорее концептуальная модель, так что обозначенные выше проблемы нас не сильно волнуют.

Теперь оценим, сколько же объектов-сообщений одновременно может быть в системе с M писателями и N читателями. Максимально M объектов удерживают писатели, неустанно в них что-то записывая. Максимально N объектов удерживают читатели (хотя иногда двое могут читать одно и то же сообщение, но мы рассматриваем самый «прожорливый» случай, когда каждый читатель занят своим отдельным экземпляром). И, наконец, один объект обычно удерживает сам атомарный разделяемый указатель. Опять же, это может быть одно из сообщений, на которое уже положили глаз и читают, но мы рассматриваем самый «расточительный» по памяти случай, когда сообщение, удерживаемое атомарным указателем, ещё только ждёт своего читателя.

Мы, однако, предполагаем, что участники обмена достаточно экономны и не плодят лишних экземпляров разделяемых указателей на ранее использованные ими сообщения, то есть каждый участник всегда довольствуется только одним указателем.

В результате получается, что в каждый момент времени у нас имеется максимально M + N + 1 «живых» объектов-сообщений в системе. При одном читателе и одном писателе получается 1 + 1 + 1 = 3 объекта-сообщения. Вот вам и тройной буфер. Это неудивительно, поскольку тройной буфер — это и есть сильно оптимизированная редукция общего случая к варианту с единственным писателем и единственным читателем.

Теперь, переходя от концепции к практике, займёмся этой самой оптимизированной редукцией для случая, когда читателей и писателей не слишком много (но не обязательно по одному). Разумеется, первое, что надо сделать на этом пути — вместо динамического распределения памяти заготовить заранее пул из K = M + N + 1 объектов-сообщений (или хотя бы отвести память под них, если не хочется конструировать сообщения в пуле заранее).

А второе (моделируя работу shared_ptr!) — завести по счетчику использования на каждое сообщение из пула. И еще — завести место под номер (индекс) сообщения в пуле, которое содержит «самую свежую» информацию, моделируя тот самый экземпляр atomic<shared_ptr<Message> >, который в нашем примере выполнял роль коммуникационного канала между писателями и читателями.

И, наконец, если участников не очень много — запаковать все эти счетчики использования вкупе с индексом «самого свежего» сообщения в один-единственный интегральный атомарный тип. Тогда все нужные операции доступа, как со стороны писателя, так и со стороны читателя, можно реализовать простейшим CAS-циклом над этим интегральным атомиком. Но понятно, что количество буферов (и, соответственно, количество участников взаимодействия) будет ограничено вместимостью упомянутого интегрального типа. Так, 32-битный атомик позволяет завести семь буферов (шесть участников). 64-битный атомик позволяет иметь до 15 буферов (14 участников обмена). Это в известной мне реализации, может быть, можно изощриться в кодировке и добавить чуть-чуть еще, но принципиально больше «втиснуть» не получится.

В минимальном варианте набор операций над нашим оптимизированным коммуникационным каналом сводится к следующему.

Со стороны писателя:

(1) поиск и получение свободного сообщения из пула для формирования новой записи (то есть поиск сообщения с нулевым счетчиком использования) и (2) коммит записанного.

Со стороны читателя:

(1) начало чтения и (2) конец чтения.

Все стартовые операции (под номером 1) приводят к инкременту счетчика использования соответствующего сообщения, а читательская операция под номером (2) приводит к декременту.

Писательская же операция под номером (2), то есть коммит, немного сложнее. Она
А) заменяет индекс «самого свежего» сообщения на индекс сообщения, только что сформированного писателем,
Б) декрементирует счетчик использования «старого» (только что замененного) сообщения.

Счетчик использования нового сообщения писателем при коммите не изменяется, поскольку, с одной стороны, писатель его «отпускает», но с другой стороны — то же сообщение «принимает» коммуникационный канал.

Если мы не конструируем объекты сообщений в пуле заранее, то писательская операция (1) должна вызывать конструктор (точнее, placement new, ибо прямой вызов конструктора в C++ невозможен), а декремент счетчика использования (неважно откуда вызванный) при обнулении счетчика должен приводить к деструкции сообщения. Другой вариант реализации: писатель сам деструктирует содержимое нового сообщения, только что полученного из пула (предположим, что все сообщения в пуле изначально сконструированы), и на его месте конструирует новое. Подробнее на этих (чисто «плюсовых») технических деталях я останавливаться не буду.

Думаю, что для любителей побаловаться на досуге lock-free алгоритмами реализация описанных выше операций будет несложным упражнением. Для тех же, кому самостоятельно упражняться не хочется, могу предложить реализацию со своего гитхаба (https://github.com/hmvyp/ntuplebuf/). Признаюсь, код не блещет изяществом, но вполне рабочий, более или менее оттестированный и используется в реальных проектах.

Нужно заметить, что функциональность этого кода несколько выходит за рамки озвученного в начале статьи шаблона взаимодействия, ибо предусмотрен еще один режим работы, который я назвал транзакцией. Транзакция позволяет писателю «накапливать» данные до тех пор, пока читатель не извлечет накопленое. Под «накоплением» подразумевается формирование следующего сообщения на основе информации из предыдущего (то есть сами сообщения как таковые не «копятся», просто новое сообщение может как-то учитывать содержимое предыдущего). Со стороны читателя происходит «разрушающее» чтение, то есть с опустошением данных в канале. Транзакцию я рассматривал только при двух участниках, одном писателе и одном читателе, придавать смысл другим конфигурациям я не пытался (хотя не исключено, что смысл этот можно найти и придать).

Транзакция может оказаться неуспешной для писателя — если во время подготовки нового сообщения читатель забрал старое. Тогда писателю имеет смысл просто повторить попытку, начав новое сообщение «с чистого листа», забыв о прошлом. Вторая попытка (после неудачи) будет всегда успешна.

При использовании транзакций количество необходимых буферов остается равным магической тройке. Хотя на первый взгляд кажется, что потребуется лишний буфер, раз писатель в транзакции выступает одновременно и в роли собственно писателя (нового сообщения), и в роли читателя (предыдущего сообщения, на основании которого он формирует новое). Однако предыдущее сообщение не может отличаться от сообщения в коммуникационном канале (если оно там еще осталось), ибо что-либо иное в коммуникационный канал просто некому записать, кроме самого писателя (читатель может лишь опустошить канал, ничего не записывая). Так что лишний буфер не нужен.

Опять же, повторюсь, всё это уже выходит за рамки рассматриваемого в статье шаблона взаимодействия, в котором «устаревшие» данные неинтересны читателю, а действия читателя неинтересны писателю (при транзакциях интересы участников переплетены сильнее).

Так что оставим транзакции в покое и вернёмся к исходной постановке задачи. Наш пример с одним писателем и одним читателем с использованием библиотеки ntuplebuf будет выглядеть примерно так:

#include "ntuplebuf/ntuplebuf_dyn.hpp"

// Канал общения поставщика и потребителя:
ntuplebuf::NTupleBufferDynAllocTyped<
    uint32_t,   // тип атомарного управляющего кода
    3,          // количество буферов (не более 7 для 32-разрядного атомика)
    Message     // тип сообщения
> channel3b;


inline void
writer3b(){ // (отдельная нить)
    unsigned count = 0;

    Message* msg = nullptr; // в начале - пусто

    while(!shutdown_flag.load()){
        // Создаем сообщение:
        channel3b.start_writing(&msg);

        if(msg) {
            // Формируем содержание:
            snprintf(
                    msg->text.data(), Message::LENGTH,
                    "Count: %u", ++count
            );

            // Отправляем...
            channel3b.commit(&msg);
        }else{
            // значит, в программе баг:
            // недостаточное количество буферов и т.п.
            std::cout << "\n A bug encountered!!!";
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(600));
    }
}

inline void
reader3b(){ // (другая нить)
    Message* msg = nullptr; // в начале - пусто

    while(!shutdown_flag.load()){
        channel3b.start_reading(&msg);

        if(msg == nullptr){
            std::cout << "\n no data available\n";
        }else{
            // Читаем сообщение:
            std::cout << "\nMessage received: "
                    <<  msg->text.data() << "\n";
        }

        channel3b.free(&msg);
            // free() освобождает буфер и обнуляет указатель msg
            // на сообщение. Вызов free() опционален, если что,
            // буфер освободит следующий start_reading()

        std::this_thread::sleep_for(std::chrono::milliseconds(900));
    }
}

inline void
triple_buf_example2(){
    std::cout << "\n\nreader-writer interaction using ntuplebuf:\n";
    shutdown_flag.store(false);

    std::thread t2(reader3b);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::thread t1(writer3b);

    std::this_thread::sleep_for(std::chrono::milliseconds(7400));

    shutdown_flag.store(true);
    t1.join();
    t2.join();
}

Класс Message должен иметь конструктор по умолчанию и не должен предъявлять каких-либо необычных требований к выравниванию (то есть более сильных, чем std::max_align_t). Память под объекты-сообщения при конструировании канала инициализируется нулями, то есть мусора там не будет, даже если умолчательный конструктор сообщения тривиален (ничего не делает).

Все объекты-сообщения умолчательно конструируются при инициализации коммуникационного канала («тройного» или «N-рного» буфера) и окончательно деструктируются деструктором канала. При запросе писателем нового «чистого» сообщения старое содержание сообщения деструктируется, и объект-сообщение заново умолчательно конструируется «на месте».

Я не уверен, что описанный порядок конструирования-деструктирования оптимален для всех случаев жизни, может быть, иногда захочется изначально зарезервировать неинициализированную память, а конструировать и деструктировать сообщения уже во время соответствующих операций, но это несколько сложнее в реализации (в части определения момента деструкции и её синхронизации, в текущем дизайне конструирование и деструктирование совершает только писатель).

В крайнем случае — код проекта открыт (public domain), и никто не мешает, например, надстраивать собственные обертки вокруг управляющей структуры, которая обеспечивает всю «атомарную» бухгалтерию и абсолютно индифферентна к содержанию и расположению в памяти объектов-сообщений (в упомянутом проекте на гитхабе эта бухгалтерия сосредоточена в файле ntuplebuf/ntuplebuf.hpp)

Ну и, конечно, никто не мешает поискать другие реализации (на том же гитхабе их по крайней мере несколько) или вообще всё реализовать самостоятельно. Если данная статья послужит импульсом к такому решению, то она, как мне кажется, уже выполнит своё предназначение.