Как стать автором
Обновить
96.19
Тензор
Разработчик системы СБИС

Приручаем многопоточность в Node.js (часть 5/5: автомасштабирование под нагрузку)

Время на прочтение19 мин
Количество просмотров7.7K

В прошлых частях цикла мы:

  • рассмотрели базовые концепты работы с многопоточностью в JavaScript на примере среды Node.js;

  • научились формировать общую очередь и каналы обмена данными и сигналами, чтобы более эффективно управлять загрузкой потоков;

  • использовали разделяемую память и Atomics-операции как самое быстрое средство обмена большими блоками данных;

  • и создали отдельный поток-координатор, чтобы устранить негативное влияние синхронного кода в основном потоке исполнения на загрузку потоков вспомогательных.

В сегодняшней, заключительной, части я продемонстрирую, как все эти механики вместе позволяют сделать эффективный микросервис, автоматически подстраивающийся под изменения входящей нагрузки.

В данном случае эффективность - это не про максимально возможную скорость обработки каждой отдельной задачи, а про сбалансированное использование аппаратных ресурсов с учетом тех ограничений, на которые мы готовы пойти. Особенно актуально это для различных "облачных" размещений, где оплата идет за фактически потребленные CPU и RAM.

Передача контента сообщений

В предыдущей части мы научились быстро передавать блоки двоичных данных через разделяемую память в обрабатывающие их потоки с помощью потока-координатора, избавившего нас от ожидания исполнения синхронного кода в основном потоке:

Передача данных в поток в схеме с координатором
Передача данных в поток в схеме с координатором

Но, как правило, обрабатываемые сообщения состоят не только из двоичного контента, но и из некоторого объекта, который хочется иметь в доступе по итогу процессинга, а вот передавать его туда целиком вовсе незачем.

Доработаем схему из прошлой статьи, вынеся функционал промежуточного хранения сообщений и формирования единого блока двоичных данных в класс Coordinator, а управления обрабатывающими потоками и очередью - в WorkersPool:

Схема управления потоками
Схема управления потоками

В коде эта схема может выглядеть примерно так:

if (isMainThread) {
  const {Coordinator} = require(...);
  const coordinator = new Coordinator(__filename, ...);
  coordinator
    .on('online', () => {
      // отправка данных
      coordinator.postMessage(message);
    })
    .on('message', (result, message) => { // увязанные [по id] сообщение и результат
      // обработка полученного результата в паре с исходным сообщением
    });
}
else {
  const {workerType, port} = workerData;
  switch (workerType) {
    case 'coordinator':
      const {WorkersPool} = require(__filename, ...);
      const pool = new WorkersPool(...);
      break;
    case 'worker':
      const {shared} = workerData;
      // магия обработки данных в разделяемой памяти
      port.postMessage({id, ...});
      break;
  }
}

Представление данных

Для "увязки" обрабатываемых двоичных данные и исходного сообщения мы можем использовать автоинкрементный id. Но только лишь его - мало, чтобы корректно передать данные в поток.

Создавая блок разделяемой памяти SharedArrayBuffer, мы должны заранее предусмотреть максимальный размер возможного контента. Но работать-то мы можем с разными данными - то есть получается, что передавать размер нам надо тоже, иначе мы не сможем их правильно "выцепить" из разделяемой памяти.

Получается, что каждый отправляемый в поток блок данных должен иметь префикс, состоящий из служебного id и длины самих данных - для этого нам достаточно иметь Uint32Array из двух ячеек.

Но для "склейки" этих данных с двоичными данными из сообщения нам понадобится его Uint8Array-проекция и функция преобразования dataFunc, которая может быть примерно такой:

message => { // функция получения двоичных данных
  // объединяем префикс и данные
  const buf = Buffer.concat([ui8prefix, ...message.part]);
  // записываем в префикс результирующую длину
  buf.writeUInt32LE(buf.length - 8, 4); // Uint32Array.BYTES_PER_ELEMENT
  // возвращаем итоговый двоичный контент
  return buf.buffer;
}

В свою очередь, на стороне обрабатывающего потока позиция id воспринимается не только как Int32-значение (уже знаковое!), но и как ячейка для ожидания блокировки:

const {shared} = workerData;

// [shared] = {lock:int32} + {size:uint32} + {data:uint8[]}
const lock = new Int32Array(shared, 0, 1);
const size = new Uint32Array(shared, Int32Array.BYTES_PER_ELEMENT, 1);
const data = new Uint8Array(shared, Int32Array.BYTES_PER_ELEMENT + Uint32Array.BYTES_PER_ELEMENT);
// ...
  const id = lock[0];
  const messageData = data.subarray(0, size[0]);
Разное представление одних данных
Разное представление одних данных

Передача данных

Давайте детализируем схему выше с точки зрения путей обмена этими данными:

Движение данных в схеме с координатором
Движение данных в схеме с координатором
  1. в основном потоке приложения у нас появляется сообщение message, содержащее некоторый набор двоичных данных

  2. мы передаем его в Coordinator, где оно по автоинкрементному id заносится в промежуточное Map-хранилище messages

  3. с помощью вспомогательной функции dataFunc мы из сообщения извлекаем все необходимые двоичные данные и "склеиваем" их с префиксом, содержащим id и размер данных

  4. результирующий data-буфер мы по ссылке передаем в поток-координатор

  5. в координаторе данные должны или пройти через очередь (если потоки заняты), или сразу копируются в разделяемую память свободного потока

  6. поток, получая уведомление о новых данных, обрабатывает их и отправляет прямо в основной поток, ...

  7. ... где они по id связываются с исходным сообщением и передаются в .emit

Класс Coordinator
class Coordinator extends Worker {
  #messages = new Map(); // хранилище всех обрабатываемых сообщений

  #p32 = new Int32Array(3);                // префикс данных = [THREAD_FREE/id, id/THREAD_FREE, dataSize]
  #p8  = new Uint8Array(this.#p32.buffer); // ... и его uint8-проекция
  #data = [this.#p8]; // массив для склейки частей в двоичный блок
  #message2data;      // функция получения двоичных данных из сообщения и помещения в массив

  #data2buffer() {
    // объединяем префикс и данные
    const buf = Buffer.concat(this.#data);
    // записываем в префикс результирующую длину
    buf.writeUInt32LE(buf.length - 12, 8); // Int32Array.BYTES_PER_ELEMENT * 3
    // восстанавливаем состояние массива
    this.#data.length = 1;
    // возвращаем итоговый двоичный контент
    return buf;
  };

  constructor(filename, options) {
    // сигнальный канал
    const {port1, port2} = new MessageChannel();

    /* доинициализируем необходимые опции
      {
        workerData : {
          workerType : 'coordinator'
        , port       : port2
        }
      , transferList : [port2]
      }
    */
    ((options ??= {}).workerData ??= {}).workerType = 'coordinator';
    ((options ??= {}).workerData ??= {}).port = port2;
    ((options ??= {}).transferList ??= []).push(port2);
    super(filename, options);

    const {dataField, dataArray} = options;
    this.#p32[0] = THREAD_FREE;
    this.#p32[1] = 0; // ID

    this.#message2data = dataField
      ? message => this.#data.push(dataField(message))
      : message => this.#data.push(...dataArray(message));

    const messages = this.#messages;
    port1.on('message', ({threadId, port}) => {
      // ассоциируем открывшийся порт с конкретным потоком
      port.threadId = threadId;
      this.emit('port.open', port);

      port
        .on('message', result => {
          // из результата обработки по ID получаем исходное сообщение ...
          const message = messages.get(result.id);
          if (message) {
            messages.delete(result.id);
            // ... и передаем вместе с результатом
            this.emit('message', result, message);
          }
        })
        .on('close', () => {
          // при закрытии порта - отписываемся от него
          this.emit('port.close', port);
          port.removeAllListeners();
        });
    });
  }

  // передача двоичных данных в поток
  postMessage(message) {
    // сохраняем объект сообщения в хранилище
    this.#messages.set(this.#p32[1], message);
    // добавляем в общий массив для склейки одно или несколько полей
    this.#message2data(message);
    // формируем целевой контейнер данных
    const buffer = this.#data2buffer();
    // передаем двоичный контент с префиксом в поток по ссылке
    super.postMessage(buffer, [buffer.buffer]);
    // id = (id + 1) % 0x10000000
    this.#p32[1]++;
    this.#p32[1] &= 0x0FFFFFFF;
  }
}

На что тут стоит обратить внимание:

  • для связи между объектом Coordinator в основном потоке и потоком-координатором мы подняли служебный MessageChannel, состоящий из пары портов, как это было описано во второй части серии;

  • именно по этому каналу поток-координатор передает нам порты для порождаемых им обрабатывающих потоков;

  • по этим-то портам рабочие потоки и сбрасывают нам результаты своей деятельности прямо в основной поток.

Работа с портами MessageChannel
Работа с портами MessageChannel

Разумное количество потоков

Так, с передачей данных в обрабатывающие потоки и обратно в основной поток - разобрались. Но сколько таких потоков нам необходимо иметь вообще?

Сначала обратим внимание на тот факт, что некоторых ресурсов стоит уже само создание потока. Если же оно включает в себя какую-то "тяжелую" инициализационную подготовку вроде прегенерации кэша внутри потока, то его старт может кратковременно занимать логическое ядро CPU даже на все 100%.

Отсюда следует четыре достаточно простых вывода:

  • потоки стоит порождать/убивать как можно реже;

  • не стоит это делать одновременно - то есть допустима некоторая задержка на создание/уничтожение потока;

  • как минимум, один поток обработки должен существовать всегда;

  • потоков должно быть не больше, чем CPU-ядер.

Раз уж мы упомянули тот факт, что внутри потока могут формироваться собственные данные или кэш, то становится выгодно иметь лишь минимально необходимое число потоков, насколько это вообще возможно, чтобы занимать как можно меньше памяти и улучшать долюcache hit.

Как упоминалось выше, для решения наших задач будет логично выделить класс WorkersPool, работающий внутри потока-координатора, который и будет осуществлять все управление потоками и очередью.

Критерий "необходимости"

Но как понять, что существующего количества потоков уже стало недостаточно, и все-таки необходимо создать еще один, несмотря на все сопутствующие издержки?

Фактически, нас интересует лишь один параметр работы нашего сервиса - чтобы все задачи обрабатывались достаточно быстро. А любая поступившая задача может быть либо сразу отдана на исполнение какому-то из потоков, либо поставлена в очередь.

Очередь, кстати, возьмем все ту же, из прошлой статьи, на основе кольцевого буфера:

this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);

Сама длина очереди "в штуках" нам не сильно о чем-то говорит, ведь мы не оцениваем ни скорость обработки, ни "вес" каждой отдельной задачи. Но если мы задумаемся, что нахождение задачи в очереди в течение 1 миллисекунды нас совсем не напрягает, а "зависание" в течение 1 секунды не устраивает совсем, то где-то на этом интервале [1ms .. 1s] найдем комфортное значение, сколько мы готовы достаточно безболезненно позволить задаче находиться в очереди - скажем, это будет 100ms.

То есть пока время пребывания самой старой задачи в очереди не перевалило за это значение, выгоднее еще немного подождать, чем стартовать еще один поток.

Чтобы не заниматься этой оценкой на каждой операции, повесим эту проверку на таймер прямо в конструкторе класса.

Класс WorkersPool (каркас)
class WorkersPool {
  #options; // кэш аргументов конструктора

  #mainWorker;              // основной рабочий поток
  #workersPool = [];        // пул свободных дополнительных потоков
  #workersSet  = new Set(); // полный набор всех активных потоков
  #workersRemain;           // потоков еще доступно к созданию

  #queue;      // очередь на кольцевом буфере

  #checking;   // признак активности проверки состояния

  #queueMore1; // "длина" очереди превышает возможности 1 потока
  #queueMoreW; //                                   ... всех существующих потоков

  #checkQueue(workers) {
    // если в очереди задач уже больше, чем потоков,
    // ... и самая старая задача висит дольше, чем обработали бы все активные потоки за время старта нового
    return this.#queue.length > workers && this.#queue[0].ts + this.#options['timeoutSpawn'] * workers < Date.now();
  }

  constructor(workerFile, options) {
    options['workerFile'] ??= workerFile;
    const {poolSize, queuePowMin, queuePowMax, timeoutIdle, intervalCheck} = options;
    this.#options = options;

    this.#workersRemain = poolSize;

    // очередь на кольцевом буфере
    this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);

    // главный вспомогательный поток существует сразу и всегда
    this.#mainWorker = this.#createWorker();
    
    // ...
    
    // периодическая проверка очереди и пула дополнительных потоков
    const pool = this.#workersPool;
    setInterval(() => {
      // пора ли порождать еще один поток?
      this.#queueMoreW = this.#checkQueue(this.#workersSet.size);
      // пора ли отдавать задачи дополнительным потокам?
      this.#queueMore1 = this.#queueMoreW || this.#checkQueue(1);
      // закрытие простаивающих дополнительных потоков
      // ...
    }, intervalCheck);
  }

  #createWorkerIfPossible() {
    // проверяем возможность и необходимость (по состоянмю очереди) запуска потока
    if (this.#workersRemain > 0 && this.#checkQueue(this.#workersSet.size)) {
      this.#createWorker();
    }
  }

  #createWorker() {
    this.#workersRemain--;
    this.#workersRemain = -this.#workersRemain; // wrap flag
    // ...

    worker.on('online', () => {
      // продолжим пробовать стартовать следующий после паузы на timeoutSpawn
      setTimeout(() => {
        this.#workersRemain = -this.#workersRemain; // wrap flag
        this.#createWorkerIfPossible();
      }, this.#options['timeoutSpawn']);
    });

    return worker;
  }

  #destroyWorker(worker) {
    // убираем поток из общего набора
    this.#workersSet.delete(worker);
    // завершаем сам поток
    worker.terminate();
    // исключаем воркер из пула, если он там был
    const idx = this.#workersPool.indexOf(worker);
    if (idx >= 0) {
      this.#workersPool.splice(idx, 1);
    }
    // увеличиваем количество доступных к запуску
    this.#workersRemain++;
  }
}

Тут мы использовали #workersRemain одновременно и как счетчик доступных к запуску, и как признак (отрицательное значение) наличия запускающегося в текущий момент потока.

Второе замечание касается использования для пула свободных именно массива, а не Set, поскольку нам важен приоритет потоков при раздаче им заданий на обработку.

Принцип распределения сообщений

Старшему досталась мельница, среднему – осел, ну а младшему пришлось взять себе кота.

[Шарль Перро, "Кот в сапогах"]

С учетом возможности существования собственного кэша в каждом из потоков, нам выгоднее использовать его как можно эффективнее - то есть отправлять задачи в поток, который существует дольше.

В идеале, таким потоком должен всегда оказываться главный рабочий поток, который, как мы договорились выше, существует всегда.

Но он может быть занят выполнением текущей задачи - тогда задача должна быть направлена в очередь, либо первому свободному потоку, если "длина" очереди уже заведомо превосходит возможности первого потока.

Если же ни одного такого потока не нашлось, и общая длина очереди превышает возможности всех потоков за нормативное время порождения потока, то пора задуматься о порождении нового потока:

this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
  this.#createWorkerIfPossible();
  this.#checking = null;
}, this.#options['intervalCheck']));

Такая конструкция не только заставляет ждать intervalCheck для попытки запуска потока, но и блокирует все остальные попытки на этом интервале.

Получение данных из очередей

На самом деле, когда мы говорим про очередь, надо не забывать, что их, фактически, две.

Когда у нас случается parentPort.on('message', message => ...), это означает, что нам отправили не только конкретное сообщение, но и что-то еще за ним могли успеть положить в очередь порта, которую мы можем сразу извлечь через receiveMessageOnPort, избегнув повторных вызов обработчика, как рассказывалось во второй части.

Получается вот такая нетривиальная схема доставки данных до процессинга в потоке:

"Вычерпываем" очередь порта при любой возможности
"Вычерпываем" очередь порта при любой возможности
.on('message') и .pulse()
  constructor(workerFile, options) {
    // ...
    
    // основная точка приема сообщений из main-потока
    parentPort.on('message', message => {
      // флаг возможности и необходимости продолжать отправку
      let processed;
      const {ready, _pulse} = this.#mainWorker;
      if (ready) { // если основной рабочий поток свободен
        processed = _pulse(message);  // ... отдаем сообщение ему
        message = undefined;          // ... и обнуляем
      }
      // продолжаем "дергать" потоки, пока хоть какая-то очередь (порта или своя) непуста
      // ... и еще кто-то остался кто-то свободный
      while (!processed) {
        const worker = this.#queueMore1 && this.#workersPool.pop();
        if (worker) {
          processed = worker._pulse(message);
          message &&= undefined;
        }
        else {
          // перекладываем всю очередь порта в свою очередь с меткой времени
          const now = Date.now();
          message ??= receiveMessageOnPort(parentPort)?.message;
          while (message) {
            this.#queue.push(message);
            message.ts = now;
            message = receiveMessageOnPort(parentPort)?.message;
          }
          // отложенная проверка необходимости запуска нового потока
          this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
            this.#createWorkerIfPossible();
            this.#checking = null;
          }, this.#options['intervalCheck']));
          return;
        }
      }
    });
    // ...

  #pulseWorker(worker, message) {
    worker.ready = false; // busy
    // если нам передали сообщение - пытаемся отдать его потоку
    let processed = !message || worker._send(message);
    // пока он говорит "уже все сразу обработал"...
    while (processed) {
      // извлекаем следующее сообщение из очереди порта основного потока
      const recv = receiveMessageOnPort(parentPort);
      // ... или своей локальной очереди
      message = recv?.message ?? this.#queue.shift();
      if (!message) {
        if (worker !== this.#mainWorker) {
          // когда все очереди закончились - возвращаем дополнительный поток в пул свободных
          const pool = this.#workersPool;
          !pool.includes(worker) && pool.push(worker);
          // фиксируем момент последней активности потока из пула
          worker.activity = Date.now();
        }
        worker.ready = true; // free
        return true;
      }
      // передаем данные в поток
      processed = worker._send(message);
    }
  };

Сама схема передачи в поток "по блокировке" та же самая, что мы использовали в прошлой части.

"Моя фамилия - Итого"

Осталось только свести все части воедино:

Полный код классов и тестового приложения, и пара слов про обработку строк

Для простоты изложения выше опущена часть существенных деталей:

  • ячеек для ожидания нотификации должно быть две - "туда" и "обратно", иначе они иначе ожидание в самом потоке координаторе может быть прервано нотификацией из него же

  • одну из ячеек мы используем в качестве сигнала об окончании записи буфера, что не является атомарной операцией само по себе, поэтому в потоке можно начать читать начало уже нового буфера, а "хвост" - еще от старого

const {
  Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
, receiveMessageOnPort
, threadId
} = require('node:worker_threads');

const THREAD_FREE = -1;

const hrtime = process.hrtime.bigint;

class WorkersPool {
  #options; // кэш аргументов конструктора

  #mainWorker;              // основной рабочий поток
  #workersPool = [];        // пул свободных дополнительных потоков
  #workersSet  = new Set(); // полный набор всех активных потоков
  #workersRemain;           // потоков еще доступно к созданию

  #queue;      // очередь на кольцевом буфере

  #checking;   // признак активности проверки состояния

  #queueMore1; // "длина" очереди превышает возможности 1 потока
  #queueMoreW; //                                   ... всех существующих потоков

  #checkQueue(workers) {
    // если в очереди задач уже больше, чем потоков,
    // ... и самая старая задача висит дольше, чем обработали бы все активные потоки за время старта нового
    return this.#queue.length > workers && this.#queue[0].ts + this.#options['timeoutSpawn'] * workers < Date.now();
  }

  constructor(workerFile, options) {
    options['workerFile'] ??= workerFile;
    const {poolSize, queuePowMin, queuePowMax, timeoutIdle, intervalCheck} = options;
    this.#options = options;

    this.#workersRemain = poolSize;

    // очередь на кольцевом буфере
    this.#queue = new (require('./Pow2Buffer'))(queuePowMin, queuePowMax);

    // главный вспомогательный поток существует сразу и всегда
    this.#mainWorker = this.#createWorker();

    // основная точка приема сообщений из main-потока
    parentPort.on('message', message => {
      // флаг возможности и необходимости продолжать отправку
      let processed;
      const {ready, _pulse} = this.#mainWorker;
      if (ready) { // если основной рабочий поток свободен
        processed = _pulse(message);  // ... отдаем сообщение ему
        message = undefined;          // ... и обнуляем
      }
      // продолжаем "дергать" потоки, пока хоть какая-то очередь (порта или своя) непуста
      // ... и еще кто-то остался кто-то свободный
      while (!processed) {
        const worker = this.#queueMore1 && this.#workersPool.pop();
        if (worker) {
          processed = worker._pulse(message);
          message &&= undefined;
        }
        else {
          // перекладываем всю очередь порта в свою очередь с меткой времени
          const now = Date.now();
          message ??= receiveMessageOnPort(parentPort)?.message;
          while (message) {
            this.#queue.push(message);
            message.ts = now;
            message = receiveMessageOnPort(parentPort)?.message;
          }
          // отложенная проверка необходимости запуска нового потока
          this.#queueMoreW && this.#workersRemain > 0 && (this.#checking ??= setTimeout(() => {
            this.#createWorkerIfPossible();
            this.#checking = null;
          }, this.#options['intervalCheck']));
          return;
        }
      }
    });

    // периодическая проверка очереди и пула дополнительных потоков
    const pool = this.#workersPool;
    setInterval(() => {
      // пора ли порождать еще один поток?
      this.#queueMoreW = this.#checkQueue(this.#workersSet.size);
      // пора ли отдавать задачи дополнительным потокам?
      this.#queueMore1 = this.#queueMoreW || this.#checkQueue(1);
      // закрытие простаивающих дополнительных потоков
      if (pool.length) {
        const deadline = Date.now() - timeoutIdle;
        for (const {activity, _destroy} of pool) {
          activity < deadline && _destroy();
        }
      }
    }, intervalCheck);
  }

  #createWorkerIfPossible() {
    // проверяем возможность и необходимость (по состоянмю очереди) запуска потока
    if (this.#workersRemain > 0 && this.#checkQueue(this.#workersSet.size)) {
      this.#createWorker();
    }
  }

  #createWorker() {
    this.#workersRemain--;
    this.#workersRemain = -this.#workersRemain; // wrap flag

    const shared = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 3 + this.#options['dataSize']);
    const data = new Uint8Array(shared);
    const lock = new Int32Array(shared, 0, 2); // lock/id - первые 4 байта data
    lock[0] = THREAD_FREE;
    lock[1] = THREAD_FREE;

    // ждем, пока поток не проинициализируется и не сменит состояние, чтобы его "подергать"
    let worker;
    Atomics.waitAsync(lock, 1, THREAD_FREE)
      .value
      .then(() => worker._pulse());
    
    const {port1, port2} = new MessageChannel();
    worker = new Worker(
      this.#options['workerFile']
    , {
        workerData : {
          workerType : 'worker'
        , shared
        , port       : port2 // передаем порт в рабочий поток
        }
      , transferList : [port2]
      }
    );

    // отправляем порт в основной поток, в Coordinator
    this.#options['port'].postMessage(
      {
        threadId : worker.threadId
      , port     : port1
      }
    , [port1]
    );

    this.#workersSet.add(worker);

    worker.id = worker.threadId;
    worker.port = port2;
    worker.lock = lock;
    worker.data = data;

    worker._pulse   = this.#pulseWorker.bind(this, worker);
    worker._wait    = this.#waitWorker.bind(this, worker);
    worker._destroy = this.#destroyWorker.bind(this, worker);
    worker._send    = this.#sendMessage.bind(this, worker);

    worker.on('online', () => {
      // продолжим пробовать стартовать следующий после паузы на timeoutSpawn
      setTimeout(() => {
        this.#workersRemain = -this.#workersRemain; // wrap flag
        this.#createWorkerIfPossible();
      }, this.#options['timeoutSpawn']);
    });

    return worker;
  }

  #pulseWorker(worker, message) {
    worker.ready = false; // busy
    // если нам передали сообщение - пытаемся отдать его потоку
    let processed = !message || worker._send(message);
    // пока он говорит "уже все сразу обработал"...
    while (processed) {
      // извлекаем следующее сообщение из очереди порта основного потока
      const recv = receiveMessageOnPort(parentPort);
      // ... или своей локальной очереди
      message = recv?.message ?? this.#queue.shift();
      if (!message) {
        if (worker !== this.#mainWorker) {
          // когда все очереди закончились - возвращаем дополнительный поток в пул свободных
          const pool = this.#workersPool;
          !pool.includes(worker) && pool.push(worker);
          // фиксируем момент последней активности потока из пула
          worker.activity = Date.now();
        }
        worker.ready = true; // free
        return true;
      }
      // передаем данные в поток
      processed = worker._send(message);
    }
  };

  #sendMessage(worker, message) {
    const {lock, data, _wait} = worker;
    // записываем из ui8-проекции в разделяемую память вместе с id
    data.set(message);
    // id - это первые 4 байта data
    const id = lock[1];
    // копируем id вместо THREAD_FREE как сигнал окончания записи всего буфера
    lock[0] = id;
    // уведомляем поток
    Atomics.notify(lock, 0, 1);
    // уходим в ожидание
    return _wait(id);
  }

  #waitWorker(worker, id) {
    // ждем, пока поток не обработает и не позовет нас
    const {value} = Atomics.waitAsync(worker.lock, 1, id);
    if (value === 'not-equal') {
      // если он сразу успел обработать, то в него можно и дальше писать
      return true;
    }
    else {
      const {lock, _wait, _pulse} = worker;
      // если не сразу - ждем, пока не обработается "наш" вызов
      value.then(() => lock[1] != THREAD_FREE ? _wait(id) : lock[0] == id && _pulse());
    }
  }

  #destroyWorker(worker) {
    // убираем поток из общего набора
    this.#workersSet.delete(worker);
    // закрываем обе стороны ассоциированного MessageChannel
    worker.port.close();
    // завершаем сам поток
    worker.terminate();
    // исключаем воркер из пула, если он там был
    const idx = this.#workersPool.indexOf(worker);
    if (idx >= 0) {
      this.#workersPool.splice(idx, 1);
    }
    // увеличиваем количество доступных к запуску
    this.#workersRemain++;
  }
}

class Coordinator extends Worker {
  #messages = new Map(); // хранилище всех обрабатываемых сообщений

  #p32 = new Int32Array(3);                // префикс данных = [THREAD_FREE/id, id/THREAD_FREE, dataSize]
  #p8  = new Uint8Array(this.#p32.buffer); // ... и его uint8-проекция
  #data = [this.#p8]; // массив для склейки частей в двоичный блок
  #message2data;      // функция получения двоичных данных из сообщения и помещения в массив

  #data2buffer() {
    // объединяем префикс и данные
    const buf = Buffer.concat(this.#data);
    // записываем в префикс результирующую длину
    buf.writeUInt32LE(buf.length - 12, 8); // Int32Array.BYTES_PER_ELEMENT * 3
    // восстанавливаем состояние массива
    this.#data.length = 1;
    // возвращаем итоговый двоичный контент
    return buf;
  };

  constructor(filename, options) {
    // сигнальный канал
    const {port1, port2} = new MessageChannel();

    /* доинициализируем необходимые опции
      {
        workerData : {
          workerType : 'coordinator'
        , port       : port2
        }
      , transferList : [port2]
      }
    */
    ((options ??= {}).workerData ??= {}).workerType = 'coordinator';
    ((options ??= {}).workerData ??= {}).port = port2;
    ((options ??= {}).transferList ??= []).push(port2);
    super(filename, options);

    const {dataField, dataArray} = options;
    this.#p32[0] = THREAD_FREE;
    this.#p32[1] = 0; // ID

    this.#message2data = dataField
      ? message => this.#data.push(dataField(message))
      : message => this.#data.push(...dataArray(message));

    const messages = this.#messages;
    port1.on('message', ({threadId, port}) => {
      // ассоциируем открывшийся порт с конкретным потоком
      port.threadId = threadId;
      this.emit('port.open', port);

      port
        .on('message', result => {
          // из результата обработки по ID получаем исходное сообщение ...
          const message = messages.get(result.id);
          if (message) {
            messages.delete(result.id);
            // ... и передаем вместе с результатом
            this.emit('message', result, message);
          }
        })
        .on('close', () => {
          // при закрытии порта - отписываемся от него
          this.emit('port.close', port);
          port.removeAllListeners();
        });
    });
  }

  // передача двоичных данных в поток
  postMessage(message) {
    // сохраняем объект сообщения в хранилище
    this.#messages.set(this.#p32[1], message);
    // добавляем в общий массив для склейки одно или несколько полей
    this.#message2data(message);
    // формируем целевой контейнер данных
    const buffer = this.#data2buffer();
    // передаем двоичный контент с префиксом в поток по ссылке
    super.postMessage(buffer, [buffer.buffer]);
    // id = (id + 1) % 0x10000000
    this.#p32[1]++;
    this.#p32[1] &= 0x0FFFFFFF;
  }
}

const taskSize = 1 << 16;

if (isMainThread) {
  const {randomBytes} = require('node:crypto');
  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  console.log(Date.now(), 'Main : online');
  const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });
  console.log(Date.now(), 'Main : generated');

  const hashes = messages.map(() => undefined);
  let remain;

  const coordinator = new Coordinator(
    __filename
  , {
      dataField : message => message.data // двоичные данные лежат в одном поле
    }
  );
  coordinator
    .on('online', () => {
      console.log(Date.now(), 'Coordinator : online');
      // получаем список всех сообщений
      const fns = fs.readdirSync(dir)
        .sort()
        .map(fn => dir + sep + fn);
      remain = fns.length;

      fns.forEach((fn, id) => {
        const data = fs.readFileSync(fn); // тяжелый синхронный код
        coordinator.postMessage({id, fn, data});
      });
      console.log(Date.now(), 'Main : all send');
    })
    .on('port.open', port => {
      console.log(Date.now(), `Coordinator : dataPort open  = +1 worker [${port.threadId}]`);
    })
    .on('port.close', port => {
      console.log(Date.now(), `Coordinator : dataPort close = -1 worker [${port.threadId}]`);
    })
    .on('message', (result, message) => {
      hashes[message.id] = result.hash;
      if (!--remain) {
        console.log(Date.now(), 'Main : all recv');
        // подождем, пока завершатся все желающие потоки
        setTimeout(() => {
          process.exit();
        }, 1000);
      }
    });
}
else {
  const {workerType, port} = workerData;
  switch (workerType) {
    case 'coordinator':
      // в потоке-координаторе нет активности, кроме управления пулом рабочих потоков
      const pool = new WorkersPool(
        __filename
      , {
          poolSize      : require('node:os').cpus().length // по количеству CPU-ядер
        , dataSize      : taskSize // предельный размер данных
        , port
        , queuePowMin   : 8  //   256
        , queuePowMax   : 16 // 65536
        , intervalCheck : 10
        , timeoutSpawn  : 100
        , timeoutIdle   : 10
        }
      );
      break;
    case 'worker':
      const {createHash} = require('node:crypto');

      const {shared} = workerData;

      // [shared] = {id/lockIn:int32} + {state/lockOut:int32} + {size:int32} + {data:uint8[]}
      const lock = new Int32Array(shared, 0, 3);
      const data = new Uint8Array(shared, Int32Array.BYTES_PER_ELEMENT * 3);

      const processMessage = () => {
        const [id, , size] = lock;
        // забираем данные и производим обработку
        const hash = createHash('sha256').update(
          data.subarray(0, size) // входящий контент
        );

        // уведомляем координатор о своей доступности
        lock[1] = THREAD_FREE;
        Atomics.notify(lock, 1, 1);

        // отправляем результат
        port.postMessage({id, hash : hash.digest('hex')});
        
        // ... и возвращаемся к ожиданию блокировки
        wait(id);
      };

      const wait = id => {
        const {value} = Atomics.waitAsync(lock, 0, id);
        if (value === 'not-equal') {
          // если значение изменилось на новое, то поток уже обработал задачу, и реагируем сразу
          // ... но разрываем цепочку вызовов, чтобы избежать переполнения стека
          const _id = lock[0];
          _id == THREAD_FREE ? wait(_id) : process.nextTick(processMessage);
        }
        else {
          // иначе ждем разрешения Promise блокировки с другим ID
          value.then(() => {
            const _id = lock[0];
            _id == id || _id == THREAD_FREE ? wait(_id) : processMessage();
          });
        }
      };

      // "освобождаем" блокировку и первично уведомляем координатора о своей готовности
      lock[0] = THREAD_FREE;
      lock[1] = THREAD_FREE;
      Atomics.notify(lock, 1, 1);
      wait(THREAD_FREE);

      // подвешиваем поток в бесконечное ожидание
      port.on('message', () => {});
      break;
  }
}

Если ваша обработка в том или ином виде требует копирования данных из разделяемого буфера (например, преобразование в строку), то вы можете освободить блокировку сразу после него.

В этом случае поток продолжит обработку строки, а координатор уже сможет положить в буфер свежие данные.

Если вы все сделали правильно, то должны увидеть примерно такой вывод:

1666250935654 Main : online
1666250940036 Main : generated
1666250940132 Coordinator : online
1666250941187 Main : all send
1666250941195 Coordinator : dataPort open  = +1 worker [2]
1666250941196 Coordinator : dataPort open  = +1 worker [3]
1666250941196 Coordinator : dataPort open  = +1 worker [4]
1666250941225 Coordinator : dataPort close = -1 worker [3]
1666250941244 Main : all recv
1666250941262 Coordinator : dataPort close = -1 worker [4]

В данном случае было запущено одновременно до 3 рабочих потоков: главный #2 и два вспомогательных #3 и #4. Один из них успел завершиться даже раньше, чем мы получили все результаты, а второй - чуть погодя.

На этом цикл статей про многопоточность в JavaScript/Node.js я завершаю, а вы прочитайте предыдущие части - не пожалеете!


Теги:
Хабы:
Всего голосов 15: ↑15 и ↓0+15
Комментарии6

Публикации

Информация

Сайт
sbis.ru
Дата регистрации
Дата основания
Численность
1 001–5 000 человек
Местоположение
Россия