Об одном методе распределения памяти

  • Tutorial
image
Не секрет, что иногда выделение памяти требует отдельных решений. Например — когда память выделяется и освобождается стремительным домкратом потоком, в параллельных задачах.

В результате стандартный консервативный аллокатор выстраивает все запросы в очередь на pthread_mutex / critical section. И наш многоядерный процессор медленно и печально едет на первой передаче.

И что с этим делать? Познакомимся поближе с деталями реализации метода Scalable Lock-Free Dynamic Memory Allocation. Maged M. Michael. IBM Thomas J. Watson Research Center.

Самый простой код что я сумел найти — написан под LGPL камрадами Scott Schneider и Christos Antonopoulos. Его и рассмотрим.



Начнем издалека


image
Итак — как нам избавиться от ненужных локов?

Ответ лежит на поверхности — надо выделять память в lock-free списках. Хорошая идея — но как строятся такие списки?

На помощь нам спешат атомарные операции. Те самые, которые InterlockedCompareExchange. Но постойте — максимум на что мы можем рассчитывать это long long, он же __int64. И что делать? А вот что — определим свой собственный указатель с тегом.

Ограничив размер адреса в 46 бит, и мы можем в 64бит целое спрятать нужные нам дополнения, а они — понадобятся далее.

#pragma pack(1)
typedef struct {
    volatile unsigned __int64 top:46, ocount:18;
} top_aba_t;


Кстати, с учетом выравнивания по 8 / 16 байт — мы можем получить не 2 в 46 степени, а несколько больше. Стандартный прием — выданный адрес не может быть нечетным и более того — должен быть выровнен для плавающей точки.

И еще момент — код становится очень длинным. То есть стандартная портянка
desc->Next = queue_head;
queue_head = desc;

превращается вот в такие макароны
    descriptor_queue old_queue, new_queue;
    do {
        old_queue = queue_head;
        desc->Next = (descriptor*)old_queue.DescAvail;
        new_queue.DescAvail = (unsigned __int64)desc;
        new_queue.tag = old_queue.tag + 1;
    } while (!compare_and_swap64k(queue_head, old_queue, new_queue));

что сильно удлиняет код и делает его менее читабельным. Поэтому очевидные вещи убраны под спойлеры.

Lock-free FIFO queue


image
Раз у нас теперь есть собственный указатель, можем заняться списком.

// Pseudostructure for lock-free list elements.
// The only requirement is that the 5th-8th byte of
// each element should be available to be used as
// the pointer for the implementation of a singly-linked
// list. 
struct queue_elem_t {
    char                *_dummy;
    volatile queue_elem_t    *next;
};

И в нашем случае — не забыть про выравнивание, например так

typedef struct {
    unsigned __int64  _pad0[8];
    top_aba_t       both;
    unsigned __int64  _pad1[8];
} lf_fifo_queue_t;


Оборачиваем работу с атомарными функциями

image
Теперь определим парочку абстракций, чтобы наш код мог быть переносим (для Win32 например это реализуется вот так):

Обертка над атомами для Win32
#define fetch_and_store(address, value) InterlockedExchange((PLONG)(address), (value))
#define atmc_add(address, value) InterlockedExchangeAdd((PLONG)(address), (value))
#define compare_and_swap32(address, old_value, new_value) \
     (InterlockedCompareExchange(\
          (PLONG)(address), (new_value), (old_value))\
       == (old_value))
#define compare_and_swap64(address, old_value, new_value) \
    (InterlockedCompareExchange64(\
          (PLONGLONG)(address), (__int64)(new_value), (__int64)(old_value)) \
     == (__int64)(old_value))
#define compare_and_swap_ptr(address, old_value, new_value) \
    (InterlockedCompareExchangePointer((address), \ 
         (new_value), (old_value)) \
      == (old_value))


и добавим еще один метод, просто чтобы не отвлекаться на кастинг параметров к __int64 и передаче их по указателям.

#define compare_and_swap64k(a,b,c) \
     compare_and_swap64((volatile unsigned __int64*)&(a), \
           *((unsigned __int64*)&(b)), \
           *((unsigned __int64*)&(c)))

И вот мы уже готовы реализовать базовые функции (добавить и удалить).

Определяем базовые функции работы со списком

image
static inline int lf_fifo_enqueue(lf_fifo_queue_t *queue, void *element) {
    top_aba_t old_top;
    top_aba_t new_top;
    
    for(;;) {
        old_top.ocount = queue->both.ocount;
        old_top.top = queue->both.top;

        ((queue_elem_t *)element)->next = (queue_elem_t *)old_top.top;
        new_top.top = (unsigned __int64)element;
        new_top.ocount += 1;
        if (compare_and_swap64k(queue->both, old_top, new_top)) 
            return 0;
    }
}

На что тут надо обратить внимание — обычная операция по добавлению обернута циклом, выход из которого — мы успешно записали новое значение поверх старого, и при этом старое кто-то еще не поменял. Ну а если поменял — то повторяем все заново. И еще момент — в наш ocount записываем номер попытки, с которой нам это удалось. Пустячок, а каждая попытка выдает уникальное 64бит целое.

Именно на таком простом шаманстве и строятся lock-free структуры данных.

Аналогичным образом реализуется снятие с вершины нашего FIFO списка:

Аналогично lf_fifo_dequeue
static inline void *lf_fifo_dequeue(lf_fifo_queue_t *queue) {
    top_aba_t head;
    top_aba_t next;

    for(;;) {
        head.top = queue->both.top;
        head.ocount = queue->both.ocount;
        if (head.top == 0) return NULL;
        next.top = (unsigned __int64)(((struct queue_elem_t *)head.top)->next);
        next.ocount += 1;
        if (compare_and_swap64k(queue->both, head, next)) 
            return ((void *)head.top);
    }
}


Тут мы видим ровно то же самое — в цикле пробуем снять, если есть что и мы снимаем так что старое значение все еще правильное — то отлично, а нет — пробуем еще.

И конечно же инициализация такого элемента списка банальна — вот она:

lf_fifo_queue_init
static inline void lf_fifo_queue_init(lf_fifo_queue_t *queue) {
    queue->both.top = 0;
    queue->both.ocount = 0;
}



Собственно идея аллокатора

image
Перейдем теперь непосредственно к аллокатору. Аллокатор должен быть быстрым — поэтому разделим память на классы. Большой (напрямую берем у системы), и маленький, побитый на много-много маленьких подклассов размером от 8 байт до 2 килобайт.

Такой ход конем позволит нам решить две задачи. Первая — маленькие кусочки будут выделяться супербыстро, и за счет того что они разделены в таблицу — не будут лежать в одном огромном списке. А большие куски памяти не будут мешаться под ногами и приводить к проблемам с фрагментацией. Более того — поскольку в каждом подклассе у нас все блоки одного размера, работа с ними очень упрощается.

И еще момент! Выделение маленьких кусочков привяжем к нитям (а освобождение — нет). Таким образом мы убьем двух зайцев сразу — упростится контроль за выделением, и острова памяти выделенные для треда локально — не будут перемешиваться лишний раз.

Получим нечто вот такое:
image

А вот так данные будут раскладываться по суперблоку:
image
На картинке мы видим 4 случая
  1. Активный суперблок содержит 5 элементов организованных в список, 4 из них доступны для использования.
  2. А теперь мы зарезервировали следующий элемент (см credits)
  3. И в результате выдали блок номер 5
  4. А потом его вернули обратно (но попал он в partial list)


Опишем хип и дескриптор блока

image
Помолясь богине Техно, приступим.

Определим парочку констант

Балнальщина а-ля GRANULARITY и PAGE_SIZE
struct Descriptor;
typedef struct Descriptor descriptor;
struct Procheap;
typedef struct Procheap procheap;

#define TYPE_SIZE   sizeof(void*)
#define PTR_SIZE    sizeof(void*)
#define HEADER_SIZE (TYPE_SIZE + PTR_SIZE)

#define LARGE       0
#define SMALL       1

#define PAGESIZE    4096
#define SBSIZE      (16 * PAGESIZE)
#define DESCSBSIZE  (1024 * sizeof(descriptor))

#define ACTIVE      0
#define FULL        1
#define PARTIAL     2
#define EMPTY       3

#define MAXCREDITS  64 // 2^(bits for credits in active)
#define GRANULARITY 8


И займемся креативом, определим нужные типы данных. Итак, у нас будет много хипов — каждый в своем классе, да привязанных к текущему треду. В суперблоке будет два указателя — активный список и перераспределенный.

Вы спросите — а что это такое и почему так сложно?

Первое, оно же главное — выделять список поэлементно банально невыгодно. То есть — классический однонаправленный список при миллионах элементов превращается в ад и израиль. На каждый элемент списка надо выделить 8 / 16 байт, чтобы хранить два несчастных указателя на сами данные и на следующий элемент списка.

Выгодно ли это? Очевидно что — нет! И как следует поступить? Правильно, а давайте мы сгруппируем наши описатели списка в группы (stripe) по 500 (к примеру) элементов. И получим список не элементов, но — групп. Экономично, практично, работать с элементами можно как и в классическом варианте. Весь вопрос только в нестандартном выделении памяти.

Более того, все Next в пределах блока просто указывают на соседний элемент массива, и мы можем их явно проинициализировать сразу при выделении страйпа. По факту, последний Next в страйпе будет указывать на следующий страйп, но с точки зрения работы со списками — ничего не меняется.

Легко догадаться, что списки memory block descriptors строятся именно так.

И еще — Active и есть наш активный страйп с заранее выделенными кусочками памяти по эн байт, в котором и ведем выдачу памяти по принципу FIFO. Если есть место в страйпе — берем оттуда. Если нет — ищем уже в классическом списке Partial. Если нет ни там ни там — отлично, выделяем новый страйп.

Второе, такая «полосатость» ведет к некоторому перерасходу памяти, т. к. мы можем выделить страйп под 8 байтные кусочки памяти в виде массива в 64К, а запрошено будет пара кусочков всего. И еще, активные страйпы для каждой нити свои, что еще больше усугубит проблему.

Однако, если у нас действительно идет активная работа с памятью, это даст хороший выигрыш по скорости.

Вот сам хип

struct Procheap {
    volatile active     Active;     // initially NULL
    volatile descriptor*    Partial;    // initially NULL
    sizeclass*      sc;     // pointer to parent sizeclass
};

А вот что ему надо:

Что это за Active / Partial такие
typedef struct {
    unsigned __int64  ptr:58, credits:6;
} active;

/* We need to squeeze this in 64-bits, but conceptually
 * this is the case:
 *  descriptor* DescAvail;
 */
typedef struct {
    unsigned __int64  DescAvail:46, tag:18;
} descriptor_queue;

/* Superblock descriptor structure. We bumped avail and count 
 * to 24 bits to support larger superblock sizes. */
typedef struct {
    unsigned __int64  avail:24,count:24, state:2, tag:14;
} anchor;

typedef struct {
    lf_fifo_queue_t    Partial;    // initially empty
    unsigned int        sz;            // block size
    unsigned int        sbsize;    // superblock size
} sizeclass;



и собственно дескриптор — описатель нашего фрагмента.

struct Descriptor {
    struct queue_elem_t lf_fifo_queue_padding;
    volatile anchor    Anchor;     // anchor to superblock exact place
    descriptor*          Next;          // next element in list
    void*                     sb;              // pointer to superblock
    procheap*           heap;         // pointer to owner procheap
    unsigned int       sz;               // block size
    unsigned int       maxcount; // superblock size / sz
};

Как видно — ничего сверхъестественного. Описание хипа и суперблока нам нужны в дескрипторе, т. к. выделять память можно в одном треде, а освобождать — в другом.

Приступим к реализации

image
Сначала нам надо определить наши локальные переменные — хипы, размеры и прочее. Вот примерно так:

Как же без глобальных переменных
/* This is large and annoying, but it saves us from needing an 
 * initialization routine. */
sizeclass sizeclasses[2048 / GRANULARITY] =
                {
                {LF_FIFO_QUEUE_STATIC_INIT, 8, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 16, SBSIZE},
                ...
                {LF_FIFO_QUEUE_STATIC_INIT, 2024, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2032, SBSIZE},
                {LF_FIFO_QUEUE_STATIC_INIT, 2040, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2048, SBSIZE},
                };

#define LF_FIFO_QUEUE_STATIC_INIT   {{0, 0, 0, 0, 0, 0, 0, 0}, {0, 0}, {0, 0, 0, 0, 0, 0, 0, 0}}

__declspec(thread) procheap* heaps[2048 / GRANULARITY]; // =  { };

static volatile descriptor_queue queue_head;


Тут мы видим все то, что рассматривали в предыдущем разделе:

__declspec(thread) procheap* heaps[2048 / GRANULARITY]; // =  { };
static volatile descriptor_queue queue_head;

Это и есть наши хипы per thread. И — список всех дескрипторов per-process.

Malloc — как это работает

image
Рассмотрим собственно процесс выделения памяти подробнее. Легко увидеть несколько особенностей, а именно:

  1. Если запрошенный размер не имеет нашего хипа для малых размеров — просто запрашиваем у системы
  2. Выделяем память по очереди — сначала из активного списка, затем из списка фрагментов, и наконец — пробуем запросить у системы новый кусок.
  3. У нас всегда есть память в системе, а если нет — надо только подождать в цикле


Вот такое решение.

void* my_malloc(size_t sz) { 
    procheap *heap;
    void* addr;

    // Use sz and thread id to find heap.
    heap = find_heap(sz);

    if (!heap) 
        // Large block
        return alloc_large_block(sz);

    for(;;) { 
        addr = MallocFromActive(heap);
        if (addr) return addr;

        addr = MallocFromPartial(heap);
        if (addr) return addr;

        addr = MallocFromNewSB(heap);
        if (addr) return addr;
    } 
}

Углубимся в тему, рассмотрим каждую часть в отдельности. Начнем с начала — с добавления нового куска из системы.

MallocFromNewSB
static void* MallocFromNewSB(procheap* heap) {
    descriptor* desc;
    void* addr;
    active newactive, oldactive;

    *((unsigned __int64*)&oldactive) = 0;
    desc = DescAlloc();
    desc->sb = AllocNewSB(heap->sc->sbsize, SBSIZE);

    desc->heap = heap;
    desc->Anchor.avail = 1;
    desc->sz = heap->sc->sz;
    desc->maxcount = heap->sc->sbsize / desc->sz;

    // Organize blocks in a linked list starting with index 0.
    organize_list(desc->sb, desc->maxcount, desc->sz);

    *((unsigned __int64*)&newactive) = 0;
    newactive.ptr = (__int64)desc;
    newactive.credits = min(desc->maxcount - 1, MAXCREDITS) - 1;

    desc->Anchor.count = max(((signed long)desc->maxcount - 1 ) - 
           ((signed long)newactive.credits + 1), 0); // max added by Scott
    desc->Anchor.state = ACTIVE;

    // memory fence.
    if (compare_and_swap64k(heap->Active, oldactive, newactive)) { 
        addr = desc->sb;
        *((char*)addr) = (char)SMALL; 
        addr = (char*) addr + TYPE_SIZE;
        *((descriptor **)addr) = desc; 
        return (void *)((char*)addr + PTR_SIZE);
    } 
    else {
        //Free the superblock desc->sb.
        munmap(desc->sb, desc->heap->sc->sbsize);
        DescRetire(desc); 
        return NULL;
    }
}


Никаких чудес — всего лишь создание суперблока, дескриптора, и инициализация пустого списка. И добавление в список активных нового блока. Тут прошу обратить внимание на такой факт, что нет цикла с присваиванием. Если не удалось — значит не удалось. Почему так? Потому что функция и так вызывается из цикла, а если не удалось вставить в список — то это значит что вставил кто-то другой и надо проводить выделение памяти сначала.

Перейдем теперь к выделению блока из списка активных — ведь мы уже научились выделять суперблок и прочее.

MallocFromActive
static void* MallocFromActive(procheap *heap)  {
    active newactive, oldactive;
    descriptor* desc;
    anchor oldanchor, newanchor;
    void* addr;
    unsigned __int64 morecredits = 0;
    unsigned long next = 0;

    // First step: reserve block
    do { 
        newactive = oldactive = heap->Active;
        if (!(*((unsigned __int64*)(&oldactive)))) return NULL;
        if (oldactive.credits == 0)  *((unsigned __int64*)(&newactive)) = 0;
        else --newactive.credits;
    } while (!compare_and_swap64k(heap->Active, oldactive, newactive));

    // Second step: pop block
    desc = mask_credits(oldactive);
    do {
        // state may be ACTIVE, PARTIAL or FULL
        newanchor = oldanchor = desc->Anchor;
        addr = (void *)((unsigned __int64)desc->sb + oldanchor.avail * desc->sz);
        next = *(unsigned long *)addr;
        newanchor.avail = next;
        ++newanchor.tag;

        if (oldactive.credits == 0) {
            // state must be ACTIVE
            if (oldanchor.count == 0) newanchor.state = FULL;
            else { 
                morecredits = min(oldanchor.count, MAXCREDITS);
                newanchor.count -= morecredits;
            }
        } 
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    if (oldactive.credits == 0 && oldanchor.count > 0) 
        UpdateActive(heap, desc, morecredits);

    *((char*)addr) = (char)SMALL; 
    addr = (char*) addr + TYPE_SIZE;
    *((descriptor**)addr) = desc; 
    return ((void*)((char*)addr + PTR_SIZE));
}

Собствено алгоритм прост, лишь слегка путает нас громоздкая форма записи. На самом же деле — если есть что брать, то берем новый кусок из суперблока, и отмечаем сей факт. Попутно проверяем — а не забрали ли мы последний кусочек, и если да — отмечаем и это.

Есть тут роано одна тонкость, а именно — если вдруг выяснилось, что мы взяли последний кусочек из суперблока, то вслед за нами следующий запрос приведет к добавлению нового суперблока взамен уже использованного. И как только мы это обнаружили — нам негде записать факт того что блок выделен. Поэтому мы заносим только что выделенный кусочек в partial list.

UpdateActive
static void UpdateActive(procheap* heap, descriptor* desc, unsigned __int64 morecredits) { 
    active oldactive, newactive;
    anchor oldanchor, newanchor;

    *((unsigned __int64*)&oldactive) = 0;
    newactive.ptr = (__int64)desc;
    newactive.credits = morecredits - 1;
    if (compare_and_swap64k(heap->Active, oldactive, newactive)) 
        return;

    // Someone installed another active sb
    // Return credits to sb and make it partial
    do { 
        newanchor = oldanchor = desc->Anchor;
        newanchor.count += morecredits;
        newanchor.state = PARTIAL;
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));
    HeapPutPartial(desc);
}

Настала пора рассмотреть работу с дескрипторами, прежде чем мы перейдем к заключительной части этого эссе.

Memory Block Descriptors

image
Для начала, научимся дескриптор создавать. Но где? Вообще-то, если кто забыл — мы как раз пишем выделение памяти. Очевидным и красивым решением было бы использование тех же механизмов, что и для generic allocation, но увы — это будет всем известный анекдот pkunzip.zip. Поэтому принцип тот же — выделяем большой блок содержащий массив дескрипторов, и как только массив переполняется — создаем новвый и объединяем его с предыдущим в список.

DescAlloc
static descriptor* DescAlloc() {
    descriptor_queue old_queue, new_queue;
    descriptor* desc;
  
    for(;;) {
        old_queue = queue_head;
        if (old_queue.DescAvail) {
            new_queue.DescAvail = (unsigned __int64)((descriptor*)old_queue.DescAvail)->Next;
            new_queue.tag = old_queue.tag + 1;
            if (compare_and_swap64k(queue_head, old_queue, new_queue)) {
                desc = (descriptor*)old_queue.DescAvail;
                break;
            }
        }
        else {
            desc = AllocNewSB(DESCSBSIZE, sizeof(descriptor));
            organize_desc_list((void *)desc, DESCSBSIZE / sizeof(descriptor), sizeof(descriptor));

            new_queue.DescAvail = (unsigned long)desc->Next;
            new_queue.tag = old_queue.tag + 1;
            if (compare_and_swap64k(queue_head, old_queue, new_queue)) 
                break;
            munmap((void*)desc, DESCSBSIZE);   
        }
    }
    return desc;
}

Ну а теперь дело за не менее мощным колдунством — надо еще и научиться дескриптор обратно возвращать. Однако, возвращать мы будем в том же FIFO — потому что возвращать нам понадобится только если мы его взяли по ошибке, а этот факт обнаруживается сразу. Так что дело оказывается гораздо проще

DescRetire
void DescRetire(descriptor* desc) {
    descriptor_queue old_queue, new_queue;

    do {
        old_queue = queue_head;
        desc->Next = (descriptor*)old_queue.DescAvail;
        new_queue.DescAvail = (unsigned __int64)desc;
        new_queue.tag = old_queue.tag + 1;
    } while (!compare_and_swap64k(queue_head, old_queue, new_queue));
}


Helpers

image
Приведем также вспомогательные функции по инициализации списков и т. д. Функции настолько самоочевидны, что их даже описывать как-то смысла нет.
organize_list
static void organize_list(void* start, unsigned long count, unsigned long stride) {
    char* ptr;
    unsigned long i;
  
    ptr = (char*)start; 
    for (i = 1; i < count - 1; i++) {
        ptr += stride;
        *((unsigned long*)ptr) = i + 1;
    }
}

organize_desc_list
static void organize_desc_list(descriptor* start, unsigned long count, unsigned long stride) {
    char* ptr;
    unsigned int i;
 
    start->Next = (descriptor*)(start + stride);
    ptr = (char*)start; 
    for (i = 1; i < count - 1; i++) {
        ptr += stride;
        ((descriptor*)ptr)->Next = (descriptor*)((char*)ptr + stride);
    }
    ptr += stride;
    ((descriptor*)ptr)->Next = NULL;
}

mask_credits
static descriptor* mask_credits(active oldactive) {
    return (descriptor*)oldactive.ptr;
}

Суперблок просто запрашиваем у системы:
static void* AllocNewSB(size_t size, unsigned long alignement) {
    return VirtualAlloc(NULL, size, MEM_COMMIT, PAGE_READWRITE);
}

Точно так же получаем большие блоки, запросив чуть больше под заголовок блока:
alloc_large_block
static void* alloc_large_block(size_t sz) {
    void* addr = VirtualAlloc(NULL, sz + HEADER_SIZE, MEM_COMMIT, PAGE_READWRITE);

    // If the highest bit of the descriptor is 1, then the object is large 
    // (allocated / freed directly from / to the OS)
    *((char*)addr) = (char)LARGE;
    addr = (char*) addr + TYPE_SIZE;
    *((unsigned long *)addr) = sz + HEADER_SIZE;
    return (void*)((char*)addr + PTR_SIZE); 
}

А это — поиск в нашей таблице хипа адаптированного под нужный размер (bkb ноль если размер слишком велик):
find_heap
static procheap* find_heap(size_t sz) {
    procheap* heap;
  
    // We need to fit both the object and the descriptor in a single block
    sz += HEADER_SIZE;
    if (sz > 2048) return NULL;
  
    heap = heaps[sz / GRANULARITY];
    if (heap == NULL) {
        heap = VirtualAlloc(NULL, sizeof(procheap), MEM_COMMIT, PAGE_READWRITE);
        *((unsigned __int64*)&(heap->Active)) = 0;
        heap->Partial = NULL;
        heap->sc = &sizeclasses[sz / GRANULARITY];
        heaps[sz / GRANULARITY] = heap;
    }
    
    return heap;
}

А вот обертки для списков. Они добавлены исключительно для наглядности — кода с макаронами вокруг атомиков у нас более чем достаточно, чтобы в одну функцию yt складывать по надцать циклов вокруг compare and swap
ListGetPartial
static descriptor* ListGetPartial(sizeclass* sc) {
    return (descriptor*)lf_fifo_dequeue(&sc->Partial);
}

ListPutPartial
static void ListPutPartial(descriptor* desc) {
    lf_fifo_enqueue(&desc->heap->sc->Partial, (void*)desc);  
}

Удаление строится банальнейше — перестройкой списка во временный и обратно:
ListRemoveEmptyDesc
static void ListRemoveEmptyDesc(sizeclass* sc) {
    descriptor *desc;
    lf_fifo_queue_t temp = LF_FIFO_QUEUE_STATIC_INIT;

    while (desc = (descriptor *)lf_fifo_dequeue(&sc->Partial)) {
        lf_fifo_enqueue(&temp, (void *)desc);
        if (desc->sb == NULL) DescRetire(desc);
        else break;
    }
    while (desc = (descriptor *)lf_fifo_dequeue(&temp)) 
        lf_fifo_enqueue(&sc->Partial, (void *)desc);
}

И несколько оберток вокруг partial lists
RemoveEmptyDesc
static void RemoveEmptyDesc(procheap* heap, descriptor* desc) {
    if (compare_and_swap_ptr(&heap->Partial, desc, NULL)) 
        DescRetire(desc);
    else 
        ListRemoveEmptyDesc(heap->sc);
}

HeapGetPartial
static descriptor* HeapGetPartial(procheap* heap) { 
    descriptor* desc;
    do {
        desc = *((descriptor**)&heap->Partial); // casts away the volatile
        if (desc == NULL) return ListGetPartial(heap->sc);
    } while (!compare_and_swap_ptr(&heap->Partial, desc, NULL));
    return desc;
}

HeapPutPartial
static void HeapPutPartial(descriptor* desc) {
    descriptor* prev;
    do {
        prev = (descriptor*)desc->heap->Partial; // casts away volatile
    } while (!compare_and_swap_ptr(&desc->heap->Partial, prev, desc));
    if (prev) ListPutPartial(prev); 
}


Последний рывок — и выделять / освобождать готов!

image
И наконец мы готовы реализовать выделение памяти не в страйпе, все возможности для этого у нас уже есть.

Алгоритм прост — находим наш список, резервируем в нем место (попутно освобождая пустые блоки), и возвращаем его клиенту.

MallocFromPartial
static void* MallocFromPartial(procheap* heap) {
    descriptor* desc;
    anchor oldanchor, newanchor;
    unsigned __int64 morecredits;
    void* addr;
  
retry:
    desc = HeapGetPartial(heap);
    if (!desc) return NULL;

    desc->heap = heap;
    do {
        // reserve blocks
        newanchor = oldanchor = desc->Anchor;
        if (oldanchor.state == EMPTY) 
            DescRetire(desc); 
            goto retry;
        }

        // oldanchor state must be PARTIAL
        // oldanchor count must be > 0
        morecredits = min(oldanchor.count - 1, MAXCREDITS);
        newanchor.count -= morecredits + 1;
        newanchor.state = morecredits > 0 ? ACTIVE : FULL;
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    do { 
        // pop reserved block
        newanchor = oldanchor = desc->Anchor;
        addr = (void*)((unsigned __int64)desc->sb + oldanchor.avail * desc->sz);

        newanchor.avail = *(unsigned long*)addr;
        ++newanchor.tag;
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    if (morecredits > 0) 
        UpdateActive(heap, desc, morecredits);

    *((char*)addr) = (char)SMALL; 
    addr = (char*) addr + TYPE_SIZE;
    *((descriptor**)addr) = desc; 
    return ((void *)((char*)addr + PTR_SIZE));
}

Теперь рассмотрим как вернуть нам память обратно в список. В общем — классический алгоритм: по переданному нам указателю восстанавливаем дескриптор, и по якорю запомненному в дескрипторе — попадаем в нужное нам место суперблока и отмечаем нужный кусочек как свободный, а кто до сих дочитал тому пиво. И конечно же пара проверок — не надо ли нам освободить весь суперблок, а то в нем он последний неосвобожденный.

void my_free(void* ptr) {
    descriptor* desc;
    void* sb;
    anchor oldanchor, newanchor;
    procheap* heap = NULL;

    if (!ptr) return;
    
    // get prefix
    ptr = (void*)((char*)ptr - HEADER_SIZE);  
    if (*((char*)ptr) == (char)LARGE) {
        munmap(ptr, *((unsigned long *)((char*)ptr + TYPE_SIZE)));
        return;
    }
    desc = *((descriptor**)((char*)ptr + TYPE_SIZE));
    
    sb = desc->sb;
    do { 
        newanchor = oldanchor = desc->Anchor;

        *((unsigned long*)ptr) = oldanchor.avail;
        newanchor.avail = ((char*)ptr - (char*)sb) / desc->sz;

        if (oldanchor.state == FULL) newanchor.state = PARTIAL;

        if (oldanchor.count == desc->maxcount - 1) {
            heap = desc->heap;
            // instruction fence.
            newanchor.state = EMPTY;
        } 
        else 
            ++newanchor.count;

        // memory fence.
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    if (newanchor.state == EMPTY) {
        munmap(sb, heap->sc->sbsize);
        RemoveEmptyDesc(heap, desc);
    } 
    else if (oldanchor.state == FULL) 
        HeapPutPartial(desc);
}

На что надо обратить внимание — освобожденные кусочки попадают в partial list, и было бы неплохо добавить проверку — а не попадает ли наш кусочек на вершину Active страйпа, это бы неплохо повысило эффективность вырожденного случая «выделяем и освобождаем в цикле». Но это уже в качестве домашнего задания.

Выводы


image
В данной крайне занудной и длинной работе мы рассмотрели как можно построить свой аллокатор на lock-free FIFO lists, узнали что это вообще такое, и освоили немало трюков по работе с атомиками. Надеюсь, умение группировать списки в страйпы поможет не только в деле написания своего менеджера памяти.

Дополнительные материалы

  1. Scalable Lock-Free Dynamic Memory Allocation
  2. Hoard memory allocator
  3. Scalable Locality-ConsciousMultithreaded Memory Allocation


И в заключение, вот парочка иллюстраций из [3] о скорости работы разных аллокаторов (картинка кликабельна).

image
image
Как видно, несмотря на явную простоту алгоритма, работает он на уровне лучших образцов, многие из которых появились сильно позже.

Update: спасибо SkidanovAlex за развернутое пояснение:

Из статьи не понятно, зачем нужен ocount. Это реализация так назвыемых tagged pointer. Идея в том, что если бы его не было, то возможен был бы такой сценарий (называется проблемой ABA — поэтому и структура в статье называется top_aba_t):

Пусть операция pop выглядит примерно так:

do {
  long snapshot = stack->head;
  long next = snapshot->next;
} while (!cas(&stack->head, next, snapshot));

В принципе код deque в статье делает именно это, но с ocount. Теперь рассмотрим такой сценарий: на вершине стека лежит элемент А, за ним элемент B (стек выглядит как A -> B). Мы запомнили snapshot = A, next = B.

В этот момент другой поток убрал элемент A, затем вставил элемент C, и вставил обратно А. Теперь стек выглядит как:
A -> C -> B.

Операция pop просыпается, и ее CAS срабатывает (stack->head == snapshot, они оба равны A), и заменяет stack->head на B. Элемент C теряется.

С ocount такого не произойдет, потому что свежевставленный A будет иметь другой ocount, и CAS провалится.

Но ocount спасает конечно только на практике. В теории после того как мы запомнили snapshot и next другой поток может удалить A 2^18 раз, пока ocount не примет такое же значение как было, и ABA проблема опять произойдет.

На современном железе конечно никто не отводит под указатель 48 бит. Вместо этого используется две 64-битных переменных, идущих подряд, первая под указатель, вторая под ocount (теоретический сценарий с вставкой элемента А много раз становится еще более теоретическим), и используется double cas.

Комментарии 62

    +8
    Ни черта не понятно.
    «Вот вам код — разбирайтесь, как он работает».
      –4
      и вообще статьи подобного толка не требуют ни одной строки кода, а иллюстрировать лучше на блок-схемах.
        +16
        Вы о чем, какие блок-схемы? Я их последний раз чертил тушью по ГОСТу на ватманских листах, когда диплом сдавал 20 лет назад. Уже тогда это выглядело явным маразмом. Код гораздо понятнее.

        Вообще, я вот по таким книжкам учился — ни одной блок-схемы и состоит из распечаток. Или по таким и таким.

        Тут вся соль именно в деталях реализации — в том как именно этот список построен, как его собрать в страйпы, что из этого получается. Вот суперблок размера 48, вот у него раздача прямо из суперблока, вот — классический однонаправленный список свободных блоков. Вот тут — попадает в список, вот тут — берется. Суть метода то именно в деталях.
          +24
          Всё отлично, не парьтесь. Многие читатели «мимо кассы» из-за того, что это же не про «копирастеры что-то накопирастили», а о программировании.
          Спасибо за статью.
            +4
            Полностью поддерживаю.
            +2
            Хмм, у вас удивительная статья — с одной стороны я понял чего в моем на досуге пописываемом просмотровщеке изображений лагает генерация превьюх (первый абзац), а остального я не бельмеса не понял — нужно перечитать.
              0
              и я по таким же книжкам учился…
              у меня на полке стоит второе издание Страуструпа 1987г на 257 листах. Сколько там сейчас листов? 1136…
              ни какого STL, только С++ как Си «с постинкрементом»

              спасибо за статью, в жизни пригодится
              сам свои пулы памяти «изобретаю»…
              некоторые готовые использовал — не понравилось.

                0
                А еще помню, С++ по Подбельскому учил.
                Лучше бы не учил по нему… там уж точно подход: Си с постинкрементом…

                для тех кто не в танке: спрашивают как-то у преподователя
                — почему в языке С++ после эС идет два плюса?
                — потому-что это новый язык, но все его продолжают использовать как Си, постинкремент
          +6
          С самого начала:
          "… превращается вот в такие макароны"
          descriptor_queue old_queue, new_queue;
          do {
          old_queue = queue_head;
          desc->Next = (descriptor*)old_queue.DescAvail;
          new_queue.DescAvail = (unsigned __int64)desc;
          new_queue.tag = old_queue.tag + 1;
          } while (!compare_and_swap64k(queue_head, old_queue, new_queue));

          Что это?! Что оно делает? Что за типы? Что за поля?
          А дальше всё становится запутанней и запутанней.
          Нет разобраться можно, то проще в инете где-нибудь почитать, А те, кто сходу это понимаю и так знают алгоритм и им не нужно это читать — парадокс.
            +2
            Я кстати этот самый кусок проиллюстрировал более понятными двумя строчками ;-)

            И добавил в текст парочку иллюстраций — возможно они будут более наглядными.
            0
            А я не понял, зачем нужен ocount, и как это так мы взяли и ограничили адресное пространство в 48 бит.
              +1
              Пока что 48 бит это то что можно реально адресовать, так что страшного ничего не вижу. Да, в kernel mode не сработает, там чуть сложнее, ибо старший бит / биты — индикация.

              Используется такой трюк чтобы не допускать возможных коллизий. Вот почему

              Note that if a thread is delayed at any point while executing this routine, other active threads will be able to proceed with their operations without waiting for the delayed thread, and every time a thread executes a full iteration of the loop, some operation must have made progress.

              If the CAS succeeds, then the increment of the current thread has taken effect. If the CAS fails, then the value of the counter must have changed during the loop. The only way the counter changes is if a CAS succeeds.

              Then, some other thread’s CAS must have succeeded during the loop and hence that other thread’s increment must have taken effect.
              +2
              Вообще «стандартный консервативный аллокатор» не так уж и плох, по крайней мере в gcc он и так оперирует аренами и всякую мелочь выделяет без локов.

              Во-вторых, если хочется совсем продвинутого, то естьgoogle tcmalloc, который делает это еще лучше. Работает как drop-in replacement для стандартного аллокатора.

              В тех 0.5% случаев, когда tcmalloc загнется скорее всего загнется и то что вы описали.

              А вообще за статью + конечно.
                +4
                Зато стандартный аллокатор от msvcrt — очень даже занимается тем что висит в giant locks (тьфу на него еще раз).

                Вообще je / tc аллокаторы гораздо сложнее для понимания, тут-то весь исходник в 30 килобайт, они обозримы. Конечно, это не тот простой код из книжки Кернигана, Ритчи и Фьюэра — но тоже боле-мене понятен.
                  0
                  Ну, строго говоря, когда меня перестанет устраивать аллокатор от МС, я вначале заменю его на что-то проверенное готовое (ну там от Гугла или из буста), а своё вряд ли стану писать. Но за саму идею и объяснение — респект.
                  0
                  Я в конец добавил пару графиков, там в общем видно — как аллокаторы работают в разных случаях. Позаимствовал из собственных источников картинку ;-)
                  +2
                  За статью плюс, синтаксис volatile unsigned __int64 top:46, ocount:18; увидел впервые, что за top и ocount? Гугл почему-то не помог (

                  P.S. Если не сложно — добавьте комментарии к коду. Хотя бы чуть-чуть.
                    +3
                    Битовые поля. 64бит целое делим на битовые поля — верхнюю (top) 46 бит (адрес) и нижнюю (ocount) 18 бит, для индикации прогресса в цикле. Зачем так странно — в общем известный ход.

                    CAS (который compare-and-swap) атомарен только сам по себе.

                    И если 2 треда одновременно наступят на это — у них может получиться весьма интересный набор эффектов (ABA problem). Чтобы такого не происходило — надо использовать индикатор прогресса, и в цикле этот ocount каждый раз наращивается. Т… о. каждый раз в compare-and-swap используется уникальное число.

                    Конечно, 18 бит под счетчик отводить это сурово — но в работе Michael именно так. Я к примеру когда реализовывал свои lock-free lists в kernel-mode использовал пару бит всего для этого, а тут — оставил ровно как есть.
                    0
                    Я правильно понял, что освобождённые блоки по FIFO используются? Но тогда на часто выделяемых/освобождаемых блоках загрязняется кэш процессора (особенно нынешние многоуровневые большие кэши). Для аллокаторов оптимально будет LIFO — тогда есть шанс что при частом обращении блок не будет покидать кэш второго-третьего уровня.
                    И ещё вопрос, что значит "Выделение маленьких кусочков привяжем к нитям (а освобождение — нет)."? А именно — не понял что значит "а освобождение — нет".
                      +1
                      Да, там FIFO в partial list. Это не самый лучший способ, я согласен. Однако тут я разбираю готовую работу, а не свое пишу, свое я бы немного не так делал ;-)

                      Да, и учтите, работа вышла аж в 2004м, кто тогда серьезно задумывался про lock-free? Все более менее известные аллокаторы появились позже, емнип. Исходная работа вообще была рассчитана на 32 бита и Linux, это уже последователи ее перенесли и адаптировали.

                      По выделенному тексту — я был косноязычен, согласен.

                      Идея тут вот в чем — при выделении памяти мы можем начальную таблицу proc heap (для блоков размером до 2К) выделить в TLS aka thread local storage. Тогда нам не надо будет задумываться о дополнительной синхронизации, мелочь — а приятно.

                      А вот при освобождении памяти назад — мы можем блок выделенный в одном треде освобождать в другом, поэтому в качестве penalty каждый memory block descriptor должен знать — из какого он proc heap.
                        0
                        Идея про TLS уже ближе к практике.

                        По хорошему проблема аллокации возникает только когда она нужна очень часто для мелких блоков в многопоточной среде.

                        При этом основные временные затраты уходят именно на само выделение памяти. Ну и в отрыве от железа не стоит заниматься такой оптимизацией. Есть целый ворох связанных вопросов — выравнивание, кэши процессора разного уровня и т.п.

                        Для решения этой проблемы нет особого смысла в создании специального аллокатора, который только все усложняет.

                        Достаточно простого кэша заранее выделенных блоков, где суперблоки будут выделяться per cpu или per thread, а при запросе на аллокацию оттуда просто с минимальными затратами будут отдаваться готовые блоки.

                        CAS не панацея — фактически те же локи, вид с боку. Спинлоки и мутексы по ходу на них же и реализуются обычно. Да и не портируемо получается — CAS на разных платформах разный.

                        Минимизация CAS и раскидывание достаточно больших суперблоков по CPU/thread позволяет обойти дискард кэша процессора при записи в выделенные блоки.

                        Реальная деаллокация здесь происходит только для суперблоков — мелкие блоки просто добавляются в список доступных.

                        При этом даже не нужно писать аллокатор — можно взять один из многих уже готовых.

                        Но еще проще решить вопрос сразу на уровне приложения. Пусть выделением памяти занимается нечто системное, а кэш аллокацию построить исходя из задачи. Ну и главное помнить, что частая аллокация и деаллокакация — признак ошибки в днк проектировании.
                          0
                          Еше немного дополню.

                          Да, решение с суперблоками под каждое ядро/поток будет неоптимальным в части фрагментации памяти.

                          Но вопрос в том, какова основная цель. Если скорость в какой-то опредеденной узкой задаче, то при локальных суперблоках будет получен максимальный результат. При этом чем более узкая задача, тем меньше будет и фрагментации.
                      0
                      В целом статья понравилась НО:
                      1) где КОД — Я ХОЧУ КОД — GitHub в этом Вам поможет, и не забудьте добавить README.md с кратким описанием исходников + ссылку на пост
                      2) по поводу compare_and_swap64 не проще ли бы было сразу обернуть в ASM CMPXCHG8B? Поскольку только она гарантировано дает 1 шаговый обмен, например Apple GCC на x64 разворачивает этот код в более чем 6 инструкций — что в многопоточной среде может привести к эпик фэйлу. (говорю поскольку были случаи)
                      3) про грамматику и мелкие недочеты — отписал в личку.
                      4) от себя лично бы просил добавить к заголовкам функций комментарии для autodoc.
                      5) картники — о боже, зачем JPG ??? — Перезалейте например их в PNG на habrastorage — так будет реально лучше.

                      В целом статья хорошая от меня +1 (жаль что кармы на это не хватает).
                      Продолжайте дальше.
                        +1
                        Эээ??? Там же для винды пример там есть специальная функция на этот счёт. А на других платформах, естественно, нужно иные макросы использовать? А где Вы в GCC нашли InterlockedExchangeAdd? А если нашли, то он будет работать он же Interlocked! ASM будет не кросс-платформенно и не понятно.
                          0
                          Это я про compare_and_swap64, естественно.
                            –1
                            :)
                            В GCC под Windows InterlockedExchangeAdd эта функция как-раз есть.
                            По тексту не было комментарий на эту тему, а вот пример для SSMAlloc — github.com/Naruil/SSMalloc/blob/master/include-x86_64/atomic.h о чем Я и говорил.
                              0
                              Да, конечно же я имел в виду Apple.
                              Приведённый Вами файл заголовка — под конкретную архитектуру и конкретный компилятор. Т.е. не переносимо. А автор явно написал, что это под Windows — там рекомендуется использовать именно InterlockedExchangeAdd. К тому же это понятнее — даже не знакомый с WinAPI поймёт что это, зачем и чем заменить на целевой платформе.
                            +2
                            Вам таки реально нужен код 2004 года? ;-)

                            Реализаций таких много, одна из них на гитхабе, как заказывали.

                            Это ж больше исследовательская работа, подробно расписанная самим автором, ссылки приведены. Код там кстати ровно такой же и без комментариев.
                            +1
                            лучше наверно оригинал прочитать
                            people.cs.vt.edu/~scschnei/papers/ismm06.pdf
                            people.cs.vt.edu/~scschnei/streamflow/
                              0
                              Это не оригинал, это дальнейшие исследования, более поздние. Последний график я утянул именно отсюда. А оригинал я привел первой ссылкой в конце.
                              +1
                              Совершенно непонятно назвачение #pragma pack(1) в первой структуре и наличие unsigned __int64 _pad0[8]; (да ещё два раза) во второй.

                              Какие-то глюки были и вы их так замаскировали?
                                –1
                                это старый как мир прием — если не указать pack(1) то компилятор может делать выравнивание переменных шире — например по 256 байтной границе в сумме с кросс-платформенностью может выдать глюки.
                                для x86 это НЕ принципиально, а вот для других платформ — вполне.
                                  +1
                                  Мне кажется, вы не понимаете, что делаете.

                                  Вначале вы принудительно указали компилятору, что для этой структуры не использовать требования к alignment. Как следствие, все interlocked функции будут глючить. Смотрите здесь, например: blogs.msdn.com/b/oldnewthing/archive/2004/08/30/222631.aspx

                                  Потом вы начали вставлять по 8*8=64 байта перед/после каждой top_aba_t (без этого же не работало, да?). В результате вместо 8-ми байтовой top_aba_t используется 130-байтовая структура.

                                  Вы точно разбираетесь в том, что пишете?

                                  ps: oх, вы не автор.
                                    0
                                    msdn.microsoft.com/en-us/library/2e70t5y1(v=vs.80).aspx — Я предлагал использовать CMPXCHG8B для аллоцирования памяти по 8байт за 1 такт, там глюков нету — Я уже применял такой прием для атомарного изменения ячейки памяти в общей среде выполнения.

                                    По поводу метода — это не ко мне.

                                    А к чему тогда комментарий?

                                +8
                                Несколько комментариев:
                                1. Структура данных в статье — это стек, а не очередь, она не FIFO, а LIFO. Реализовать lock-free очередь очень сложно. Почти все статьи о lock-free очередях на связных списках не верны.
                                2. Из статьи не понятно, зачем нужен ocount. Это реализация так назвыемых tagged pointer. Идея в том, что если бы его не было, то возможен был бы такой сценарий (называется проблемой ABA — поэтому и структура в статье называется top_aba_t):
                                Пусть операция pop выглядит примерно так:
                                do {
                                  long snapshot = stack->head;
                                  long next = snapshot->next;
                                } while (!cas(&stack->head, next, snapshot));
                                

                                В принципе код deque в статье делает именно это, но с ocount. Теперь рассмотрим такой сценарий: на вершине стека лежит элемент А, за ним элемент B (стек выглядит как A -> B). Мы запомнили snapshot = A, next = B.
                                В этот момент другой поток убрал элемент A, затем вставил элемент C, и вставил обратно А. Теперь стек выглядит как:
                                A -> C -> B
                                Операция pop просыпается, и ее CAS срабатывает (stack->head == snapshot, они оба равны A), и заменяет stack->head на B. Элемент C теряется.
                                С ocount такого не произойдет, потому что свежевставленный A будет иметь другой ocount, и CAS провалится.
                                ocount спасает конечно только на практике. В теории после того как мы запомнили snapshot и next другой поток может удалить A 2^18 раз, пока ocount не примет такое же значение как было, и ABA проблема опять произойдет.
                                На современном железе конечно никто не отводит под указатель 48 бит. Вместо этого используется две 64-битных переменных, идущих подряд, первая под указатель, вторая под ocount (теоретический сценарий с вставкой элемента А много раз становится еще более теоретическим), и используется double cas.

                                И пара мелких недочетов — в коде в реализации стека (очереди в терминологии статьи :)) ocount для новой переменной не копируется со старой, то есть он всегда будет равен единице (или, что более вероятно, какому-то мусору — переменная заводится на стеке, и конструктора у нее нет).
                                И вот такие конструкции:
                                        old_top.ocount = queue->both.ocount;
                                        old_top.top = queue->both.top;
                                

                                Вообще говоря опасны — queue->both может поменяться между этими двумя строками. Кажется, что в этом случае это не приведет ни к чему страшному, просто провалится cas и начнется другая итерация, но не понятно, почему не присвоить в одну операцию (например можно top_aba_t сделать union с long long, и копировать этот long long).
                                  +1
                                  Комментарий гораздо полезнее самой статьи, спасибо.
                                    0
                                    ПО поводу union — современные компиляторы, по-идее, в обоих случаях сгенерируют одинаковый код. И не понятно с чего бы union был лучше — если архитектура x32/x64 то всё равно будет не атомарно. Чем копирование юниона лучше-то?
                                      0
                                      union не обязателен, но важно не разбивать это присвоение на две команды. union упрощает с той точки зрения, что CAS принимает как аргумент long, и не придется делать каст, если сделать union.
                                      Ваша точка зрения, что компилятор сообразит эти два присвоения преобразовать в одно 64-ех битное копирование неверно.

                                      Почему -- смотрите под спойлер.
                                      В следующем коде, если бы ваше предположение было верно, все три блока в thread2 были бы идентичны, и ни один из них никогда бы ничего не вывел. На деле же скомпилированный G++ код многократно выводят OMG1
                                      #include <thread>
                                      #include <assert.h>
                                      
                                      struct test_t { union { struct { volatile int64_t a:48, b:16; }; int64_t i; }; };
                                      
                                      test_t test;
                                      
                                      void thread1()
                                      {
                                          while (true)
                                          {
                                              test_t x;
                                              x.a = (test.a + 1) % 1000;
                                              x.b = x.a;
                                              test.i = x.i;
                                          }
                                      }
                                      
                                      void thread2()
                                      {
                                          while (true)
                                          {
                                              {
                                                  test_t x;
                                                  x.a = test.a;
                                                  x.b = test.b;
                                                  if (x.a != x.b) printf("OMG 1, %ld != %ld\n", x.a, x.b);
                                              }
                                              {
                                                  test_t x = test;
                                                  if (x.a != x.b) printf("OMG 2, %ld != %ld\n", x.a, x.b);
                                              }
                                              {
                                                  test_t x; x.i = test.i;
                                                  if (x.a != x.b) printf("OMG 3, %ld != %ld\n", x.a, x.b);
                                              }
                                          }
                                      }
                                      
                                      int main()
                                      {
                                          assert(sizeof(test_t) == 8);
                                          std::thread t1(thread1);
                                          std::thread t2(thread2);
                                          t1.join();
                                          t2.join();
                                          return 0;
                                      }
                                      


                                      Ключи компиляции, которые я использовал:
                                      g++ -std=c++0x test.cpp -pthread -m64 -O2
                                        0
                                        … а каст приведёт к нарушению strict aliasing rules.
                                          0
                                          В данный момент только при -O3, и пока не затрагивает Visual Studio (судя по коду, windows).
                                          Но будет бомбой замедленного действия.
                                            0
                                            > В данный момент только при -O3

                                            Забыли добавить «в моей версии компилятора, которым я пользуюсь»

                                              +1
                                              Сорри, у g++ конечно.
                                          –2
                                          Крайне вредная оптимизация. Будет работать на некоторых компиляторах, некоторых архитектурах при некоторых настройках. Есть мнение, что так делать нельзя. Однажды что-то изменится в компиляторе и никто и никогда не узнает, почему программа перестала работать.
                                            +1
                                            Делая 64-ех битный CAS мы уже делаем определенные предположения об архитектуре (например, что она по меньшей мере 64-ех битная).

                                            Вариант в коде в статье использует casts:
                                            #define compare_and_swap64(address, old_value, new_value) \
                                                (InterlockedCompareExchange64(\
                                                      (PLONGLONG)(address), (__int64)(new_value), (__int64)(old_value)) \
                                                 == (__int64)(old_value))
                                            


                                            Мало того, что он использует cast, он использует C-style cast, который в С++ по хорошему не должен применяться никогда. Выше объяснили, почему такой вариант как раз-таки работать не обязан.

                                            Использование union — это хороший способ реализовать 64-ех битный tagged pointer в данной ситуации и избежать casts.
                                            Какой вариант вы предлагаете, если union — это «крайне вредная оптимизация»?
                                              0
                                              Кстати C-style cast это не хорошо, но не страшно. А варианту в статье, для полного счастье не хватает приведения к volatile в некоторых местах. Либо объявление, как volatile, и приведения к не-volatile там, где нужна оптимизация (да, есть случаи, когда можно убирать volatile, но это рискованный трюк).
                                              +1
                                              Приведите пример. В доках по union ясно сказано, что все его элементы хранятся в одной памяти, так что же по-вашему может пойти не так? Это не праздный интерес, я как раз люблю такие вещи делать, интересно знать, почему так нельзя.
                                                –1
                                                1. Вы не знаете, когда компилятор сохранит значение при присвоении. И сохранит ли он его вообще. А код многопоточный.
                                                2. Вы снова не знаете на какой архитектуре это будет работать. То, что вы делаете предположения на счёт того, что она 64-битная ни о чём не говорит. А если её потом соберут под x86? Писать в каждом файле НЕ СОБИРАТЬ ПОД x86? А если соберут под ARM? Там вообще всё несколько иначе работает.
                                                3. Вы в любом случае не знаете на какой архитектуре это будет работать. Кто-то когда-то расположит ваш union по не выравненному адресу и дальше поведение будет зависеть от количества процессоров, организации кэш-памяти, разрядности шины и т.п.
                                                4. Ваш код зависит от настроек компилятора — где гарантия, что в следующих версиях, поведение компилятора будет таким же? Есть всякого рода форки GCC, поведение которых отличает от эталона.
                                                5. При таком подходе есть риск однажды получить нарушение pointer aliasing, тогда уже кошмар начнётся — может самые непредсказуемым образом перестать работать с -O2.
                                                Вы пытаетесь найти оправдание плохому коду? Что-то вроде, я пишу под конкретный компилятор, конкретную архитектуру и конкретные настройки? Так это как-минимум не профессионально.
                                                  0
                                                  Во-первых, я не пытаюсь искать оправдание плохому коду, я искренне считаю его хорошим. Во-вторых, вы неубедительны, вы говорите, что что-то где-то когда-то может быть. Меня может автобус вечером сбить, например, я же не меняю свою жизнь из за этой вероятности. Вы либо приведите конкретный пример, либо ссылку в стандарт.
                                                  Пока мой код работал на win\linux на обоих 86- и 64-битных архитектурах и, соответственно, ОС. Машин не то чтобы много, штук 7-8 тестировалось. И под gcc(both win\linux) и под msvc компилерами работает. То есть пока все ок, ничего даже с -O3 не глючит. Код pure C98, к слову.
                                                  Под ARM, кстати, тоже работает (android-ndk), штук 8 мобильникопланшетов пробовалось.
                                                  Вот отсутствие проблем меня убеждает, а вероятность их в будущем меня может убедить только ссылкой в стандарт си.
                                                    0
                                                    Странный спор. Вы мне показывает плохой код, и говорите что он работает. При этом априори известно, что он может не работать. Ну, что тут сказать, пишете такой код дальше
                                                      0
                                                      априори известно, что он может не работать

                                                      В том-то и дело, что мне не известно. Я вас же и прошу подтверждение этого факта привести. Нельзя же просто на слово верить. Я не могу найти в литературе по си указания на ub или еще каких противопоказаний.
                                                        0
                                                        Факт чего? Того, что Вы НЕ знаете, какой код сгенерирует компилятор для не-volatile обращения? Это написано в стандарте C.
                                                          0
                                                          В стандарте С написано, что этот код будет работать. Вернее, я не нашел там ничего насчет того, что работать он не будет. Вы же заявляете, что может и нет.
                                                            0
                                                            В стандарте написано, что вы не можете делать предположений о том, когда и каким образом компилятор выполнит модификацию переменной. Для многопоточного когда этого достаточно. Что они должны были ещё написать?
                                                              0
                                                              Вы как-то отклоняетесь от темы при этом говоря верные вещи, но не касающиеся изначального вопроса добавлять в юнион long, чтобы одной строкой кода копировать\кастовать это значение. В многопоточном коде, как вы верно сказали, все может пойти крахом, но тут что добавлять в юнион лонг, что руками кастовать и копировать — одно и то же. В любом случае нужна блокировка или что-то неблокирующее. Я, собственно, хочу сказать что этот лонг в юнионе будет вести себя точно так же, как касты руками (по крайней мере не хуже).
                                                              Я нигде не говорил, что я забиваю на синхронизацию в многопоточной среде, расчитывая на атомарность чего-либо.
                                                                0
                                                                Вопрос именно в многопоточности, которая тут имеет место быть, и именно при работе с этой структурой. В иных случах (без многопоточности), если вложенная в uinion структура не слишком сложная (чтобы компилятор не запутался), то это использовать можно.
                                                                С многопоточностью же если хотим какой-то портабельности, то поле long нужно тоже сделать volatile.
                                                                Без многопоточности (и соответственно без volatile), такая оптимизация практически не имеет смысла, так как:
                                                                union {
                                                                struct {
                                                                int a;
                                                                int b;
                                                                };
                                                                long long l;
                                                                } u1,u2;
                                                                u1.a=u2.a; u1.b=u2.b;
                                                                u1.l=u2.l;

                                                                Последние две строки если оптимизация не '-O0' после компиляции должны быть эквивалентны (либо у компилятора должны быть основания не оптимизировать копирование — например, дальнейшее использование одного из параметров, когда можно избежать лишней команды пересылки). Об этом случае, кстати, говорится во множестве (всех?) всяких комментариев к стандарту C/C++ — не оптимизируйте копирование через union.
                                                                И ещё один момент, уж если и писать такой код, то обязательно указывая packed для структуры, т.к. если выравнивание на целевой платформе будет отличаться, то int64 может вовсе не равен двум int32.
                                                                Как-то так.
                                                  0
                                                  Кстати, если будет несколько таких присвоений, то компилятор может это соптимизировать в какие-нибудь блочные операции (в зависимости от процессора и архитектуры) и тогда вы вообще не сможете сказать, как оно работает.
                                                    0
                                                    Если оптимизация компилятора ломает корректность кода, то это вина компилятора и будет пофикшено. Да, баги такого типа — тот еще геморрой и проблемы, спорить не буду, но проблема не в моем коде, все-таки давайте исходить из того, что мы можем лично контролировать.
                                                    И вы не подумайте, что это я холивара ради спорю, мне реально стремно, что я ошибаюсь и все может рухнуть, и виноват буду я, а не разрабы компилера\либ\ос.
                                                      0
                                                      Мы НЕ можем контролировать архиеткутуру. Так как код должен быть портабельным или хотя бы легко адаптироваться. А мы НЕ знаем на каких архитектурах и компиляторах он будет работать.
                                                        0
                                                        И, кстати, это не вина компилятора — это и есть оптимизация. Компилятор не знает, что программист ССЗБ и собрался выстрелить себе в ногу. Никто этого фиксить не будет — это нормальное поведение. Для других случаев есть volatile, и для совсем уж тяжёлых — sheduler barrier и тому подобное.
                                                  0
                                                  Во второй задаче, в последнем варианте вообще может ничего не копироваться (и скорее всего не будет).
                                                  Во втором варианте второй задачи, тоже самое — может не быть копирования.
                                                  Первые вариант естественно будет приводит к коллизиям.
                                                  В любом случае, даже при известном компиляторе и архитектуре, никто ничего не гарантирует. Например может быть система с некогеренентным кэом процессоров. Или архитектура, где транзакция чтения 64-битного слова из памяти с меньшей разрядностью может быть прервана обращением другого процессора.
                                                  Или банальное нарушение выравнивание (вот напишете такой код, а он будет на packed структуре использоваться), и всё — глюки зависят от конкретной системы и процессора.
                                                  В общем, в данном случае два нарушения — приведение volatile к не-volatile в многопоточном выражении и нарушение aliasing (а если забыть и начать с указателями работать, так и вообще эпических зад начнётся из-за нарушения pointer aliasing)
                                                    +1
                                                    Короче, такой код писать нельзя ни в коем случае — этот код может работать только под конкретным компилятором, с конкретными настройками и на конкретной архитектуре (причём архитектура это не только x86/x64 и x86/ARM/MIPS, но и способ работы с памятью и кэшем, по поводу которых вообще нельзя делать никаких предположений).
                                                0
                                                Всё прочитал, ничего не понял.

                                                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                                Самое читаемое