Node.js Стримы предоставляют мощное средство для эффективной обработки большого обьема данных с малым использованием данных. Если у вас есть большой файл, то он скорее всего не сможет целиком загрузиться в память, и гораздо эффективнее будет обрабатывать его, разбив на более мелкие куски. Node.js Стримы существуют уже достаточно давно, однако их использование сопряжено с некоторыми трудностями. Само API не очень очевидное, а писать конструкции вида req.on('data', ... с неявными строковыми параметрами не очень удобно.
Рассмотрим такой жизненный пример, как запись большого файла в БД, например после его загрузки через веб-интерфейс. Допустим у нас есть текстовый файл на 1 миллион строчек. Есть вероятность, что он поместится в память, если загрузить его целиком. Но если вы не единственный пользователь вашего приложения, и несколько пользователей могут выполнять это действие одновременно, то такая нагрузка уже будет некорректной.
Воспользуемся потоковым чтением через Stream. Он будет читать файл и передавать его нам в виде буффера байтов, безотносительно разбивки по строкам. В примерах, нам приводят удобные конструкции с gzip с использованием pipe в стримах
import * as fs from 'node:fs'; import * as zlib from 'node:zlib'; const r = fs.createReadStream('file.txt'); const z = zlib.createGzip(); const w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w)
И для построчного чтения тоже хотелось бы сделать аналогичное. Представим, что у нас есть некоторый Transform стрим, который преобразует данные в строки, а затем Writable стрим, для записи в базу.
import * as fs from 'node:fs'; const reader = fs.createReadStream('file.txt'); const transformToLines = /* some stream */; const writeToDB = /* some stream */; reader.pipe(transformToLines).pipe(writeToDB);
Выглядит, хорошо, но такого встроенного средства для построчного чтения файлов нет. Есть отдельные пакеты в NPM и есть встроенный readline.
const rl = createInterface({ input: reader, crlfDelay: Infinity, });
Основная проблема в том - что он не является инстансом Stream. Это интфейс, и мы не можем использовать его в pipe.
Можно было бы написать конструкцию вида:
const readFile = readline.createInterface({ input: reader, output: writeToDB, crlfDelay: Infinity, });
Но она выглядит громоздкой и противоречит красивым примерам с .pipe. Также можно найти негативные комментарии в сети про работу модуля readline, и есть мнение, что он больше ориентирован на работу с stdin.
Для разбивки стрима на строки, есть популярный модуль split.
import * as split from 'split' reader.pipe(split()).pipe(writeToDB)
Это то что нам нужно, но мы пойдем более сложным путем и напишем его сами. При потоковом чтении файла, мы получаем данные в chunk в виде буффера, размер которого ограничен highWaterMark в байтах.
В одном chunk может лежать несколько строчек текста, а может быть меньше одной. При этом chunk обрезает данные безотносительно конца строки - он про него ничего не знает. Поэтому одна строчка может быть разделена между двумя chunk. Остаток строки от предыдущего будем класть в acc: string. Для простоты, будем везде считать, что везде кодировка utf8.
import { Transform, TransformCallback } from 'node:stream'; export class LineStream extends Transform { readonly eol = /\r?\n/ protected acc: string = '' _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) { const str = this.acc + Buffer.from(chunk).toString('utf8') const lines = str.split(this.eol) if(lines.length === 0) { return callback() } this.acc = lines.pop() || '' for (const line of lines) { this.push(line) } callback() } _flush(callback: TransformCallback) { if(this.acc) { this.push(this.acc) } callback() } constructor() { super({ objectMode: true }) } }
Еще одна проблема - мультибайтовые символы. Например символ Евро € - это E2 82 AC, и он может быть разделен между двумя chunk. Чтобы правильно обьединить их, воспользуемся встроенным модулем string_decoder. Пример его использования:
import { StringDecoder } from 'node:string_decoder' const decoder: StringDecoder = new StringDecoder('utf8') //...Запись каждого из chunk... const text = decoder.write(chunk) console.log(text) //...Финальное обращение const remaining = decoder.end() console.log(remaining)
Применяем эту логику к LineStream и получаем:
import { Transform, TransformCallback } from 'node:stream' import { StringDecoder } from 'node:string_decoder' export class LineStream extends Transform { readonly eol = /\r?\n/ protected acc: string = '' protected decoder: StringDecoder = new StringDecoder('utf8') _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) { const str = this.acc + this.decoder.write(chunk) const lines = str.split(this.eol) if(lines.length === 0) { return callback(); } this.acc = lines.pop() || '' for (const line of lines) { this.push(line); } callback(); } _flush(callback: TransformCallback) { const remaining = this.decoder.end() if(this.acc || remaining) { this.push(this.acc + remaining) } callback(); } constructor() { super({ objectMode: true }) } }
Теперь мы получили Stream, который передает дальше по одной строке. Для оптимальной записи в базу данных, делаем группировку по 10000 строчек, используя для этого очень похожую логику на основе Transform.
import { Transform, TransformCallback } from 'stream' export class BatchStream extends Transform { protected acc: string[] = []; _transform(chunk: string, encoding: BufferEncoding, callback: TransformCallback) { this.acc.push(chunk); if (this.acc.length === 10000) { this.push(this.acc) this.acc = [] } callback() } _flush(callback: TransformCallback) { if (this.acc.length > 0) { this.push(this.acc) } callback() } constructor() { super({ objectMode: true }) } }
И теперь добавляем стрим пакетной записи в базу. Он принимает массив строк и записывает их в одно действие. Ожидание подключения к БД прописываем в _construct, а также не забываем закрыть подключение в конце. Для простоты примера, все параметры хардкодим.
import { Writable } from 'node:stream' import * as mysql from 'mysql2'; export class SqlWriteStream extends Writable { protected connection: mysql.Connection readonly sql = 'INSERT INTO test (value) VALUES ?' constructor() { super({ objectMode: true }) this.connection = mysql.createConnection({ host: "127.0.0.1", user: "user", password: "pass", database: "db" }); } _construct?(callback: (error?: Error | null) => void): void { this.connection.connect((err) => { if (err) callback(err) else callback() }) } _write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) { const values = chunk.map((x: any) => [x]) this.connection.query(this.sql, [values], (err, result) => { if (err) callback(err) else callback() }); } _destroy(error: Error | null, callback: (error?: Error | null) => void) { this.connection.end() } }
Теперь цепочка pipe будет иметь вид:
import { createReadStream } from 'node:fs' //1.Потоковое чтение файла в виде байтового буффера const reader = createReadStream('file.txt', 'utf8') //2.Преобразование одного буффера в несколько строк, передача их далее по одной const transformToLines = new LineStream() //3.Группировка строк в 10 000 записей для записи в базу const batchStream = new BatchStream() //4.Пакетная запись в базу данных const writeToDB = new SqlWriteStream() const stream = reader.pipe(transformToLines).pipe(batchStream).pipe(writeToDB)
Для тестов, создадим файл с одним миллионом строк:
seq 1 1000000 > file.txt
Запускаем скрипт, и получаем время выполнения:
stream: 6.860s Max memory consumption: 50 Mb
В примерах выше я использовал много допущений, вроде кодировки utf8 и хардкода многих констант. Все это корректно будет вынести, например в опции, но на суть процесса не влияет.
