Как стать автором
Обновить

Играем с потоками в Node.JS 10.5.0

Время на прочтение6 мин
Количество просмотров8.8K

Доброго времени суток



У меня на работе возник спор между мной и дотнетчиками насчет потоков в новой версии Node.JS и необходимости их синхронизоровать. Для начала решили выбрать задачу о параллельной записи строк в файл. Тема с worker_threads горячая, прошу под кат.

Немного о самих потоках. Они являются экспериментальной технологией в Node.JS 10.5.0, и для того, чтобы иметь доступ к модулю «worker_threads», необходимо запускать наше Node.JS приложение с флагом "--experimental-worker". Я прописал этот флаг в start скрипте в файле package.json:
{
  "name": "worker-test",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "start": "node --max-old-space-size=4096 --experimental-worker app.js "
  },
  "author": "",
  "license": "ISC"
}

Теперь о самой логике. Главный поток порождает N рабочих потоков, все они пишут с каким-то интервалом в файл. В отличие от всех примеров, где главные и дочерние потоки стартуют с одного файла, я отделил потоки в отдельный, мне это кажется более чистым и элегантным.

Собственно, код.

Главный файл app.js — точка входа.

const { Worker } = require('worker_threads');
const path = require('path');

const WORKERS_NUMBER = 100;

console.log('Hello from main!');

for (var i = 1; i <= WORKERS_NUMBER ; i++) {
  const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}

Здесь мы просто создаем дочерние потоки используя класс Worker и указывая путь к стартовому файлу для потока './writer-worker-app/app.js'. При создании потока передаем самописный айдишник как данные workerData.

Стартовый файл для потока ./writer-worker-app/app.js:

const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');

const id = workerData.id;

console.log(`Worker ${id} initializad.`);

while (true) {
  sendMessage();
}

function sendMessage() {
  logger.log(`Hello from worker number ${workerData.id}\r\n`);
}


Ну и простейший класс-логер: ./writer-worker-app/logger.js
const fs = require('fs');

function log(message) {
  return fs.appendFileSync('./my-file.txt', message);
}

module.exports = {
  log
};

При запуске этого приложения мы все надеялись на то, что в итоге получим кашу в файле и дотнетчики закричат, как нужны блокировки с семафорами и прочими радостями параллельного исполнения. Но нет! В файле все строки идут не прерываясь, разве что в случайном порядке:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11

Замечательный эксперимент, очередная маленькая победа Ноды :-) Моё предположение в том, что вся синхронизация происходит на уровне I\O потоков Ноды, но буду рад узнать в комментариях правильный вариант. На всякий случай мы проверили работу, используя не fs.appendFileSync, а fs.createWriteStream и метода stream.write.

Результат вышел такой же.

Но мы на этом не остановились.


Коллега предложил задачу о синхронизации потоков. Для нашего конкретного примера, пусть это будет задача последовательной записи в файл в порядке возврастания айдишников. Сначала пишет первый поток, потом второй, потом третий и так далее.

Для этого я ввёл еще один поток-Менеджер. Можно было обойтись главным, но мне так приятно создавать этих изолированных рабочих и выстраивать общение посредством сообщений. Прежде чем начать писать имплементацию потока-Менеджера, необходимо создать канал связи между ним и писателями-рабочими. Для этого был использован класс MessageChannel. Инстансы этого класса имеют два поля: port1 и port2, каждый из которых умеет слушать и отправлять сообщения другому посредством методов .on('message') и .postMessage(). Этот класс и был создан в рамках модуля «worker_threads» для коммуникации между потоками, потому что обычно при передачи объекта происходит просто его клонирование, и в изолированной среде выполнения потока он будет бесполезен.

Для коммуникации между 2 потоками мы каждому должны дать по порту.

Интересный факт: на 10.5.0 невозможно передать порт через конструктор воркера, необходимо это делать только через worker.postMessage(), причем обязательно указывая порт в transferList параметре!

Сам поток-менеджер будет отсылать команды потокам-писателям в порядке возрастания их идентификаторов, причем следующую команду он отправит только после получения ответа писателя об успешной операции.

Недо-UML-диаграмма приложения:


Наш видоизмененный главный файл ./app.js:
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');

const WORKERS_NUMBER = 100;

console.log('Main app initialized and started.');

const workersMeta = [];

for (var i = 1; i <= WORKERS_NUMBER; i++) {
  const channel = new MessageChannel();
  const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
  workersMeta.push({ id: i, worker, channel });
}

workersMeta.forEach(({ worker, channel }) => {
  worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})

setTimeout(() => {
  const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
  const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 }));
  orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));

  console.log('All worker threads have been initialized');
}, WORKERS_NUMBER * 10);

Здесь мы сначала создаем воркеров, потом каждому отправляем порт для связи с менеджером (и только так, через конструктор это сделать невозможно).

Потом создаем поток-менеджер, отправляем ему список портов для связи с потоками-писателями.
Updated: эмпирическим путем я выяснил, что при работе с потоками лучше сначала дать им настояться (проинициализироваться как надо). По хорошему надо было слушать какие то ответы от потоков в стиле «Я готов!», но я решил пойти более легким путем.

Изменим и поведение потока-писателя, чтобы он отправлял сообщение только когда ему скажут, а также возвращал результат, когда операция записи закончена:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');

const id = workerData.id;

console.log(`Worker ${id} initializad.`);

parentPort.on('message', value => {
  const orchestratorPort = value.orchestratorPort;
  orchestratorPort.on('message', data => {
    if (data.command == 'write') {
      console.log(`Worker ${id} received write command`);
      sendMessage();
      sendResult(orchestratorPort);
    }
  });
  console.log(`Worker ${id} started.`);
});


function sendMessage() {
  logger.log(`Hello from worker number ${workerData.id}\r\n`);
}

function sendResult(port) {
  port.postMessage({ id, status: 'completed' });
}

Мы правильно проинициализировались от сообщение родительского потока, начали случать канал потока-менеджера, при получении команды сначала пишем в файл, потом отправляем результат. Нужно заметить, что в файл пишется синхронно, поэтому sendResult() вызывается сразу за sendMessage().

Всё, что осталось — написать имплементацию нашего умного менеджера
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads');

console.log('Orchestrator initialized.')

let workerPorts;

parentPort.on('message', (value) => {
  workerPorts = value.workerPorts;
  workerPorts.forEach(wp => wp.port.on('message', handleResponse));
  console.log('Orchestrator started.');
  sendCommand(workerPorts[0]);
});

function handleResponse(status) {
  const responseWorkerId = status.id;
  let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
  if (!nextWorker) {
    nextWorker = workerPorts[0];
  }
  sendCommand(nextWorker);
}

function sendCommand(worker) {
  worker.port.postMessage({ command: 'write' });
}

Получили список портов, упорядочили, для каждого порта установили колбек на респонз, ну и отправили команду первому. В самом колбеке ищем следующего писателя и отправляем команду ему. Чтобы не сильно напрягать систему, был установлен интервал между командами.

Вот и всё, наше многопоточное приложение с управлением потоков готово. Мы научились не просто порождать воркеры-потоки в Node.JS, но и создавать эффективные способы коммуникации между ними. На мой личный взгляд, архитектура изолированных потоков в Node.JS с ожиданием и отправкой сообщений более чем удобная и перспективная. Всем спасибо за внимание.

Весь исходный код может быть найден здесь.

UPDATE


Чтобы не вводить в заблуждение читателей, а также не давать лишних поводов написать, что я жульничаю с таймаутами, я проапдейтил статью и репозиторий.
Изменения:
1) удалены интервалы в первоначальных писателях, теперь по хардкору идет while(true)
2) добавлен --max-old-space-size=4096 флаг, просто на всякий случай, т.к. текущая имплементация потоков не очень стабильная и я надеюсь, что это как то поможет.
3) удалены интервалы отправки сообщений у потока-менеджера. теперь запись идёт нон-стоп.
4) добавлен таймаут при инициализации менеджера, почему — расписано выше.

TO DO:
1) добавить сообщения изменяемой длины или подсчет вызова логера — спасибо FANAT1242
2) добавить бенчмарк, сравнить работу первой и второй версии (сколько строк запишет за 10 секунд, к примеру)

UPDATE 2


1) Был изменен код логгирования: теперь каждое сообщение имеет разную длину.
2) Был изменен writer-worker-app/app.old.js: каждый поток пишет 1000 раз, потом завершается.

Это было сделано для проверки идей пользователя FANAT1242. Сообщения все равно не переписывают друг друга, строк в файле ровно 1000 * N потоков.
Теги:
Хабы:
+5
Комментарии14

Публикации

Изменить настройки темы

Истории

Работа

Ближайшие события

Weekend Offer в AliExpress
Дата20 – 21 апреля
Время10:00 – 20:00
Место
Онлайн
Конференция «Я.Железо»
Дата18 мая
Время14:00 – 23:59
Место
МоскваОнлайн