Pull to refresh

Реализация демонов на Node.js

Reading time12 min
Views18K

Не так давно мне выпала честь сделать реализацию демонов (воркеров) на Node.js для использования во всех проектах, разрабатываемых нашей компанией. Мы часто разрабатываем проекты, где необходимо процессить и распределенно хранить видео файлы на нескольких серверах, так что нам нужно иметь готовый инструмент для этого.


Постановка задачи:


  • Разработка должна быть на Node.js. Мы давно используем эту платформу для разработки всех наших проектов, так что здесь это оправданный выбор.
  • Все ноды в кластере должны быть равнозначными. Не должно быть специального управляющего звена или мастера. В противном случае остановка мастера может привести к останову всего кластера.
  • Задачи должны находиться в таблице MySQL. Это намного гибче и информативнее, чем использовать какой-нибудь MQ. Всегда можно получить доступ ко всем задачам, оценить очередь, переназначить их на другую ноду и т.д.
  • Каждый воркер должен уметь обрабатывать несколько задач одновременно.

Сразу привожу ссылку на GitHub того, что получилось: (https://github.com/pipll/node-daemons).


Запуск воркеров


Каждый воркер это отдельный процесс Node.js. Для создания процессов воркеров используется встроенный модуль cluster. Он также контролирует падения воркеров и заново их запускает.


Код запуска воркеров
'use strict';

const config = require('./config/config');

const _ = require('lodash');
const path = require('path');
const cluster = require('cluster');
const logger = require('log4js').getLogger('app');
const models = require('./models');

// Таймер для проверки количества воркеров и остановки мастера
let shutdownInterval = null;

if (cluster.isMaster) {
    // Запускаем воркеры
    _.each(config.workers, (conf, name) => {
        if (conf.enabled) {
            startWorker(name);
        }
    });
} else {
    // Инициализируем воркер
    let name = process.env.WORKER_NAME;
    let WorkerClass = require(path.join(__dirname, 'workers', name + '.js'));
    let worker = null;
    if (WorkerClass) {
        worker = new WorkerClass(name, config.workers[name]);
        // Запускаем воркер
        worker.start();
        // Подписываемся на событие, когда воркер остановлен
        worker.on('stop', () => {
            process.exit();
        });
    }
    // Подписываемся на события от мастера
    process.on('message', message => {
        if ('shutdown' === message) {
            if (worker) {
                worker.stop();
            } else {
                process.exit();
            }
        }
    });
}

// Shutdown
process.on('SIGTERM', shutdownCluster);
process.on('SIGINT', shutdownCluster);

// Метод запуска воркера
function startWorker(name) {
    let worker = cluster.fork({WORKER_NAME: name}).on('online', () => {
        logger.info('Start %s worker #%d.', name, worker.id);
    }).on('exit', status => {
        // Когда воркер был остановлен
        if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) {
            // Если воркер был остановлен контролируемо, то ни чего не делаем
            logger.info('Worker %s #%d was killed.', name, worker.id);
        } else {
            // Если воркер был остановлен неконтролируемо, то запускаем его еще раз
            logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id);
            startWorker(name);
        }
    });
}

// Метод остановки кластера
function shutdownCluster() {
    if (cluster.isMaster) {
        clearInterval(shutdownInterval);
        if (_.size(cluster.workers) > 0) {
            // Посылаем сигнал останова каждому воркеру
            logger.info('Shutdown workers:', _.size(cluster.workers));
            _.each(cluster.workers, worker => {
                try {
                    worker.send('shutdown');
                } catch (err) {
                    logger.warn('Cannot send shutdown message to worker:', err);
                }
            });
            // Ожидаем останов всех воркеров
            shutdownInterval = setInterval(() => {
                if (_.size(cluster.workers) === 0) {
                    process.exit();
                }
            }, config.shutdownInterval);
        } else {
            process.exit();
        }
    }
}

Я хотел бы обратить внимание на несколько моментов:


  • Для запуска воркера используется fork процесса, при этом в переменной окружения WORKER_NAME передается название запускаемого воркера.
  • Когда воркер неконтролируемо завершает работу, мы перезапускаем его.
  • Чтобы контролируемо остановить воркеры, мы посылаем им сигнал shutdown. Воркер реагирует на это событие и после окончания выполнения задачи делает process.exit().
  • Мастер наблюдает за количеством воркеров с помощью setInterval и когда все воркеры будут остановлены делает process.exit().

Компонент базового воркера


Этот компонент предназначен для выполнения периодических процессов и не работает с очередью задач.


Код базового воркера
'use strict';

const _ = require('lodash');
const Promise = require('bluebird');
const log4js = require('log4js');
const EventEmitter = require('events');
const WorkerStates = require('./worker_states');

class Worker extends EventEmitter {

    constructor(name, conf) {
        super();
        this.name = name;
        // Значения настроек по умолчанию
        this.conf = _.defaults({}, conf, {
            sleep: 1000 // Задержка между запусками
        });
        this.logger = log4js.getLogger('worker-' + name);
        // Флаг останова воркера
        this.stopped = true;
        // Таймер для задержки между запусками loop мотода
        this.timer = null;
        // Состояние воркера
        this.state = null;
    }

    // Метод запуска воркера
    start() {
        this.logger.info('Start');
        this.stopped = false;
        this.state = WorkerStates.STATE_IDLE;
        return this._startLoop();
    }

    // Метод останова воркера
    stop() {
        this.logger.info('Stop');
        this.stopped = true;
        if (this.state === WorkerStates.STATE_IDLE) {
            // Останавливаем таймер задержки
            if (this.timer) {
                clearTimeout(this.timer);
                this.timer = null;
            }
            this.state = WorkerStates.STATE_STOP;
            // Вызываем событие останова воркера
            this.emit('stop');
        }
    }

    // Метод для выполнения бизнес задач
    loop() {
        return Promise.resolve();
    }

    // Метод запуска и обработки loop метода
    _startLoop() {
        this.state = WorkerStates.STATE_WORK;
        return this.loop().catch(err => {
            this.logger.warn('Loop error:', err);
        }).finally(() => {
            this.state = WorkerStates.STATE_IDLE;
            if (!this.stopped) {
                // Делаем повторный запуск по окончании работы loop метода
                this.timer = setTimeout(() => {
                    this._startLoop();
                }, this.conf.sleep);
            } else {
                this.state = WorkerStates.STATE_STOP;
                // Вызываем событие останова воркера
                this.emit('stop');
            }
        });
    }

}

module.exports = Worker;

Код простейшего воркера может выглядеть так:


'use strict';

const Promise = require('bluebird');
const Worker = require('../components/worker');

class Sample extends Worker {

    loop() {
        this.logger.info("Loop method");
        return Promise.resolve().delay(30000);
    }

}

module.exports = Sample;

Некоторые особенности:


  • loop метод предназначен для наследования в потомках и реализации бизнес задач. Возвращаемым значением этого метода должен быть Promise.
  • По окончании работы loop метода он запускается заново через указанное в настройках воркера время.
  • Воркер имеет три состояния:
    • STATE_IDLE — во время паузы между запусками loop метода.
    • STATE_WORK — во время работы loop метода.
    • STATE_STOP — после останова воркера.

Компонент воркера обработки задач


Это основной компонент, предназначенный для параллельной обработки задач из таблицы MySQL.


Код воркера обработки задач
'use strict';

const config = require('../config/config');

const _ = require('lodash');
const Promise = require('bluebird');
const Worker = require('./worker');
const WorkerStates = require('./worker_states');
const models = require('../models');

class TaskWorker extends Worker {

    constructor(name, conf) {
        super(name, conf);
        // Значения настроек по умолчанию
        this.conf = _.defaults({}, this.conf, {
            maxAttempts: 3, // Максимальное количество попыток запуска задачи
            delayRatio: 300000, // Базовое значение отсрочки запуска задачи
            count: 1, // Максимальное количество одновременно обрабатываемых задач
            queue: '', // Название очереди для получения задач
            update: 3000 // Интервал обновления статуса задачи
        });
        // Счетчик одновременно обрабатываемых задач
        this.count = 0;
    }

    loop() {
        if (this.count < this.conf.count && !this.stopped) {
            // Получение очередной задачи
            return this._getTask().then(task => {
                if (task) {
                    // Увеличение счетчика одновременно обрабатываемых задач
                    this.count++;
                    // Запуск периодического обновления статуса задачи
                    let interval = setInterval(() => {
                        return models.sequelize.transaction(t => {
                            return task.touch({transaction: t});
                        });
                    }, this.conf.update);
                    // Запуск метода обработки задачи
                    this.handleTask(task.get({plain: true})).then(() => {
                        // Завершение задачи
                        return models.sequelize.transaction(t => {
                            return task.complete({transaction: t}).then(() => {
                                this.logger.info('Task completed:', task.id);
                            });
                        });
                    }).catch(err => {
                        // Задача не была выполнена - перезапуск задачи
                        this.logger.warn('Handle error:', err);
                        return this.delay(task).then(delay => {
                            return models.sequelize.transaction(t => {
                                return task.fail(delay, {transaction: t}).then(() => {
                                    this.logger.warn('Task failed:', task.id);
                                });
                            });
                        });
                    }).finally(() => {
                        clearInterval(interval);
                        this.count--;
                    }).done();
                    return null;
                }
            });
        } else {
            return Promise.resolve();
        }
    }

    // Метод обработки задачи
    handleTask() {
        return Promise.resolve();
    }

    // Метод вычисления отсрочки задачи после неуспешного запуска
    delay(task) {
        return Promise.resolve().then(() => {
            return task.attempts * this.conf.delayRatio;
        });
    }

    // Метод получения задачи для обработки
    _getTask() {
        return models.sequelize.transaction({autocommit: false}, t => {
            return models.Task.scope({
                method: ['forWork', this.conf.queue, config.node_id]
            }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => {
                if (task) {
                    return task.work(config.node_id, {transaction: t});
                }
            });
        });
    }

    _startLoop() {
        this.state = WorkerStates.STATE_WORK;
        return this.loop().catch(err => {
            this.logger.warn('Loop error:', err);
        }).finally(() => {
            if (this.count === 0) {
                this.state = WorkerStates.STATE_IDLE;
            }
            if (this.stopped && this.count === 0) {
                this.state = WorkerStates.STATE_STOP;
                this.emit('stop');
            } else {
                this.timer = setTimeout(() => {
                    this._startLoop();
                }, this.conf.sleep);
            }
        });
    }

}

module.exports = TaskWorker;

Код простейшего воркера может выглядеть так:


'use strict';

const Promise = require('bluebird');
const TaskWorker = require('../components/task_worker');

class Sample extends TaskWorker {

    handleTask(task) {
        this.logger.info('Sample Task:', task);
        return Promise.resolve().delay(30000);
    }

}

module.exports = Sample;

Особенности работы компонента:


  • Для получения задачи из базы данных используется конструкция SELECT ... FOR UPDATE и последующий UPDATE записи в базе данных в одной транзакции с отключенным автокоммитом. Это позволяет получить эксклюзивный доступ к задаче даже при одновременных запросах с нескольких серверов.
  • Во время обработки задачи запускается периодический процесс обновления статуса задачи в базе данных. Это необходимо чтобы отличить долго работающую задачу от неожиданно завершенной задачи без обновления статуса.
  • Статус обработанной задачи определяется статусом Promise возвращаемого методом handleTask. При успехе задача помечается как выполненная. В противном случае задача помечается как провальная и запускается с отсрочкой, задаваемой в методе delay.

Модель работы с задачами


Для работы с моделями базы данных используется модуль sequelize. Все задачи находятся в таблице tasks. Таблица имеет следующую структуру:


Поле Тип Описание
id integer, autoincrement ID задачи
node_id integer, nullable ID ноды, для которой предназначена задача
queue string Очередь задачи
status enum Статус задачи
attempts integer Количество попуток запуска задачи
priority integer Приоритет задачи
body string Тело задачи в JSON формате
start_at datetime, nullable Дата и время начала обработки задачи
finish_at datetime, nullable Дата и время окончания обработки задачи (аналог TTL)
worker_node_id integer, nullable ID ноды, которая начала обработку задачи
worker_started_at datetime, nullable Дата и время начала обработки задачи
checked_at datetime, nullable Дата и время обновления статуса работы задачи
created_at datetime, nullable Дата и время создания задачи
updated_at datetime, nullable Дата и время изменения задачи

Задача может быть назначена как конкретной ноде, так и любой ноде в кластере. Это регулируется полем node_id во время создания задачи. Задачу можно запустить с отсрочкой (поле start_at) и с ограничением временем обработки (поле finish_at).


Различные воркеры работают с различными очередями задач, задаваемыми в поле queue. Приоритет обработки задается в поле priority (чем больше, тем выше приоритет). Количество перезапусков задачи сохраняется в поле attempts. В теле задачи (поле body) в JSON формате передаются параметры, необходимые воркеру.


Поле checked_at выполняет роль признака работающей задачи. Значение его все время меняется во время работы задачи. Если значение поля checked_at долго не менялось, а задача находится в статусе working, то задача считается проваленной и статус ее меняется на failure.


Код модели работы с задачами
'use strict';

const moment = require('moment');

module.exports = function(sequelize, Sequelize) {

    return sequelize.define('Task', {
        id: {
            type: Sequelize.INTEGER,
            primaryKey: true,
            autoIncrement: true
        },
        node_id: {
            type: Sequelize.INTEGER
        },
        queue: {
            type: Sequelize.STRING,
            allowNull: false
        },
        status: {
            type: Sequelize.ENUM,
            values: ['pending', 'working', 'done', 'failure'],
            defaultValue: 'pending',
            allowNull: false
        },
        attempts: {
            type: Sequelize.INTEGER,
            defaultValue: 0,
            allowNull: false
        },
        priority: {
            type: Sequelize.INTEGER,
            defaultValue: 10,
            allowNull: false
        },
        body: {
            type: Sequelize.TEXT,
            set: function(body) {
                return this.setDataValue('body', JSON.stringify(body));
            },
            get: function() {
                try {
                    return JSON.parse(this.getDataValue('body'));
                } catch (e) {
                    return null;
                }
            }
        },
        start_at: {
            type: Sequelize.DATE
        },
        finish_at: {
            type: Sequelize.DATE
        },
        worker_node_id: {
            type: Sequelize.INTEGER
        },
        worker_started_at: {
            type: Sequelize.DATE
        },
        checked_at: {
            type: Sequelize.DATE
        }
    }, {
        tableName: 'tasks',
        freezeTableName: true,
        underscored: true,

        scopes: {
            forWork: function(queue, node_id) {
                return {
                    where: {
                        node_id: {
                            $or: [
                                null,
                                node_id
                            ]
                        },
                        queue: queue,
                        status: 'pending',
                        start_at: {
                            $or: [
                                null,
                                {
                                    $lt: moment().toDate()
                                }
                            ]
                        },
                        finish_at: {
                            $or: [
                                null,
                                {
                                    $gte: moment().toDate()
                                }
                            ]
                        }
                    },
                    order: [
                        ['priority', 'DESC'],
                        ['attempts', 'ASC'],
                        [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC']
                    ]
                };
            }
        },

        instanceMethods: {
            fail: function(delay, options) {
                this.start_at = delay ? moment().add(delay, 'ms').toDate() : null;
                this.attempts = sequelize.literal('attempts + 1');
                this.status = 'failure';
                return this.save(options);
            },
            complete: function(options) {
                this.status = 'done';
                return this.save(options);
            },
            work: function(node_id, options) {
                this.status = 'working';
                this.worker_node_id = node_id;
                this.worker_started_at = moment().toDate();
                return this.save(options);
            },
            check: function(options) {
                this.checked_at = moment().toDate();
                return this.save(options);
            }
        }
    });

};

Жизненный цикл задач


Все задачи проходят следующий жизненный цикл:


  1. Новая задача создается со статусом pending.
  2. Когда наступает время обработки задачи, ее получает первый свободный воркер, переводит в статус working и заполняет поля worker_node_id и worker_started_at.
  3. Во время обработки задачи воркер с некоторой периодичностью (по умолчанию каждые 10 секунд) обновляет поле checked_at для нотификации о корректной работе.
  4. По окончании работы над задачей может быть несколько вариантов развития событий:
    4.1. Если задача успешно завершилась, то задача переводится в статус done.
    4.2. Если выполнение задачи провались, то задача переводится в статус failure, увеличивается количество attempts и происходит отсрочка запуска на заданное количество времени (вычисляется в методе delay основываясь на количестве попыток и настройке delayRatio).

В проекте так же есть встроенный модуль Manager, который запускается на каждой ноде и делает следующую обработку задач:


  1. Переводит зависшие задачи в статус failure.
  2. Переводит проваленные задачи в статус pending для новой обработки.
  3. Удаляет невыполненные задачи, срок запуска которых уже истек.
  4. Удаляет успешно завершенные задачи с отсрочкой в 1 час (может быть настроено).
  5. Удаляет проваленные завершенные задачи с израсходованным количеством попыток запуска с отсрочкой в 3 дня (может быть настроено).

Кроме того этот модуль работает с включением/отключением нод.


Код модуля менеджера
'use strict';

const _ = require('lodash');
const moment = require('moment');
const Promise = require('bluebird');
const Worker = require('../components/worker');
const models = require('../models');
const config = require('../config/config');

class Manager extends Worker {

    constructor(name, conf) {
        super(name, conf);
        this.conf = _.defaults({}, this.conf, {
            maxUpdate: 30000,       // 30 seconds
            maxCompleted: 3600000,  // 1 hour
            maxFailed: 259200000    // 3 days
        });
    }

    loop() {
        return models.sequelize.transaction(t => {
            return Promise.resolve()
                .then(() => {
                    return this._checkCurrentNode(t);
                })
                .then(() => {
                    return this._activateNodes(t);
                })
                .then(() => {
                    return this._pauseNodes(t);
                })
                .then(() => {
                    return this._restoreFrozenTasks(t);
                })
                .then(() => {
                    return this._restoreFailedTasks(t);
                })
                .then(() => {
                    return this._deleteDeadTasks(t);
                })
                .then(() => {
                    return this._deleteCompletedTasks(t);
                })
                .then(() => {
                    return this._deleteFailedTasks(t);
                });
        });
    }

    _checkCurrentNode(t) {
        return models.Node.findById(config.node_id, {transaction: t}).then(node => {
            if (node) {
                return node.check();
            }
        });
    }

    _activateNodes(t) {
        return models.Node.update({
            is_active: true
        }, {
            where: {
                is_active: false,
                checked_at: {
                    $gte: moment().subtract(2 * this.conf.sleep).toDate()
                }
            },
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Activate nodes:', count);
            }
        });
    }

    _pauseNodes(t) {
        return models.Node.update({
            is_active: false
        }, {
            where: {
                is_active: true,
                checked_at: {
                    $lt: moment().subtract(2 * this.conf.sleep).toDate()
                }
            },
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Pause nodes:', count);
            }
        });
    }

    _restoreFrozenTasks(t) {
        return models.Task.update({
            status: 'failure',
            attempts: models.sequelize.literal('attempts + 1')
        }, {
            where: {
                status: 'working',
                checked_at: {
                    $lt: moment().subtract(this.conf.maxUpdate).toDate()
                }
            },
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Restore frozen tasks:', count);
            }
        });
    }

    _restoreFailedTasks(t) {
        let where = [{status: 'failure'}];
        let conditions = this._failedTasksConditions();
        if (conditions.length) {
            where.push({$or: conditions});
        }

        return models.Task.update({
            status: 'pending',
            worker_node_id: null,
            worker_started_at: null
        }, {
            where: where,
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Restore failure tasks:', count);
            }
        });
    }

    _deleteDeadTasks(t) {
        return models.Task.destroy({
            where: {
                status: 'pending',
                finish_at: {
                    $lt: moment().toDate()
                }
            },
            transaction: t
        }).then(count => {
            if (count > 0) {
                this.logger.info('Delete dead tasks:', count);
            }
        });
    }

    _deleteCompletedTasks(t) {
        return models.Task.destroy({
            where: {
                status: 'done',
                checked_at: {
                    $lt: moment().subtract(this.conf.maxCompleted).toDate()
                }
            },
            transaction: t
        }).then(count => {
            if (count > 0) {
                this.logger.info('Delete completed tasks:', count);
            }
        });
    }

    _deleteFailedTasks(t) {
        let where = [
            {status: 'failure'},
            {checked_at: {
                $lt: moment().subtract(this.conf.maxFailed).toDate()
            }}
        ];
        let conditions = this._failedTasksConditions();
        if (conditions.length) {
            where.push({$or: conditions});
        }
        return models.Task.destroy({
            where: where,
            transaction: t
        }).then(count => {
            if (count > 0) {
                this.logger.info('Delete failed tasks:', count);
            }
        });
    }

    _failedTasksConditions() {
        let conditions = [];
        _.each(config.workers, (worker) => {
            if (worker.queue) {
                let item = {queue: worker.queue};
                if (worker.maxAttempts !== undefined) {
                    item.attempts = {
                        $lt: worker.maxAttempts
                    };
                }
                conditions.push(item);
            }
        });
        return conditions;
    }

}

module.exports = Manager;

Выводы и планы на будущее


В целом получился неплохой и достаточно надежный инструмент для работы с фоновыми задачами, которым мы можем поделиться с сообществом. Основные идеи и принципы, используемые в этом проекте, мы выработали за несколько лет работы над различными проектами.


В планах развития проекта:


  • Hotreload воркеров без перезапуска мастер процесса. Чтобы можно было обновлять код отдельных воркеров, не вмешиваясь в работу других.
  • Добавление информации о прогрессе задачи (поле progress) и добавление метода в модуль TaskWorker для обновления этой информации.
  • Создание различных интерфейсов для работы с задачами и нодами:
    • CLI
    • Web
    • API
  • Создание базовых воркеров:
    • Процессинг видео файлов.
    • Репликация файлов между серверами для надежного хранения.
  • Тесты для воркеров.

Буду рад конструктивной критике и отвечу на интересующие вопросы в комментариях.

Tags:
Hubs:
Total votes 7: ↑3 and ↓4-1
Comments26

Articles