«Простота — требование, необходимое для обеспечения надёжности», — Эдсгер Дейкстра

Невидимая структура данных

В каждой программе используется стек — стек вызовов. Каждый вызов функции записывает в стек кадр, каждый возврат извлекает его. Он настолько фундаментален, что мы редко о нём задумываемся.

Но когда нам нужен собственный стек или очередь, крайне важно правильно выбрать реализацию.

Однажды я отлаживал вылет прошивки во встраиваемой системе RISC-V. У системы был планировщик задач, использующий очередь для управления ожидающими задачами. При большой нагрузке система вылетала с переполнением стека.

Переполнение стека? Очередь должна была находиться в куче, а не в стеке.

Проблема заключалась не в самой очереди, а в том, как она была реализована. Для очереди использовался связанный список, и каждый вызов malloc() выполнял распределение из пула памяти, делившего пространство со стеком. Под нагрузкой очередь разрасталась, пул фрагмен��ировался и рано или поздно стеку не оставалось места для роста.

Как же мы устранили проблему? Заменили очередь на основе связанного списка кольцевым буфером — очередью на основе массива фиксированного размера, получив при этом отсутствие динамического распределения, предсказуемое использование памяти и десятикратный рост скорости.

Стек: массив против связанного списка

Давайте начнём со стеков. В учебниках приводится две реализации:

Стек на основе массива:

#define MAX_SIZE 1000

typedef struct {
    int data[MAX_SIZE];
    int top;
} stack_t;

void push(stack_t *s, int value) {
    if (s->top < MAX_SIZE) {
        s->data[s->top++] = value;
    }
}

int pop(stack_t *s) {
    if (s->top > 0) {
        return s->data[--s->top];
    }
    return -1;  // Error
}

Стек на основе связанного списка:

typedef struct node {
    int value;
    struct node *next;
} node_t;

typedef struct {
    node_t *top;
} stack_t;

void push(stack_t *s, int value) {
    node_t *node = malloc(sizeof(node_t));
    node->value = value;
    node->next = s->top;
    s->top = node;
}

int pop(stack_t *s) {
    if (s->top) {
        node_t *node = s->top;
        int value = node->value;
        s->top = node->next;
        free(node);
        return value;
    }
    return -1;  // Error
}

Сравнение из учебника:

  • Массив: push/pop за O(1), но фиксированный размер

  • Связанный список: push/pop за O(1), неограниченный размер

Реальность:

$ perf stat -e cycles,cache-misses ./stack_benchmark
Array stack (1000 ops):
    12,000 cycles
        45 cache-misses

Linked list stack (1000 ops):
   450,000 cycles
    2,100 cache-misses

Связанный список в 37 раз медленнее!

Почему?

  1. Оверхед malloc/free: каждый push/pop вызывает распределитель (~100 тактов)

  2. Промахи кэша: узлы разбросаны по памяти

  3. Следование по указателям: каждый pop следует по указателю (промах кэша)

Когда их использовать:

  • Стек на основе массива: почти всегда (встраиваемые системы, ситуации, когда критична производительность)

  • Стек на основе связанного списка: когда размер непредсказуем, а памяти в избытке

Очередь: кольцевой буфер

Очереди сложнее, чем стеки, потому что необходимо выполнять добавление с одного конца, а удаление с другого.

Наивная очередь на основе массива (плохое решение):

typedef struct {
    int data[MAX_SIZE];
    int front;
    int rear;
} queue_t;

void enqueue(queue_t *q, int value) {
    if (q->rear < MAX_SIZE) {
        q->data[q->rear++] = value;
    }
}

int dequeue(queue_t *q) {
    if (q->front < q->rear) {
        return q->data[q->front++];
    }
    return -1;  // Ошибка
}

Проблема: после множества операций front и rear достигают MAX_SIZE, даже если очередь пуста.

Изначально:             [_, _, _, _, _]  front=0, rear=0
Добавление в очередь:   [1, 2, 3, _, _]  front=0, rear=3
Извлечение из очереди:  [_, 2, 3, _, _]  front=1, rear=3
Извлечение из очереди:  [_, _, 3, _, _]  front=2, rear=3
Добавление в очередь:   [_, _, 3, 4, 5]  front=2, rear=5
Добавление в очередь:   ЗАПОЛНЕНА!       front=2, rear=5 (но элементов всего 3!)

Решение: кольцевой буфер (круговой массив)

typedef struct {
    int data[MAX_SIZE];
    int head;
    int tail;
    int count;
} ring_buffer_t;

void enqueue(ring_buffer_t *q, int value) {
    if (q->count < MAX_SIZE) {
        q->data[q->tail] = value;
        q->tail = (q->tail + 1) % MAX_SIZE;
        q->count++;
    }
}

int dequeue(ring_buffer_t *q) {
    if (q->count > 0) {
        int value = q->data[q->head];
        q->head = (q->head + 1) % MAX_SIZE;
        q->count--;
        return value;
    }
    return -1;  // Ошибка
}

Как это работает:

Изначально:             [_, _, _, _, _]  head=0, tail=0, count=0
Добавление в очередь:   [1, _, _, _, _]  head=0, tail=1, count=1
Добавление в очередь:   [1, 2, _, _, _]  head=0, tail=2, count=2
Добавление в очередь:   [1, 2, 3, _, _]  head=0, tail=3, count=3
Извлечение из очереди:  [_, 2, 3, _, _]  head=1, tail=3, count=2
Добавление в очередь:   [_, 2, 3, 4, _]  head=1, tail=4, count=3
Добавление в очередь:   [_, 2, 3, 4, 5]  head=1, tail=0, count=4  (Хвост уходит в начало!)
Добавление в очередь:   [6, 2, 3, 4, 5]  head=1, tail=1, count=5  (Очередь заполнена)

Производительность:

$ perf stat -e cycles ./queue_benchmark
Ring buffer (1M ops):
    15,000,000 cycles
         1,234 cache-misses

Linked list queue (1M ops):
   520,000,000 cycles
       980,000 cache-misses

Кольцевой буфер в 35 быстрее!

Оптимизация операции деления с остатком

У кольцевого буфера есть одна проблема, связанная с производительностью: операция деления с остатком % MAX_SIZE.

На многих процессорах (особенно встраиваемых), деление/деление с остатком выполняется медленно (10-40 тактов).

Оптимизация 1: размер, равный степени двойки

Если MAX_SIZE равен степени двойки 2, то деление с остатком превращается в побитовую AND:

#define MAX_SIZE 1024  // Должен быть степенью двойки
#define MASK (MAX_SIZE - 1)

void enqueue(ring_buffer_t *q, int value) {
    if (q->count < MAX_SIZE) {
        q->data[q->tail] = value;
        q->tail = (q->tail + 1) & MASK;  // Fast!
        q->count++;
    }
}

Бенчмарк:

Версия с делением с остатком:     15 000 000 тактов
Версия с побитовым AND:            8 500 000 тактов  (в 1,76 раза быстрее)

Оптимизация 2: избавление от поля count

Вместо отслеживания count воспользуемся тем, что head == tail означает пустую очередь:

typedef struct {
    int data[MAX_SIZE];
    int head;
    int tail;
} ring_buffer_t;

int is_empty(ring_buffer_t *q) {
    return q->head == q->tail;
}

int is_full(ring_buffer_t *q) {
    return ((q->tail + 1) & MASK) == q->head;
}

void enqueue(ring_buffer_t *q, int value) {
    if (!is_full(q)) {
        q->data[q->tail] = value;
        q->tail = (q->tail + 1) & MASK;
    }
}

int dequeue(ring_buffer_t *q) {
    if (!is_empty(q)) {
        int value = q->data[q->head];
        q->head = (q->head + 1) & MASK;
        return value;
    }
    return -1;
}

Компромисс: занимает один слот (максимальный размер становится равным MAX_SIZE - 1), но проще и немного быстрее.

Кольцевой буфер без блокировок (единственный источник/потребитель)

Во встроенных системах с прерываниями или многоядерностью часто необходимы безопасные по потокам очереди.

Для единственного источника и единственного потребителя можно создать кольцевой буфер без блокировок:

typedef struct {
    volatile int data[MAX_SIZE];
    volatile int head;  // Записывает только потребитель
    volatile int tail;  // Записывает только источник
} spsc_ring_buffer_t;

// Источник (обработчик прерываний или другое ядро)
void enqueue(spsc_ring_buffer_t *q, int value) {
    int next_tail = (q->tail + 1) & MASK;
    if (next_tail != q->head) {  // Не заполнено
        q->data[q->tail] = value;
        __sync_synchronize();  // Барьер памяти
        q->tail = next_tail;
    }
}

// Потребитель (основной поток)
int dequeue(spsc_ring_buffer_t *q) {
    if (q->head != q->tail) {  // Не пусто
        int value = q->data[q->head];
        __sync_synchronize();  // Барьер памяти
        q->head = (q->head + 1) & MASK;
        return value;
    }
    return -1;
}

Основные пункты:

  • volatile: предотвращает кэширование значений компилятором

  • Барьеры памяти: гарантируют упорядочивание в моделях слабой памяти (ARM, RISC-V)

  • Единственный источник/потребитель: нет необходимости в атомарных операциях

Версия для RISC-V (явный fence):

void enqueue(spsc_ring_buffer_t *q, int value) {
    int next_tail = (q->tail + 1) & MASK;
    if (next_tail != q->head) {
        q->data[q->tail] = value;
        asm volatile("fence w, w" ::: "memory");  // Store-store fence
        q->tail = next_tail;
    }
}

Очередь с приоритетами: двоичная куча

Иногда нам требуется очередь, элементы которой имеют приоритеты. Её стандартная реализация — двоичная куча.

Двоичная куча на основе массива:

typedef struct {
    int data[MAX_SIZE];
    int size;
} heap_t;

void heap_push(heap_t *h, int value) {
    if (h->size >= MAX_SIZE) return;

    // Вставка в конец
    int i = h->size++;
    h->data[i] = value;

    // Поднятие наверх
    while (i > 0) {
        int parent = (i - 1) / 2;
        if (h->data[i] <= h->data[parent]) break;

        // Замена местами
        int temp = h->data[i];
        h->data[i] = h->data[parent];
        h->data[parent] = temp;

        i = parent;
    }
}

int heap_pop(heap_t *h) {
    if (h->size == 0) return -1;

    int result = h->data[0];

    // Перемещение последнего элемента в корень
    h->data[0] = h->data[--h->size];

    // Опускание вниз
    int i = 0;
    while (1) {
        int left = 2 * i + 1;
        int right = 2 * i + 2;
        int largest = i;

        if (left < h->size && h->data[left] > h->data[largest])
            largest = left;
        if (right < h->size && h->data[right] > h->data[largest])
            largest = right;

        if (largest == i) break;

        // Замена местами
        int temp = h->data[i];
        h->data[i] = h->data[largest];
        h->data[largest] = temp;

        i = largest;
    }

    return result;
}

Поведение кэша:

  • Плюсы: на основе массивов, последовательная память

  • Минусы: паттерн произвольного доступа при поднимании вверх/опускании вниз

Производительность: O(log n), но хорошее поведение кэша при малых размерах куч.

Встраиваемые системы: очереди фиксированного размера

Во встраиваемых системах нормой являются очереди фиксированного размера:

Почему?

  1. Предсказуемая память: отсутствие malloc/free

  2. Детерминированная производительность: нет оверхеда распределения

  3. Безопасность для реального времени: нет неограниченных операций

  4. Простота: проще проверять и отлаживать

Пример: буфер приёма UART

#define UART_BUFFER_SIZE 256  // Степень двойки

typedef struct {
    uint8_t data[UART_BUFFER_SIZE];
    volatile uint16_t head;
    volatile uint16_t tail;
} uart_buffer_t;

uart_buffer_t uart_rx_buffer = {0};

// Вызывается из прерывания UART
void uart_rx_isr(void) {
    uint8_t byte = UART_DATA_REG;

    uint16_t next_tail = (uart_rx_buffer.tail + 1) & (UART_BUFFER_SIZE - 1);
    if (next_tail != uart_rx_buffer.head) {
        uart_rx_buffer.data[uart_rx_buffer.tail] = byte;
        uart_rx_buffer.tail = next_tail;
    } else {
        // Буфер полон, отклоняем байт (или устанавливаем флаг ошибки)
    }
}

// Вызывается из основного цикла
int uart_read(void) {
    if (uart_rx_buffer.head == uart_rx_buffer.tail) {
        return -1;  // Пусто
    }

    uint8_t byte = uart_rx_buffer.data[uart_rx_buffer.head];
    uart_rx_buffer.head = (uart_rx_buffer.head + 1) & (UART_BUFFER_SIZE - 1);
    return byte;
}

Главные особенности:

  • Фиксированный размер (256 байт)

  • Степень двойки для быстрого деления с остатком

  • Отсутствие блокировки (единственный производитель/потребитель)

  • Безопасно для ISR (volatile, барьеры памяти имплицитны в ISR)

Пример из реального мира: планировщик задач

Вернёмся к моему вылету прошивки. Вот, как код выглядел до и после:

До (очередь на основе связанного списка):

typedef struct task {
    void (*func)(void);
    struct task *next;
} task_t;

task_t *task_queue = NULL;

void schedule_task(void (*func)(void)) {
    task_t *task = malloc(sizeof(task_t));  // Медленно, фрагментация
    task->func = func;
    task->next = NULL;

    // Добавление в конец очереди
    if (!task_queue) {
        task_queue = task;
    } else {
        task_t *curr = task_queue;
        while (curr->next) curr = curr->next;  // Обход за O(n)!
        curr->next = task;
    }
}

void run_tasks(void) {
    while (task_queue) {
        task_t *task = task_queue;
        task_queue = task->next;
        task->func();
        free(task);  // Медленно
    }
}

Проблемы:

  • malloc/free в ISR (плохая практика)

  • Добавление в очередь за O(n) (обход всего списка)

  • Фрагментация памяти

  • Непредсказуемая производительность

После (кольцевой буфер):

#define MAX_TASKS 32

typedef struct {
    void (*funcs[MAX_TASKS])(void);
    volatile uint8_t head;
    volatile uint8_t tail;
} task_queue_t;

task_queue_t task_queue = {0};

void schedule_task(void (*func)(void)) {
    uint8_t next_tail = (task_queue.tail + 1) & (MAX_TASKS - 1);
    if (next_tail != task_queue.head) {
        task_queue.funcs[task_queue.tail] = func;
        task_queue.tail = next_tail;
    }
    // Если заполнена, задача отклоняется (может установить флаг ошибки)
}

void run_tasks(void) {
    while (task_queue.head != task_queue.tail) {
        void (*func)(void) = task_queue.funcs[task_queue.head];
        task_queue.head = (task_queue.head + 1) & (MAX_TASKS - 1);
        func();
    }
}

Улучшения:

  • Нет malloc/free

  • Добавление в очередь и удаление из очереди за O(1)

  • Фиксированная память (128 байт)

  • Предсказуемая производительность

  • Безопасность для ISR

Результат: вылеты прекратились, планирование задач стало быстрее в 10 раз.

Подведём итог

Вылет прошивки из-за «переполнения стека» на самом был проблемой очереди. Динамическое распределение очереди на основе связанного списка фрагментировало пул памяти, деливший пространство со стеком. После её замены на кольцевой буфер фиксированного размера вылеты прекратились, а планирование задач ускорилось в 10 раз. Невидимая структура данных стала видимой из-за своего сбоя.

Стеки:

  • На основе массивов: быстрые, имеют фиксированный размер, удобны для кэша

  • На основе связанных списков: медленные (malloc/free), неограниченный размер

  • Рекомендация: если размер не абсолютно непредсказуем, используйте стеки на основе массивов

Очереди:

  • На основе кольцевого буфера: быстрые, имеют фиксированный размер, удобны для кэша

  • На основе связанного списка: медленные, неограниченный размер

  • Рекомендация: используйте кольцевой буфер, особенно во встраиваемых системах

Оптимизации:

  • Размер, равный степени двойки, для быстрого деления с остатком (побитовое AND)

  • Отсутствие блокировки при единственном производителе/потребителе

  • Устранение поля count (жертвуем одним слотом ради простоты)

Встраиваемые системы:

  • Очереди фиксированного размера (предсказуемая память)

  • Нет malloc/free (детермированость, безопасность для реального времени)

  • Безопасность для ISR (volatile, барьеры памяти)

  • Размеры, равные степени двойки (быстрые операции)

Очереди с приоритетами:

  • Двоичная куча: O(log n), на основе массива, хорошее поведение кэша

  • Использовать для малых или средних куч (< 10 тысяч элементов)

В следующей главе: хэш-таблицы сочетают в себе скорость массивов с гибкостью динамических структур, однако конфликты кэша могут уничтожить производительность. Мы узнаем, как создавать удобные для кэша хэш-таблицы.