Как стать автором
Обновить

Пишем на С++ вектор, умеющий расширяться без копирования элементов

Время на прочтение9 мин
Количество просмотров21K

В языке С есть функции malloc, free и realloc. При использовании последней вы можете написать этакий расширяющийся массив из примитивных типов или структур (классов-то нет), который, можно надеяться, не будет копировать все данные при каждом расширении. В С++ есть встроенный класс vector, который представляет из себя расширяющийся массив, но он так не умеет: при каждом расширении вектора выделяется новый участок памяти и все элементы перемещаются на него (по возможности, с использованием move-семантики). Но ведь, если можно каждый раз не копировать все старые элементы на новое место, вектор должен работать быстрее? В этой статье я попробую написать вектор, который умеет расширяться без копирования элементов.

Сначала я покажу, что стандартный вектор не умеет расширяться без копирования/перемещения, потом обсужу придуманное решение, потом собственно приведу реализацию вектора, а в конце сравню производительность со стандартным вектором.

Код приведён здесь.

Как работает стандартный вектор

Сначала давайте убедимся в том, что вектор при каждом расширении копирует (перемещает) все элементы. Для этого напишем класс, который содержит счётчик копирований, перемещений и вызовов конструктора:

class Int
class Int {
private:
    int value;

public:
    static int moves;
    static int copies;
    static int constructed;

    Int() = delete;

    explicit Int(int a): value(a) {
        constructed++;
    }

    ~Int() {
        constructed--;
    }

    Int(const Int& other) {
        value = other.value;
        copies++;
        constructed++;
    }

    Int(Int&& other) noexcept {
        value = other.value;
        other.value = 0;
        moves++;
        constructed++;
    }

    Int& operator = (const Int& other) = delete;

    Int& operator = (Int&& other) = delete;
};

Далее запустим простой код, чтобы увидеть, сколько копирований/перемещений произойдёт:

простой код
int main() {
    vector<Int> vec;
    for (int i = 0; i < 16; i++) {
        vec.emplace_back(i + 1);
    }
    cout << "copies " << Int::copies << " moves " << Int::moves << "\n";
    return 0;
}

Этот код выведет: "copies 0 moves 15" - можно легко прикинуть, что сначала вместимость вектора была 1, потом 2, потом 4 и так далее до 16. В итоге при расширениях произошло 1 + 2 + 4 + 8 = 15 перемещений. Плохо! Если в классе Int удалить move-конструктор, то вместо 15-и перемещений будет 15 копирований.

Идея решения

Сначала убедимся в том, что функцию realloc из С использовать нельзя.

Функция realloc принимает на вход указатель на начало уже выделенного участка памяти и размер в байтах. Она пытается выделить кусок памяти нужного размера сразу вслед за уже выделенным участком памяти и вернуть изначальный указатель - таким образом размер выделенной памяти будет увеличен без копирования. Если же выделить новый участок памяти сразу вслед за данным невозможно, то данная функция выделит новый участок памяти и скопирует туда весь старый участок памяти, после чего освободит его.

Проблема в том, что произвольные С++ объекты нельзя копировать просто так - нужно вызывать конструктор копирования или перемещения, а потом вызывать деструктор старого объекта. Ничего такого, конечно, realloc не делает, поэтому использовать его мы не сможем.

Если бы в стандартной библиотеке С была функция, назовём её try_realloc, которая подобно realloc пытается увеличить данный ей участок памяти, а в случае неудачи ничего не делает, то мы могли бы её и использовать. Но такой, насколько мне известно, нет. Поэтому придётся делать что-то другое.

Одним решением могло бы быть написание собственных функций malloc, free, realloc и try_realloc. Однако, написание эффективной реализации этих функций - вопрос сложный. Поэтому пойдём другим путём.

mmap

В Linux есть системный вызов mmap . Если среди флагов указать MAP_ANONYMOUS, то этот системный вызов занимается по сути тем, что выделяет процессу новую память (возможно, по сути ядро ОС не будет сразу же выделять столько же физической памяти, сколько её попросят, но вот участок виртуальной памяти оно выделит, под который по мере необходимости будет выделять страницы физической памяти). Если же указать ненулевой первый аргумент mmap и указать среди флагов MAP_FIXED_NOREPLACE,то ядро по возможности выделит нам память ровно там, где мы попросили, а если не сможет - вернёт ошибку. А если среди флагов указать просто MAP_FIXED, то мы рискуем выстрелить себе в ногу, поскольку в итоге ядро может удалить какую-то другую память, поэтому так мы делать не будем.

Давайте посмотрим, как это работает. Ниже приведён код, который выделяет одну страницу памяти где получится, потом пытается сразу после неё получить ещё одну страницу, потом - две и так далее.

код
void experiment() {
    const int PAGE_SIZE = 4096;
    char *addr = (char *) mmap(nullptr, PAGE_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
    int total_alloc = 1;
    auto next_addr = addr + PAGE_SIZE * total_alloc;
    for (int i = 0; i < 15; i++) {
        char *new_addr = (char *) mmap(next_addr, PAGE_SIZE * total_alloc, PROT_READ | PROT_WRITE,
                                       MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED_NOREPLACE, -1, 0);
        if (new_addr == MAP_FAILED) {
            cerr << "MAP_FAILED\n" << total_alloc;
            return ;
        }
        next_addr = new_addr + PAGE_SIZE * total_alloc;
        total_alloc *= 2;
    }
    cerr << "returning 0" << endl;
    cout << "total_alloc: " << total_alloc << endl;
}

вывод:

MAP_FAILED
1

Увы, но этот код выводит на моей машине выделяет всего одну страницу памяти, а при попытке выделить ещё одну сразу же после неё происходит ошибка. В чём проблема? Видимо, память сразу после изначально выделенной страницы уже занята. Нам нужно каким-то образом найти большой незанятый участок виртуальной памяти. В принципе, можно было бы изучить то, что и куда ядро Linux размещает в виртуальной памяти, найти место, где начинается какой-нибудь большой участок и захардкодить его. На моей машине, если в примере выше поставить как первый аргумент mmap адрес 0x7f45c73fd000, то выделяется запрошенное количество памяти. Но такой подход будет зависеть от конкретной реализации. Кроме того, придётся думать, как разным векторам найти разные участки виртуальной памяти. Поэтому можно сделать так: вначале с помощью mmap получаем адрес большого свободного куска, а потом сразу же отдаём его, но в дальнейшем начинаем просить память с того адреса, который получили от первого вызова:

Hidden text
void experiment() {
    const int PAGE_SIZE = 4096;
    int initial_pages = 32768;
    char *start_addr = (char *) mmap(nullptr, PAGE_SIZE * initial_pages, PROT_READ | PROT_WRITE,
                                     MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
    if (start_addr == MAP_FAILED) {
        cerr << "initial mmap failed\n";
        return ;
    }
    munmap(start_addr, PAGE_SIZE * initial_pages);
    char *addr = (char *) mmap(start_addr, PAGE_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
    // printf("start_addr: %lx\n", (long )start_addr);
    int total_alloc = 1;
    auto next_addr = addr + PAGE_SIZE * total_alloc;
    for (int i = 0; i < 15; i++) {
        char *new_addr = (char *) mmap(next_addr, PAGE_SIZE * total_alloc, PROT_READ | PROT_WRITE,
                                       MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED_NOREPLACE, -1, 0);
        if (new_addr == MAP_FAILED) {
            cerr << "MAP_FAILED\n" << total_alloc;
            return ;
        }
        next_addr = new_addr + PAGE_SIZE * total_alloc;
        total_alloc *= 2;
    }
    cout << "total_alloc: " << total_alloc << endl;
}

Теперь перейдём к написанию вектора.

Реализация вектора

Внутри вектора будем хранить указатель на имеющийся у вектора участок памяти, количество элементов в векторе и количество страниц памяти, которое выделено.

template<typename T>
class MyVector {
public:
    explicit MyVector(size_t max_capacity = 32768);

    ~MyVector();
 
    MyVector(const MyVector&) = delete;

    MyVector(MyVector&& other) noexcept;

    MyVector& operator = (const MyVector&) = delete;

    MyVector& operator = (MyVector&& other)  noexcept;

    T& operator [] (size_t ind);

    const T& operator [] (size_t ind) const;

    void pop();

    void push_back(const T& element);

    template<typename ...Args>
    void emplace_back(Args ...args);

private:
    static const int PROT = PROT_WRITE | PROT_READ;
    static int DEFAULT_FLAGS;

    static size_t PAGE_SIZE;

    void clear();

    bool need_increasing();

    void increase_capacity();

    void fallback_allocate();

    char* raw_data;
    size_t pages_allocated;
    size_t size;
};

Методы pop, push_back и операторы[] устроены вполне стандартно. Приведу здесь код конструктора, деструктора и методов, расширяющих вектор.

template<typename T>
MyVector<T>::MyVector(size_t max_capacity, bool huge_pages) {
    char* initial_address = (char* ) mmap(nullptr, (max_capacity * sizeof(T) / PAGE_SIZE + 1) * PAGE_SIZE, PROT,
                                          DEFAULT_FLAGS, -1, 0);
    if (initial_address == MAP_FAILED) {
        throw runtime_error(string("MAP_FAILED1 ") + strerror(errno));
    }
    munmap(initial_address, (max_capacity * sizeof(T) / PAGE_SIZE + 1) * PAGE_SIZE);
    raw_data = (char* ) mmap(initial_address, PAGE_SIZE, PROT, DEFAULT_FLAGS | MAP_FIXED_NOREPLACE, -1, 0);
    if (raw_data == MAP_FAILED) {
        throw runtime_error(string("MAP_FAILED2 ") + strerror(errno));
    }
    pages_allocated = 1;
    size = 0;
}

max_capacity - это совет от пользователя вектора, который будет подсказывать, сколько максимально элементов могут положить в вектор, но вектор будет работать и если элементов будет больше. Итак, вначале конструктор находит большой свободный кусок виртуальной памяти (с помощью mmap и затем munmap), затем выделает одну страницу памяти.

Деструктор вызывает метод clear, который приведём ниже:

template<typename T>
void MyVector<T>::clear() {
    T* data = (T*) raw_data;
    for (size_t i = 0; i < size; i++) {
        data[i].~T();
    }
    if (raw_data) {
        munmap(raw_data, pages_allocated * PAGE_SIZE);
    }
    pages_allocated = 0;
    size = 0;
    raw_data = nullptr;
}

Вызываем деструкторы и возвращаем память.

template<typename T>
void MyVector<T>::increase_capacity() {
    char* new_mem = (char* ) mmap(raw_data + pages_allocated * PAGE_SIZE, pages_allocated * PAGE_SIZE, PROT,
                                  DEFAULT_FLAGS | MAP_FIXED_NOREPLACE, -1, 0);
    if (new_mem == MAP_FAILED) {
        fallback_allocate();
        return;
    }
    pages_allocated *= 2;
}

Пытаемся выделить новый участок памяти сразу вслед за уже выделенным. Если не получилось - вызываем другой метод, который выделит новую память где получится и копирует туда все элементы вектора:

template<typename T>
void MyVector<T>::fallback_allocate() {
    //cerr << "fallback_allocate called " << pages_allocated << endl;
    T* data = (T*) raw_data;
    T* new_memory = (T *) mmap(nullptr, pages_allocated * 2 * PAGE_SIZE, PROT, DEFAULT_FLAGS, -1, 0);
    if (new_memory == MAP_FAILED) {
        throw runtime_error(string("MAP_FAILED in fallback_allocate ") + strerror(errno));
    }
    for (size_t i = 0; i < size; i++) {
        new (&new_memory[i]) T(std::move(data[i]));
        data[i].~T();
    }
    munmap(raw_data, pages_allocated * PAGE_SIZE);
    pages_allocated *= 2;
    raw_data = (char *) new_memory;
}

Тестируем производительность

Для тестирования будем запускать следующий код, замеряя время и раскомментруя разные строки:

// компилируем так: g++ -O2 main.cpp
// запускаем так: time ./a.out
int main() {
    {
        int initial_size = 100000000;
        // auto my_vec = MyVector<Int>(PAGE_SIZE / sizeof(Int));
        auto my_vec = MyVector<Int>(initial_size * 2);
        //auto my_vec = vector<Int>();
        //my_vec.reserve(PAGE_SIZE / sizeof(Int));
        // my_vec.reserve(initial_size);
        for (size_t i = 0; i < initial_size; i++) {
            my_vec.emplace_back(i);
        }
    }
    printf("copied: %d, moved: %d, left: %d\n", Int::copies, Int::moves, Int::constructed);
    return 0;
}

В конце код выводит число копирований и перемещений элементов типа Int, а ещё число оставшихся в конце. Последнее число должно быть нулём - если это не так, значит, какие-то деструкторы не вызвались.

Первый запуск - мой вектор, с большим max_capacity в конструкторе:

copied: 0, moved: 0, left: 0

real 0m0,245s
user 0m0,136s
sys 0m0,109s

Всё хорошо - копирований нет, перемещений тоже, все деструкторы отработали.

Второй запуск - используем стандартный вектор, но резервируем место для 4096 / sizeof(Int) элементов - столько мой вектор может хранить до первого расширения.

copied: 0, moved: 134216704, left: 0

real 0m0,732s
user 0m0,472s
sys 0m0,252s

Как и ожидалось, видим, что стандартный вектор неистово перемещает элементы по памяти, работает сильно дольше.

Проверим, сколько времени будет работать стандартный вектор, если ему ничего не придётся перемещать (установим правильный .reserve)

copied: 0, moved: 0, left: 0

real 0m0,411s
user 0m0,307s
sys 0m0,104s

Перемещений нет, но работает всё равно не так быстро, как мой вектор.

Под конец, посмотрим, как будет работать мой вектор, если он не будет заранее знать, сколько элементов мы положим (передадим в конструктор небольшое число).

copied: 0, moved: 134215680, left: 0

real 0m0,634s
user 0m0,389s
sys 0m0,245s

На этот раз ему пришлось перемещать элементы, но работает довольно быстро.

Выводы

Пока что данной реализации надо либо дать какую-то хорошую оценку максимального количества элементов в векторе, чтобы она ничего не перемещала по памяти. Если такая оценка есть, то её можно дать и стандартному вектору в метод reserve, но в случае моей реализации, она не займёт столько памяти сразу же, а только когда (и если) будет действительно столько элементов в векторе. Если такой оценки нет, то можно просто передать большое число - на 64-ёх битных машинах виртуальной памяти очень много, скорее всего участок свободной виртуальной памяти найдётся. При этом в реальности, пока не понадобится, ни физическая, ни виртуальная память израсходована не будет.

Отдельно хочу отметить странность - в моих замерах получилось, что, если стандартному вектору правильно сообщить, сколько элементов будет, он всё равно будет работать медленнее. Почему так - не очень понятно, мне думается, что есть какой-то подвох. Если честно, когда я начинал писать этот код, я вообще не думал, что будет работать быстрее - я предполагал, что из-за большого числа системных вызовов работать будет не очень быстро.

P. S.

На самом деле, стандартные дек, стек и очередь не перемещают свои элементы никуда, но видимо, за счёт более медленного выполнения некоторых операций.

Теги:
Хабы:
Всего голосов 15: ↑11 и ↓4+9
Комментарии70

Публикации

Работа

Программист C++
123 вакансии
QT разработчик
9 вакансий

Ближайшие события