В этой статье я опишу две абстракции-классы, написанные средствами nodejs, которые предоставляют функционал распределения запросов по открытым каналам (tcp-socket). При этом учитывается общая загруженность системы и, если каналов не хватает, открываются новые, по мере уменьшения общего количества запросов — «лишние» каналы закрываются.
Этот клиент можно использовать для распределения запросов по каналам, которые представляют собой по сути net.Socket. Для этого нужно внести изменения в метод по открытию и закрытию канала, добавлению запроса в канал.
В примере, который я опишу, используется библиотека pg, предоставляющая функционал по открытию сокетов к серверу с базой данных. При этом дефолтовое управление пулом коннектов, предоставляемое библиотекой, никак не используется.
Для начала рассмотрим класс Connect, с помощью которого будет осуществляться управление одной сущностью — коннектом:
Теперь непосредственно класс Balanсer, который будет управлять нашими коннектами: открывать новые, закрывать лишние, распределять между ними запросы, предоставлять единый вход для сервиса
Как все это проверить? Для тестирования я использую запрос «select pg_sleep(1)», который выполняется 1 секунду и имитирует запрос к базе.
10 000 таких запросов обрабатывались балансировщиком ~101590 ms, при максимальном количестве запросов на коннект равным 100 и границах общего количества каналов=сокетов от 10 до 100.
Использованный скрипт:
Все исходники доступны на гитхабе.
Клиент еще, конечно, сырой, многое нужно допилить/переписать, так что прошу сильно не ругать. Если нужно, могу заняться им плотнее.
Этот клиент можно использовать для распределения запросов по каналам, которые представляют собой по сути net.Socket. Для этого нужно внести изменения в метод по открытию и закрытию канала, добавлению запроса в канал.
В примере, который я опишу, используется библиотека pg, предоставляющая функционал по открытию сокетов к серверу с базой данных. При этом дефолтовое управление пулом коннектов, предоставляемое библиотекой, никак не используется.
Для начала рассмотрим класс Connect, с помощью которого будет осуществляться управление одной сущностью — коннектом:
Класс Connect
/* Конструктор класса коннект, в качестве аргумента строка формата "pg://USER:PASSWORD@HOST:PORT/DATABASE" */
function Connect(connString) {
// сохраняем параметры в свойстве объекта
this._connString = connString;
// свойство отвечающее, за запуск обработки запросов
this._isRun = false;
// максимальное количество запросов помещенных в сокет, после которого будет вызвано событие "maxCount"
this._maxQueryCount = 100;
// служебное свойство, используемое в методе _nextTick
this._worked = false;
// количество запросов, висящих на коннекте
this._queryCount = 0;
// "движок" класса
this._emitter = new (require('events').EventEmitter);
// делаем "селфи"
var self = this;
// на открытие коннекта создаем обработчик "open", в котором регистрируем массив коннектов
this._emitter.on('open', function() {
self._arrayQuery = [];
});
// на событие ошибки будет сгенерирована ошибка, которая если не навесить обработчик, повалит выполнение скрипта
this._emitter.on('error', function(err) {
throw err;
});
// на событие достижения лимита этого коннекта, пометим его флагом
this._emitter.on('maxCount', function(message) {
self._setMax = true;
});
// при создании экземпляра класса открываем коннект до базы, здесь может быть открытие любого коннекта,
// который представляет собой по сути net.Socket
pg.connect(this._connString, function(err, client, done) {
if (err) {
return self._emitter.emit('error', err);
}
// запишем в "внутреннее" свойство ссылку на клиент, который общается с базой
self._client = client;
// "мягкое закрытие" клиента
self._done = done;
// вызываем событие готовности (передаем событие далее по цепочке)
self._emitter.emit('open');
});
}
/* метод, который предоставляет функционал по "навешиванию" обработчиков на события */
Connect.prototype.on = function(typeEvent, func) {
if(typeEvent == 'error') {
// если это обработчик на ошибки подменяем стандартный обработчик пользовательским
this._emitter.removeAllListeners('error');
}
this._emitter.addListener(typeEvent, func);
};
/* метод, которые запускает работу по обработке запросов */
Connect.prototype.start = function() {
this._isRun = true;
this._nextTick();
};
/* метод, которые останавливает работу по обработке запросов */
Connect.prototype.stop = function() {
this._isRun = false;
};
/* метод, возвращающий состоянии коннекта (заполнен оли он) */
Connect.prototype.isFull = function() {
return this._setMax;
};
/*
метод, закрывающий мягко коннект
(т.е. если на коннекте висят запросы, программа дождется их выполнения и закроет коннект)
*/
Connect.prototype.close = function () {
if(this._done) {
this._emitter.emit('close');
this._done();
} else {
this._emitter.emit('error', new Error('connect is not active'));
}
};
/* метод, возвращающий массив обрабатываемых запросов */
Connect.prototype.queryQueue = function () {
return this._arrayQuery;
};
/*
главный рабочий метод класса - добавление запроса.
В качестве аргументов сам запрос в виде строки, параметры запроса, коллбэк на завершении запроса
*/
Connect.prototype.addQuery = function (query, params, cb) {
if(!(typeof query == 'string')) {
return this._emitter.emit('error', new Error('not valid query'));
}
if( !(typeof params == "object") || !(params instanceof Array) ) {
return this._emitter.emit('error', new Error('not valid argument'));
}
this._queryCount++;
this._arrayQuery.push({ query: query, params: params, callback: cb });
if(this._queryCount > this._maxQueryCount) {
this._emitter.emit('maxCount', 'in queue added too many requests, the waiting time increases');
}
this._nextTick();
};
/* метод по манипулированию максимальным количеством запросов в коннекте */
Connect.prototype.maxQueryCount = function (count) {
if(count) {
this._maxQueryCount = count;
} else {
return this._maxQueryCount;
}
};
/* возвращает количество обрабатываемых запросов */
Connect.prototype.queryCount = function () {
return this._queryCount;
};
/*
внутренний метод класса, ответственный за выполнение запросов,
в данном случае реализован вариант, когда все запросы сразу отправляются
к базе, возможна реализация в случае с последовательным выполнением
запросы хранятся во внутреннем буффере (массиве _arrayQuery)
и отправляются к базе по мере выполнения предыдущего
*/
Connect.prototype._nextTick = function() {
var self = this;
if(this._worked) {
return;
}
while(this._isRun && this._arrayQuery.length>0) {
this._worked = true;
var el = this._arrayQuery.shift();
// здесь используется синтаксис библиотеки pg, к которой мы привязаны
this._client.query(el.query, el.params, function(err, result) {
self._queryCount--;
if(err) {
return el.callback(err);
}
el.callback(null, result);
if(self._queryCount==0) {
self._emitter.emit('drain');
self._setMax = false;
}
})
}
this._worked = false;
};
Теперь непосредственно класс Balanсer, который будет управлять нашими коннектами: открывать новые, закрывать лишние, распределять между ними запросы, предоставлять единый вход для сервиса
Класс Balancer
/* конструктор класса, который будет распределять запросы */
function Balancer(minCountConnect, maxCountConnect) {
// записываем в свойство максимальный предел открытых коннектов до базы
this._maxCountConnect = maxCountConnect;
// записываем в свойство минимальный предел открытых коннектов до базы
this._minCountConnect = minCountConnect;
// массив коннектов
this._connectArray = [];
// закрываемые коннекты
this._closedConnect = [];
// массив задач
this._taskArray = [];
// служебный флаг
this._run = false;
// движок класса
this._emitter = new (require('events').EventEmitter);
// запускаем инициализацию
this._init();
}
/* метод инициализации класса, открывающий коннекты последовательно, один за другим */
Balancer.prototype._init = function() {
this._cursor = 0;
this.activQuery = 0;
var self = this;
var i=0;
// рекурсивный вызов функции, добавляющей новый коннект
var cycle = function() {
i++;
if(i<self._minCountConnect) {
self._addNewConnect(cycle);
} else {
self._emitter.emit('ready');
}
};
this._addNewConnect(cycle);
};
/* собственно метод, открывающий соединение, используем класс коннекта */
Balancer.prototype._addNewConnect = function(cb) {
var self = this;
var connect = new GPSconnect(connString);
connect.on('open', function() {
self._connectArray.push(connect);
cb();
});
};
/* метод, по проверке "загруженности" коннекта и возвращающий индекс коннекта */
Balancer.prototype._cycle = function(pos) {
for (var i=pos;i<this._connectArray.length;i++) {
if( !(this._connectArray[i].isFull()) )
break;
}
return i;
};
/* метод, заполняющий коннект запросами */
Balancer.prototype._next = function(connect, el) {
connect.addQuery(el.query, el.params, el.cb);
connect.start();
this._distribution();
};
/*
Главный метод класса - распределяет запросы между коннектами.
Распределение проходит по принципу "Round-robin" с проверкой на загруженность коннекта.
Это нужно в случае, если какой то запрос оказался "тяжелым",
чтобы снять нагрузку с этого коннекта и перераспределить запросы на другие коннекты
код оформлен конечно криво, надеюсь в скором времени поправить
*/
Balancer.prototype._distribution = function() {
if(this._taskArray.length>0) {
var el = this._taskArray.shift();
this._cursor = this._cycle(this._cursor);
var self = this;
if(this._cursor<this._connectArray.length) {
var connect = this._connectArray[this._cursor];
this._next(connect, el);
this._cursor++;
} else {
this._cursor = this._cycle(0);
if(this._cursor<this._connectArray.length) {
var connect = this._connectArray[this._cursor];
this._next(connect, el);
this._cursor++;
} else if( this._connectArray.length<this._maxCountConnect) {
self._addNewConnect(function() {
self._cursor = self._connectArray.length-1;
var connect = self._connectArray[self._cursor];
self._next(connect, el);
});
} else {
for (var i=0;i<this._connectArray.length;i++) {
if( this.activQuery/this._connectArray.length > this._connectArray[i].queryCount() ) {
break;
}
}
if(i==this._connectArray.length) {
i = 0;
}
this._cursor = i;
var connect = this._connectArray[this._cursor];
this._next(connect, el);
}
}
} else {
this._run = false;
}
};
/* метод, который предоставляет функционал по "навешиванию" обработчиков на события */
Balancer.prototype.on = function(typeEvent, func) {
this._emitter.addListener(typeEvent, func);
};
/*
метод, вызываемый для проверки количества открытых коннектов, и если необходимости в таком количестве нет
"лишние" коннекты исключается из системы распределения
*/
Balancer.prototype._removeLoad = function() {
var self = this;
var temp = this._connectArray[0].maxQueryCount().toFixed();
var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp;
if(currentCount< this._connectArray.length ) {
while( this._connectArray.length != currentCount ) {
var poppedConnect = this._connectArray.pop();
if(poppedConnect.queryCount()==0) {
poppedConnect.close();
} else {
poppedConnect.index = self._closedConnect.length;
poppedConnect.on('drain', function() {
poppedConnect.close();
self._closedConnect.slice(poppedConnect.index, 1);
});
self._closedConnect.push(poppedConnect);
}
}
}
};
/*
Cобственно метод, который предоставляет вход-трубу, через который добавляются все запросы.
Параметр tube, возможно использовать для дифференсации запросов между собой,
пока он никак не используется.
*/
Balancer.prototype.addQuery = function(tube, query, params, cb) {
this.activQuery++;
var self = this;
this._removeLoad();
var wrappCb = function() {
self.activQuery--;
cb.apply(this, arguments);
};
this._taskArray.push({ query: query, params: params, cb: wrappCb });
if(!this._run) {
this._run = true;
this._distribution();
}
};
Как все это проверить? Для тестирования я использую запрос «select pg_sleep(1)», который выполняется 1 секунду и имитирует запрос к базе.
10 000 таких запросов обрабатывались балансировщиком ~101590 ms, при максимальном количестве запросов на коннект равным 100 и границах общего количества каналов=сокетов от 10 до 100.
Использованный скрипт:
var balancer = new Balancer(10,100);
balancer.on('ready', function() {
var y=0;
var time = +new Date();
for(var i=0;i<10000; i++) {
balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) {
if(err) console.log(err);
y++;
if(y==10000) {
console.log(balancer._connectArray.length);
console.log(+new Date()-time);
}
});
}
});
Все исходники доступны на гитхабе.
Клиент еще, конечно, сырой, многое нужно допилить/переписать, так что прошу сильно не ругать. Если нужно, могу заняться им плотнее.