
В Node.js IPC (Inter-Process Communication) - это механизм, используемый для обмена данными между процессами. Начиная с версии 12.16.0 в модуле child_processes появилась поддержка режима advanced serialization для IPC. Однако иногда он может привести к проблемам с зависанием сообщений, что приводит к ошибкам и проблемам с функциональностью. В этой статье мы расскажем как решили эту проблему.
В нашей системе мониторинга и анализа PostgreSQL каждый коллектор представляет собой один мастер-процесс и несколько дочерних процессов-воркеров, используемых для распределения нагрузки. Воркеры собирают статистику и анализируют логи серверов PostgreSQL, сохраняя результаты в таблицах фактов. Словарные данные передаются по IPC мастер-процессу, который сохраняет их в таблицах словарей после удаления дубликатов (подробнее об архитектуре коллектора можно прочитать в статье).

Воркеры принимают логи в виде буферов и не преобразует их в строки во время парсинга, но при передаче словарных записей мастер-процессу производится сериализация с использованием JSON.stringify в режиме по умолчанию. Это приводит к конвертации буферов в строки:
const buf = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
const json = JSON.stringify(buf);
// здесь json - это строка:
'{"type":"Buffer","data":[1,2,3,4,5]}'
а затем десериализации на стороне получателя, в результате мастер получает объект с полями type и data вместо буфера и для получения исходного буфера требуется преобразование:
Buffer.from({"type":"Buffer","data":[1,2,3,4,5]})
// получим исходный <Buffer 01 02 03 04 05>
Таким образом, для передачи буфера через IPC требуется несколько преобразований на обеих сторонах — в строку в рабочем процессе и обратно в объект, а затем в буфер в мастер-процессе. Это слишком затратно для высоконагруженного коллектора, который мониторит тысячи серверов PostgreSQL. Кроме потерь времени ЦПУ, создаются новые строки и объекты, что потребляет больше памяти и увеличивает нагрузку на сборщик мусора.
Поэтому с появлением advanced режима сериализации мы перешли на него, чтобы избежать лишних преобразований. Однако, со временем стали замечать странную проблему - в произвольный момент передача данных через IPC прекращалась, при этом использование памяти мастер-процессом постоянно росло, приводя к падению по ошибке 'heap out of memory'.
В коде коллектора явных ошибок не нашли - использовались стандартные process.send и process.on('message') . Исследуя модуль child_process , выяснили, что между основным процессом и дочерним создается IPC-канал сообщений, использующий pipe-механизмы библиотеки libuv. Размер блока при передаче через pipe может составлять до 65536 байт. Формат сообщения включает заголовок из 4 байт с его общим размером, служебный заголовок, включающий номер версии, тип и длину сообщения, и само сообщение, длина которого кодируется как varint в формате base-128 и может быть 1 до 3 байт, поэтому служебный заголовок может занимать от 6 до 8 байт.

Например при отправке небольшого сообщения длиной 10 байт будет создан блок размером 20 байт, длина сообщения кодируется в 1 байте (0x0A):

При отправке сообщения с длиной больше 65524 байт используются несколько блоков, например сообщение длиной 100 000 байт будет передано в двух блоках:

здесь header занимает 8 байт, так как 100 000 в base-128 это 0xA0 0x8D 0x06 .
Сборку сообщения из нескольких блоков выполняет функция parseChannelMessages , в ней:
channel[kMessageBuffer] - массив, в который собираются поступающие блоки
channel[kMessageBufferSize] - общий размер всех блоков
в fullMessageSize хранится взятый из первых 4 байт размер сообщения
*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;
ArrayPrototypePush(channel[kMessageBuffer], readData);
channel[kMessageBufferSize] += readData.length;
// Index 0 should always be present because we just pushed data into it.
let messageBufferHead = channel[kMessageBuffer][0];
while (messageBufferHead.length >= 4) {
// We call `readUInt32BE` manually here, because this is faster than first converting
// it to a buffer and using `readUInt32BE` on that.
const fullMessageSize = (
messageBufferHead[0] << 24 |
messageBufferHead[1] << 16 |
messageBufferHead[2] << 8 |
messageBufferHead[3]
) + 4;
if (channel[kMessageBufferSize] < fullMessageSize) break;
const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
channel[kMessageBuffer][0] :
Buffer.concat(
channel[kMessageBuffer],
channel[kMessageBufferSize],
);
const deserializer = new ChildProcessDeserializer(
TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize),
);
messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
channel[kMessageBufferSize] = messageBufferHead.length;
channel[kMessageBuffer] =
channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];
deserializer.readHeader();
yield deserializer.readValue();
}
channel.buffering = channel[kMessageBufferSize] > 0;
}
При передаче нескольких сообщений подряд могут использоваться блоки максимального размера, при этом в одном блоке может быть несколько сообщений.
Например, при передаче двух сообщений длиной 65521 байт используются два блока, причем заголовок второго сообщения разделен между блоками:

В приемнике при этом выполняются следующие операции:
первый блок сохраняется в массиве channel[kMessageBuffer]
его размер больше 4 байт, поэтому запускается цикл обработки
определяется размер первого сообщения и сохраняется в fullMessageSize
сообщение вырезается из блока subarray(4, fullMessageSize) и десериализуется
хвост первого блока сохраняется в первом элементе channel[kMessageBuffer]
первое сообщение отправляется обработчику события 'message'
второй блок добавляется в массив channel[kMessageBuffer]
проверяется длина первого элемента channel[kMessageBuffer], в котором находится хвост первого блока размером всего 1 байт, поэтому проверка условия запуска цикла (>=4) не проходит и блок остается не обработанным
при поступлении новых блоков пункты 7 и 8 повторяются, при этом блоки остаются в массиве channel[kMessageBuffer] без обработки.
Таким образом, если заголовок сообщения оказался разделенным между блоками, то все последующие сообщения остаются без обработки, просто накапливаясь в очереди, что приводит к росту используемой памяти мастер-процессом и в итоге к ошибке 'heap out of memory'.
Очевидно, что для решения этой проблемы надо всего лишь добавить проверку перед п.1 - если длина первого элемента channel[kMessageBuffer] меньше 4 байт, то в ней находится начало заголовка пришедшего блока и их надо объединить:
if (channel[kMessageBufferSize] && channel[kMessageBuffer][0].length < 4) {
// Message length split into two buffers, so let's concatenate it.
channel[kMessageBuffer][0] = Buffer.concat([channel[kMessageBuffer][0], readData]);
} else {
ArrayPrototypePush(channel[kMessageBuffer], readData);
}
Мы оформили issue и предложили свой патч, который недавно приняли и он вошел в релизы 22.14.0 и 23.7.0 .
Для предыдущих версий можно исправить ошибку непосредственно в JavaScript, проверяя длину первого элемента channel[kMessageBuffer] и объединяя его с последующим блоком при необходимости, например по условию длительного отсутствия сообщений:
const kChannelHandle = Object.getOwnPropertySymbols(childProcess).find(s => s.description === 'kChannelHandle');
if (childProcess[kChannelHandle]) {
const kMessageBuffer = Object.getOwnPropertySymbols(childProcess[kChannelHandle]).find(s => s.description === 'kMessageBuffer');
let buf = childProcess[kChannelHandle][kMessageBuffer];
if (buf[0]?.length < 4) {
buf.unshift(Buffer.concat([buf.shift(), buf.shift()]));
}
}
здесь мы также проверяем длину первого элемента channel[kMessageBuffer] и если она меньше 4 байт, то объединяем первые два. Таким образом при поступлении новых блоков условие запуска цикла обработки будет выполняться и очередь из накопившихся блоков будет разобрана.
Проблема появилась в Node.js 16.17.0 вместе с этим PR и отсутствует в предыдущих версиях. Для проверки вашей версии можно использовать следующий скрипт:
const { spawn } = require('child_process');
const msgLen = 65521;
let cnt = 10;
if (process.argv[2] === 'child') {
const msg = Buffer.allocUnsafe(msgLen);
process.channel.ref();
(function send() {
if (cnt-- > 0) {
process.send(msg, send);
}
})();
} else {
const child = spawn(process.execPath, [__filename, 'child'], {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
serialization: 'advanced'
});
setTimeout(() => {
child.kill();
console.log('Test failed');
process.exit(1);
}, 1000);
child.on('message', (msg) => {
if (--cnt === 0) {
child.kill();
console.log('Test passed');
process.exit(0);
}
})
}
Аналогичный тест был добавлен в набор тестов Node.js, поэтому эта проблема не должна возникать в новых версиях.
