Как стать автором
Обновить

Иструменты Node.js разработчика. Очереди заданий (job queue)

Время на прочтение 7 мин
Количество просмотров 31K
При реализации бэка веб-приложений и мобильных приложений, даже самых простых, уже стало привычным использование таких инструментов как: базы данных, почтовый (smtp) сервер, redis-сервер. Набор используемых инструментов постоянно расширяется. Например, очереди сообщений, судя по количеству установок пакета amqplib (650 тыс. установок в неделю), используется наравне с реляционными базами данных (пакет mysql 460 тыс. установок в неделю и pg 800 тыс. установок в неделю).

Сегодня я хочу рассказать об очередях заданий (job queue), которые пока используются на порядок реже, хотя необходимость в них возникает, практически, во всех реальных проектах

Итак, очереди заданий позволяют асинхронно выполнить некоторую задачу, фактически, выполнить функцию с заданными входными параметрами и в установленное время.

В зависимости от параметров, задание может выполняться:

  • сразу после добавления в очередь заданий;
  • однократно в установленное время;
  • многократно по расписанию.

Очереди заданий позволяют передать выполняемому заданию параметры, отследить и повторно выполнить задания, закончившиеся с ошибкой, установить ограничение на количество одновременно выполняемых заданий.

Подавляющее большинство приложений на Node.js связаны с разработкой REST-API для веб-приложений и мобильных приложений. Сократить время выполнения REST-API — важно для комфортной работы пользователя с приложением. В то же время, вызов REST-API может инициировать длительные и/или ресурсоёмкие операции. Например, после совершения покупки необходимо отправить пользователю пуш-сообщение на мобильное приложение, или же отправить запрос о совершении покупки на REST-API CRM. Эти запросы можно выполнить асинхронно. Как сделать это правильно, если у Вас нет инструмента для работы с очередями заданий? Например, можно отправить сообщение в очередь сообщений, запустить worker, который будет читать эти сообщения и выполнять на основании этих сообщений необходимую работу.

Фактически, примерно это и делают очереди заданий. Однако, если присмотреться внимательно, то очереди заданий имеют несколько принципиальных отличий от очереди сообщений. Во-первых, в очередь сообщений кладут сообщения (статику), а очереди заданий подразумевают выполнение какой-то работы (вызов функции). Во-вторых, очередь заданий подразумевают наличие какого-то процессора (воркера), который будет выполнять заданную работу. При этом нужен дополнительный функционал. Количество процессоров-воркеров должно прозрачно масштабироваться в случае повышения нагрузки. С другой стороны необходимо ограничивать количество одновременно работающих заданий на одном процессоре-воркере, чтобы сгладить пиковые нагрузки и не допустить отказов в обслуживании. Это показывает что есть необходимость в инструменте, который мог бы запускать асинхронные задания, настраивая различные параметры, так же просто как мы делаем запрос по REST-API (а лучше если еще проще).

При помощи очередей сообщений относительно просто реализовать очередь заданий, которые выполняются немедленно после постановки задания в очередь. Но часто требуются выполнить задание однократно в установленное время или же по расписанию. Для этих задач широко используют ряд пакетов, которые реализуют логику работы cron в linux. Чтобы не быть голословным, скажу, что пакет node-cron имеет 480 тыс. установок в неделю, node-schedule — 170 тыс. установок в неделю.

Использовать node-cron это, конечно, удобнее, чем аскетичный setInterval(), но лично я сталкивался с целым рядом проблем при его использовании. Если выразить общий недостаток — это отсутствие контроля за количеством одновременно выполняемых заданий (это стимулирует пиковые нагрузки: повышение нагрузки замедляет работу заданий, замедление работы заданий увеличивает количество одновременно выполняемых заданий а это в свою очередь еще больше грузит систему), невозможность для повышения производительности запустить node-cron на нескольких ядрах (в этом случае все задания независимо выполняются на каждом ядре) и отсутствие средств для отслеживания и перезапуска заданий, закончившихся с ошибкой.

Я надеюсь, что показал, что необходимость в таком инструменте, как очередь заданий есть наравне с такими инструментами как базы данных. И такие средства появились, хотя еще недостаточно широко применяются. Перечислю наиболее популярные из них:

Имя пакета Количество установок в неделю Количество лайков
kue 29190 8753
bee-queue 29022 1431
agenda 25459 5488
bull 56232 5909

*) По состоянию на 25 на сентябрь 2021 года после возобновления поддержки проекта
.
Я сегодня буду рассматривать применение пакета bull, с которым работаю сам. Почему я выбрал именно этот пакет (хотя не навязываю свой выбор другим). На тот момент, когда я начал искать удобную реализацию очереди сообщений, проект bee-queue был уже остановлен. Реализация kue, по бенчмаркам приведенным в репозитарии bee-queue, сильно отставала от других реализаций и, кроме того, не содержала средств для запуска периодически выполняемых заданий. Проект agenda реализует очереди с сохранением в базе данных mongodb. Это для некоторых кейсов большой плюс, если нужно сверх-надежность при постановке заданий в очередь. Однако не только это решающий фактор. Я, естественно, испытывал все варианты библиотек на выносливость, генерируя большое количество заданий в очереди, и так и не смог добиться от agenda бесперебойной работы. При превышении какого-то количества заданий, agenda останавливалась и прекращала ставить задания в работу.

Поэтому я остановился на bull который реализует удобный API, при достаточном быстродействии с возможностью масштабирования, так как в качестве бэка пакет bull использует redis-сервер. В том числе, можно использовать кластер серверов redis.

При создании очереди очень важно выбрать оптимальные параметры очереди заданий. Параметров много, и значение некоторых из них дошло до меня не сразу. После многочисленных экспериментов я остановился на таких параметрах:

const Bull = require('bull');

const redis = {
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: null,
  connectTimeout: 180000
  };

const defaultJobOptions = {
  removeOnComplete: true,
  removeOnFail: false,
};

const limiter = {
  max: 10000,
  duration: 1000,
  bounceBack: false,
};

const settings = {
  lockDuration: 600000, // Key expiration time for job locks.
  stalledInterval: 5000, // How often check for stalled jobs (use 0 for never checking).
  maxStalledCount: 2, // Max amount of times a stalled job will be re-processed.
  guardInterval: 5000, // Poll interval for delayed jobs and added jobs.
  retryProcessDelay: 30000, // delay before processing next job in case of internal error.
  drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs).
};

const bull = new Bull('my_queue', { redis, defaultJobOptions, settings, limiter });

module.exports = { bull };


В тривиальных случаях нет необходимости создавать много очередей, так как в каждой очереди можно задавать имена для разных заданий, и с каждым именем связывать свой процессор-воркер:

const { bull } = require('../bull');

bull.process('push:news', 1, `${__dirname}/push-news.js`);
bull.process('push:status', 2, `${__dirname}/push-status.js`);
...
bull.process('some:job', function(...args) { ... });

Я использую возможность, которая идет в bull «из коробки» — распараллеливать процессоры-воркеры на нескольких ядрах. Для этого вторым параметром задается количество ядер на которым будет запущен процессор-воркер, а в третьем параметре имя файла с определением функции обработки задания. Если такая фича не нужна, в качестве второго параметра можно просто передать callback-функцию.

Задание в очередь ставится вызовом метода add(), которому в параметрах передается имя очереди и объект, который в последующем будет передаваться обработчику задания. Например, в хуке ORM после создания записи с новой новостью, я могу асинхронно отправить всем клиентам пуш сообщение:

  afterCreate(instance) {
      bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options);
  }

Обработчик события принимает в параметрах объект задания с параметрами, переданными в метод add() и функцию done(), которую необходимо вызвать для подтверждения выполнения задания или же для того чтобы сообщить, что задание закончилось с ошибкой:

const { firebase: { admin } } = require('../firebase');
const { makePayload } = require('./makePayload');

module.exports = (job, done) => {
  const { id, title, message } = job.data;
  const data = {
    id: String(id),
    type: 'news',
  };
  const payloadRu = makePayload(title.ru, message.ru, data);
  const payloadEn = makePayload(title.en, message.en, data);
  return Promise.all([
    admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }),
    admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }),
  ])
    .then(response => done(null, response))
    .catch(done);
};

Для просмотра состояния очереди задания можно воспользоваться средством arena-bull:

const Arena = require('bull-arena');
const redis = {
    host: 'localhost',
    port: 6379,
    maxRetriesPerRequest: null,
    connectTimeout: 180000
  };

const arena = Arena({
  queues: [
    {
      name: 'my_gueue',
      hostId: 'My Queue',
      redis,
    },
  ],
},
{
  basePath: '/',
  disableListen: true,
});

module.exports = { arena };

И напоследок маленький лайфхак. Как я уже говорил, bull использует redis-сервер в качестве бэка. Вероятность того что при рестарте redis-сервера задания пропадут весьма мала. Но зная тот факт что сисадмины иногда могут просто «почистить кэш редиса», при этом удалив все задания в частности, я был обеспокоен прежде всего периодически выполняемыми заданиями, которые в этом случае остановились навсегда. В связи с этим я нашело возможность как возобновлять такие периодические задания:

const cron = '*/10 * * * * *';

const { bull } = require('./app/services/bull');

bull.getRepeatableJobs()
  .then(jobs => Promise.all(_.map(jobs, (job) => {
    const [name, cron] = job.key.split(/:{2,}/);
    return bull.removeRepeatable(name, { cron });
  })))
  .then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }));

setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);


То есть задание сначала исключается из очереди, а затем ставится вновь, и все это (увы) по setInterval(). Собственно без такого вот лайфхака я бы возможно не решился юзать периодические таски на bull.

UPD1. bee-queue проект снова поддерживается.
UPD2. bull не имеет возможность, которая есть у koa и bee-queue — пописка на события конретного job. Итогда это бывает очень нужно для такого случая:
const job = queue.job({...});
job.on('success', function(result) {
   res.status(200).send(result);
})

UPD3. kue в настоящее время проект не поддерживается

apapacy@gmail.com
3 июля 2019 года
Теги:
Хабы:
+9
Комментарии 2
Комментарии Комментарии 2

Публикации

Истории

Работа

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн