
В этом месяце выходит десятая версия Node.js, в которой нас ждет изменение поведения потоков (readable-stream), вызванное появлением асинхронных циклов for-await-of. Давайте разберемся что это такое и к чему нам готовиться.
Конструкция for-await-of.
Для начала давайте разберемся с тем как работают асинхронные циклы на простом примере. Для наглядности добавим завершившиеся промисы.
const promises = [ Promise.resolve(1), Promise.resolve(2), Promise.resolve(3), ];
Обычный цикл пройдется по массиву promises и вернет сами значения:
for (const value of promises) { console.log(value); } // > Promise({resolved: 1}) // > Promise({resolved: 2}) // > Promise({resolved: 3})
Асинхронный цикл дождется разрешения промиса и вернет возвращаемое промисом значение:
for await (const value of promises) { console.log(value); } // > 1 // > 2 // > 3
Чтобы асинхронные циклы заработали в более ранних версиях Node.js используйте флаг --harmony_async_iteration.ReadableStream и for-await-of
Объект ReadableStream получил свойство Symbol.asyncIterator, что позволяет ему также быть переданным в for-await-of цикл. Возьмем для примера fs.createReadableStream:
const readStream = fs.createReadStream(file); const chunks = []; for await (const chunk of readStream) { chunks.push(chunk); } console.log(Buffer.concat(chunks));
Как видно из примера теперь мы избавились от вызовов on('data', ... и on('end', ..., а сам код стал выглядеть нагляднее и предсказуемее.
Асинхронные генераторы
В некоторых случаях может понадобится дополнительная обработка получаемых данных, для этого используются асинхронные генераторы. Мы можем реализовать поиск по файлу регулярного выражения:
async function * search(needle, chunks) { let pos = 0; for await (const chunk of chunks) { let string = chunk.toString(); while (string.length) { const match = string.match(needle); if (! match) { pos += string.length; break; } yield { index: pos + match.index, value: match[0], }; string = string.slice(match.index + match[0].length); pos += match.index; } } }
Посмотрим что получилось:
const stream = fs.createReadStream(file); for await (const {index, value} of search(/(a|b)c/, stream)) { console.log('found "%s" at %s', value, index); }
Согласитесь, достаточно удобно, мы на лету превратили строки в объекты и нам не понадобилось использовать TransformStream и думать, как перехватывать ошибки, которые могут возникнуть в двух разных стримах и т.п.
Пример Unix-like потоков
Задача с чтением файла достаточно распространенная, но не исчерпывающая. Давайте рассмотрим случаи, когда требуется потоковая обработка вывода наподобие unix-конвейеров. Для этого воспользуемся асинхронными генераторами, через которые пропустим результат выполнения команды ls.
Сначала мы создадим дочерний процесс const subproc = spawn('ls') и затем будем читать стандартный вывод:
for await (const chunk of subproc.stdout) { // ... }
А так как stdout генерирует вывод в виде объектов Buffer, то первым делом добавим генератор, который будет приводить вывод из типа Buffer в String:
import {StringDecoder} from 'string_decoder'; async function *toString(chunks, encoding) { const decoder = new StringDecoder(encoding) for await (const chunk of chunks) { const string = decoder.write(chunk); if (string.length) { yield string; } } decoder.end(); }
Далее сделаем простой генератор, который будет разбивать вывод построчно. Тут важно учесть, что порция данных, передаваемая из createReadStream, имеет ограниченную максимальную длинну, а это значит, что нам может прийти как строка целиком или кусок очень длинной строки, так и несколько строк одновременно:
async function *chunksToLines(chunks) { let previous = ''; for await (const chunk of chunks) { previous += chunk; while (true) { const i = previous.indexOf('\n'); if (i < 0) { break; } yield previous.slice(0, i + 1); previous = previous.slice(i + 1); } } if (previous.length > 0) { yield previous; } }
Так как каждое найденное значение все еще содержит перенос строки, создадим генератор для очистки значения от висящих пробельных символов:
async function *trim(values) { for await (const value of values) { yield value.trim(); } }
Последним действием будет непосредственно построчный вывод в консоль:
async function print(values) { for await (const value of values) { console.log(value); } }
Объединим полученный код:
async function main() { const subproc = spawn('ls'); await print(trim(chunksToLines(toString(subproc.stdout, 'utf8')))); console.log('DONE'); }
Как видим, код получился несколько трудночитаемым. Если мы захотим добавить еще несколько вызовов или параметры, то в результате получим кашу. Чтобы избежать и сделать код более линейным, давайте добавим функцию pipe:
function pipe(value, ...fns) { let result = value; for (const fn of fns) { result = fn(result); } return result; }
Теперь вызов можно привести к следующему виду:
async function main() { const subproc = spawn('ls'); await pipe( subproc.stdout, toString, chunksToLines, trim, print, ); console.log('DONE'); }
Оператор |>
Нужно иметь в виду, что в стандарте ECMA должен появиться новый оператор конвейера |> позволяющий делать то же самое, что сейчас делает pipe:
async function main() { const subproc = spawn('ls'); await subproc.stdout |> toString |> chunksToLines |> trim |> print; console.log('DONE'); }
Вывод
Как видите асинхронные циклы и итераторы сделали язык еще более выразительным, ёмким и понятным. Ад из коллбеков уходит все дальше в прошлое и скоро станет страшилкой, которой мы будем пугать внуков. А генераторы похоже займут свое место в иерархии JS и будут применяться по назначению.
Основу для данной статьи составил материал Акселя Раушмайера Using async iteration natively in Node.js.
В продолжение темы
- Iterables and Iterators [EN].
- Asynchronous iteration [EN].
