Node.js Стримы предоставляют мощное средство для эффективной обработки большого обьема данных с малым использованием данных. Если у вас есть большой файл, то он скорее всего не сможет целиком загрузиться в память, и гораздо эффективнее будет обрабатывать его, разбив на более мелкие куски. Node.js Стримы существуют уже достаточно давно, однако их использование сопряжено с некоторыми трудностями. Само API не очень очевидное, а писать конструкции вида req.on('data', ... с неявными строковыми параметрами не очень удобно.

Рассмотрим такой жизненный пример, как запись большого файла в БД, например после его загрузки через веб-интерфейс. Допустим у нас есть текстовый файл на 1 миллион строчек. Есть вероятность, что он поместится в память, если загрузить его целиком. Но если вы не единственный пользователь вашего приложения, и несколько пользователей могут выполнять это действие одновременно, то такая нагрузка уже будет некорректной.

Воспользуемся потоковым чтением через Stream. Он будет читать файл и передавать его нам в виде буффера байтов, безотносительно разбивки по строкам. В примерах, нам приводят удобные конструкции с gzip с использованием pipe в стримах

import * as fs from 'node:fs';
import * as zlib from 'node:zlib';
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w)

И для построчного чтения тоже хотелось бы сделать аналогичное. Представим, что у нас есть некоторый Transform стрим, который преобразует данные в строки, а затем Writable стрим, для записи в базу.

import * as fs from 'node:fs';
const reader = fs.createReadStream('file.txt');
const transformToLines = /* some stream */;
const writeToDB = /* some stream */;
reader.pipe(transformToLines).pipe(writeToDB);

Выглядит, хорошо, но такого встроенного средства для построчного чтения файлов нет. Есть отдельные пакеты в NPM и есть встроенный readline.

const rl = createInterface({
  input: reader,
  crlfDelay: Infinity,
});

Основная проблема в том - что он не является инстансом Stream. Это интфейс, и мы не можем использовать его в pipe.

Можно было бы написать конструкцию вида:

const readFile = readline.createInterface({
  input: reader,
  output: writeToDB,
  crlfDelay: Infinity,
});

Но она выглядит громоздкой и противоречит красивым примерам с .pipe. Также можно найти негативные комментарии в сети про работу модуля readline, и есть мнение, что он больше ориентирован на работу с stdin.

Для разбивки стрима на строки, есть популярный модуль split.

import * as split from 'split'
reader.pipe(split()).pipe(writeToDB)

Это то что нам нужно, но мы пойде�� более сложным путем и напишем его сами. При потоковом чтении файла, мы получаем данные в chunk в виде буффера, размер которого ограничен highWaterMark в байтах.

В одном chunk может лежать несколько строчек текста, а может быть меньше одной. При этом chunk обрезает данные безотносительно конца строки - он про него ничего не знает. Поэтому одна строчка может быть разделена между двумя chunk. Остаток строки от предыдущего будем класть в acc: string. Для простоты, будем везде считать, что везде кодировка utf8.

import { Transform, TransformCallback } from 'node:stream';

export class LineStream extends Transform {
    readonly eol = /\r?\n/

    protected acc: string = ''

    _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
        const str = this.acc + Buffer.from(chunk).toString('utf8')
        const lines = str.split(this.eol)
        if(lines.length === 0) {
            return callback()
        }
        this.acc = lines.pop() || ''
        for (const line of lines) {
            this.push(line)
        }
        callback()
    }

    _flush(callback: TransformCallback) {
        if(this.acc) {
            this.push(this.acc)
        }
        callback()
    }

    constructor() {
        super({ objectMode: true })
    }
}

Еще одна проблема - мультибайтовые символы. Например символ Евро € - это E2 82 AC, и он может быть разделен между двумя chunk. Чтобы правильно обьединить их, воспользуемся встроенным модулем string_decoder. Пример его использования:

import { StringDecoder } from 'node:string_decoder'
const decoder: StringDecoder = new StringDecoder('utf8')

//...Запись каждого из chunk...
const text = decoder.write(chunk)
console.log(text)

//...Финальное обращение
const remaining = decoder.end()
console.log(remaining)

Применяем эту логику к LineStream и получаем:

import { Transform, TransformCallback } from 'node:stream'
import { StringDecoder } from 'node:string_decoder'

export class LineStream extends Transform {
    readonly eol = /\r?\n/

    protected acc: string = ''
    protected decoder: StringDecoder = new StringDecoder('utf8')

    _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
        const str = this.acc + this.decoder.write(chunk)
        const lines = str.split(this.eol)
        if(lines.length === 0) {
            return callback();
        }
        this.acc = lines.pop() || ''
        for (const line of lines) {
            this.push(line);
        }
        callback();
    }

    _flush(callback: TransformCallback) {
        const remaining = this.decoder.end()
        if(this.acc || remaining) {
            this.push(this.acc + remaining)
        }
        callback();
    }

    constructor() {
        super({ objectMode: true })
    }
}

Теперь мы получили Stream, который передает дальше по одной строке. Для оптимальной записи в базу данных, делаем группировку по 10000 строчек, используя для этого очень похожую логику на основе Transform.

import { Transform, TransformCallback } from 'stream'

export class BatchStream extends Transform {
    protected acc: string[] = [];

    _transform(chunk: string, encoding: BufferEncoding, callback: TransformCallback) {
        this.acc.push(chunk);
        if (this.acc.length === 10000) {
            this.push(this.acc)
            this.acc = []
        }

        callback()
    }

    _flush(callback: TransformCallback) {
        if (this.acc.length > 0) {
            this.push(this.acc)
        }
        callback()
    }

    constructor() {
        super({ objectMode: true })
    }
}

И теперь добавляем стрим пакетной записи в базу. Он принимает массив строк и записывает их в одно действие. Ожидание подключения к БД прописываем в _construct, а также не забываем закрыть подключение в конце. Для простоты примера, все параметры хардкодим.

import { Writable } from 'node:stream'
import * as mysql from 'mysql2';

export class SqlWriteStream extends Writable {

    protected connection: mysql.Connection

    readonly sql = 'INSERT INTO test (value) VALUES ?'

    constructor() {
        super({ objectMode: true })
        this.connection = mysql.createConnection({
            host: "127.0.0.1",
            user: "user",
            password: "pass",
            database: "db"
        });
    }

    _construct?(callback: (error?: Error | null) => void): void {
        this.connection.connect((err) => {
            if (err) callback(err)
            else callback()
        })
    }

    _write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) {
        const values = chunk.map((x: any) => [x])
        this.connection.query(this.sql, [values], (err, result) => {
            if (err) callback(err)
            else callback()
        });
    }

    _destroy(error: Error | null, callback: (error?: Error | null) => void) {
        this.connection.end()
    }
}

Теперь цепочка pipe будет иметь вид:

import { createReadStream } from 'node:fs'

//1.Потоковое чтение файла в виде байтового буффера
const reader = createReadStream('file.txt', 'utf8')

//2.Преобразование одного буффера в несколько строк, передача их далее по одной
const transformToLines = new LineStream()

//3.Группировка строк в 10 000 записей для записи в базу
const batchStream = new BatchStream()

//4.Пакетная запись в базу данных
const writeToDB = new SqlWriteStream()

const stream = reader.pipe(transformToLines).pipe(batchStream).pipe(writeToDB)

Для тестов, создадим файл с одним миллионом строк:

seq 1 1000000 > file.txt

Запускаем скрипт, и получаем время выполнения:

stream: 6.860s
Max memory consumption: 50 Mb

В примерах выше я использовал много допущений, вроде кодировки utf8 и хардкода многих констант. Все это корректно будет вынести, например в опции, но на суть процесса не влияет.