node-seq на новый лад (опять про асинхронность)

Здравствуй, Хабр! Пишу тебе на правах слоупока. Ведь в то время как космические корабли с вертикальным взлетом и посадкой бороздят просторы мирового океана, а самые нетерпеливые вовсю используют фичи ES6 в своих проектах я принес тебе очередную библиотеку для облегчения жизни асинхронщика.

Конечно, давно уже есть миллион реализаций обещаний, а для желающих — async. Есть, также, широкоизвестная в узких кругах библиотека Seq от небезызвестного товарища Substack. Я начал пользоваться ей практически с первых дней моего яваскрипта и использовал везде где мог. Предлагаемый этой библиотекой подход кажется мне более понятным и логичным для обуздания асинхронной лапши чем подход используемый, например, в async. Смотрите сами:

var fs = require('fs');
var Hash = require('hashish');
var Seq = require('seq');

Seq()
  .seq(function () {
    fs.readdir(__dirname, this);
  })
  .flatten()
  .parEach(function (file) {
    fs.stat(__dirname + '/' + file, this.into(file));
  })
  .seq(function () {
    var sizes = Hash.map(this.vars, function (s) { return s.size })
    console.dir(sizes);
  });

Все настолько просто и понятно, что даже объяснять лень. К сожалению, библиотека давно не развивается и не поддерживается. К тому же, за время использования накопился список багов на которые так или иначе натыкался, список фич которых хотелось, список претензий — потому что не все работало так же хорошо как в моем воображении. Однажды, я в очередной раз я наткнулся на несоврешенство мира и решил, что настал Этот Момент. Пора форкнуть и пофиксить. Я часто так поступаю, но то что делает эта библиотека казалось мне настоящей Магией.

И вот счастливый и преисполненный решимости делаю git clone, cd, gvim, и пытаюсь понять что тут происходит. Не понимаю. Автор использует еще пару своих библиотек и для просветления необходимо разобраться сначала с ними. Через пару часов мне надоедает и я обнаруживаю Фатальный Недостаток. Сказано — сделано. Сажусь и пишу с нуля Библиотеку Мечты. Как ни странно, никакой магии во всем этом не оказалось. Прототип был готов за вечер. Затем, еще какое то время, я допиливал его уже используя в реальном проекте, заменив им Seq полностью. И вот получилось то, что получилось. Давайте познакомимся.

YAFF — Yet Another Flow Framework.

В целом, я старался сделать его совместимым с Seq. И большинство кода мигрировалось просто заменой импорта (было var Seq = require('seq'); стало var Seq = require('yaff');). Разумеется, заменить пришлось кое что еще. Seq использует метод .catch() для ловли блох. Напрмиер, вышепрведенный кусок кода можно изменить вот так:

var fs = require('fs');
var Hash = require('hashish');
var Seq = require('seq');

Seq()
  .seq(function () {
    fs.readdir(__dirname, this);
  })
  .flatten()
  .parEach(function (file) {
    fs.stat(__dirname + '/' + file, this.into(file));
  })
  .catch(function (err)(
    console.error(err);
  ))
  .seq(function () {
    var sizes = Hash.map(this.vars, function (s) { return s.size })
    console.dir(sizes);
  });

Эта конструкция ужасна тем, что после того как мы «поймали» ошибку мы можем продолжать. Так нельзя. Во-первых, не понятно что делать если parEach (или другие подобные методы) выбросят несколько ошибок. Ловить только первую? Ловить все? А если мы уже ушли далеко вниз а в каком-нибудь parEach выше по таймеру выскочила ошибка? А если ниже у нас уже нет никаких catch? А если те catch что ниже не подготовлены для обработки ошибок которые вылезут по таймеру из forEach? Возникает много вопросов без ответов. Поэтому, я решил, что в каждом YAFF должна быть только одна конструкция для обработки ошибок и она должна быть в конце. А чтобы не нарушать традиции nodejs пусть она обрабатывает еще и результат. Получается красота. Убедитесь:

var fs = require('fs');
YAFF(['./', '../'])
  .par(function (path) {
    fs.readdir(path, this);
  })
  .par(function (path) {
    fs.readdir(path, this);
  })
  .flatten()
  .parMap(function (file) {
    fs.stat(__dirname + '/' + file, this);
  })
  .map(function (stat) {
    return stat.size;
  })
  .unflatten()
  .finally(function (e, sizes) {
    if (e)
      throw e;
    log(sizes);
  });

Если предположить, что это все внтури асинхронной функции то в finally мы можем просто кинуть предоставленный нам коллбэк, а-ля:

var listDirs = function (dirs, cb) {
  YAFF(dirs)
    [волшебные пузырьки]
    .finally(cb);
};

Удобно? Мне тоже нравится. Вообще, эта библиотека построена относительно концепции стека-массива аргументов (стоило написать об этом с самого начала). И все методы что здесь есть так или иначе этот стек меняют применяясь к нему или к отдельным его элементам. Скажем, стайка функций обернутых в par возьмет по одному элементу из стека, в той очередности в которой написаны эти самые par и только после того как все par отстреляют коллбэки (а коллбэк внутри всех методов это this) YAFF перейдет к тому что у него дальше на очереди. Пусть дальше у нас несколько функций завернутых в seq. YAFF будет применять к ним весь стек и заменять его на то что вернет в коллбэк обернутая функция. Вот код, чтобы было понятно:

YAFF(['one', 'two', 'three'])
  .par(function (one) {
    this(null, one);
  })
  .par(function (two) {
    this(null, two);
  })
  .par(function (three) {
    this(null, three);
  })
  .seq(function (one, two, three) {
    this(null, one, three);
  })
  .seq(function (one, three) {
    this(null, null);
  })
  .seq(function () {
    this(null, 'and so on and so forth');
  })

Разумеется, если кто-то вызовет свой коллбэек с ненулевым первым аргументом(ошибка) то YAFF тут же плюнет на все функции, что там еще остались, и пойдет выполнять то, что написано в finally. Если вы забыли написать finally или посчитали, что в вашем коде ошибок уж точно не может быть то YAFF, в случае ошибки, бесцеремонно кинет исключение. Так что лучше, чтобы finally был.

Еще, здесь есть всякие синхронные функции для работы со стеком аргументов как с массивом: push, pull, filter, set, reduce, flatten, extend, unflatten, empty, shift, unshift, splice, reverse, save(name), load(fromName). Фуф, вроде все. Названия говорят сами за себя, но если что — не стесняйтесь спрашивать и заглядывать в исходник. Там один файл (main.js) и все очень просто устроено.

Ну и, конечно, то ради чего все затевалось — асинхронные функции для обработки массивов данных: forEach(YAFF не станет ждать когда закончится обработка этого блока, результаты работы этого блока не повлияют на стек. YAFF сразу перейдет к следующему хендлеру в цепочке), seqEach, parEach ( YAFF подождет пока отстреляются все функции, но результаты на стек не повлияют). seqMap, parMap, seqFilter, parFilter — делают то, что вы ожидаете; YAFF ждет пока они отработают а результаты работы этих блоков заменяют те значения что были на стеке раньше. Кроме того, у всех методов с префиксом par после функции можно указать число. Это число — лимит одновременно работающих асинхронных функций. Как то так:

var resizePhotos(photos, cb) {
  YAFF(photos)
    .parMap(function (photo) {
      asyncReisze(photo.image, photo.params, this);
    }, 10)
    .unaflatten()
    .finally(cb);
}

В этом примере мы асинхронно ресайзим пачку фотографий. Чтобы не перегружать сервер мы ресайзим не больше 10 картинок одновременно для каждого клиента. unflatten нужен для того, чтобы размазанные по стеку фотографии собрать в массив который будет одним аргументом для коллбэка.

Еще у YAFF есть методы mseq и mpar — это реверанс в сторону пользователей async. Принимают эти методы массив функций которые будут исполнены последовательно или параллельно. С тем же успехом можно было бы написать кучу seq() и par(), но иногда хочется нагенерировать функций динамически. У нас же все-таки функциональный язык, да?

Чтобы окончательно вас запутать я придумал следующий пример и нарисовал картинку (в отчаянной надежде что она все прояснит):

YAFF(['./'])
  .mseq([
    function (path1) {
      fs.readdir(path1, this);
    },
    function (arg) {
      this(null, arg);
    }
  ])
  .flatten()
  .parMap(function (file) {
    fs.stat(__dirname + '/' + file, this);
  })
  .map(function (stat) {
    return stat.size;
  })
  .unflatten()
  .finally(function (e, sizes) {
    log(sizes);
  });

Большая картинка


Надеюсь, что мне удалось понятно рассказать то, что я хотел; вы прониклись идеей и я не один такой кто не понимает зачем нужен async.

Все конкретные пожелания и предложения лучше оформлять в виде issues (на английском, не стесняйтесь — я тоже плохо его знаю, будем тренировать словесность вместе) на гитхабе или даже в виде пул-реквестов.

Ах да, библиотека есть на npmjs.org.

P.S. Только что в порыве страсти добавил синхронный метод apply — теперь все остальные синхронные методы можно выбросить. Но я оставлю для удобства и совместимости.
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 9

    +2
    Во-первых, не понятно что делать если [...] выбросят несколько ошибок
    Ну тут, очевидно, два варианта. Либо ловить первую, либо собирать их в массив. Promise.all работает по первой стратегии: если один из входов фейлит, то фейлит и их композиция с этой ошибкой. Тут плюс в том, что не нужно дожидаться ответов от всех входов, можно продолжать после первого фейла. И с другой стороны, попробуйте придумать ситуацию, когда нам действительно нужно получить весь массив ошибок? Как правило, нам достаточно того, что ошибка совершилась, и нужно переходить к её обработке. А что там с остальным, так разберёмся, когда этот блок перестанет выдавать ошибку.

    А если мы уже ушли далеко вниз а в каком-нибудь parEach выше по таймеру выскочила ошибка?
    Я, наверно, не до конца понимаю идею этой библиотеки. Как мы можем «уйти вниз», если ещё не получены все результаты? Документация грит:
    parEach waits for all actions to call this() before moving along to the next action in the chain.
    Если она ждёт ответа ото всех входов, то она дождётся и любой ошибки.

    Поэтому, я решил, что в каждом YAFF должна быть только одна конструкция для обработки ошибок и она должна быть в конце.
    Так нельзя, это всё равно что предлагать в синхронном программировании сделать один блок try-catch на всю программу и обязать программистов размещать catch в main. Многоуровневые catch просто необходимы. Например, у нас есть асинхронная либа, которая кормит нас такими потоками (flow). У неё есть внутренние ошибки, например, вызванные отсутствием файлов. Но на выходе, она преобразует невнятные внутренние ошибки в понятные ошибки из своего API. Или ещё пример: либа скармливает нам типизированные ошибки, мы хотим отлавливать их по-отдельности, на разных уровнях. Для всего этого нужны многоуровневые catch.

    Смысль финального catch, это отловить все ошибки, чтобы программа хотя бы не падала, но в действительности, может быть ещё много промежуточных catch под более конкретные ситуации.
      0
      Есть еще forEach и он не ждет ничего.
      Со временем я пришел к тому, что в моем коде остался только один catch и после seq для обработки удачного результата. Это примерно как в эрланге let it crash — мы программируем только оптимистичный случай когда все работает как надо. Если что-то пошло не так — ловим ошибку в finally и возвращаем тому кто вызывал: другая функция поймает ошибку точно так что в свой finally и таким образом она всплывет к вызывающему.
      Catch вызывает много проблем с пониманием того куда передается поток управления. Вы использовали оригинальную библиотеку (Seq)?
        +1
        мы программируем только оптимистичный случай когда все работает как надо
        Да, понял стратегию. Это достаточно просто и наглядно, и ещё я думаю более производительно в реализации.

        Оригинальную библиотеку я не использовал, потому у меня и возник такой вопрос. Я много использую Promise, но от основной идеи далеко не уйдёшь, асинхронный поток он и есть поток. В вашем примере указан как раз perEach, как я понял из документации, он-таки дожидается результата. forEach — нет, но в документации не сказано, что forEach сам может быть эмитентом ошибки. Вероятно, forEach нужен для вызова сайд-эффектов, поэтому он не влияет на состояние потока. В других либах для этого есть функция tap или doto.
          0
          К слову, это библиотека не является заменой обещаниям. Я использую и то и другое. Свою библиотеку — когда нужно организовать такой вот асинхронный поток. Обещания — когда этот поток может вызываться из разных мест или результаты работы нужны в разных местах, ну и для кеширования(или лучше сказать мемоизации?). Они прекрасно друг друга дополняют.
            +1
            Рекомендую обратить внимание на Highland. Он несовместим с node-seq, как ваша библиотека, но в целом очень интересен, например, умеет принимать всё, от массивов до функций, промисов и потоков ноды. Просто ознакомьтесь с некоторыми решениями, очень интересная либа.

            А по поводу отлова ошибок я бы хотел развеять ваше предубеждение, что их поведение неочевидно. Как правило, по взгляду на код можно определить как срабатывают catch, есть простое правило:
            1. catch отлавливает все ошибки, возникшие выше него.
            2. Если отработавший catch не выбросил ошибку сам, то она считается отловленной и следующие обработчики ошибок не срабатывают.
            3. Если же он породил новую ошибку, то она будет передана ниже.
            Так это реализовано в Promise и это очень наглядно (и тут полная аналогия с синхронным try-catch, только без многих уровней вложенности). Я думаю в node-seq такой же механизм, поэтому задавал вам уточняющий вопрос.
            +1
            А с catch в оригинальной библиотеке происходили чудеса. В каких то случаях, после того как туда ловилась одна ошибка из того же parEach, библиотека переходила к тому, что написано после catch. И у меня даже были какие то костыли для обхода этой ситуации. Просто в какой то момент надоело и решил сначала исправить, а потом и переписать. Вот, смотрите:
              this.Seq([1,2,3,4,5,6,7])
                .parEach(function (num) {
                  if (num % 2 == 0)
                    return setTimeout(function () {this('ha-ha!')}.bind(this), 100)
                  this(null, num);
                })
                .catch(function () {
                  log('orig catch:', arguments);
                })
                .seq(function () {
                  log('regual seq', arguments);
                  this();
                })
                .catch(function () {
                  log('should not be called', arguments)
                });
            

            Этот код (использует оригинальную библиотеку) даст такой результат:
            orig catch: { '0': 'ha-ha!', '1': 1 }
            regual seq { '0': 1, '1': 2, '2': 3, '3': 4, '4': 5, '5': 6, '6': 7 }
            orig catch: { '0': 'ha-ha!', '1': 3 }
            regual seq {}
            orig catch: { '0': 'ha-ha!', '1': 5 }
            regual seq {}
            

            т.е. элемент после catch будет вызван несколько раз. Перый раз — как положено, с аргументами. А потом — непонтно как.
        0
        Я тоже искал библиотеки для асинхронной разработки. Хотелось, чтоб был чейнинг, только асинхронный. Собственно я даже начал делать либу. Однако потом я ушёл в iojs и генераторы :)
        Мне кажется, что асинхронный чейнинг наиболее удобный путь, конечно не считая генераторов. В течении пары месяцев точно сяду за неё и доделаю.
          0
          В общем то, получается примерно то же, что придумал substack и переписал я.
          iojs, конечно, хорошо. Но существующие проекты и на 0,12 ноду перевести не всегда можно и насчет будущего iojs у меня пока сомнения; поживем — посмотрим как они будут развиваться и куда двигаться. (Ужасно не люблю когда вот так вот плодят сущности.)
            0
            Тут особо нечего придумывать, идея на поверхности. Главная заморочка — архитектура.

            Не вижу особой сложности в переносе проектов на 0.12. Нативные модули можно с nan собрать. А в js, кажется, stream как-то переписали. Но gulp на iojs работает, и многие другие проекты работают. Так что ничего страшного.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое