Комментарии 8
Все очень круто, но можно ли раскрыть содержимое "./Pow2Buffer"
В тексте есть ссылка на предыдущую статью, откуда взята реализация: https://habr.com/ru/company/tensor/blog/688182/
Всё очень здорово, но если финальное содержимое той статьи вставить в ./Pow2Buffer
То при запуске вашего "Полный код и результаты тестов"
Выбивает вот таким:
generated: 289 ms
node:internal/event_target:969
process.nextTick(() => { throw err; });
^
TypeError [Error]: require(...) is not a constructor
at Object.<anonymous> (/app/sample-07.js:141:21)
at Module._compile (node:internal/modules/cjs/loader:1126:14)
at Object.Module._extensions..js (node:internal/modules/cjs/loader:1180:10)
at Module.load (node:internal/modules/cjs/loader:1004:32)
at Function.Module._load (node:internal/modules/cjs/loader:839:12)
at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
at MessagePort.<anonymous> (node:internal/main/worker_thread:196:24)
at MessagePort.[nodejs.internal.kHybridDispatch] (node:internal/event_target:694:20)
at MessagePort.exports.emitMessage (node:internal/per_context/messageport:23:28)
Emitted 'error' event on Worker instance at:
at Worker.[kOnErrorMessage] (node:internal/worker:290:10)
at Worker.[kOnMessage] (node:internal/worker:301:37)
at MessagePort.<anonymous> (node:internal/worker:202:57)
at MessagePort.[nodejs.internal.kHybridDispatch] (node:internal/event_target:694:20)
at MessagePort.exports.emitMessage (node:internal/per_context/messageport:23:28)
Поэтому и написал в самом начале - пожалуйста листинг Pow2Buffer
TypeError [Error]: require(...) is not a constructor
at Object.<anonymous> (/app/sample-07.js:141:21)
Очень интересно и очень сложно. На потоке-координаторе я сломался.
Что, куда, откуда? port1 и port2 и там и тут. Какой порт на что подписан? И вообще потерял что такое parentPort :) Читать очень сложно. На чтение только этой части ушло почти 1,5 часа.
Тем не менее всё интересно, спасибо!
В итоге, я надеюсь, что остался с примерно правильным пониманием:
сначала создаётся координатор, с ним создаётся канал для сигналов (и какой-то части данных). Координатору добавляется св-во "порт сигналов".
затем создаются обычные рабочие, с каждым создаётся канал для сигналов (и какой-то части данных). Также добавляется св-во "порт сигналов".
затем между ними создаётся канал данных. И через доступные порты сигналов (из св-тв объекта), созданные ранее, отправляются порты данных. В каждый по одному.
дальше клубок подписок. Рабочий, получив по сигнальному порту информацю о порте данных сразу его подписывает на обработчик. И магическим образом сразу что-то сигнализирует куда-то.
координатор же, получив по сигнальному порту информацию о порте данных, сначала записывает его в пул. А потом уже подписывает на него обработчик. И тоже магически что-то отправляет по сигнальному порту кому-то.
потом появляется "парент порт". Это обработчик вызовов из основнова потока. куда мы кидаем пары {id, message}. Координатор в случае получения задания должен сразу попытаться его кому-то назначить (видимо, не дожидаясь готовности потоков), а если некому - сохранить сообщение в очереди.
Начнем по порядку...
{port1, port2} = new MessageChannel();
Канал - это просто некоторая абстракция над куском разделяемой памяти, у которой есть два симметричных конца-"порта" и буфер-очередь (см. схему "Структура MessageChannel"). Один поток "владеет" одним портом, другой - другим. Соответственно, оба могут как писать, так и читать оттуда, причем и по событию
port.on('message', ...)
, и самостоятельноreceiveMessageOnPort(port)
.{parentPort} = require('node:worker_threads');
- это уже заранее доступный в потоке такой же "конец" канала связи с main-потоком.Сначала мы создаем 16 одинаковых рабочих потоков - они константны для всех прогоняемых тестов. И для каждого из них организуем служебный канал (чтобы не дергать как раз обычный - через
parentPort
) для общения между ним и main-потоком. Один его конец оставляем в main-потоке какworker.signalPort
, а другой черезworkerData
передаем в поток-координатор.По этому каналу мы передадим в поток данные о портах других каналов.
В рамках теста на N потоков мы создаем поток-координатор с таким же служебным каналом
coordinator.signalPort
.Затем мы для каждого из N рабочих потоков мы в main создаем свой канал - один конец передаем через
signalPort
в координатор, а второй - в рабочий поток. На этом main и про этот свежесозданный канал, и проsignalPort
, "забывает", и больше от него ничего не хочет.Дальше main передает через
coordinator.postMessage
сообщение в поток-координатор, тот его получает вparentPort.on('message', ...)
и складывает в очередь.Как только кто-то из рабочих потоков по переданному в начале порту сигнализирует координатору, что он свободен, координатор передает сообщение из очереди ему.
Рабочий поток результат обработки через
parentPort.postMessage
передает прямо в main, минуя координатор. Ну и сигнализируем в координатор "я свободен".
Приручаем многопоточность в Node.js (часть 2/5: очередь, каналы и координатор)