Хабр Курсы для всех
РЕКЛАМА
Практикум, Хекслет, SkyPro, авторские курсы — собрали всех и попросили скидки. Осталось выбрать!

scoped_lock lock(map);
auto v = map[key];
// Работаем с v...
access_lock() у контейнера или gc?flip_and_wait()?A входит в критическую секцию, то есть после выхода из access_lock(), он находит в контейнере указатель на объект E, сохраняет этот указатель локально и начинает работать с E.B удаляет из контейнера объект E и вызывает synchronize(), чтобы дождаться когда все потоки выйдут из своих критических секций, а после этого удалить E. Но сразу же выходит из synchronize(), так как идентификатор grace-периода у A равен 0, и текущий идентификатор grace-периода равен 0, то есть считается, что A находится в критической секции, которая началась уже после вызова synchronize(), что не так. Поэтому поток B удаляет E, а после этого в A происходит обращение к не выделенному участку памяти.flip_and_wait()?synchronize() один раз вызывать flip_and_wait(), где для каждого потока ждать завершения критической секции, в которой он находится в данный момент? Вроде бы при двойном flip_and_wait() так и происходит: сначала ждём завершения критических секций начатых в grace-периоде 0, а потом в grace-периоде 1, то есть всех критических секций.rcu.retire_ptr(). А rcu.retire_ptr() приводит (или может приводить в случае буферизованного rcu) к rcu.synchronize(), который ожидает завершения текущего grace-периода, то есть снятия блокировки. Получим deadlock. get() — поиск элемента по ключу, и extract() — удаление по ключу, оба возвращают указатель на найденный элемент. В RCU-специализациях таких контейнеров придется блокировать RCU перед вызовом явно в пользовательском коде, — именно то, что вы написали.flip_and_wait() приводит к проблеме. Отсюда же вытекает необходимость идентификатора grace-периода в один бит.flip_and_wait() в synchronize().access_lock(). В этом методе они успели прочитать текущее значение g_nGlobalCtl, в котором текущий grace-id = 0, но ещё не успели записать его в свои m_nThreadCtl. rcu.synchronize(). Вызывается flip_and_wait(), который меняет текущий grace-id на 1 (grace-id — идентификатор текущего grace-периода) и ждет окончания потоков A и C. Они ещё не успели указать, что находятся в критической секции, так что flip_and_wait() со свистом пролетает (в смысле завершается). Текущий grace-id=1.g_nGlobalCtl (в котором grace-id=0) в своих локальных m_nThreadCtl. Таким образом, они находятся в grace-период 0. Упс! Пока что только нарушение семантики RCU, ничего страшного.rcu.synchronize(). Вызывается flip_and_wait(), который меняет текущий grace-id на 0 и ждет выхода A из предыдущего (xor — см. check_grace_period()) grace-периода. У потока A grace-id=0, текущий grace-id=0, их xor = 0 — можно выходить из rcu.synchronize() и удалять E.flip_and_wait() нас здесь спасает.erase(E_key) {
найти объект E;
вызвать synchronize(), чтобы дождаться момента, когда можно быть уверенным что никто не держит ссылок на E;
удалить объект E из map;
}
erase(E_key) {
используя lock-free алгоритм исключить указатель на E из map;
вызвать synchronize(), чтобы дождаться момента, когда можно быть уверенным что никто не держит указателей на E;
освободить память, занимаемую E;
}
rcu.synchronize(). Правильно?flip_and_wait():B вызывает rcu.synchronize(). Так как A и C ещё не вошли в критические секции, то он выходит из rcu.synchronize() и выполняет что хотел.C хочет дождаться момента, когда можно освободить память, занимаемую E. Он вызывает rcu.synchronize() и, так как никаких grace-id нет, а поток A находится в критической секции, то ждёт выхода A из критической секции.A завершает работу с E и выходит из критической секции.C дождался выхода A из критической секции, так как другие потоки вне критической секции, он освобождает память, занимаемую E.atomic.fetch_and_add для инкремента счетчика критической секции, а при выходе — atomic.fetch_and_sub. Считается (не без оснований), что RMW-атомики довольно тяжелы, и чем мощнее железо, тем они тяжелее (это уже мое наблюдение). В RCU стремились как можно более облегчить read-часть, поэтому предложили схему, где access_lock/access_unlock использует только атомарные чтение/запись. // g_nGlobalCtl - не нужна
struct thread_record {
std::atomic<uint32_t> nThreadCtl;
thread_record * pNext;
thread_record(): nThreadCtl(0), pNext(nullptr) {}
};
// c_nControlBit и c_nNestMask не нужны
void access_lock()
{
thread_record * pRec = get_thread_record();
assert( pRec != nullptr );
uint32_t tmp = pRec->nThreadCtl.load( std::memory_order_relaxed );
if ( tmp == 0 ) {
// Так как grace-id нет, то nThreadCtl выступает лишь в качестве счётчика вложенности
pRec->nThreadCtl.store(1, std::memory_order_relaxed );
std::thread_fence( std::memory_order_acquire );
}
else
pRec->nThreadCtl.fetch_add( 1, std::memory_order_relaxed );
}
void access_unlock()
{
thread_record * pRec = get_thread_record();
assert( pRec != nullptr );
pRec->nThreadCtl.fetch_sub( 1, std::memory_order_release );
}
bool check_grace_period( thread_record * pRec )
{
uint32_t const v = pRec->nThreadCtl.load( std::memory_order_relaxed );
// Проверка нахождения в критической секции простая, так как не надо сравнивать grace-id
return 0 != v;
}
std::mutex g_Mutex;
void synchronize()
{
std::atomic_thread_fence( std::memory_order_acquire );
{
cds::lock::scoped_lock<std::mutex> sl( g_Mutex );
// Двойной вызов flip_and_wait() не нужен
wait();
}
std::atomic_thread_fence( std::memory_order_release );
}
// Никакого flip нет, потому функция называется просто wait
void wait()
{
// flip не нужен, так как нет grace-id
for (thread_record* pRec = g_ThreadList.head(std::memory_order_acquire);
pRec!= nullptr;
pRec = pRec->m_pNext )
{
while ( check_grace_period( pRec ))
{
sleep( 10 ); // ждем 10 миллисекунд
CDS_COMPILER_RW_BARRIER ;
}
}
}
synchronize(). Некоторые критические секции могли начаться после исключения элемента, но до вызова synchronize(), так что можно было бы не ждать окончания таких критических секций, но ожидание их завершения лишь скажется на времени работы synchronize(), но не нарушит логику работы. Так как корректность алгоритма важнее, то будем ждать окончания завершения таких критических секций.synchronize() проверяем каждый поток, если он находится в критической секции, то ждём её завершения и переходим к следующему потоку.
flip_and_wait(), это когда сначала для потока 1 ждём выхода из критической секции, в которую он вошёл при grace-id == 0, потом пока ждали выхода других потоков из их критических секций, поток 1 опять входит в критическую секцию при grace-id == 1, при втором вызове flip_and_wait() надо опять ждать выход из критической секции потока 1:
synchronize(), а другие читатели очень плотно работают со структурой данных, постоянно входя/выходя в/из критические секции чтения, то grace-период может никогда не закончиться.flip_and_wait() время ожидания ограничено. Да, придется ждать окончания grace-id=0 и grace-id=1, но ситуация с бесконечным ожиданием может возникнуть только при крахе потока-читателя (или ошибки в реализации RCU), но крах потока обычно приводит к аварийному завершению всей программы. Без этих граничных случаев (крах потока/ошибка) ожидание гарантированно конечное.fetch_add/fetch_sub.g_ThreadList, то каждый поток будем ждать по одному разу. Если же будут постоянно появляться новые потоки и добавляться в конец списка, то тогда ожидание может быть бесконечным. Это недостаток 1.flip_and_wait() определяется временем выполнения самой длинной критической секции среди всех потоков. А в моём варианте, ожидание определяется как сумма критических секций для всех потоков. То есть, при большом количестве потоков-читателей занимает намного больше времени.synchronize() поток будет вытеснен (а это может произойти и при выборе неподходящей back-off стратегии: sleep() или yield()), то он может и не заметить, что один из читателей вышел из критической секции и вошел в неё снова. Получается, что ожидающий поток не должен вытесняться вовсе, — очень жесткое, часто невыполнимое (так как мы не можем управлять ядром ОС) требование.Что-то очень напоминающее reference counting. Да, такая схема должна работать, но в ней вам придется при входе в критическую секцию чтения вызывать атомарную RMW-операцию atomic.fetch_and_add для инкремента счетчика критической секции, а при выходе — atomic.fetch_and_sub.
PS: В моем предыдущем комментарии рассуждения насчет «RCU использует только атомарные чтение/запись» прошу считать бредом. Конечно, используются RMW fetch_add/fetch_sub.
#include <atomic>
#include <array>
#include <thread>
template <typename Value_type>
struct Cell
{
Value_type read()
{
++_readers_counter;
auto result = _value;
--_readers_counter;
return result;
}
std::atomic<short> _readers_counter;
Value_type _value;
};
template <typename Value_type>
class Lock_free_cell
{
Lock_free_cell() :
_read_cell_pointer_id(0),
_pointers({new Cell<Value_type>{0},
new Cell<Value_type>{0},
new Cell<Value_type>{0},
new Cell<Value_type>{0}})
{ }
Value_type read()
{
auto cell_pointer = _pointers[_read_cell_pointer_id.load() % 4];
auto result = cell_pointer->read();
return result;
}
bool write(const Value_type& new_value)
{
while (_is_write.test_and_set())
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto read_pointer_index = _read_cell_pointer_id.load();
_pointers[(read_pointer_index + 1) % 4]->_value = new_value;
++_read_cell_pointer_id;
auto previous_read_pointer = _pointers[read_pointer_index % 4];
while (previous_read_pointer->_readers_counter.load() != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
_is_write.clear();
return true;
}
private:
std::atomic<unsigned char> _read_cell_pointer_id;
std::atomic_flag _is_write;
std::array<Cell<Value_type>*, 4> _pointers;
};
Lock-free структуры данных. Внутри. RCU