Как стать автором
Обновить
143.77
Тензор
Разработчик системы СБИС

Эффективная FIFO-обработка для Node.js и Chrome

Время на прочтение9 мин
Количество просмотров7.7K

"По классике" FIFO-очередь для обработки некоторого потока задач обычно реализуется в виде связанного списка элементов. Но для JavaScript такой подход нехорош - он требует либо создания "обвязки" над элементом очереди в виде дополнительного объекта, содержащего ссылки на сам элемент и указатель на следующий, либо превращения элемента в объект и расширения его таким же указателем.

FIFO-очередь на связном списке
FIFO-очередь на связном списке
Расширение элементов очереди указателями
Расширение элементов очереди указателями

В таких нагруженных системах, как коллектор нашего сервиса мониторинга PostgreSQL-серверов, создание и последующая подчистка Garbage Collector'ом подобных избыточных объектов и полей - непозволительная роскошь.

Но если внимательно посмотреть на эту схему, то можно заметить, что сами элементы очереди A, B, C линейно упорядочены. Так нельзя ли использовать в качестве очереди обычный массив с его .push() и .shift()?..

Использование массива в качестве очереди
Использование массива в качестве очереди

Массив-как-очередь

Давайте сначала напишем микро-тест, который продемонстрирует нам производительность такой очереди в зависимости от ее длины:

// test.js
const queue = [];

const usec = hrtb => {
  const us = process.hrtime(hrtb);
  return us[0] * 1e9 + us[1];
};

const totalLength = 1 << 20; // обрабатываем миллион элементов

console.log('scale | push, us | shift, us');
for (let n = 0; n <= 16; n++) { // перебираем размеры массивов по 2^N
  const ln = 1 << n;
  let tw = 0;
  let tr = 0;
  for (let iter = 0; iter < (totalLength >> n); iter++) { // прогоняем 1M/2^N итераций
    // записываем 2^N случайных целых чисел в массив
    {
      const hrt = process.hrtime();
      for (let i = 0; i < ln; i++) {
        queue.push(Math.random() * 1e9 | 0);
      }
      tw += usec(hrt);
    }
    // считываем все числа
    {
      const hrt = process.hrtime();
      while (queue.length) {
        queue.shift();
      }
      tr += usec(hrt);
    }
  }
  // выводим усредненные данные на один элемент
  console.log(`${n.toString().padStart(5)} | ${(tw/totalLength | 0).toString().padStart(8)} | ${(tr/totalLength | 0).toString().padStart(9)}`);
}

Запустим на текущей LTS-версии 16.17, минимизируя влияние GC:

$ node --expose-gc test

И получим красивый, но не очень приятный график:

Средняя длительность одной операции, нс
Средняя длительность одной операции, нс

.push() держится молодцом, укладываясь в 20нс на интервале длин очереди [32..8192], а за его пределами подрастая до 50-100нс.

А вот с .shift() картина другая. Если к 16-элементному массиву время снижается до 14нс, то к 8192 оно дорастает до 100нс, а затем резкий рост вплоть до 10000нс/элемент при длине массива 65536.

То есть чтобы просто прочитать, даже без какой-либо обработки, все элементы очереди такого размера, потребуется почти секунда! И такое поведение движка V8, лежащего в основе Node.js и Chrome - не новость, есть даже открытая таска Performance: make Array.shift an O(1) operation.

Кольцевой буфер

Но для подобных задач и незачем постоянно двигать элементы в самом массиве - достаточно в фиксированном массиве двигать только лишь указатели на "начало", откуда мы будем читать следующий элемент, и "конец", куда будем записывать.

Такая структура данных называется кольцевым буфером.

Схема работы кольцевого буфера
Схема работы кольцевого буфера

У такой структуры есть два неприятных для производительности момента:

  • необходимость постоянных операций с длиной массива;

  • его конечность, то есть в какой-то момент пишущий "хвост" может догнать "голову" с обратной стороны и начать перезаписывать еще непрочитанный контент.

"Хвост" догнал "голову"
"Хвост" догнал "голову"

Давайте напишем собственную реализацию, которая частично устранит эти проблемы. Лишь частично, поскольку фиксированная длина массива может быть не только проблемой, но и благом - в случае, когда мы можем частью самых старых задач в очереди пожертвовать ради производительности.

if против математики

Начнем нашу реализацию с самого простого варианта: head и tail всегда численно находятся внутри буфера, а если tail уже "убежал за край", то мы сохраним эту информацию в отдельном over-флаге:

class RingBuffer {
  _buffer;
  _tail = 0;
  _head = 0;
  _over = false;

  constructor(len) {
    this._buffer = Array(len).fill();
  }

  get length() {
    return this._over * this._buffer.length + this._tail - this._head;
  }

  push(val) {
    if (this._tail === this._head && this._over) {
      // "хвост" догнал "с обратной стороны" - надо продвинуть и "голову"
      if (this._head++ === this._buffer.length) {
        this._head = 0;
        this._over = false; // при переходе "головы" флаг сбрасываем
      }
    }
    this._buffer[this._tail++] = val;
    if (this._tail === this._buffer.length) {
      this._tail = 0;
      this._over = true;    // при переходе "хвоста" флаг устанавливаем
    }
    return this.length;
  }

  shift() {
    if (this._head !== this._tail || this._over) {
      const val = this._buffer[this._head++];
      if (this._head === this._buffer.length) {
        this._head = 0;
        this._over = false; // при переходе "головы" флаг сбрасываем
      }
      return val;
    }
  }
}

Соответственно, в test.js нам надо поменять инициализацию очереди:

const queue = new RingBuffer(65536); // мы точно знаем, что размер очереди не больше
RingBuffer.shift() существенно быстрее Array.shift()
RingBuffer.shift() существенно быстрее Array.shift()

Но в этой реализации достаточно много сравнений и сложных условий, которые CPU, традиционно, не очень любит.

Давайте чуть изменим логику работы, и if-сравнения пересечения границы массива заменим на операцию взятия остатка по модулю %. Но вспомним, что для некоторых чисел эту операцию можно заменить на & (побитовое "И") - такими числами являются 2^N.

Тесты показывают, что (Math.random() * 1e6 | 0) & 0xFFFF на 1-2% быстрее, чем (Math.random() * 1e6 | 0) % 65536, что сущая мелочь в общей массе, но все-таки идет на пользу общей производительности.

Поэтому мы будем длину массива всегда устанавливать равной некоторой степени 2 - все равно непринципиально, будете вы ее задавать как 1000 или как 1024.

В новой реализации, чтобы не городить сложных условий, будем всегда придерживаться следующей логики:

  • head <= tail - то есть численно читающая "голова" всегда левее, чем пишущий "хвост"

  • 0 <= head < 2 ^ pow - "голова" всегда находится внутри массив

  • 0 <= tail < 2 ^ (pow + 1) - "хвост" может численно убегать за границу массива не более, чем на его длину

  • когда "голова" переходит через границу массива, от "хвоста" отсекаем все лишнее, чтобы он попал в диапазон массива

Возвращаем "хвост" при переходе "головы"
Возвращаем "хвост" при переходе "головы"
class Pow2Buffer {
  _buffer;
  _tail = 0;
  _head = 0;
  _mask; // битовая маска

  constructor(pow) {
    this._buffer = Array(1 << pow).fill(); // .length = 100..00b
    this._mask = this._buffer.length - 1;  //    mask =  11..11b
  }

  get length() {
    return this._tail - this._head;
  }

  push(val) {
    if ((this._tail & this._mask) === this._head && this._tail > this._head) {
      // "хвост" догнал "с обратной стороны" - надо продвинуть и "голову"
      this._head++;
      this._head &= this._mask;
      this._tail = this._head + this._mask; // mask = buffer.length - 1
    }
    this._buffer[this._tail++ & this._mask] = val;
    return this.length;
  }

  shift() {
    if (this._head < this._tail) {
      const val = this._buffer[this._head++];
      if ((this._head & this._mask) === 0) {
        this._head = 0;
        this._tail &= this._mask; // при переходе "головы" вгоняем "хвост" в границы массива
      }
      return val;
    }
  }
}

Теперь при вызове мы указываем степень 2, поэтому в test.js поправим так:

const queue = new Pow2Buffer(16); // 2^16 = 65536
2^N рулит!
2^N рулит!

В этой реализации .push() "стоит" около 16нс, а .shift() - вообще стремится к 2нс.

Добавляем гибкости

Заметим, что ради производительности, мы не "зануляем" прочитанную ячейку массива, поэтому объект остается доступен в памяти и не может быть зачищен GC все то время, пока "хвост" не пройдет весь остальной массив и не перезапишет ячейку новым элементом.

Это побуждает нас использовать в качестве буфера как можно более короткий массив - исходя из графика задержек, 256 элементов будет достаточно эффективно.

С другой стороны, при кратких пиковых нагрузках 256 элементов в очереди нам может не хватить, и мы начнем терять данные, хотя легко можем этого избежать. Достаточно всего лишь позволить буферу при необходимости расширяться и "схлапываться", когда потребность пропадает.

Логично пытаться расширить массив только тогда, когда это действительно необходимо - то есть когда "хвост догнал голову", и запись элемента уже приведет к утрате данных. Тогда, если мы еще не достигли верхнего ограничения на разрешенный размер буфера, мы просто вставляем сегмент из пустых элементов в текущей позиции так, чтобы длина массива удвоилась:

Удвоение длины массива
Удвоение длины массива

Со "схлапыванием" ситуация похожая, с той лишь разницей, что делать мы ее будем пытаться только при переходе "головы" через границу массива:

"Уполовинивание" длины массива
"Уполовинивание" длины массива

Конечно, может оказаться так, что "хвост" уже успел сдвинуться во вторую половину массива. Ничего страшного - тогда просто не будем никак массив менять в этом случае.

class Pow2Buffer {
  _buffer;
  _tail = 0;
  _head = 0;
  _mask;
  _MIN_BUFFER; // минимальный, стартовый размер буфера
  _MAX_BUFFER; // максимально допустимый размер буфера

  constructor(powMin, powMax) {
    this._MAX_BUFFER = 1 << powMax;
    this._MIN_BUFFER = 1 << powMin;
    this._buffer = Array(this._MIN_BUFFER).fill();
    this._mask = this._buffer.length - 1;
  }

  get length() {
    return this._tail - this._head;
  }
  
  _tryExpand() {
    const ln = this._buffer.length;
    if (ln < this._MAX_BUFFER) {
      this._buffer.splice(this._head, 0, ...Array(ln).fill()); // удваиваем длину
      this._mask = this._buffer.length - 1;
      this._tail += ln; // "хвост" теперь снова за границей массива
      this._head += ln; // а "голова" - в конце вставленного сегмента
      return true;
    }
  }

  push(val) {
    if ((this._tail & this._mask) === this._head && this._tail > this._head) {
      if (!this._tryExpand()) { // пытаемся расширить массив
        // вот если расширять уже некуда - тогда двигаем "голову"
        this._head++;
        this._head &= this._mask;
        this._tail = this._head + this._mask;
      }
    }
    this._buffer[this._tail++ & this._mask] = val;
    return this.length;
  }

  _tryCollapse() {
    const ln = this._buffer.length;
    if (ln > this._MIN_BUFFER && (this._tail << 1) < ln) { // "хвост" в первой половине массива
      // подбираем наиболее близкую 2^N-длину не меньше powMin
      this._buffer.length = Math.max(1 << Math.ceil(Math.log2(this._tail + 1)), this._MIN_BUFFER);
      this._mask = this._buffer.length - 1;
      return true;
    }
  }

  shift() {
    if (this._head < this._tail) {
      const val = this._buffer[this._head++];
      if ((this._head & this._mask) === 0) {
        this._head = 0;
        this._tail &= this._mask;
        this._tryCollapse(); // пытаемся "схлопнуть" массив
      }
      return val;
    }
  }
}

Попробуем заведомо заставить наш буфер расширяться в некоторых тестах:

const queue = new Pow2Buffer(8, 16); // размер буфера в диапазоне [256..65536]

Как можно заметить, производительность пострадала лишь немного в тестах, когда буфер вынужден расширяться по несколько раз:

Наводим "красоту"

Осталось совсем немного - сделать все "стильно, модно, молодежно".

Иногда бывает необходимо "заглянуть" в следующий элемент очереди без его извлечения - поддержим обращение queue[0] по аналогии с обычным массивом. Для этого прямо в конструкторе объявим геттер для свойства .0:

  constructor(powMin, powMax) {
    this._MAX_BUFFER = 1 << powMax;
    this._MIN_BUFFER = 1 << powMin;
    this._buffer = Array(this._MIN_BUFFER).fill();
    this._mask = this._buffer.length - 1;
    // поддержка обращения к "следующему" (нулевому) элементу очереди
    Object.defineProperty(this, 0, {
      get : () => this._head < this._tail ? this._buffer[this._head] : undefined
    });
  }

Можно было бы поддержать доступ по произвольному индексу через Proxy так или так, но тогда через него пойдут и все обращения к локальным полям, что явно снизит производительность. К тому же, такой задачи обычно и не возникает.

Давайте также добавим нашей очереди возможность перебора в цикле for .. of. Для этого нам понадобится объявить примитивный генератор:

  * [Symbol.iterator]() {
    while (this.length) {
      yield this.shift();
    }
  }

А заодно - методы .toArray() и .toString():

  toArray() {
    return this._tail <= this._buffer.length // если "хвост" в границах массива
      ? this._buffer
          .slice(this._head, this._tail)     // просто вырезаем [head, tail]
      : this._buffer
          .slice(this._head)
          .concat(                           // иначе "клеим" [head, ...] + [..., tail]
            this._buffer
              .slice(0, this._tail & this._mask)
          );
  }

  toString() {
    return this.toArray().toString();
  }

Ну, и сделаем, наконец, все приватные свойства и методы такими с точки зрения языка, заменив префикс _ на #:

class Pow2Buffer {
  #buffer;
  #tail = 0;
  #head = 0;
  #mask;
  #MIN_BUFFER;
  #MAX_BUFFER;

  constructor(powMin, powMax) {
    this.#MAX_BUFFER = 1 << powMax;
    this.#MIN_BUFFER = 1 << powMin;
    this.#buffer = Array(this.#MIN_BUFFER).fill();
    this.#mask = this.#buffer.length - 1;

    Object.defineProperty(this, 0, {
      get : () => this.#head < this.#tail ? this.#buffer[this.#head] : undefined
    });
  }

  get length() {
    return this.#tail - this.#head;
  }

  #tryExpand() {
    const ln = this.#buffer.length;
    if (ln < this.#MAX_BUFFER) {
      this.#buffer.splice(this.#head, 0, ...Array(ln).fill());
      this.#mask = this.#buffer.length - 1;
      this.#tail += ln;
      this.#head += ln;
      return true;
    }
  }

  push(val) {
    if ((this.#tail & this.#mask) === this.#head && this.#tail > this.#head) {
      if (!this.#tryExpand()) {
        this.#head++;
        this.#head &= this.#mask;
        this.#tail = this.#head + this.#mask;
      }
    }
    this.#buffer[this.#tail++ & this.#mask] = val;
    return this.length;
  }

  #tryCollapse() {
    const ln = this.#buffer.length;
    if (ln > this.#MIN_BUFFER && (this.#tail << 1) < ln) {
      this.#buffer.length = Math.max(1 << Math.ceil(Math.log2(this.#tail + 1)), this.#MIN_BUFFER);
      this.#mask = this.#buffer.length - 1;
      return true;
    }
  }

  shift() {
    if (this.#head < this.#tail) {
      const val = this.#buffer[this.#head++];
      if ((this.#head & this.#mask) === 0) {
        this.#head = 0;
        this.#tail &= this.#mask;
        this.#tryCollapse();
      }
      return val;
    }
  }

  * [Symbol.iterator]() {
    while (this.length) {
      yield this.shift();
    }
  }

  toArray() {
    return this.#tail <= this.#buffer.length
      ? this.#buffer
          .slice(this.#head, this.#tail)
      : this.#buffer
          .slice(this.#head)
          .concat(
            this.#buffer
              .slice(0, this.#tail & this.#mask)
          );
  }

  toString() {
    return this.toArray().toString();
  }
}

На этом сегодня все. Надеюсь, данный концепт позволит вам получить более производительные Node.js-решения.

Теги:
Хабы:
Всего голосов 16: ↑16 и ↓0+16
Комментарии7

Публикации

Информация

Сайт
sbis.ru
Дата регистрации
Дата основания
Численность
1 001–5 000 человек
Местоположение
Россия