В этом месяце выходит десятая версия Node.js, в которой нас ждет изменение поведения потоков (readable-stream), вызванное появлением асинхронных циклов for-await-of. Давайте разберемся что это такое и к чему нам готовиться.
Конструкция for-await-of.
Для начала давайте разберемся с тем как работают асинхронные циклы на простом примере. Для наглядности добавим завершившиеся промисы.
const promises = [
Promise.resolve(1),
Promise.resolve(2),
Promise.resolve(3),
];
Обычный цикл пройдется по массиву promises и вернет сами значения:
for (const value of promises) {
console.log(value);
}
// > Promise({resolved: 1})
// > Promise({resolved: 2})
// > Promise({resolved: 3})
Асинхронный цикл дождется разрешения промиса и вернет возвращаемое промисом значение:
for await (const value of promises) {
console.log(value);
}
// > 1
// > 2
// > 3
Чтобы асинхронные циклы заработали в более ранних версиях Node.js используйте флаг --harmony_async_iteration
.
ReadableStream и for-await-of
Объект ReadableStream получил свойство Symbol.asyncIterator
, что позволяет ему также быть переданным в for-await-of цикл. Возьмем для примера fs.createReadableStream
:
const readStream = fs.createReadStream(file);
const chunks = [];
for await (const chunk of readStream) {
chunks.push(chunk);
}
console.log(Buffer.concat(chunks));
Как видно из примера теперь мы избавились от вызовов on('data', ...
и on('end', ...
, а сам код стал выглядеть нагляднее и предсказуемее.
Асинхронные генераторы
В некоторых случаях может понадобится дополнительная обработка получаемых данных, для этого используются асинхронные генераторы. Мы можем реализовать поиск по файлу регулярного выражения:
async function * search(needle, chunks) {
let pos = 0;
for await (const chunk of chunks) {
let string = chunk.toString();
while (string.length) {
const match = string.match(needle);
if (! match) {
pos += string.length;
break;
}
yield {
index: pos + match.index,
value: match[0],
};
string = string.slice(match.index + match[0].length);
pos += match.index;
}
}
}
Посмотрим что получилось:
const stream = fs.createReadStream(file);
for await (const {index, value} of search(/(a|b)c/, stream)) {
console.log('found "%s" at %s', value, index);
}
Согласитесь, достаточно удобно, мы на лету превратили строки в объекты и нам не понадобилось использовать TransformStream и думать, как перехватывать ошибки, которые могут возникнуть в двух разных стримах и т.п.
Пример Unix-like потоков
Задача с чтением файла достаточно распространенная, но не исчерпывающая. Давайте рассмотрим случаи, когда требуется потоковая обработка вывода наподобие unix-конвейеров. Для этого воспользуемся асинхронными генераторами, через которые пропустим результат выполнения команды ls
.
Сначала мы создадим дочерний процесс const subproc = spawn('ls')
и затем будем читать стандартный вывод:
for await (const chunk of subproc.stdout) {
// ...
}
А так как stdout генерирует вывод в виде объектов Buffer, то первым делом добавим генератор, который будет приводить вывод из типа Buffer в String:
import {StringDecoder} from 'string_decoder';
async function *toString(chunks, encoding) {
const decoder = new StringDecoder(encoding)
for await (const chunk of chunks) {
const string = decoder.write(chunk);
if (string.length) {
yield string;
}
}
decoder.end();
}
Далее сделаем простой генератор, который будет разбивать вывод построчно. Тут важно учесть, что порция данных, передаваемая из createReadStream, имеет ограниченную максимальную длинну, а это значит, что нам может прийти как строка целиком или кусок очень длинной строки, так и несколько строк одновременно:
async function *chunksToLines(chunks) {
let previous = '';
for await (const chunk of chunks) {
previous += chunk;
while (true) {
const i = previous.indexOf('\n');
if (i < 0) {
break;
}
yield previous.slice(0, i + 1);
previous = previous.slice(i + 1);
}
}
if (previous.length > 0) {
yield previous;
}
}
Так как каждое найденное значение все еще содержит перенос строки, создадим генератор для очистки значения от висящих пробельных символов:
async function *trim(values) {
for await (const value of values) {
yield value.trim();
}
}
Последним действием будет непосредственно построчный вывод в консоль:
async function print(values) {
for await (const value of values) {
console.log(value);
}
}
Объединим полученный код:
async function main() {
const subproc = spawn('ls');
await print(trim(chunksToLines(toString(subproc.stdout, 'utf8'))));
console.log('DONE');
}
Как видим, код получился несколько трудночитаемым. Если мы захотим добавить еще несколько вызовов или параметры, то в результате получим кашу. Чтобы избежать и сделать код более линейным, давайте добавим функцию pipe
:
function pipe(value, ...fns) {
let result = value;
for (const fn of fns) {
result = fn(result);
}
return result;
}
Теперь вызов можно привести к следующему виду:
async function main() {
const subproc = spawn('ls');
await pipe(
subproc.stdout,
toString,
chunksToLines,
trim,
print,
);
console.log('DONE');
}
Оператор |>
Нужно иметь в виду, что в стандарте ECMA должен появиться новый оператор конвейера |>
позволяющий делать то же самое, что сейчас делает pipe
:
async function main() {
const subproc = spawn('ls');
await subproc.stdout
|> toString
|> chunksToLines
|> trim
|> print;
console.log('DONE');
}
Вывод
Как видите асинхронные циклы и итераторы сделали язык еще более выразительным, ёмким и понятным. Ад из коллбеков уходит все дальше в прошлое и скоро станет страшилкой, которой мы будем пугать внуков. А генераторы похоже займут свое место в иерархии JS и будут применяться по назначению.
Основу для данной статьи составил материал Акселя Раушмайера Using async iteration natively in Node.js.
В продолжение темы
- Iterables and Iterators [EN].
- Asynchronous iteration [EN].