Доброго времени суток

У меня на работе возник спор между мной и дотнетчиками насчет потоков в новой версии Node.JS и необходимости их синхронизоровать. Для начала решили выбрать задачу о параллельной записи строк в файл. Тема с worker_threads горячая, прошу под кат.
Немного о самих потоках. Они являются экспериментальной технологией в Node.JS 10.5.0, и для того, чтобы иметь доступ к модулю «worker_threads», необходимо запускать наше Node.JS приложение с флагом "--experimental-worker". Я прописал этот флаг в start скрипте в файле package.json:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" }
Теперь о самой логике. Главный поток порождает N рабочих потоков, все они пишут с каким-то интервалом в файл. В отличие от всех примеров, где главные и дочерние потоки стартуют с одного файла, я отделил потоки в отдельный, мне это кажется более чистым и элегантным.
Собственно, код.
Главный файл app.js — точка входа.
const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); }
Здесь мы просто создаем дочерние потоки используя класс Worker и указывая путь к стартовому файлу для потока './writer-worker-app/app.js'. При создании потока передаем самописный айдишник как данные workerData.
Стартовый файл для потока ./writer-worker-app/app.js:
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); }
Ну и простейший класс-логер: ./writer-worker-app/logger.js
const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log };
При запуске этого приложения мы все надеялись на то, что в итоге получим кашу в файле и дотнетчики закричат, как нужны блокировки с семафорами и прочими радостями параллельного исполнения. Но нет! В файле все строки идут не прерываясь, разве что в случайном порядке:
Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11
Замечательный эксперимент, очередная маленькая победа Ноды :-) Моё предположение в том, что вся синхронизация происходит на уровне I\O потоков Ноды, но буду рад узнать в комментариях правильный вариант. На всякий случай мы проверили работу, используя не fs.appendFileSync, а fs.createWriteStream и метода stream.write.
Результат вышел такой же.
Но мы на этом не остановились.
Коллега предложил задачу о синхронизации потоков. Для нашего конкретного примера, пусть это будет задача последовательной записи в файл в порядке возврастания айдишников. Сначала пишет первый поток, потом второй, потом третий и так далее.
Для этого я ввёл еще один поток-Менеджер. Можно было обойтись главным, но мне так приятно создавать этих изолированных рабочих и выстраивать общение посредством сообщений. Прежде чем начать писать имплементацию потока-Менеджера, необходимо создать канал связи между ним и писателями-рабочими. Для этого был использован класс MessageChannel. Инстансы этого класса имеют два поля: port1 и port2, каждый из которых умеет слушать и отправлять сообщения другому посредством методов .on('message') и .postMessage(). Этот класс и был создан в рамках модуля «worker_threads» для коммуникации между потоками, потому что обычно при передачи объекта происходит просто его клонирование, и в изолированной среде выполнения потока он будет бесполезен.
Для коммуникации между 2 потоками мы каждому должны дать по порту.
Интересный факт: на 10.5.0 невозможно передать порт через конструктор воркера, необходимо это делать только через worker.postMessage(), причем обязательно указывая порт в transferList параметре!
Сам поток-менеджер будет отсылать команды потокам-писателям в порядке возрастания их идентификаторов, причем следующую команду он отправит только после получения ответа писателя об успешной операции.
Недо-UML-диаграмма приложения:

Наш видоизмененный главный файл ./app.js:
const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10);
Здесь мы сначала создаем воркеров, потом каждому отправляем порт для связи с менеджером (и только так, через конструктор это сделать невозможно).
Потом создаем поток-менеджер, отправляем ему список портов для связи с потоками-писателями.
Updated: эмпирическим путем я выяснил, что при работе с потоками лучше сначала дать им настояться (проинициализироваться как надо). По хорошему надо было слушать какие то ответы от потоков в стиле «Я готов!», но я решил пойти более легким путем.
Изменим и поведение потока-писателя, чтобы он отправлял сообщение только когда ему скажут, а также возвращал результат, когда операция записи закончена:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); }
Мы правильно проинициализировались от сообщение родительского потока, начали случать канал потока-менеджера, при получении команды сначала пишем в файл, потом отправляем результат. Нужно заметить, что в файл пишется синхронно, поэтому sendResult() вызывается сразу за sendMessage().
Всё, что осталось — написать имплементацию нашего умного менеджера
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); }
Получили список портов, упорядочили, для каждого порта установили колбек на респонз, ну и отправили команду первому. В самом колбеке ищем следующего писателя и отправляем команду ему. Чтобы не сильно напрягать систему, был установлен интервал между командами.
Вот и всё, наше многопоточное приложение с управлением потоков готово. Мы научились не просто порождать воркеры-потоки в Node.JS, но и создавать эффективные способы коммуникации между ними. На мой личный взгляд, архитектура изолированных потоков в Node.JS с ожиданием и отправкой сообщений более чем удобная и перспективная. Всем спасибо за внимание.
Весь исходный код может быть найден здесь.
UPDATE
Чтобы не вводить в заблуждение читателей, а также не давать лишних поводов написать, что я жульничаю с таймаутами, я проапдейтил статью и репозиторий.
Изменения:
1) удалены интервалы в первоначальных писателях, теперь по хардкору идет while(true)
2) добавлен --max-old-space-size=4096 флаг, просто на всякий случай, т.к. текущая имплементация потоков не очень стабильная и я надеюсь, что это как то поможет.
3) удалены интервалы отправки сообщений у потока-менеджера. теперь запись идёт нон-стоп.
4) добавлен таймаут при инициализации менеджера, почему — расписано выше.
TO DO:
1) добавить сообщения изменяемой длины или подсчет вызова логера — спасибо FANAT1242
2) добавить бенчмарк, сравнить работу первой и второй версии (сколько строк запишет за 10 секунд, к примеру)
UPDATE 2
1) Был изменен код логгирования: теперь каждое сообщение имеет разную длину.
2) Был изменен writer-worker-app/app.old.js: каждый поток пишет 1000 раз, потом завершается.
Это было сделано для проверки идей пользователя FANAT1242. Сообщения все равно не переписывают друг друга, строк в файле ровно 1000 * N потоков.
