Опять асинхронщина на колбэках. В массивах

var nodes = arrayThatLeaks( linklist )
  .leakMap( xhrLoad )
  .leakFilter( responseNotEmpty )
  .leakMap( insertInDocument );
// или теперь ещё и так
Promise.resolve(
  arrayThatLeaks( linklist )
    .leakMap( xhrLoad )
    .leakFilter( responseNotEmpty )
    .leakMap( makeDomNode )
  )
.then( insertNodesInDocument );

Подобная запись встретилась как-то в комментариях, и то сподвигло на эксперимент – упражнение для мозгов. Асинхронные операции над массивами в форме записи цепочкой.

Ссылки, предупреждения и оправдания
Пройти и разрушать, а если отвалится скачать и молотить.

Определенно, что здесь представлен очередной велосипед, и кто-то сочтет за невежество сам факт появления этого кода. Но как упомянуто, это эксперимент – попытка очередного кодера «оседлать» однопоточную асинхронность javascript. Кто заглянет в код, тот будет обескуражен его качеством, а имена функций и переменных могут запутать чтеца в попытках осмысления. Но есть надежда, что кому-то этот код сможет пригодиться и не пропадет даром.

Естественно, что обработчики методов должны быть написаны с учетом асинхронности происходящего. Тут сделано по-простому:

function responseNotEmpty( r, v ) {
  if( /* что-то c v*/ ) r( true );
  else r( false );
}

Первым аргументом в обработчике идет служебная процедура – приёмник результата, второй аргумент – элемент массива. Результирующий массив nodes будет заполняться значениями по мере их прохождения через цепочку процедур. Порядок элементов в результирующем массиве зависит от того, какой из элементов обработается раньше, то есть, значения начального массива теряют свой индекс, даже на map().

Была мысль обратиться к современным подходам Обещаний и Асинхронных ожиданий, но они были отброшены по необъяснимым причинам. В итоге получен код, состоящий из потенциальных утечек в половине функций. А его структура есть чудовищное воплощение callback-hell – 5 ступеней на сотню строк.

Обдумано два способа


Сначала хотелось создавать большой объект с внутренним контролем состояния и исполнения всей цепочки асинхронных обработчиков. То представлялось в виде многомерного массива, целой карты с результатами и флагами, и микрофункциями, и флагами, и счетчиками, и флагами. Это было хорошо для одного метода map(), где длина массива не меняется. Но метод фильтрации сделает часть карты ненужной. Пусть даже интерпретатор и сделает ненужные ссылки фантомными для физической памяти, но это же надо постараться в коде, чтобы эти ссылки не трогать. Плюс реализация параллельных ветвей методов заставляет разращивать карту в нескольких измерениях почти бесконтрольно.

var a1 = arrayThatLeaks( [/**/] );
a1.leakMap(/**/).leakFilter(/**/).leakMap(/**/); // ветка 1
a1.leakFilter(/**/).leakReduce(/**/); // ветка 2

Второй и текущий вариант предполагает создавать отдельный набор контрольных данных для каждого вызова метода массива. Здесь упомянутые трудности схлопываются в переменную счетчик (почти). А для связки методов в цепочку введена процедура обмена takeforward(), вся «магия» местной асинхронщины. Эта процедура внутри замыкания одного метода получает внутренние процедуры из замыкания следующего метода. Что конкретно передавать выяснялось в процессе написания, оказалось, что достаточно процедуры запуска обработчика и синхронизирующей процедуры контроля счетчиков.

Детально


Первым делом, как полагается, пришлось от конкретики разных методов выделить общие вспомогательные процедурки в функцию chain(). Здесь оборачиваются обработчики и приёмники из методов, и происходит связывание с предыдущим методом через аргумент giveback(), которым обычно является предыдущая процедура takeforward(). Из chain() выходит результирующий массив, расширенный «утечковыми» методами. Расширяется массив в функции expa(), где каждый метод есть результат работы chain(). При этом, для создания метода в chain() передаются синхронизатор, приёмник и предобработчик метода, которые оборачиваются и получают немного ссылок из замыкания chain().

Такая схема работает для простых по смыслу методов map() и filter(). А вот для reduce() нужно создавать внешние хранилища промежуточных значений, что порождает анонимку над chain(). Возможно, в дальнейшем, когда методов станет больше, chain() усложнится для лучшего описания методов с сильносвязанными значениями, типа сортировки.

Процедуры, определяющие логику метода:


Синхронизатор метода запускается из внутренней процедуры контроля счетчиков и получает счетчики своего и предыдущего замыкания chain(), флаг завершения предыдущейго метода, ссылку на запуск обработчика своего метода, ссылку на запуск следующего метода и ссылку на массив результатов. Для простых методов здесь вычисляется только завершенность исполнения метода. Для сложных дополнительно определяется очередность запуска обработчиков с подстановкой всех необходимых значений, поэтому в синхронизатор приходят ссылки на запуск. Каждый раз, когда синхронизируется один метод, синхронизируются все последующие – это позволяет отрабатывать логику методов наперед.

Приёмник результатов вставляется аргументом во внешний обработчик (аргумент метода) и получает оттуда значение. Каждый приёмник в обертке связывается с конкретным индексом, может отправить какое-нибудь значение по цепочке дальше и запускает синхронизацию. Запуск цепочки здесь необязателен потому, что далеко не всегда полученному значению необходимо продолжать обработку. И индекс можно вставить любой, лишь бы без повторов.

Предобработчик метода возник лишь для реализации логики свертки, конечно же, он будет полезен и для каких-нибудь других методов. Суть предобработчика в перехвате значений из цепочки обработки. Для reduce() это важно, так как обработчик требует два значения, а из цепочки приходит одно, а что ещё важнее, логика reduce() требует вовсе остановить цепочку. К тому же, предобработчик может запустить синхронизацию – но это скорее костыль, чтобы не распылять по процедурам ссылки на запуск.

В итоге


Таким образом, цепочка методов разворачивается и уплотняется скрытыми процедурами для обеспечения контролируемого асинхронного исполнения (а как иначе?), и по ней обрабатываются значения массива, где-то они проходят без ожидания остальных, где-то ждут, где-то пропадают. Утечки здесь закономерны в силу цепочечности замыканий, и всякая кривая ссылка будет держать всю цепочку. Есть ещё концептуальный вопрос поведения в случаях опустошения массива, выбрасывать ошибку или сохранять в результаты.

UPD: Теперь можно совмещать с обещаниями. Для этого появилась проверка в обертке приёмника данных и внешний метод .then(), который лишь прикидывается обещанием.

Similar posts

Ads
AdBlock has stolen the banner, but banners are not teeth — they will be back

More

Comments 9

    +2
    Код у вас получился весьма сложночитаемый, я поленился в него вникать, извините.

    Пример использования мне показался притянут за уши. Было бы интересно вычисления выполнять в WebWorker'е, тогда асинхронность будет оправдана.

    Но зачем все это, если есть промисы? Пример с использованием каррированных функций:

    ```js
    Promise
    .resolve( [ 1.1, 5.4, 8.3, 2.7, 4.9, 4.4, 7] )
    .then(mapAsync(plus(3)))
    .then(filterAsync(greater(3)))
    /*… */
    ```
      0
      Промисы – это дело предпочтений и знаний, мои остановились на колбэках. Про webworker-ы говорят, что они не имеют доступа к DOM, а так хочется.

      Вероятно, когда-нибудь переделаю свой код, чтобы он мог жевать промисы, рассовывать цепочки по worker-ам, читать мысли… но пока я лишь доказал для себя свою маленькую идею.

      А пример на числах для упрощенного восприятия вычислений в консоли.
        0
        Посмотрите Rx.js и нативные Stream в node.js
      0
      А какова должна быть практическая польза от этого дела. Еще не совсем понятно само определение «утечек», что это?

      P.S. Что-то мне подсказывает, что на самом деле вам нужны Observables — действительно мощнецкий инструмент для борьбы с асинхронностью. Посмотрите в сторону RxJS/xstream/most.js. Или я что-то упустил? :)
        0
        Пользы пока пренебрежимо мало, мой код работает только с массивами. А «утечка» – стандартный эффект неспособности сборщика мусора вытравить ненужное.

        Как мне показалось, rxjs, xstream, stream работают с единичными значениями. Для обработки массива по соответствующим массиву принципам нужно дописывать что-то внутри или снаружи этих библиотек. А mostjs уж очень «функциональнен», сходу толком не разобрал, но опять же – не массивы.

        В целом, обсерверы – те же события, а события – это колбэки, назначаемые из другого места, которое считают правильнее.
        hithim
          0
          Они работают с единичными значениями «over time» — то есть, с течением времени. Вот и асинхронность. Никто не мешает сделать поток из массива и разом пустить его через цепочку операторов стрима. Получится своего рода «асинхронный массив» (хотя и не совсем, но результат, как мне кажется, будет именно тот, что нужен).

          Тут, собственно, весь вопрос сводится к тому, что именно вы хотите делать с массивами с практической точки зрения?
            0
            Свернуть через удаленный ресурс, например.

            Конечно же, методов понадобится побольше, бэкенд нужно подготовить для такого и обеспечить транспортировку данных. Важно же то, что завязка на массив – пусть и малая часть всей логики – может быть учтена, а не создана вновь.
        +1

        Не понятно как в этом коде обрабатывать ошибки, видится что никак.

          0
          Пока никак, да.

        Only users with full accounts can post comments. Log in, please.