Не так давно мне выпала честь сделать реализацию демонов (воркеров) на Node.js для использования во всех проектах, разрабатываемых нашей компанией. Мы часто разрабатываем проекты, где необходимо процессить и распределенно хранить видео файлы на нескольких серверах, так что нам нужно иметь готовый инструмент для этого.
Постановка задачи:
- Разработка должна быть на Node.js. Мы давно используем эту платформу для разработки всех наших проектов, так что здесь это оправданный выбор.
- Все ноды в кластере должны быть равнозначными. Не должно быть специального управляющего звена или мастера. В противном случае остановка мастера может привести к останову всего кластера.
- Задачи должны находиться в таблице MySQL. Это намного гибче и информативнее, чем использовать какой-нибудь MQ. Всегда можно получить доступ ко всем задачам, оценить очередь, переназначить их на другую ноду и т.д.
- Каждый воркер должен уметь обрабатывать несколько задач одновременно.
Сразу привожу ссылку на GitHub того, что получилось: (https://github.com/pipll/node-daemons).
Запуск воркеров
Каждый воркер это отдельный процесс Node.js. Для создания процессов воркеров используется встроенный модуль cluster. Он также контролирует падения воркеров и заново их запускает.
'use strict'; const config = require('./config/config'); const _ = require('lodash'); const path = require('path'); const cluster = require('cluster'); const logger = require('log4js').getLogger('app'); const models = require('./models'); // Таймер для проверки количества воркеров и остановки мастера let shutdownInterval = null; if (cluster.isMaster) { // Запускаем воркеры _.each(config.workers, (conf, name) => { if (conf.enabled) { startWorker(name); } }); } else { // Инициализируем воркер let name = process.env.WORKER_NAME; let WorkerClass = require(path.join(__dirname, 'workers', name + '.js')); let worker = null; if (WorkerClass) { worker = new WorkerClass(name, config.workers[name]); // Запускаем воркер worker.start(); // Подписываемся на событие, когда воркер остановлен worker.on('stop', () => { process.exit(); }); } // Подписываемся на события от мастера process.on('message', message => { if ('shutdown' === message) { if (worker) { worker.stop(); } else { process.exit(); } } }); } // Shutdown process.on('SIGTERM', shutdownCluster); process.on('SIGINT', shutdownCluster); // Метод запуска воркера function startWorker(name) { let worker = cluster.fork({WORKER_NAME: name}).on('online', () => { logger.info('Start %s worker #%d.', name, worker.id); }).on('exit', status => { // Когда воркер был остановлен if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) { // Если воркер был остановлен контролируемо, то ни чего не делаем logger.info('Worker %s #%d was killed.', name, worker.id); } else { // Если воркер был остановлен неконтролируемо, то запускаем его еще раз logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id); startWorker(name); } }); } // Метод остановки кластера function shutdownCluster() { if (cluster.isMaster) { clearInterval(shutdownInterval); if (_.size(cluster.workers) > 0) { // Посылаем сигнал останова каждому воркеру logger.info('Shutdown workers:', _.size(cluster.workers)); _.each(cluster.workers, worker => { try { worker.send('shutdown'); } catch (err) { logger.warn('Cannot send shutdown message to worker:', err); } }); // Ожидаем останов всех воркеров shutdownInterval = setInterval(() => { if (_.size(cluster.workers) === 0) { process.exit(); } }, config.shutdownInterval); } else { process.exit(); } } }
Я хотел бы обратить внимание на несколько моментов:
- Для запуска воркера используется fork процесса, при этом в переменной окружения
WORKER_NAMEпередается название запускаемого воркера. - Когда воркер неконтролируемо завершает работу, мы перезапускаем его.
- Чтобы контролируемо остановить воркеры, мы посылаем им сигнал shutdown. Воркер реагирует на это событие и после окончания выполнения задачи делает
process.exit(). - Мастер наблюдает за количеством воркеров с помощью
setIntervalи когда все воркеры будут остановлены делаетprocess.exit().
Компонент базового воркера
Этот компонент предназначен для выполнения периодических процессов и не работает с очередью задач.
'use strict'; const _ = require('lodash'); const Promise = require('bluebird'); const log4js = require('log4js'); const EventEmitter = require('events'); const WorkerStates = require('./worker_states'); class Worker extends EventEmitter { constructor(name, conf) { super(); this.name = name; // Значения настроек по умолчанию this.conf = _.defaults({}, conf, { sleep: 1000 // Задержка между запусками }); this.logger = log4js.getLogger('worker-' + name); // Флаг останова воркера this.stopped = true; // Таймер для задержки между запусками loop мотода this.timer = null; // Состояние воркера this.state = null; } // Метод запуска воркера start() { this.logger.info('Start'); this.stopped = false; this.state = WorkerStates.STATE_IDLE; return this._startLoop(); } // Метод останова воркера stop() { this.logger.info('Stop'); this.stopped = true; if (this.state === WorkerStates.STATE_IDLE) { // Останавливаем таймер задержки if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.state = WorkerStates.STATE_STOP; // Вызываем событие останова воркера this.emit('stop'); } } // Метод для выполнения бизнес задач loop() { return Promise.resolve(); } // Метод запуска и обработки loop метода _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { this.state = WorkerStates.STATE_IDLE; if (!this.stopped) { // Делаем повторный запуск по окончании работы loop метода this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } else { this.state = WorkerStates.STATE_STOP; // Вызываем событие останова воркера this.emit('stop'); } }); } } module.exports = Worker;
Код простейшего воркера может выглядеть так:
'use strict'; const Promise = require('bluebird'); const Worker = require('../components/worker'); class Sample extends Worker { loop() { this.logger.info("Loop method"); return Promise.resolve().delay(30000); } } module.exports = Sample;
Некоторые особенности:
loopметод предназначен для наследования в потомках и реализации бизнес задач. Возвращаемым значением этого метода должен бытьPromise.- По окончании работы
loopметода он запускается заново через указанное в настройках воркера время. - Воркер имеет три состояния:
- STATE_IDLE — во время паузы между запусками
loopметода. - STATE_WORK — во время работы
loopметода. - STATE_STOP — после останова воркера.
- STATE_IDLE — во время паузы между запусками
Компонент воркера обработки задач
Это основной компонент, предназначенный для параллельной обработки задач из таблицы MySQL.
'use strict'; const config = require('../config/config'); const _ = require('lodash'); const Promise = require('bluebird'); const Worker = require('./worker'); const WorkerStates = require('./worker_states'); const models = require('../models'); class TaskWorker extends Worker { constructor(name, conf) { super(name, conf); // Значения настроек по умолчанию this.conf = _.defaults({}, this.conf, { maxAttempts: 3, // Максимальное количество попыток запуска задачи delayRatio: 300000, // Базовое значение отсрочки запуска задачи count: 1, // Максимальное количество одновременно обрабатываемых задач queue: '', // Название очереди для получения задач update: 3000 // Интервал обновления статуса задачи }); // Счетчик одновременно обрабатываемых задач this.count = 0; } loop() { if (this.count < this.conf.count && !this.stopped) { // Получение очередной задачи return this._getTask().then(task => { if (task) { // Увеличение счетчика одновременно обрабатываемых задач this.count++; // Запуск периодического обновления статуса задачи let interval = setInterval(() => { return models.sequelize.transaction(t => { return task.touch({transaction: t}); }); }, this.conf.update); // Запуск метода обработки задачи this.handleTask(task.get({plain: true})).then(() => { // Завершение задачи return models.sequelize.transaction(t => { return task.complete({transaction: t}).then(() => { this.logger.info('Task completed:', task.id); }); }); }).catch(err => { // Задача не была выполнена - перезапуск задачи this.logger.warn('Handle error:', err); return this.delay(task).then(delay => { return models.sequelize.transaction(t => { return task.fail(delay, {transaction: t}).then(() => { this.logger.warn('Task failed:', task.id); }); }); }); }).finally(() => { clearInterval(interval); this.count--; }).done(); return null; } }); } else { return Promise.resolve(); } } // Метод обработки задачи handleTask() { return Promise.resolve(); } // Метод вычисления отсрочки задачи после неуспешного запуска delay(task) { return Promise.resolve().then(() => { return task.attempts * this.conf.delayRatio; }); } // Метод получения задачи для обработки _getTask() { return models.sequelize.transaction({autocommit: false}, t => { return models.Task.scope({ method: ['forWork', this.conf.queue, config.node_id] }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => { if (task) { return task.work(config.node_id, {transaction: t}); } }); }); } _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { if (this.count === 0) { this.state = WorkerStates.STATE_IDLE; } if (this.stopped && this.count === 0) { this.state = WorkerStates.STATE_STOP; this.emit('stop'); } else { this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } }); } } module.exports = TaskWorker;
Код простейшего воркера может выглядеть так:
'use strict'; const Promise = require('bluebird'); const TaskWorker = require('../components/task_worker'); class Sample extends TaskWorker { handleTask(task) { this.logger.info('Sample Task:', task); return Promise.resolve().delay(30000); } } module.exports = Sample;
Особенности работы компонента:
- Для получения задачи из базы данных используется конструкция
SELECT ... FOR UPDATEи последующийUPDATEзаписи в базе данных в одной транзакции с отключенным автокоммитом. Это позволяет получить эксклюзивный доступ к задаче даже при одновременных запросах с нескольких серверов. - Во время обработки задачи запускается периодический процесс обновления статуса задачи в базе данных. Это необходимо чтобы отличить долго работающую задачу от неожиданно завершенной задачи без обновления статуса.
- Статус обработанной задачи определяется статусом
Promiseвозвращаемого методомhandleTask. При успехе задача помечается как выполненная. В противном случае задача помечается как провальная и запускается с отсрочкой, задаваемой в методеdelay.
Модель работы с задачами
Для работы с моделями базы данных используется модуль sequelize. Все задачи находятся в таблице tasks. Таблица имеет следующую структуру:
| Поле | Тип | Описание |
|---|---|---|
id |
integer, autoincrement | ID задачи |
node_id |
integer, nullable | ID ноды, для которой предназначена задача |
queue |
string | Очередь задачи |
status |
enum | Статус задачи |
attempts |
integer | Количество попуток запуска задачи |
priority |
integer | Приоритет задачи |
body |
string | Тело задачи в JSON формате |
start_at |
datetime, nullable | Дата и время начала обработки задачи |
finish_at |
datetime, nullable | Дата и время окончания обработки задачи (аналог TTL) |
worker_node_id |
integer, nullable | ID ноды, которая начала обработку задачи |
worker_started_at |
datetime, nullable | Дата и время начала обработки задачи |
checked_at |
datetime, nullable | Дата и время обновления статуса работы задачи |
created_at |
datetime, nullable | Дата и время создания задачи |
updated_at |
datetime, nullable | Дата и время изменения задачи |
Задача может быть назначена как конкретной ноде, так и любой ноде в кластере. Это регулируется полем node_id во время создания задачи. Задачу можно запустить с отсрочкой (поле start_at) и с ограничением временем обработки (поле finish_at).
Различные воркеры работают с различными очередями задач, задаваемыми в поле queue. Приоритет обработки задается в поле priority (чем больше, тем выше приоритет). Количество перезапусков задачи сохраняется в поле attempts. В теле задачи (поле body) в JSON формате передаются параметры, необходимые воркеру.
Поле checked_at выполняет роль признака работающей задачи. Значение его все время меняется во время работы задачи. Если значение поля checked_at долго не менялось, а задача находится в статусе working, то задача считается проваленной и статус ее меняется на failure.
'use strict'; const moment = require('moment'); module.exports = function(sequelize, Sequelize) { return sequelize.define('Task', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true }, node_id: { type: Sequelize.INTEGER }, queue: { type: Sequelize.STRING, allowNull: false }, status: { type: Sequelize.ENUM, values: ['pending', 'working', 'done', 'failure'], defaultValue: 'pending', allowNull: false }, attempts: { type: Sequelize.INTEGER, defaultValue: 0, allowNull: false }, priority: { type: Sequelize.INTEGER, defaultValue: 10, allowNull: false }, body: { type: Sequelize.TEXT, set: function(body) { return this.setDataValue('body', JSON.stringify(body)); }, get: function() { try { return JSON.parse(this.getDataValue('body')); } catch (e) { return null; } } }, start_at: { type: Sequelize.DATE }, finish_at: { type: Sequelize.DATE }, worker_node_id: { type: Sequelize.INTEGER }, worker_started_at: { type: Sequelize.DATE }, checked_at: { type: Sequelize.DATE } }, { tableName: 'tasks', freezeTableName: true, underscored: true, scopes: { forWork: function(queue, node_id) { return { where: { node_id: { $or: [ null, node_id ] }, queue: queue, status: 'pending', start_at: { $or: [ null, { $lt: moment().toDate() } ] }, finish_at: { $or: [ null, { $gte: moment().toDate() } ] } }, order: [ ['priority', 'DESC'], ['attempts', 'ASC'], [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC'] ] }; } }, instanceMethods: { fail: function(delay, options) { this.start_at = delay ? moment().add(delay, 'ms').toDate() : null; this.attempts = sequelize.literal('attempts + 1'); this.status = 'failure'; return this.save(options); }, complete: function(options) { this.status = 'done'; return this.save(options); }, work: function(node_id, options) { this.status = 'working'; this.worker_node_id = node_id; this.worker_started_at = moment().toDate(); return this.save(options); }, check: function(options) { this.checked_at = moment().toDate(); return this.save(options); } } }); };
Жизненный цикл задач
Все задачи проходят следующий жизненный цикл:
- Новая задача создается со статусом pending.
- Когда наступает время обработки задачи, ее получает первый свободный воркер, переводит в статус working и заполняет поля
worker_node_idиworker_started_at. - Во время обработки задачи воркер с некоторой периодичностью (по умолчанию каждые 10 секунд) обновляет поле
checked_atдля нотификации о корректной работе. - По окончании работы над задачей может быть несколько вариантов развития событий:
4.1. Если задача успешно завершилась, то задача переводится в статус done.
4.2. Если выполнение задачи провались, то задача переводится в статус failure, увеличивается количествоattemptsи происходит отсрочка запуска на заданное количество времени (вычисляется в методеdelayосновываясь на количестве попыток и настройкеdelayRatio).
В проекте так же есть встроенный модуль Manager, который запускается на каждой ноде и делает следующую обработку задач:
- Переводит зависшие задачи в статус failure.
- Переводит проваленные задачи в статус pending для новой обработки.
- Удаляет невыполненные задачи, срок запуска которых уже истек.
- Удаляет успешно завершенные задачи с отсрочкой в 1 час (может быть настроено).
- Удаляет проваленные завершенные задачи с израсходованным количеством попыток запуска с отсрочкой в 3 дня (может быть настроено).
Кроме того этот модуль работает с включением/отключением нод.
'use strict'; const _ = require('lodash'); const moment = require('moment'); const Promise = require('bluebird'); const Worker = require('../components/worker'); const models = require('../models'); const config = require('../config/config'); class Manager extends Worker { constructor(name, conf) { super(name, conf); this.conf = _.defaults({}, this.conf, { maxUpdate: 30000, // 30 seconds maxCompleted: 3600000, // 1 hour maxFailed: 259200000 // 3 days }); } loop() { return models.sequelize.transaction(t => { return Promise.resolve() .then(() => { return this._checkCurrentNode(t); }) .then(() => { return this._activateNodes(t); }) .then(() => { return this._pauseNodes(t); }) .then(() => { return this._restoreFrozenTasks(t); }) .then(() => { return this._restoreFailedTasks(t); }) .then(() => { return this._deleteDeadTasks(t); }) .then(() => { return this._deleteCompletedTasks(t); }) .then(() => { return this._deleteFailedTasks(t); }); }); } _checkCurrentNode(t) { return models.Node.findById(config.node_id, {transaction: t}).then(node => { if (node) { return node.check(); } }); } _activateNodes(t) { return models.Node.update({ is_active: true }, { where: { is_active: false, checked_at: { $gte: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Activate nodes:', count); } }); } _pauseNodes(t) { return models.Node.update({ is_active: false }, { where: { is_active: true, checked_at: { $lt: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Pause nodes:', count); } }); } _restoreFrozenTasks(t) { return models.Task.update({ status: 'failure', attempts: models.sequelize.literal('attempts + 1') }, { where: { status: 'working', checked_at: { $lt: moment().subtract(this.conf.maxUpdate).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore frozen tasks:', count); } }); } _restoreFailedTasks(t) { let where = [{status: 'failure'}]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.update({ status: 'pending', worker_node_id: null, worker_started_at: null }, { where: where, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore failure tasks:', count); } }); } _deleteDeadTasks(t) { return models.Task.destroy({ where: { status: 'pending', finish_at: { $lt: moment().toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete dead tasks:', count); } }); } _deleteCompletedTasks(t) { return models.Task.destroy({ where: { status: 'done', checked_at: { $lt: moment().subtract(this.conf.maxCompleted).toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete completed tasks:', count); } }); } _deleteFailedTasks(t) { let where = [ {status: 'failure'}, {checked_at: { $lt: moment().subtract(this.conf.maxFailed).toDate() }} ]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.destroy({ where: where, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete failed tasks:', count); } }); } _failedTasksConditions() { let conditions = []; _.each(config.workers, (worker) => { if (worker.queue) { let item = {queue: worker.queue}; if (worker.maxAttempts !== undefined) { item.attempts = { $lt: worker.maxAttempts }; } conditions.push(item); } }); return conditions; } } module.exports = Manager;
Выводы и планы на будущее
В целом получился неплохой и достаточно надежный инструмент для работы с фоновыми задачами, которым мы можем поделиться с сообществом. Основные идеи и принципы, используемые в этом проекте, мы выработали за несколько лет работы над различными проектами.
В планах развития проекта:
- Hotreload воркеров без перезапуска мастер процесса. Чтобы можно было обновлять код отдельных воркеров, не вмешиваясь в работу других.
- Добавление информации о прогрессе задачи (поле
progress) и добавление метода в модульTaskWorkerдля обновления этой информации. - Создание различных интерфейсов для работы с задачами и нодами:
- CLI
- Web
- API
- Создание базовых воркеров:
- Процессинг видео файлов.
- Репликация файлов между серверами для надежного хранения.
- Тесты для воркеров.
Буду рад конструктивной критике и отвечу на интересующие вопросы в комментариях.
