Не так давно мне выпала честь сделать реализацию демонов (воркеров) на Node.js для использования во всех проектах, разрабатываемых нашей компанией. Мы часто разрабатываем проекты, где необходимо процессить и распределенно хранить видео файлы на нескольких серверах, так что нам нужно иметь готовый инструмент для этого.
Постановка задачи:
- Разработка должна быть на Node.js. Мы давно используем эту платформу для разработки всех наших проектов, так что здесь это оправданный выбор.
- Все ноды в кластере должны быть равнозначными. Не должно быть специального управляющего звена или мастера. В противном случае остановка мастера может привести к останову всего кластера.
- Задачи должны находиться в таблице MySQL. Это намного гибче и информативнее, чем использовать какой-нибудь MQ. Всегда можно получить доступ ко всем задачам, оценить очередь, переназначить их на другую ноду и т.д.
- Каждый воркер должен уметь обрабатывать несколько задач одновременно.
Сразу привожу ссылку на GitHub того, что получилось: (https://github.com/pipll/node-daemons).
Запуск воркеров
Каждый воркер это отдельный процесс Node.js. Для создания процессов воркеров используется встроенный модуль cluster. Он также контролирует падения воркеров и заново их запускает.
'use strict';
const config = require('./config/config');
const _ = require('lodash');
const path = require('path');
const cluster = require('cluster');
const logger = require('log4js').getLogger('app');
const models = require('./models');
// Таймер для проверки количества воркеров и остановки мастера
let shutdownInterval = null;
if (cluster.isMaster) {
// Запускаем воркеры
_.each(config.workers, (conf, name) => {
if (conf.enabled) {
startWorker(name);
}
});
} else {
// Инициализируем воркер
let name = process.env.WORKER_NAME;
let WorkerClass = require(path.join(__dirname, 'workers', name + '.js'));
let worker = null;
if (WorkerClass) {
worker = new WorkerClass(name, config.workers[name]);
// Запускаем воркер
worker.start();
// Подписываемся на событие, когда воркер остановлен
worker.on('stop', () => {
process.exit();
});
}
// Подписываемся на события от мастера
process.on('message', message => {
if ('shutdown' === message) {
if (worker) {
worker.stop();
} else {
process.exit();
}
}
});
}
// Shutdown
process.on('SIGTERM', shutdownCluster);
process.on('SIGINT', shutdownCluster);
// Метод запуска воркера
function startWorker(name) {
let worker = cluster.fork({WORKER_NAME: name}).on('online', () => {
logger.info('Start %s worker #%d.', name, worker.id);
}).on('exit', status => {
// Когда воркер был остановлен
if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) {
// Если воркер был остановлен контролируемо, то ни чего не делаем
logger.info('Worker %s #%d was killed.', name, worker.id);
} else {
// Если воркер был остановлен неконтролируемо, то запускаем его еще раз
logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id);
startWorker(name);
}
});
}
// Метод остановки кластера
function shutdownCluster() {
if (cluster.isMaster) {
clearInterval(shutdownInterval);
if (_.size(cluster.workers) > 0) {
// Посылаем сигнал останова каждому воркеру
logger.info('Shutdown workers:', _.size(cluster.workers));
_.each(cluster.workers, worker => {
try {
worker.send('shutdown');
} catch (err) {
logger.warn('Cannot send shutdown message to worker:', err);
}
});
// Ожидаем останов всех воркеров
shutdownInterval = setInterval(() => {
if (_.size(cluster.workers) === 0) {
process.exit();
}
}, config.shutdownInterval);
} else {
process.exit();
}
}
}
Я хотел бы обратить внимание на несколько моментов:
- Для запуска воркера используется fork процесса, при этом в переменной окружения
WORKER_NAME
передается название запускаемого воркера. - Когда воркер неконтролируемо завершает работу, мы перезапускаем его.
- Чтобы контролируемо остановить воркеры, мы посылаем им сигнал shutdown. Воркер реагирует на это событие и после окончания выполнения задачи делает
process.exit()
. - Мастер наблюдает за количеством воркеров с помощью
setInterval
и когда все воркеры будут остановлены делаетprocess.exit()
.
Компонент базового воркера
Этот компонент предназначен для выполнения периодических процессов и не работает с очередью задач.
'use strict';
const _ = require('lodash');
const Promise = require('bluebird');
const log4js = require('log4js');
const EventEmitter = require('events');
const WorkerStates = require('./worker_states');
class Worker extends EventEmitter {
constructor(name, conf) {
super();
this.name = name;
// Значения настроек по умолчанию
this.conf = _.defaults({}, conf, {
sleep: 1000 // Задержка между запусками
});
this.logger = log4js.getLogger('worker-' + name);
// Флаг останова воркера
this.stopped = true;
// Таймер для задержки между запусками loop мотода
this.timer = null;
// Состояние воркера
this.state = null;
}
// Метод запуска воркера
start() {
this.logger.info('Start');
this.stopped = false;
this.state = WorkerStates.STATE_IDLE;
return this._startLoop();
}
// Метод останова воркера
stop() {
this.logger.info('Stop');
this.stopped = true;
if (this.state === WorkerStates.STATE_IDLE) {
// Останавливаем таймер задержки
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
this.state = WorkerStates.STATE_STOP;
// Вызываем событие останова воркера
this.emit('stop');
}
}
// Метод для выполнения бизнес задач
loop() {
return Promise.resolve();
}
// Метод запуска и обработки loop метода
_startLoop() {
this.state = WorkerStates.STATE_WORK;
return this.loop().catch(err => {
this.logger.warn('Loop error:', err);
}).finally(() => {
this.state = WorkerStates.STATE_IDLE;
if (!this.stopped) {
// Делаем повторный запуск по окончании работы loop метода
this.timer = setTimeout(() => {
this._startLoop();
}, this.conf.sleep);
} else {
this.state = WorkerStates.STATE_STOP;
// Вызываем событие останова воркера
this.emit('stop');
}
});
}
}
module.exports = Worker;
Код простейшего воркера может выглядеть так:
'use strict';
const Promise = require('bluebird');
const Worker = require('../components/worker');
class Sample extends Worker {
loop() {
this.logger.info("Loop method");
return Promise.resolve().delay(30000);
}
}
module.exports = Sample;
Некоторые особенности:
loop
метод предназначен для наследования в потомках и реализации бизнес задач. Возвращаемым значением этого метода должен бытьPromise
.- По окончании работы
loop
метода он запускается заново через указанное в настройках воркера время. - Воркер имеет три состояния:
- STATE_IDLE — во время паузы между запусками
loop
метода. - STATE_WORK — во время работы
loop
метода. - STATE_STOP — после останова воркера.
- STATE_IDLE — во время паузы между запусками
Компонент воркера обработки задач
Это основной компонент, предназначенный для параллельной обработки задач из таблицы MySQL.
'use strict';
const config = require('../config/config');
const _ = require('lodash');
const Promise = require('bluebird');
const Worker = require('./worker');
const WorkerStates = require('./worker_states');
const models = require('../models');
class TaskWorker extends Worker {
constructor(name, conf) {
super(name, conf);
// Значения настроек по умолчанию
this.conf = _.defaults({}, this.conf, {
maxAttempts: 3, // Максимальное количество попыток запуска задачи
delayRatio: 300000, // Базовое значение отсрочки запуска задачи
count: 1, // Максимальное количество одновременно обрабатываемых задач
queue: '', // Название очереди для получения задач
update: 3000 // Интервал обновления статуса задачи
});
// Счетчик одновременно обрабатываемых задач
this.count = 0;
}
loop() {
if (this.count < this.conf.count && !this.stopped) {
// Получение очередной задачи
return this._getTask().then(task => {
if (task) {
// Увеличение счетчика одновременно обрабатываемых задач
this.count++;
// Запуск периодического обновления статуса задачи
let interval = setInterval(() => {
return models.sequelize.transaction(t => {
return task.touch({transaction: t});
});
}, this.conf.update);
// Запуск метода обработки задачи
this.handleTask(task.get({plain: true})).then(() => {
// Завершение задачи
return models.sequelize.transaction(t => {
return task.complete({transaction: t}).then(() => {
this.logger.info('Task completed:', task.id);
});
});
}).catch(err => {
// Задача не была выполнена - перезапуск задачи
this.logger.warn('Handle error:', err);
return this.delay(task).then(delay => {
return models.sequelize.transaction(t => {
return task.fail(delay, {transaction: t}).then(() => {
this.logger.warn('Task failed:', task.id);
});
});
});
}).finally(() => {
clearInterval(interval);
this.count--;
}).done();
return null;
}
});
} else {
return Promise.resolve();
}
}
// Метод обработки задачи
handleTask() {
return Promise.resolve();
}
// Метод вычисления отсрочки задачи после неуспешного запуска
delay(task) {
return Promise.resolve().then(() => {
return task.attempts * this.conf.delayRatio;
});
}
// Метод получения задачи для обработки
_getTask() {
return models.sequelize.transaction({autocommit: false}, t => {
return models.Task.scope({
method: ['forWork', this.conf.queue, config.node_id]
}).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => {
if (task) {
return task.work(config.node_id, {transaction: t});
}
});
});
}
_startLoop() {
this.state = WorkerStates.STATE_WORK;
return this.loop().catch(err => {
this.logger.warn('Loop error:', err);
}).finally(() => {
if (this.count === 0) {
this.state = WorkerStates.STATE_IDLE;
}
if (this.stopped && this.count === 0) {
this.state = WorkerStates.STATE_STOP;
this.emit('stop');
} else {
this.timer = setTimeout(() => {
this._startLoop();
}, this.conf.sleep);
}
});
}
}
module.exports = TaskWorker;
Код простейшего воркера может выглядеть так:
'use strict';
const Promise = require('bluebird');
const TaskWorker = require('../components/task_worker');
class Sample extends TaskWorker {
handleTask(task) {
this.logger.info('Sample Task:', task);
return Promise.resolve().delay(30000);
}
}
module.exports = Sample;
Особенности работы компонента:
- Для получения задачи из базы данных используется конструкция
SELECT ... FOR UPDATE
и последующийUPDATE
записи в базе данных в одной транзакции с отключенным автокоммитом. Это позволяет получить эксклюзивный доступ к задаче даже при одновременных запросах с нескольких серверов. - Во время обработки задачи запускается периодический процесс обновления статуса задачи в базе данных. Это необходимо чтобы отличить долго работающую задачу от неожиданно завершенной задачи без обновления статуса.
- Статус обработанной задачи определяется статусом
Promise
возвращаемого методомhandleTask
. При успехе задача помечается как выполненная. В противном случае задача помечается как провальная и запускается с отсрочкой, задаваемой в методеdelay
.
Модель работы с задачами
Для работы с моделями базы данных используется модуль sequelize. Все задачи находятся в таблице tasks. Таблица имеет следующую структуру:
Поле | Тип | Описание |
---|---|---|
id |
integer, autoincrement | ID задачи |
node_id |
integer, nullable | ID ноды, для которой предназначена задача |
queue |
string | Очередь задачи |
status |
enum | Статус задачи |
attempts |
integer | Количество попуток запуска задачи |
priority |
integer | Приоритет задачи |
body |
string | Тело задачи в JSON формате |
start_at |
datetime, nullable | Дата и время начала обработки задачи |
finish_at |
datetime, nullable | Дата и время окончания обработки задачи (аналог TTL) |
worker_node_id |
integer, nullable | ID ноды, которая начала обработку задачи |
worker_started_at |
datetime, nullable | Дата и время начала обработки задачи |
checked_at |
datetime, nullable | Дата и время обновления статуса работы задачи |
created_at |
datetime, nullable | Дата и время создания задачи |
updated_at |
datetime, nullable | Дата и время изменения задачи |
Задача может быть назначена как конкретной ноде, так и любой ноде в кластере. Это регулируется полем node_id
во время создания задачи. Задачу можно запустить с отсрочкой (поле start_at
) и с ограничением временем обработки (поле finish_at
).
Различные воркеры работают с различными очередями задач, задаваемыми в поле queue
. Приоритет обработки задается в поле priority
(чем больше, тем выше приоритет). Количество перезапусков задачи сохраняется в поле attempts
. В теле задачи (поле body
) в JSON формате передаются параметры, необходимые воркеру.
Поле checked_at
выполняет роль признака работающей задачи. Значение его все время меняется во время работы задачи. Если значение поля checked_at
долго не менялось, а задача находится в статусе working, то задача считается проваленной и статус ее меняется на failure.
'use strict';
const moment = require('moment');
module.exports = function(sequelize, Sequelize) {
return sequelize.define('Task', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true
},
node_id: {
type: Sequelize.INTEGER
},
queue: {
type: Sequelize.STRING,
allowNull: false
},
status: {
type: Sequelize.ENUM,
values: ['pending', 'working', 'done', 'failure'],
defaultValue: 'pending',
allowNull: false
},
attempts: {
type: Sequelize.INTEGER,
defaultValue: 0,
allowNull: false
},
priority: {
type: Sequelize.INTEGER,
defaultValue: 10,
allowNull: false
},
body: {
type: Sequelize.TEXT,
set: function(body) {
return this.setDataValue('body', JSON.stringify(body));
},
get: function() {
try {
return JSON.parse(this.getDataValue('body'));
} catch (e) {
return null;
}
}
},
start_at: {
type: Sequelize.DATE
},
finish_at: {
type: Sequelize.DATE
},
worker_node_id: {
type: Sequelize.INTEGER
},
worker_started_at: {
type: Sequelize.DATE
},
checked_at: {
type: Sequelize.DATE
}
}, {
tableName: 'tasks',
freezeTableName: true,
underscored: true,
scopes: {
forWork: function(queue, node_id) {
return {
where: {
node_id: {
$or: [
null,
node_id
]
},
queue: queue,
status: 'pending',
start_at: {
$or: [
null,
{
$lt: moment().toDate()
}
]
},
finish_at: {
$or: [
null,
{
$gte: moment().toDate()
}
]
}
},
order: [
['priority', 'DESC'],
['attempts', 'ASC'],
[sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC']
]
};
}
},
instanceMethods: {
fail: function(delay, options) {
this.start_at = delay ? moment().add(delay, 'ms').toDate() : null;
this.attempts = sequelize.literal('attempts + 1');
this.status = 'failure';
return this.save(options);
},
complete: function(options) {
this.status = 'done';
return this.save(options);
},
work: function(node_id, options) {
this.status = 'working';
this.worker_node_id = node_id;
this.worker_started_at = moment().toDate();
return this.save(options);
},
check: function(options) {
this.checked_at = moment().toDate();
return this.save(options);
}
}
});
};
Жизненный цикл задач
Все задачи проходят следующий жизненный цикл:
- Новая задача создается со статусом pending.
- Когда наступает время обработки задачи, ее получает первый свободный воркер, переводит в статус working и заполняет поля
worker_node_id
иworker_started_at
. - Во время обработки задачи воркер с некоторой периодичностью (по умолчанию каждые 10 секунд) обновляет поле
checked_at
для нотификации о корректной работе. - По окончании работы над задачей может быть несколько вариантов развития событий:
4.1. Если задача успешно завершилась, то задача переводится в статус done.
4.2. Если выполнение задачи провались, то задача переводится в статус failure, увеличивается количествоattempts
и происходит отсрочка запуска на заданное количество времени (вычисляется в методеdelay
основываясь на количестве попыток и настройкеdelayRatio
).
В проекте так же есть встроенный модуль Manager
, который запускается на каждой ноде и делает следующую обработку задач:
- Переводит зависшие задачи в статус failure.
- Переводит проваленные задачи в статус pending для новой обработки.
- Удаляет невыполненные задачи, срок запуска которых уже истек.
- Удаляет успешно завершенные задачи с отсрочкой в 1 час (может быть настроено).
- Удаляет проваленные завершенные задачи с израсходованным количеством попыток запуска с отсрочкой в 3 дня (может быть настроено).
Кроме того этот модуль работает с включением/отключением нод.
'use strict';
const _ = require('lodash');
const moment = require('moment');
const Promise = require('bluebird');
const Worker = require('../components/worker');
const models = require('../models');
const config = require('../config/config');
class Manager extends Worker {
constructor(name, conf) {
super(name, conf);
this.conf = _.defaults({}, this.conf, {
maxUpdate: 30000, // 30 seconds
maxCompleted: 3600000, // 1 hour
maxFailed: 259200000 // 3 days
});
}
loop() {
return models.sequelize.transaction(t => {
return Promise.resolve()
.then(() => {
return this._checkCurrentNode(t);
})
.then(() => {
return this._activateNodes(t);
})
.then(() => {
return this._pauseNodes(t);
})
.then(() => {
return this._restoreFrozenTasks(t);
})
.then(() => {
return this._restoreFailedTasks(t);
})
.then(() => {
return this._deleteDeadTasks(t);
})
.then(() => {
return this._deleteCompletedTasks(t);
})
.then(() => {
return this._deleteFailedTasks(t);
});
});
}
_checkCurrentNode(t) {
return models.Node.findById(config.node_id, {transaction: t}).then(node => {
if (node) {
return node.check();
}
});
}
_activateNodes(t) {
return models.Node.update({
is_active: true
}, {
where: {
is_active: false,
checked_at: {
$gte: moment().subtract(2 * this.conf.sleep).toDate()
}
},
transaction: t
}).spread(count => {
if (count > 0) {
this.logger.info('Activate nodes:', count);
}
});
}
_pauseNodes(t) {
return models.Node.update({
is_active: false
}, {
where: {
is_active: true,
checked_at: {
$lt: moment().subtract(2 * this.conf.sleep).toDate()
}
},
transaction: t
}).spread(count => {
if (count > 0) {
this.logger.info('Pause nodes:', count);
}
});
}
_restoreFrozenTasks(t) {
return models.Task.update({
status: 'failure',
attempts: models.sequelize.literal('attempts + 1')
}, {
where: {
status: 'working',
checked_at: {
$lt: moment().subtract(this.conf.maxUpdate).toDate()
}
},
transaction: t
}).spread(count => {
if (count > 0) {
this.logger.info('Restore frozen tasks:', count);
}
});
}
_restoreFailedTasks(t) {
let where = [{status: 'failure'}];
let conditions = this._failedTasksConditions();
if (conditions.length) {
where.push({$or: conditions});
}
return models.Task.update({
status: 'pending',
worker_node_id: null,
worker_started_at: null
}, {
where: where,
transaction: t
}).spread(count => {
if (count > 0) {
this.logger.info('Restore failure tasks:', count);
}
});
}
_deleteDeadTasks(t) {
return models.Task.destroy({
where: {
status: 'pending',
finish_at: {
$lt: moment().toDate()
}
},
transaction: t
}).then(count => {
if (count > 0) {
this.logger.info('Delete dead tasks:', count);
}
});
}
_deleteCompletedTasks(t) {
return models.Task.destroy({
where: {
status: 'done',
checked_at: {
$lt: moment().subtract(this.conf.maxCompleted).toDate()
}
},
transaction: t
}).then(count => {
if (count > 0) {
this.logger.info('Delete completed tasks:', count);
}
});
}
_deleteFailedTasks(t) {
let where = [
{status: 'failure'},
{checked_at: {
$lt: moment().subtract(this.conf.maxFailed).toDate()
}}
];
let conditions = this._failedTasksConditions();
if (conditions.length) {
where.push({$or: conditions});
}
return models.Task.destroy({
where: where,
transaction: t
}).then(count => {
if (count > 0) {
this.logger.info('Delete failed tasks:', count);
}
});
}
_failedTasksConditions() {
let conditions = [];
_.each(config.workers, (worker) => {
if (worker.queue) {
let item = {queue: worker.queue};
if (worker.maxAttempts !== undefined) {
item.attempts = {
$lt: worker.maxAttempts
};
}
conditions.push(item);
}
});
return conditions;
}
}
module.exports = Manager;
Выводы и планы на будущее
В целом получился неплохой и достаточно надежный инструмент для работы с фоновыми задачами, которым мы можем поделиться с сообществом. Основные идеи и принципы, используемые в этом проекте, мы выработали за несколько лет работы над различными проектами.
В планах развития проекта:
- Hotreload воркеров без перезапуска мастер процесса. Чтобы можно было обновлять код отдельных воркеров, не вмешиваясь в работу других.
- Добавление информации о прогрессе задачи (поле
progress
) и добавление метода в модульTaskWorker
для обновления этой информации. - Создание различных интерфейсов для работы с задачами и нодами:
- CLI
- Web
- API
- Создание базовых воркеров:
- Процессинг видео файлов.
- Репликация файлов между серверами для надежного хранения.
- Тесты для воркеров.
Буду рад конструктивной критике и отвечу на интересующие вопросы в комментариях.