Как стать автором
Обновить

Приручаем многопоточность в Node.js (часть 2/5: очередь, каналы и координатор)

Время на прочтение 16 мин
Количество просмотров 7.8K
Всего голосов 14: ↑14 и ↓0 +14
Комментарии 8

Комментарии 8

Все очень круто, но можно ли раскрыть содержимое "./Pow2Buffer"

Всё очень здорово, но если финальное содержимое той статьи вставить в ./Pow2Buffer
То при запуске вашего "Полный код и результаты тестов"

Выбивает вот таким:

generated: 289 ms

node:internal/event_target:969
  process.nextTick(() => { throw err; });
                           ^
TypeError [Error]: require(...) is not a constructor
    at Object.<anonymous> (/app/sample-07.js:141:21)
    at Module._compile (node:internal/modules/cjs/loader:1126:14)
    at Object.Module._extensions..js (node:internal/modules/cjs/loader:1180:10)
    at Module.load (node:internal/modules/cjs/loader:1004:32)
    at Function.Module._load (node:internal/modules/cjs/loader:839:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
    at MessagePort.<anonymous> (node:internal/main/worker_thread:196:24)
    at MessagePort.[nodejs.internal.kHybridDispatch] (node:internal/event_target:694:20)
    at MessagePort.exports.emitMessage (node:internal/per_context/messageport:23:28)
Emitted 'error' event on Worker instance at:
    at Worker.[kOnErrorMessage] (node:internal/worker:290:10)
    at Worker.[kOnMessage] (node:internal/worker:301:37)
    at MessagePort.<anonymous> (node:internal/worker:202:57)
    at MessagePort.[nodejs.internal.kHybridDispatch] (node:internal/event_target:694:20)
    at MessagePort.exports.emitMessage (node:internal/per_context/messageport:23:28)

Дык... в статье код класса, а если хотите его размещать в отдельном файле, то не забывайте дописатьmodule.exports = Pow2Buffer.

Очень интересно и очень сложно. На потоке-координаторе я сломался.

Что, куда, откуда? port1 и port2 и там и тут. Какой порт на что подписан? И вообще потерял что такое parentPort :) Читать очень сложно. На чтение только этой части ушло почти 1,5 часа.

Тем не менее всё интересно, спасибо!

В итоге, я надеюсь, что остался с примерно правильным пониманием:

  1. сначала создаётся координатор, с ним создаётся канал для сигналов (и какой-то части данных). Координатору добавляется св-во "порт сигналов".

  2. затем создаются обычные рабочие, с каждым создаётся канал для сигналов (и какой-то части данных). Также добавляется св-во "порт сигналов".

  3. затем между ними создаётся канал данных. И через доступные порты сигналов (из св-тв объекта), созданные ранее, отправляются порты данных. В каждый по одному.

  4. дальше клубок подписок. Рабочий, получив по сигнальному порту информацю о порте данных сразу его подписывает на обработчик. И магическим образом сразу что-то сигнализирует куда-то.

  5. координатор же, получив по сигнальному порту информацию о порте данных, сначала записывает его в пул. А потом уже подписывает на него обработчик. И тоже магически что-то отправляет по сигнальному порту кому-то.

  6. потом появляется "парент порт". Это обработчик вызовов из основнова потока. куда мы кидаем пары {id, message}. Координатор в случае получения задания должен сразу попытаться его кому-то назначить (видимо, не дожидаясь готовности потоков), а если некому - сохранить сообщение в очереди.

Начнем по порядку...

  1. {port1, port2} = new MessageChannel();

    Канал - это просто некоторая абстракция над куском разделяемой памяти, у которой есть два симметричных конца-"порта" и буфер-очередь (см. схему "Структура MessageChannel"). Один поток "владеет" одним портом, другой - другим. Соответственно, оба могут как писать, так и читать оттуда, причем и по событию port.on('message', ...), и самостоятельно receiveMessageOnPort(port).

  2. {parentPort} = require('node:worker_threads'); - это уже заранее доступный в потоке такой же "конец" канала связи с main-потоком.

  3. Сначала мы создаем 16 одинаковых рабочих потоков - они константны для всех прогоняемых тестов. И для каждого из них организуем служебный канал (чтобы не дергать как раз обычный - через parentPort) для общения между ним и main-потоком. Один его конец оставляем в main-потоке как worker.signalPort , а другой через workerData передаем в поток-координатор.

    По этому каналу мы передадим в поток данные о портах других каналов.

  4. В рамках теста на N потоков мы создаем поток-координатор с таким же служебным каналом coordinator.signalPort .

  5. Затем мы для каждого из N рабочих потоков мы в main создаем свой канал - один конец передаем через signalPort в координатор, а второй - в рабочий поток. На этом main и про этот свежесозданный канал, и про signalPort, "забывает", и больше от него ничего не хочет.

  6. Дальше main передает через coordinator.postMessage сообщение в поток-координатор, тот его получает в parentPort.on('message', ...) и складывает в очередь.

  7. Как только кто-то из рабочих потоков по переданному в начале порту сигнализирует координатору, что он свободен, координатор передает сообщение из очереди ему.

  8. Рабочий поток результат обработки через parentPort.postMessage передает прямо в main, минуя координатор. Ну и сигнализируем в координатор "я свободен".

Зарегистрируйтесь на Хабре , чтобы оставить комментарий