Как стать автором
Обновить

Node.js Streams для чайников или как работать с потоками

Время на прочтение8 мин
Количество просмотров88K
Я думаю многие не раз слышали про Node js Streams, но так ни разу и не использовали, либо использовали, не задумываясь как же они работают, запайпили (pipe) стрим и норм. Давайте же разберемся что такое стримы, запайпить (pipe), чанки (chunk — часть данных) и все такое))



Почему важно понимать как устроены стримы в Node js? Ответ прост: многие встроенные модули в Node js реализуют стримы, такие как HTTP requests/responses, fs read/write, zlib, crypto, TCP sockets и другие. Также стримы вам понадобятся, к примеру, при обработке больших файлов, при работе с картинками. Возможно вы и не будете писать свой собственный стрим, однако понимание как это работает сделает вас более компетентным разработчиком.

Итак, что же такое стрим(далее по тексту буду использовать вместо Stream (поток)). Стрим — это концепция, c помощью которой можно обрабатывать данные небольшими частями, что позволяет задействовать небольшой объем оперативной памяти. Также с ее помощью мы можем разбить обработку каждой части на независимые друг от друга модули (функции либо классы). Например, мы можем сразу сжать часть данных, потом зашифровать и записать в файл. Основная идея в том, чтобы не работать с данными целиком, а поочередно обрабатывать часть данных.

В Node js есть 4 вида стримов:

  • Readable — чтение
  • Writable — запись
  • Duplex — чтение и запись
  • Transform — вид Duplex потока, который может изменять данные

Более подробную информацию вы сможете найти на официальном сайте, а сейчас перейдем к практике.

Простой пример


Думаю, что многие уже использовали стримы, даже не подозревая об этом. В этом примере мы просто отправляем клиенту файл.

// 1 - (используя стримы) загружаем часть файла и отправляем ее, до тех пор пока не отправим весь файл
const getFile = async (req, res, next) => {
  const fileStream = fs.createReadStream('path to file');

  res.contentType('application/pdf');

  fileStream.pipe(res);
};

// 2 - (не используя стримы) загружаем файл полностью в память и затем отправляем
const getFile = async (req, res, next) => {
  const file = fs.readFileSync('path to file');

  res.contentType('application/pdf');

  res.send(file);
};

Разница лишь в том, что в первом случае мы загружаем часть файла и отправляем ее, таким образом, не загружая оперативную память сервера. Во втором случае мы сразу загружаем файл целиком в оперативную память и только потом отправляем.

Далее в статье разберем каждый стрим по отдельности. Стрим можно создать используя наследование или с помощью функции-конструктора.

const { Readable } = require('stream');

// 1 - Используя конструктор
const myReadable = new Readable(opt);

// 2 - Наследуя класс 
class myReadable extends Readable {
  constructor(opt) {
    super(opt);
  }
}

Во всех примерах я буду использовать 2 способ.

Readable stream


Давайте рассмотрим как же нам создать Readable стрим в NodeJS.

const { Readable } = require('stream');

class myReadable extends Readable {
  constructor(opt) {
    super(opt);
  }

  _read(size) {}
}

Как видим из примера выше, этот класс принимает набор параметров. Мы рассмотрим только те, которые нужны для общего понимания работы Readable стрима, остальные вы можете посмотреть в документации. Нас интересует параметр highWaterMark и метод _read.

highWaterMark — это максимальное количество байтов внутреннего буфера стрима (по умолчанию 16кб) по достижению которого считывание из ресурса приостанавливается. Для того, чтобы продолжить считывание, нам нужно освободить внутренний буфер. Мы можем это сделать вызвав методы pipe, resume или подписавшись на событие data.

_read — это реализация приватного метода, который вызывается внутренними методами класса Readable. Он вызывается постоянно пока размер данных не достигнет highWaterMark.

Ну и последний метод, который нас интересует, это readable.push, он непосредственно и добавляет данные во внутренний буфер. Он возвращает true, но как только буфер будет заполнен, то вызов этого метода начнет возвращать false. Он может управляться методом readable._read.

Давайте теперь посмотрим пример для прояснения ситуации.

class Counter extends Readable {
  constructor(opt) {
    super(opt);

    this._max = 1000;
    this._index = 0;
  }

  _read() {
    this._index += 1;

    if (this._index > this._max) {
      this.push(null);
    } else {
      const buf = Buffer.from(`${this._index}`, 'utf8');

      console.log(`Added: ${this._index}. Could be added? `, this.push(buf));
    }
  }
}

const counter = new Counter({ highWaterMark: 2 });

console.log(`Received: ${counter.read().toString()}`);

Для начала скажу, что counter.read() — это не тот _read, который мы реализовали в классе. Тот метод является приватным, а этот — открытым, и он возвращает данные из внутреннего буфера. Когда мы выполним этот код, в консоли мы увидим следующее:



Что же тут произошло? При создании стрима new Counter({ highWaterMark: 2 }) мы указали, что размер нашего внутреннего буфера будет равняться 2-м байтам, т.е. может хранить 2 символа (1 символ = 1 байт). После вызова counter.read() стрим начинает считывание, записывает '1' во внутренний буфер и возвращает его. Затем он продолжает считывание, записывает '2'. Когда он запишет '3', то буфер будет заполнен, readable.push вернет false, и стрим будет ждать, пока внутренний буфер освободится. Т.к. в нашем примере нет логики на освобождения буфера, скрипт завершится.

Как и говорилось ранее, для того, чтобы чтение не прерывалось, нам нужно постоянно очищать внутренний буфер. Для этого мы подпишемся на событие data. Заменим последние 2 строчки следующим кодом.

const counter = new Counter({ highWaterMark: 2 });

counter.on('data', chunk => {
  console.log(`Received: ${chunk.toString()}`);
});

Теперь если мы запустим этот пример, то увидим, что все сработало как надо и в консоли выведутся цифры от 1 до 1000.

Writable stream


На самом деле он очень похож на Readable стрим, только предназначен для записи данных.

const { Writable } = require('stream');

class myWritable extends Writable {
  constructor(opt) {
    super(opt);
  }

  _write(chunk, encoding, callback) {}
}

Он принимает похожие параметры, как и Readable стрим. Нас интересуют highWaterMark и _write.

_write — это приватный метод, который вызывается внутренними методами класса Writable для записи порции данных. Он принимает 3 параметра: chunk (часть данных), encoding (кодировка, если chunk это строка), callback (функция, которая вызывается после успешной или неудачной записи).

highWaterMark — это максимальное количество байтов внутреннего буфера стрима (по умолчанию 16кб), по достижению которого stream.write начнет возвращать false.

Давайте перепишем предыдущий пример со счетчиком.

const { Writable } = require('stream');

class Counter extends Writable {
  _write(chunk, encoding, callback) {
    console.log(chunk.toString());

    callback();
  }
}

const counter = new Counter({ highWaterMark: 2 });

for (let i = 1; i < 1000; i += 1) {
  counter.write(Buffer.from(`${i}`, 'utf8'));
}

По сути все просто, но есть один интересный нюанс, о котором стоит помнить! При создании стрима new Counter({ highWaterMark: 2 }) мы указали, что размер нашего внутреннего буфера будет равняться 2-м байтам, т.е. может хранить 2 символа (1 символ = 1 байт). Когда же счетчик дойдет до десяти, то буфер будет заполняться при каждом вызове write, соответственно, если бы запись осуществлялась в медленный источник, то все остальные данные при вызове write сохранялись бы в оперативную память, что могло бы вызвать ее переполнение (в данном примере это конечно же не важно, так как наш буфер 2 байта, но вот с большими файлами об этом нужно помнить). Когда возникает такая ситуация, нам надо подождать, пока стрим запишет текущую порцию данных, освободит внутренний буфер (вызовет событие drain), и затем мы можем возобновить запись данных. Давайте перепишем наш пример.

const { Writable } = require('stream');
const { once } = require('events');

class Counter extends Writable {
  _write(chunk, encoding, callback) {
    console.log(chunk.toString());

    callback();
  }
}

const counter = new Counter({ highWaterMark: 2 });

(async () => {
  for (let i = 1; i < 1000; i += 1) {
    const canWrite = counter.write(Buffer.from(`${i}`, 'utf8'));

    console.log(`Can we write bunch of data? ${canWrite}`);

    if (!canWrite) {
      await events.once(counter, 'drain');
      console.log('drain event fired.');
    }
  }
})();

Метод events.once был добавлен в v11.13.0 и позволяет создать промис и подождать, пока определенное событие выполнится один раз. В этом примере мы проверяем, возможна ли запись данных в стрим, если нет, то ожидаем, пока буфер освободится, и продолжаем запись.

На первый взгляд это может показаться ненужным действием, но при работе с большими объемами данных, например файлами, которые весят больше 10гб, забыв сделать это, вы можете столкнуться с утечкой памяти.

Duplex stream


Он объединяет в себе Readable и Writable стримы, то есть мы должны написать реализацию двух методов _read и _write.

const { Duplex } = require('stream');

class myDuplex extends Duplex {
  constructor(opt) {
    super(opt);
  }

  _read(size) {}

  _write(chunk, encoding, callback) {}
}

Здесь нам интересны 2 параметра, которые мы можем передать в конструктор, это readableHighWaterMark и writableHighWaterMark, которые позволяют нам указать размер внутреннего буфера для Readable, Writable стримов соответственно. Вот так будет выглядеть реализация предыдущих двух примеров с помощью Duplex стрима.

const { Duplex } = require('stream');
const events = require('events');

class Counter extends Duplex {
  constructor(opt) {
    super(opt);

    this._max = 1000;
    this._index = 0;
  }

  _read() {
    this._index += 1;

    if (this._index > this._max) {
      this.push(null);
    } else {
      const buf = Buffer.from(`${this._index}`, 'utf8');

      this.push(buf);
    }
  }

  _write(chunk, encoding, callback) {
    console.log(chunk.toString());

    callback();
  }
}

const counter = new Counter({ 
  readableHighWaterMark: 2, 
  writableHighWaterMark: 2 
});

(async () => {
  let chunk = counter.read();

  while (chunk !== null) {
    const canWrite = counter.write(chunk);

    console.log(`Can we write bunch of data? ${canWrite}`);

    if (!canWrite) {
      await events.once(counter, 'drain');
      console.log('drain event fired.');
    }

    chunk = counter.read();
  }
})();

Думаю, этот код не нуждается в пояснениях, так как он такой же, как и раньше, только в одном классе.

Transform stream


Этот стрим является Duplex стримом. Он нужен для преобразования порции данных и отправки дальше по цепочке. Его можно реализовать таким же способом, как и остальные стримы.

const { Transform } = require('stream');

class myTransform extends Transform {
  _ transform(chunk, encoding, callback) {}
}

Нас интересует метод _transform.

_transform — это приватный метод, который вызывается внутренними методами класса Transform для преобразования порции данных. Он принимает 3 параметра: chunk (часть данных), encoding (кодировка, если chunk это строка), callback (функция, которая вызывается после успешной или неудачной записи).

С помощью этого метода и будет происходить изменение порции данных. Внутри этого метода мы можем вызвать transform.push() ноль или несколько раз, который фиксирует изменения. Когда мы завершим преобразование данных, мы должны вызвать callback, который отправит все, что мы добавляли в transform.push(). Первый параметр этой callback функции — это ошибка. Также мы можем не использовать transform.push(), а отправить измененные данные вторым параметром в функцию callback (пример: callback(null, data)). Для того, чтобы понять как использовать этот вид стрима, давайте разберем метод stream.pipe.

stream.pipe — этот метод используется для соединения Readable стрима с Writable стримом, а также для создания цепочек стримов. Это значит, что мы можем считывать часть данных и передавать в следующий стрим для обработки, а потом в следующий и т д.

Давайте напишем Transform стрим, который будет добавлять символ * в начало и конец каждой части данных.

class CounterReader extends Readable {
  constructor(opt) {
    super(opt);

    this._max = 1000;
    this._index = 0;
  }

  _read() {
    this._index += 1;

    if (this._index > this._max) {
      this.push(null);
    } else {
      const buf = Buffer.from(`${this._index}`, 'utf8');

      this.push(buf);
    }
  }
}

class CounterWriter extends Writable {
  _write(chunk, encoding, callback) {
    console.log(chunk.toString());

    callback();
  }
}

class CounterTransform extends Transform {
  _transform(chunk, encoding, callback) {
    try {
      const resultString = `*${chunk.toString('utf8')}*`;

      callback(null, resultString);
    } catch (err) {
      callback(err);
    }
  }
}

const counterReader = new CounterReader({ highWaterMark: 2 });
const counterWriter = new CounterWriter({ highWaterMark: 2 });
const counterTransform = new CounterTransform({ highWaterMark: 2 });

counterReader.pipe(counterTransform).pipe(counterWriter);

В этом примере я использовал Readable и Writable стримы из предыдущих примеров, а также добавил Transform. Как видим, получилось довольно просто.

Вот мы и рассмотрели как устроены стримы. Основная их концепция — это обработка данных по частям, что очень удобно и не требует расходов больших ресурсов. Также стримы можно использовать с итераторами, что делает их еще более удобным в использовании, но это уже совсем другая история.
Теги:
Хабы:
Всего голосов 7: ↑7 и ↓0+7
Комментарии10

Публикации

Истории

Работа

Ближайшие события

Конференция «Я.Железо»
Дата18 мая
Время14:00 – 23:59
Место
МоскваОнлайн
Антиконференция X5 Future Night
Дата30 мая
Время11:00 – 23:00
Место
Онлайн
Конференция «IT IS CONF 2024»
Дата20 июня
Время09:00 – 19:00
Место
Екатеринбург