Node.js Стримы предоставляют мощное средство для эффективной обработки большого обьема данных с малым использованием данных. Если у вас есть большой файл, то он скорее всего не сможет целиком загрузиться в память, и гораздо эффективнее будет обрабатывать его, разбив на более мелкие куски. Node.js Стримы существуют уже достаточно давно, однако их использование сопряжено с некоторыми трудностями. Само API не очень очевидное, а писать конструкции вида req.on('data', ... с неявными строковыми параметрами не очень удобно.
Рассмотрим такой жизненный пример, как запись большого файла в БД, например после его загрузки через веб-интерфейс. Допустим у нас есть текстовый файл на 1 миллион строчек. Есть вероятность, что он поместится в память, если загрузить его целиком. Но если вы не единственный пользователь вашего приложения, и несколько пользователей могут выполнять это действие одновременно, то такая нагрузка уже будет некорректной.
Воспользуемся потоковым чтением через Stream. Он будет читать файл и передавать его нам в виде буффера байтов, безотносительно разбивки по строкам. В примерах, нам приводят удобные конструкции с gzip с использованием pipe в стримах
import * as fs from 'node:fs';
import * as zlib from 'node:zlib';
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w)И для построчного чтения тоже хотелось бы сделать аналогичное. Представим, что у нас есть некоторый Transform стрим, который преобразует данные в строки, а затем Writable стрим, для записи в базу.
import * as fs from 'node:fs';
const reader = fs.createReadStream('file.txt');
const transformToLines = /* some stream */;
const writeToDB = /* some stream */;
reader.pipe(transformToLines).pipe(writeToDB);Выглядит, хорошо, но такого встроенного средства для построчного чтения файлов нет. Есть отдельные пакеты в NPM и есть встроенный readline.
const rl = createInterface({
input: reader,
crlfDelay: Infinity,
});Основная проблема в том - что он не является инстансом Stream. Это интфейс, и мы не можем использовать его в pipe.
Можно было бы написать конструкцию вида:
const readFile = readline.createInterface({
input: reader,
output: writeToDB,
crlfDelay: Infinity,
});Но она выглядит громоздкой и противоречит красивым примерам с .pipe. Также можно найти негативные комментарии в сети про работу модуля readline, и есть мнение, что он больше ориентирован на работу с stdin.
Для разбивки стрима на строки, есть популярный модуль split.
import * as split from 'split'
reader.pipe(split()).pipe(writeToDB)Это то что нам нужно, но мы пойде�� более сложным путем и напишем его сами. При потоковом чтении файла, мы получаем данные в chunk в виде буффера, размер которого ограничен highWaterMark в байтах.
В одном chunk может лежать несколько строчек текста, а может быть меньше одной. При этом chunk обрезает данные безотносительно конца строки - он про него ничего не знает. Поэтому одна строчка может быть разделена между двумя chunk. Остаток строки от предыдущего будем класть в acc: string. Для простоты, будем везде считать, что везде кодировка utf8.
import { Transform, TransformCallback } from 'node:stream';
export class LineStream extends Transform {
readonly eol = /\r?\n/
protected acc: string = ''
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
const str = this.acc + Buffer.from(chunk).toString('utf8')
const lines = str.split(this.eol)
if(lines.length === 0) {
return callback()
}
this.acc = lines.pop() || ''
for (const line of lines) {
this.push(line)
}
callback()
}
_flush(callback: TransformCallback) {
if(this.acc) {
this.push(this.acc)
}
callback()
}
constructor() {
super({ objectMode: true })
}
}Еще одна проблема - мультибайтовые символы. Например символ Евро € - это E2 82 AC, и он может быть разделен между двумя chunk. Чтобы правильно обьединить их, воспользуемся встроенным модулем string_decoder. Пример его использования:
import { StringDecoder } from 'node:string_decoder'
const decoder: StringDecoder = new StringDecoder('utf8')
//...Запись каждого из chunk...
const text = decoder.write(chunk)
console.log(text)
//...Финальное обращение
const remaining = decoder.end()
console.log(remaining)Применяем эту логику к LineStream и получаем:
import { Transform, TransformCallback } from 'node:stream'
import { StringDecoder } from 'node:string_decoder'
export class LineStream extends Transform {
readonly eol = /\r?\n/
protected acc: string = ''
protected decoder: StringDecoder = new StringDecoder('utf8')
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
const str = this.acc + this.decoder.write(chunk)
const lines = str.split(this.eol)
if(lines.length === 0) {
return callback();
}
this.acc = lines.pop() || ''
for (const line of lines) {
this.push(line);
}
callback();
}
_flush(callback: TransformCallback) {
const remaining = this.decoder.end()
if(this.acc || remaining) {
this.push(this.acc + remaining)
}
callback();
}
constructor() {
super({ objectMode: true })
}
}Теперь мы получили Stream, который передает дальше по одной строке. Для оптимальной записи в базу данных, делаем группировку по 10000 строчек, используя для этого очень похожую логику на основе Transform.
import { Transform, TransformCallback } from 'stream'
export class BatchStream extends Transform {
protected acc: string[] = [];
_transform(chunk: string, encoding: BufferEncoding, callback: TransformCallback) {
this.acc.push(chunk);
if (this.acc.length === 10000) {
this.push(this.acc)
this.acc = []
}
callback()
}
_flush(callback: TransformCallback) {
if (this.acc.length > 0) {
this.push(this.acc)
}
callback()
}
constructor() {
super({ objectMode: true })
}
}
И теперь добавляем стрим пакетной записи в базу. Он принимает массив строк и записывает их в одно действие. Ожидание подключения к БД прописываем в _construct, а также не забываем закрыть подключение в конце. Для простоты примера, все параметры хардкодим.
import { Writable } from 'node:stream'
import * as mysql from 'mysql2';
export class SqlWriteStream extends Writable {
protected connection: mysql.Connection
readonly sql = 'INSERT INTO test (value) VALUES ?'
constructor() {
super({ objectMode: true })
this.connection = mysql.createConnection({
host: "127.0.0.1",
user: "user",
password: "pass",
database: "db"
});
}
_construct?(callback: (error?: Error | null) => void): void {
this.connection.connect((err) => {
if (err) callback(err)
else callback()
})
}
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) {
const values = chunk.map((x: any) => [x])
this.connection.query(this.sql, [values], (err, result) => {
if (err) callback(err)
else callback()
});
}
_destroy(error: Error | null, callback: (error?: Error | null) => void) {
this.connection.end()
}
}Теперь цепочка pipe будет иметь вид:
import { createReadStream } from 'node:fs'
//1.Потоковое чтение файла в виде байтового буффера
const reader = createReadStream('file.txt', 'utf8')
//2.Преобразование одного буффера в несколько строк, передача их далее по одной
const transformToLines = new LineStream()
//3.Группировка строк в 10 000 записей для записи в базу
const batchStream = new BatchStream()
//4.Пакетная запись в базу данных
const writeToDB = new SqlWriteStream()
const stream = reader.pipe(transformToLines).pipe(batchStream).pipe(writeToDB)Для тестов, создадим файл с одним миллионом строк:
seq 1 1000000 > file.txtЗапускаем скрипт, и получаем время выполнения:
stream: 6.860s
Max memory consumption: 50 MbВ примерах выше я использовал много допущений, вроде кодировки utf8 и хардкода многих констант. Все это корректно будет вынести, например в опции, но на суть процесса не влияет.
