Pull to refresh

Асинхронные циклы и Stream API в Node.js 10

Reading time4 min
Views22K


В этом месяце выходит десятая версия Node.js, в которой нас ждет изменение поведения потоков (readable-stream), вызванное появлением асинхронных циклов for-await-of. Давайте разберемся что это такое и к чему нам готовиться.


Конструкция for-await-of.


Для начала давайте разберемся с тем как работают асинхронные циклы на простом примере. Для наглядности добавим завершившиеся промисы.


const promises = [
    Promise.resolve(1),
    Promise.resolve(2),
    Promise.resolve(3),
];

Обычный цикл пройдется по массиву promises и вернет сами значения:


for (const value of promises) {
    console.log(value);
}
// > Promise({resolved: 1})
// > Promise({resolved: 2})
// > Promise({resolved: 3})

Асинхронный цикл дождется разрешения промиса и вернет возвращаемое промисом значение:


for await (const value of promises) {
    console.log(value);
}
// > 1
// > 2
// > 3

Чтобы асинхронные циклы заработали в более ранних версиях Node.js используйте флаг --harmony_async_iteration.

ReadableStream и for-await-of


Объект ReadableStream получил свойство Symbol.asyncIterator, что позволяет ему также быть переданным в for-await-of цикл. Возьмем для примера fs.createReadableStream:


const readStream = fs.createReadStream(file);

const chunks = [];
for await (const chunk of readStream) {
    chunks.push(chunk);
}

console.log(Buffer.concat(chunks));

Как видно из примера теперь мы избавились от вызовов on('data', ... и on('end', ..., а сам код стал выглядеть нагляднее и предсказуемее.


Асинхронные генераторы


В некоторых случаях может понадобится дополнительная обработка получаемых данных, для этого используются асинхронные генераторы. Мы можем реализовать поиск по файлу регулярного выражения:


async function * search(needle, chunks) {
    let pos = 0;

    for await (const chunk of chunks) {
        let string = chunk.toString();

        while (string.length) {
            const match = string.match(needle);

            if (! match) {
                pos += string.length;
                break;
            }

            yield {
              index: pos + match.index,
              value: match[0],
            };

            string = string.slice(match.index + match[0].length);
            pos += match.index;
        }
    }
}

Посмотрим что получилось:


const stream = fs.createReadStream(file);

for await (const {index, value} of search(/(a|b)c/, stream)) {
    console.log('found "%s" at %s', value, index);
}

Согласитесь, достаточно удобно, мы на лету превратили строки в объекты и нам не понадобилось использовать TransformStream и думать, как перехватывать ошибки, которые могут возникнуть в двух разных стримах и т.п.


Пример Unix-like потоков


Задача с чтением файла достаточно распространенная, но не исчерпывающая. Давайте рассмотрим случаи, когда требуется потоковая обработка вывода наподобие unix-конвейеров. Для этого воспользуемся асинхронными генераторами, через которые пропустим результат выполнения команды ls.


Сначала мы создадим дочерний процесс const subproc = spawn('ls') и затем будем читать стандартный вывод:


for await (const chunk of subproc.stdout) {
    // ...
}

А так как stdout генерирует вывод в виде объектов Buffer, то первым делом добавим генератор, который будет приводить вывод из типа Buffer в String:


import {StringDecoder} from 'string_decoder';

async function *toString(chunks, encoding) {
    const decoder = new StringDecoder(encoding)
    for await (const chunk of chunks) {
        const string = decoder.write(chunk);
        if (string.length) {
            yield string;
        }
    }
    decoder.end();
}

Далее сделаем простой генератор, который будет разбивать вывод построчно. Тут важно учесть, что порция данных, передаваемая из createReadStream, имеет ограниченную максимальную длинну, а это значит, что нам может прийти как строка целиком или кусок очень длинной строки, так и несколько строк одновременно:


async function *chunksToLines(chunks) {
    let previous = '';
    for await (const chunk of chunks) {
        previous += chunk;
        while (true) {
            const i = previous.indexOf('\n');
            if (i < 0) {
                break;
            }
            yield previous.slice(0, i + 1);
            previous = previous.slice(i + 1);
        }
    }

    if (previous.length > 0) {
        yield previous;
    }
}

Так как каждое найденное значение все еще содержит перенос строки, создадим генератор для очистки значения от висящих пробельных символов:


async function *trim(values) {
    for await (const value of values) {
        yield value.trim();
    }
}

Последним действием будет непосредственно построчный вывод в консоль:


async function print(values) {
    for await (const value of values) {
        console.log(value);
    }
}

Объединим полученный код:


async function main() {
    const subproc = spawn('ls');

    await print(trim(chunksToLines(toString(subproc.stdout, 'utf8'))));

    console.log('DONE');
}

Как видим, код получился несколько трудночитаемым. Если мы захотим добавить еще несколько вызовов или параметры, то в результате получим кашу. Чтобы избежать и сделать код более линейным, давайте добавим функцию pipe:


function pipe(value, ...fns) {
    let result = value;

    for (const fn of fns) {
        result = fn(result);
    }

    return result;
}

Теперь вызов можно привести к следующему виду:


async function main() {
    const subproc = spawn('ls');

    await pipe(
        subproc.stdout,
        toString,
        chunksToLines,
        trim,
        print,
    );

    console.log('DONE');
}

Оператор |>


Нужно иметь в виду, что в стандарте ECMA должен появиться новый оператор конвейера |> позволяющий делать то же самое, что сейчас делает pipe:


async function main() {
    const subproc = spawn('ls');

    await subproc.stdout
    |> toString
    |> chunksToLines
    |> trim
    |> print;

    console.log('DONE');
}

Вывод


Как видите асинхронные циклы и итераторы сделали язык еще более выразительным, ёмким и понятным. Ад из коллбеков уходит все дальше в прошлое и скоро станет страшилкой, которой мы будем пугать внуков. А генераторы похоже займут свое место в иерархии JS и будут применяться по назначению.


Основу для данной статьи составил материал Акселя Раушмайера Using async iteration natively in Node.js.

В продолжение темы


Tags:
Hubs:
Total votes 32: ↑31 and ↓1+30
Comments37

Articles