Добрый день, Хабрахабр.
Была довольно простая задача: получить набор документов из базы, каждый документ преобразовать и отправить пользователю все преобразованные документы, порядок их менять нельзя, для обработки документа используется асинхронная функция. Если на каком-то документе вылезла ошибка — документы мы не отправляем, только ошибку и заканчиваем обработку документов.
Для решения задачи была выбрана библиотека Q, так как сам поход Promise мне симпатичен. Но возникла одна загвоздка, вроде бы элементарная задача, а выполняется больше секунды, а точнее 1300 мс, вместо ожидаемых 50-80 мс. Дабы разобраться, как все устроено и проникнуться асинхронностью было решено написать специализированный «велосипед» под данную задачу.
В первую очередь, хотелось бы рассказать, как это было реализовано изначально.
1. Процедура, которая проходила последовательно по массиву и возвращала нам promise.
2. Процедура, которая обрабатывала документ и возвращала нам promise.
3. И головная процедура, отправляющая результат пользователю
Кто-то может заметить, что всевозможных «обещаний» очень много даже там, где они не сильно нужны, и что такое большое зацикливание привело к такой скорости, но процедура отдает всего 20 простых документов, преобразования примитивны и выполнять их такое количество времени уже точно никуда не годиться.
В сети полно описаний что и как. Опишу вкратце. Promise — это своего рода обещание. Какая-то функция нам обещает результат, получить его мы можем с помощью then(success, error), в свою очередь, при успешной обработке мы может назначить новый promise и так же обработать его. В частном случае это выглядит так:
Результат каждого этапа передается как параметр в следующий и так последовательно. В итоге мы обрабатываем все ошибки в одном блоке и избавляемся от «лапши».
Внутри выглядит примерно так: создаются события, которые вызываются при успешном завершении или при ошибке:
Все это можно почитать здесь.
Теория закончилась, приступим к практике.
Задачей этого объекта будет создать нужные нам события и выдать Promise
Задача его будет отслеживать события "completed" и "error" вызывать нужные функции, которые назначены через "then" и отслеживать что там вернула эта функция: если возвратила нам еще один promise то подключаться к нему для того что бы срабатывали следующие then, если просто данные, то выполнять последующие then, таким образом мы сможем строить цепочки из then.
Итак, базовая модель готова. Осталось сделать обвязку для функций с callback
Его задача — сделать обертку для функции с callback с возможностью указания this и аргументов запуска
Тут все просто: нам передают массив функций, мы их обвязываем через promisefn и, когда они все выполнятся — вызываем resolve
После тестирования старый подход (через библиотеку Q) был переписанзаменено пару объявлений и запущен в тех же условиях. Результат положительный — 50-100 мс (вместо прежних 1300 мс).
Все исходники доступны на Github, там же можно найти и примеры. Изобретение «велосипедов» полезно хотя бы тем, что это улучшает понимание.
Спасибо за внимание!
Предисловие
Была довольно простая задача: получить набор документов из базы, каждый документ преобразовать и отправить пользователю все преобразованные документы, порядок их менять нельзя, для обработки документа используется асинхронная функция. Если на каком-то документе вылезла ошибка — документы мы не отправляем, только ошибку и заканчиваем обработку документов.
Для решения задачи была выбрана библиотека Q, так как сам поход Promise мне симпатичен. Но возникла одна загвоздка, вроде бы элементарная задача, а выполняется больше секунды, а точнее 1300 мс, вместо ожидаемых 50-80 мс. Дабы разобраться, как все устроено и проникнуться асинхронностью было решено написать специализированный «велосипед» под данную задачу.
Как было устроена работа с Q
В первую очередь, хотелось бы рассказать, как это было реализовано изначально.
1. Процедура, которая проходила последовательно по массиву и возвращала нам promise.
forEachSerial
function forEachSerial(array, func) {
var tickFunction = function (el, index, callback) {
if (func.length == 2)
func(el, callback);
else if (func.length == 3)
func(el, index, callback);
}
var functions = [];
array.forEach(function (el, index) {
functions.push(Q.nfcall(tickFunction, el, index));
});
return Q.all(functions);
}
2. Процедура, которая обрабатывала документ и возвращала нам promise.
documentToJSON
function documentToJSON(el, options) {
var obj = el.toObject(options);
var deferred = Q.defer();
var columns = ['...','...','...'];
forEachSerial(columns,function (el, next) {
el.somyAsyncFunction(options, function (err, result) {
if (err)
next(err);
else result.somyAsyncFunction2(options, function (err, result) {
if (!err)
obj[el] = result;
next(err);
});
});
}).then(function () {
deferred.resolve(obj);
}, function (err) {
deferred.reject(err);
});
return deferred.promise;
}
3. И головная процедура, отправляющая результат пользователю
sendResponse
exports.get = function (req, res, next) {
dbModel.find(search, {}, { skip: from, limit: limit }).sort({column1: -1}).exec(function (err, result) {
if (err)
next(new errorsHandlers.internalError());
else {
var result = [];
forEachSerial(result,function (el, index, next) {
documentToJSON(el,options).then(function (obj) {
result[index] = obj;
next()
}, function (err) {
next(new errorsHandlers.internalError())
});
}).then(function () {
res.send({responce: result});
}, function (err) {
next(new errorsHandlers.internalError())
});
}
});
};
Кто-то может заметить, что всевозможных «обещаний» очень много даже там, где они не сильно нужны, и что такое большое зацикливание привело к такой скорости, но процедура отдает всего 20 простых документов, преобразования примитивны и выполнять их такое количество времени уже точно никуда не годиться.
Пишем свою библиотеку promise
Как вообще «это» работает
В сети полно описаний что и как. Опишу вкратце. Promise — это своего рода обещание. Какая-то функция нам обещает результат, получить его мы можем с помощью then(success, error), в свою очередь, при успешной обработке мы может назначить новый promise и так же обработать его. В частном случае это выглядит так:
Promise.then(step1).then(step2).then(step3).then(function () {
//All OK
}, function (err) {
//Error in any step
});
Результат каждого этапа передается как параметр в следующий и так последовательно. В итоге мы обрабатываем все ошибки в одном блоке и избавляемся от «лапши».
Внутри выглядит примерно так: создаются события, которые вызываются при успешном завершении или при ошибке:
var promise = fs.stat("foo");
promise.addListener("success", function (value) {
// ok
})
promise.addListener("error", function (error) {
// error
});
Все это можно почитать здесь.
Теория закончилась, приступим к практике.
Начнем с простого — Deferred
Задачей этого объекта будет создать нужные нам события и выдать Promise
function deferred() {
this.events = new EventEmitter(); //Объект события
this.promise = new promise(this); // Возвращаемый нам Promise
this.thenDeferred = []; //Последующие обработчики, нужны для того что бы передать ошибку дальше по цепочке
var self = this;
//Вызывается в успешном случае
this.resolve = function () {
self.events.emit('completed', arguments);
}
//Вызывается в случае ошибки
this.reject = function (error) {
self.events.emit('error', error);
//Передаем ошибку дальше по цепочке
self.thenDeferred.forEach(function (el) {
el.reject(error);
});
}
}
Объект — Promise
Задача его будет отслеживать события "completed" и "error" вызывать нужные функции, которые назначены через "then" и отслеживать что там вернула эта функция: если возвратила нам еще один promise то подключаться к нему для того что бы срабатывали следующие then, если просто данные, то выполнять последующие then, таким образом мы сможем строить цепочки из then.
function promise(def) {
this.def = def;
this.completed = false;
this.events = def.events;
var self = this;
var thenDeferred;
self._successListener = null;
self._errorListener = null;
//Результатом выполнения then - будет возвращаться новый promise
this.then = function (success, error) {
if (success)
self._successListener = success;
if (error)
self._errorListener = error;
thenDeferred = new deferred();
self.def.thenDeferred.push(thenDeferred);
return thenDeferred.promise;
}
//Обрабатываем успешное выполнение задачи
this.events.on('completed', function (result) {
// объекты, аргументы, массивы приводим к виду массива для передачи их в дальнейшем как атрибуты в функцию
var args = inputOfFunctionToArray(result);
//Если вдруг задача была уже выполнена, то дальше не проходим
if (self.completed) return;
self.completed = true;
if (self._successListener) {
var result;
try {
result = self._successListener.apply(self, args);
} catch (e) {
self.def.reject(e);
result;
}
//Если результатом функции Promise и есть последующие then, подключаемся к нему
var promise;
if (isPromise(result))
promise = result;
else if (result instanceof deferred)
promise = result.promise;
if (promise && thenDeferred) {
promise.then(function () {
var args = arguments;
process.nextTick(function () {
thenDeferred.resolve.apply(self, args);
});
}, function (error) {
process.nextTick(function () {
thenDeferred.reject(error);
});
});
} else if (thenDeferred)
process.nextTick(function () {
//Для скалярных параметров просто запускаем следующие then
thenDeferred.resolve.apply(self, [result]);
});
} else if (thenDeferred)
process.nextTick(function () {
thenDeferred.resolve.apply(self, []);
});
});
//Обрабатываем ошибки
this.events.on('error', function (error) {
if (self.completed) return;
self.completed = true;
if (self._errorListener)
process.nextTick(function () {
self._errorListener.apply(self, [error]);
});
});
}
Итак, базовая модель готова. Осталось сделать обвязку для функций с callback
PromiseFn
Его задача — сделать обертку для функции с callback с возможностью указания this и аргументов запуска
var promisefn = function (bind, fn) {
var def = new deferred();
//bind является не обязательным параметром
if (typeof bind === 'function' && !fn) {
fn = bind;
bind = def;
}
//Назначаем наш callback для данной функции
var callback = function (err) {
if (err)
def.reject(err);
else {
var args = [];
for (var key in arguments)
args.push(arguments[key]);
args.splice(0, 1);
def.resolve.apply(bind, args);
}
};
var result = function () {
var args = [];
for (var key in arguments)
args.push(arguments[key]);
args.push(callback);
process.nextTick(function () {
fn.apply(bind, args);
});
return def.promise;
}
return result;
}
И напоследок ALL — последовательное выполнение функций с callback
Тут все просто: нам передают массив функций, мы их обвязываем через promisefn и, когда они все выполнятся — вызываем resolve
var all = function (functions) {
var def = new deferred();
process.nextTick(function () {
var index = -1;
var result = [];
var next = function (err, arguments) {
if (err) {
def.reject(err);
return;
}
if (arguments) result.push(inputOfFunctionToArray(arguments));
index++;
if (index >= functions.length) {
def.resolve(result);
} else process.nextTick(function () {
promisefn(functions[index])().then(function () {
var args = arguments;
process.nextTick(function () {
next(err, args);
});
}, function (err) {
process.nextTick(function () {
next(err);
});
});
});
}
process.nextTick(next);
});
return def.promise;
}
В заключение
После тестирования старый подход (через библиотеку Q) был переписан
Все исходники доступны на Github, там же можно найти и примеры. Изобретение «велосипедов» полезно хотя бы тем, что это улучшает понимание.
Спасибо за внимание!