При реализации бэка веб-приложений и мобильных приложений, даже самых простых, уже стало привычным использование таких инструментов как: базы данных, почтовый (smtp) сервер, redis-сервер. Набор используемых инструментов постоянно расширяется. Например, очереди сообщений, судя по количеству установок пакета amqplib (650 тыс. установок в неделю), используется наравне с реляционными базами данных (пакет mysql 460 тыс. установок в неделю и pg 800 тыс. установок в неделю).
Сегодня я хочу рассказать об очередях заданий (job queue), которые пока используются на порядок реже, хотя необходимость в них возникает, практически, во всех реальных проектах
Итак, очереди заданий позволяют асинхронно выполнить некоторую задачу, фактически, выполнить функцию с заданными входными параметрами и в установленное время.
В зависимости от параметров, задание может выполняться:
Очереди заданий позволяют передать выполняемому заданию параметры, отследить и повторно выполнить задания, закончившиеся с ошибкой, установить ограничение на количество одновременно выполняемых заданий.
Подавляющее большинство приложений на Node.js связаны с разработкой REST-API для веб-приложений и мобильных приложений. Сократить время выполнения REST-API — важно для комфортной работы пользователя с приложением. В то же время, вызов REST-API может инициировать длительные и/или ресурсоёмкие операции. Например, после совершения покупки необходимо отправить пользователю пуш-сообщение на мобильное приложение, или же отправить запрос о совершении покупки на REST-API CRM. Эти запросы можно выполнить асинхронно. Как сделать это правильно, если у Вас нет инструмента для работы с очередями заданий? Например, можно отправить сообщение в очередь сообщений, запустить worker, который будет читать эти сообщения и выполнять на основании этих сообщений необходимую работу.
Фактически, примерно это и делают очереди заданий. Однако, если присмотреться внимательно, то очереди заданий имеют несколько принципиальных отличий от очереди сообщений. Во-первых, в очередь сообщений кладут сообщения (статику), а очереди заданий подразумевают выполнение какой-то работы (вызов функции). Во-вторых, очередь заданий подразумевают наличие какого-то процессора (воркера), который будет выполнять заданную работу. При этом нужен дополнительный функционал. Количество процессоров-воркеров должно прозрачно масштабироваться в случае повышения нагрузки. С другой стороны необходимо ограничивать количество одновременно работающих заданий на одном процессоре-воркере, чтобы сгладить пиковые нагрузки и не допустить отказов в обслуживании. Это показывает что есть необходимость в инструменте, который мог бы запускать асинхронные задания, настраивая различные параметры, так же просто как мы делаем запрос по REST-API (а лучше если еще проще).
При помощи очередей сообщений относительно просто реализовать очередь заданий, которые выполняются немедленно после постановки задания в очередь. Но часто требуются выполнить задание однократно в установленное время или же по расписанию. Для этих задач широко используют ряд пакетов, которые реализуют логику работы cron в linux. Чтобы не быть голословным, скажу, что пакет node-cron имеет 480 тыс. установок в неделю, node-schedule — 170 тыс. установок в неделю.
Использовать node-cron это, конечно, удобнее, чем аскетичный setInterval(), но лично я сталкивался с целым рядом проблем при его использовании. Если выразить общий недостаток — это отсутствие контроля за количеством одновременно выполняемых заданий (это стимулирует пиковые нагрузки: повышение нагрузки замедляет работу заданий, замедление работы заданий увеличивает количество одновременно выполняемых заданий а это в свою очередь еще больше грузит систему), невозможность для повышения производительности запустить node-cron на нескольких ядрах (в этом случае все задания независимо выполняются на каждом ядре) и отсутствие средств для отслеживания и перезапуска заданий, закончившихся с ошибкой.
Я надеюсь, что показал, что необходимость в таком инструменте, как очередь заданий есть наравне с такими инструментами как базы данных. И такие средства появились, хотя еще недостаточно широко применяются. Перечислю наиболее популярные из них:
*) По состоянию на 25 на сентябрь 2021 года после возобновления поддержки проекта
.
Я сегодня буду рассматривать применение пакета bull, с которым работаю сам. Почему я выбрал именно этот пакет (хотя не навязываю свой выбор другим). На тот момент, когда я начал искать удобную реализацию очереди сообщений, проект bee-queue был уже остановлен. Реализация kue, по бенчмаркам приведенным в репозитарии bee-queue, сильно отставала от других реализаций и, кроме того, не содержала средств для запуска периодически выполняемых заданий. Проект agenda реализует очереди с сохранением в базе данных mongodb. Это для некоторых кейсов большой плюс, если нужно сверх-надежность при постановке заданий в очередь. Однако не только это решающий фактор. Я, естественно, испытывал все варианты библиотек на выносливость, генерируя большое количество заданий в очереди, и так и не смог добиться от agenda бесперебойной работы. При превышении какого-то количества заданий, agenda останавливалась и прекращала ставить задания в работу.
Поэтому я остановился на bull который реализует удобный API, при достаточном быстродействии с возможностью масштабирования, так как в качестве бэка пакет bull использует redis-сервер. В том числе, можно использовать кластер серверов redis.
При создании очереди очень важно выбрать оптимальные параметры очереди заданий. Параметров много, и значение некоторых из них дошло до меня не сразу. После многочисленных экспериментов я остановился на таких параметрах:
В тривиальных случаях нет необходимости создавать много очередей, так как в каждой очереди можно задавать имена для разных заданий, и с каждым именем связывать свой процессор-воркер:
Я использую возможность, которая идет в bull «из коробки» — распараллеливать процессоры-воркеры на нескольких ядрах. Для этого вторым параметром задается количество ядер на которым будет запущен процессор-воркер, а в третьем параметре имя файла с определением функции обработки задания. Если такая фича не нужна, в качестве второго параметра можно просто передать callback-функцию.
Задание в очередь ставится вызовом метода add(), которому в параметрах передается имя очереди и объект, который в последующем будет передаваться обработчику задания. Например, в хуке ORM после создания записи с новой новостью, я могу асинхронно отправить всем клиентам пуш сообщение:
Обработчик события принимает в параметрах объект задания с параметрами, переданными в метод add() и функцию done(), которую необходимо вызвать для подтверждения выполнения задания или же для того чтобы сообщить, что задание закончилось с ошибкой:
Для просмотра состояния очереди задания можно воспользоваться средством arena-bull:
И напоследок маленький лайфхак. Как я уже говорил, bull использует redis-сервер в качестве бэка. Вероятность того что при рестарте redis-сервера задания пропадут весьма мала. Но зная тот факт что сисадмины иногда могут просто «почистить кэш редиса», при этом удалив все задания в частности, я был обеспокоен прежде всего периодически выполняемыми заданиями, которые в этом случае остановились навсегда. В связи с этим я нашело возможность как возобновлять такие периодические задания:
То есть задание сначала исключается из очереди, а затем ставится вновь, и все это (увы) по setInterval(). Собственно без такого вот лайфхака я бы возможно не решился юзать периодические таски на bull.
UPD1. bee-queue проект снова поддерживается.
UPD2. bull не имеет возможность, которая есть у koa и bee-queue — пописка на события конретного job. Итогда это бывает очень нужно для такого случая:
UPD3. kue в настоящее время проект не поддерживается
apapacy@gmail.com
3 июля 2019 года
Сегодня я хочу рассказать об очередях заданий (job queue), которые пока используются на порядок реже, хотя необходимость в них возникает, практически, во всех реальных проектах
Итак, очереди заданий позволяют асинхронно выполнить некоторую задачу, фактически, выполнить функцию с заданными входными параметрами и в установленное время.
В зависимости от параметров, задание может выполняться:
- сразу после добавления в очередь заданий;
- однократно в установленное время;
- многократно по расписанию.
Очереди заданий позволяют передать выполняемому заданию параметры, отследить и повторно выполнить задания, закончившиеся с ошибкой, установить ограничение на количество одновременно выполняемых заданий.
Подавляющее большинство приложений на Node.js связаны с разработкой REST-API для веб-приложений и мобильных приложений. Сократить время выполнения REST-API — важно для комфортной работы пользователя с приложением. В то же время, вызов REST-API может инициировать длительные и/или ресурсоёмкие операции. Например, после совершения покупки необходимо отправить пользователю пуш-сообщение на мобильное приложение, или же отправить запрос о совершении покупки на REST-API CRM. Эти запросы можно выполнить асинхронно. Как сделать это правильно, если у Вас нет инструмента для работы с очередями заданий? Например, можно отправить сообщение в очередь сообщений, запустить worker, который будет читать эти сообщения и выполнять на основании этих сообщений необходимую работу.
Фактически, примерно это и делают очереди заданий. Однако, если присмотреться внимательно, то очереди заданий имеют несколько принципиальных отличий от очереди сообщений. Во-первых, в очередь сообщений кладут сообщения (статику), а очереди заданий подразумевают выполнение какой-то работы (вызов функции). Во-вторых, очередь заданий подразумевают наличие какого-то процессора (воркера), который будет выполнять заданную работу. При этом нужен дополнительный функционал. Количество процессоров-воркеров должно прозрачно масштабироваться в случае повышения нагрузки. С другой стороны необходимо ограничивать количество одновременно работающих заданий на одном процессоре-воркере, чтобы сгладить пиковые нагрузки и не допустить отказов в обслуживании. Это показывает что есть необходимость в инструменте, который мог бы запускать асинхронные задания, настраивая различные параметры, так же просто как мы делаем запрос по REST-API (а лучше если еще проще).
При помощи очередей сообщений относительно просто реализовать очередь заданий, которые выполняются немедленно после постановки задания в очередь. Но часто требуются выполнить задание однократно в установленное время или же по расписанию. Для этих задач широко используют ряд пакетов, которые реализуют логику работы cron в linux. Чтобы не быть голословным, скажу, что пакет node-cron имеет 480 тыс. установок в неделю, node-schedule — 170 тыс. установок в неделю.
Использовать node-cron это, конечно, удобнее, чем аскетичный setInterval(), но лично я сталкивался с целым рядом проблем при его использовании. Если выразить общий недостаток — это отсутствие контроля за количеством одновременно выполняемых заданий (это стимулирует пиковые нагрузки: повышение нагрузки замедляет работу заданий, замедление работы заданий увеличивает количество одновременно выполняемых заданий а это в свою очередь еще больше грузит систему), невозможность для повышения производительности запустить node-cron на нескольких ядрах (в этом случае все задания независимо выполняются на каждом ядре) и отсутствие средств для отслеживания и перезапуска заданий, закончившихся с ошибкой.
Я надеюсь, что показал, что необходимость в таком инструменте, как очередь заданий есть наравне с такими инструментами как базы данных. И такие средства появились, хотя еще недостаточно широко применяются. Перечислю наиболее популярные из них:
| Имя пакета | Количество установок в неделю | Количество лайков |
|---|---|---|
| kue | 29190 | 8753 |
| bee-queue | 29022 | 1431 |
| agenda | 25459 | 5488 |
| bull | 56232 | 5909 |
*) По состоянию на 25 на сентябрь 2021 года после возобновления поддержки проекта
.
Я сегодня буду рассматривать применение пакета bull, с которым работаю сам. Почему я выбрал именно этот пакет (хотя не навязываю свой выбор другим). На тот момент, когда я начал искать удобную реализацию очереди сообщений, проект bee-queue был уже остановлен. Реализация kue, по бенчмаркам приведенным в репозитарии bee-queue, сильно отставала от других реализаций и, кроме того, не содержала средств для запуска периодически выполняемых заданий. Проект agenda реализует очереди с сохранением в базе данных mongodb. Это для некоторых кейсов большой плюс, если нужно сверх-надежность при постановке заданий в очередь. Однако не только это решающий фактор. Я, естественно, испытывал все варианты библиотек на выносливость, генерируя большое количество заданий в очереди, и так и не смог добиться от agenda бесперебойной работы. При превышении какого-то количества заданий, agenda останавливалась и прекращала ставить задания в работу.
Поэтому я остановился на bull который реализует удобный API, при достаточном быстродействии с возможностью масштабирования, так как в качестве бэка пакет bull использует redis-сервер. В том числе, можно использовать кластер серверов redis.
При создании очереди очень важно выбрать оптимальные параметры очереди заданий. Параметров много, и значение некоторых из них дошло до меня не сразу. После многочисленных экспериментов я остановился на таких параметрах:
const Bull = require('bull'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const defaultJobOptions = { removeOnComplete: true, removeOnFail: false, }; const limiter = { max: 10000, duration: 1000, bounceBack: false, }; const settings = { lockDuration: 600000, // Key expiration time for job locks. stalledInterval: 5000, // How often check for stalled jobs (use 0 for never checking). maxStalledCount: 2, // Max amount of times a stalled job will be re-processed. guardInterval: 5000, // Poll interval for delayed jobs and added jobs. retryProcessDelay: 30000, // delay before processing next job in case of internal error. drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs). }; const bull = new Bull('my_queue', { redis, defaultJobOptions, settings, limiter }); module.exports = { bull };
В тривиальных случаях нет необходимости создавать много очередей, так как в каждой очереди можно задавать имена для разных заданий, и с каждым именем связывать свой процессор-воркер:
const { bull } = require('../bull'); bull.process('push:news', 1, `${__dirname}/push-news.js`); bull.process('push:status', 2, `${__dirname}/push-status.js`); ... bull.process('some:job', function(...args) { ... });
Я использую возможность, которая идет в bull «из коробки» — распараллеливать процессоры-воркеры на нескольких ядрах. Для этого вторым параметром задается количество ядер на которым будет запущен процессор-воркер, а в третьем параметре имя файла с определением функции обработки задания. Если такая фича не нужна, в качестве второго параметра можно просто передать callback-функцию.
Задание в очередь ставится вызовом метода add(), которому в параметрах передается имя очереди и объект, который в последующем будет передаваться обработчику задания. Например, в хуке ORM после создания записи с новой новостью, я могу асинхронно отправить всем клиентам пуш сообщение:
afterCreate(instance) { bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options); }
Обработчик события принимает в параметрах объект задания с параметрами, переданными в метод add() и функцию done(), которую необходимо вызвать для подтверждения выполнения задания или же для того чтобы сообщить, что задание закончилось с ошибкой:
const { firebase: { admin } } = require('../firebase'); const { makePayload } = require('./makePayload'); module.exports = (job, done) => { const { id, title, message } = job.data; const data = { id: String(id), type: 'news', }; const payloadRu = makePayload(title.ru, message.ru, data); const payloadEn = makePayload(title.en, message.en, data); return Promise.all([ admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }), admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }), ]) .then(response => done(null, response)) .catch(done); };
Для просмотра состояния очереди задания можно воспользоваться средством arena-bull:
const Arena = require('bull-arena'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const arena = Arena({ queues: [ { name: 'my_gueue', hostId: 'My Queue', redis, }, ], }, { basePath: '/', disableListen: true, }); module.exports = { arena };
И напоследок маленький лайфхак. Как я уже говорил, bull использует redis-сервер в качестве бэка. Вероятность того что при рестарте redis-сервера задания пропадут весьма мала. Но зная тот факт что сисадмины иногда могут просто «почистить кэш редиса», при этом удалив все задания в частности, я был обеспокоен прежде всего периодически выполняемыми заданиями, которые в этом случае остановились навсегда. В связи с этим я нашело возможность как возобновлять такие периодические задания:
const cron = '*/10 * * * * *'; const { bull } = require('./app/services/bull'); bull.getRepeatableJobs() .then(jobs => Promise.all(_.map(jobs, (job) => { const [name, cron] = job.key.split(/:{2,}/); return bull.removeRepeatable(name, { cron }); }))) .then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } })); setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);
То есть задание сначала исключается из очереди, а затем ставится вновь, и все это (увы) по setInterval(). Собственно без такого вот лайфхака я бы возможно не решился юзать периодические таски на bull.
UPD1. bee-queue проект снова поддерживается.
UPD2. bull не имеет возможность, которая есть у koa и bee-queue — пописка на события конретного job. Итогда это бывает очень нужно для такого случая:
const job = queue.job({...}); job.on('success', function(result) { res.status(200).send(result); })
UPD3. kue в настоящее время проект не поддерживается
apapacy@gmail.com
3 июля 2019 года
