Как стать автором
Обновить
172.5
Тензор
Разработчик системы Saby

IPC в Node.js: решение проблемы с передачей данных

Уровень сложностиСредний
Время на прочтение6 мин
Количество просмотров1.7K

В Node.js IPC (Inter-Process Communication) - это механизм, используемый для обмена данными между процессами. Начиная с версии 12.16.0 в модуле child_processes появилась поддержка режима advanced serialization для IPC. Однако иногда он может привести к проблемам с зависанием сообщений, что приводит к ошибкам и проблемам с функциональностью. В этой статье мы расскажем как решили эту проблему.

В нашей системе мониторинга и анализа PostgreSQL каждый коллектор представляет собой один мастер-процесс и несколько дочерних процессов-воркеров, используемых для распределения нагрузки. Воркеры собирают статистику и анализируют логи серверов PostgreSQL, сохраняя результаты в таблицах фактов. Словарные данные передаются по IPC мастер-процессу, который сохраняет их в таблицах словарей после удаления дубликатов (подробнее об архитектуре коллектора можно прочитать в статье).

Воркеры принимают логи в виде буферов и не преобразует их в строки во время парсинга, но при передаче словарных записей мастер-процессу производится сериализация с использованием JSON.stringify в режиме по умолчанию. Это приводит к конвертации буферов в строки:

const buf = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
const json = JSON.stringify(buf);

// здесь json - это строка:
'{"type":"Buffer","data":[1,2,3,4,5]}'

а затем десериализации на стороне получателя, в результате мастер получает объект с полями type и data вместо буфера и для получения исходного буфера требуется преобразование:

Buffer.from({"type":"Buffer","data":[1,2,3,4,5]}) 

// получим исходный <Buffer 01 02 03 04 05>

Таким образом, для передачи буфера через IPC требуется несколько преобразований на обеих сторонах — в строку в рабочем процессе и обратно в объект, а затем в буфер в мастер-процессе. Это слишком затратно для высоконагруженного коллектора, который мониторит тысячи серверов PostgreSQL. Кроме потерь времени ЦПУ, создаются новые строки и объекты, что потребляет больше памяти и увеличивает нагрузку на сборщик мусора.

Поэтому с появлением advanced режима сериализации мы перешли на него, чтобы избежать лишних преобразований. Однако, со временем стали замечать странную проблему - в произвольный момент передача данных через IPC прекращалась, при этом использование памяти мастер-процессом постоянно росло, приводя к падению по ошибке 'heap out of memory'.

В коде коллектора явных ошибок не нашли - использовались стандартные process.send и process.on('message') . Исследуя модуль child_process , выяснили, что между основным процессом и дочерним создается IPC-канал сообщений, использующий pipe-механизмы библиотеки libuv. Размер блока при передаче через pipe может составлять до 65536 байт. Формат сообщения включает заголовок из 4 байт с его общим размером, служебный заголовок, включающий номер версии, тип и длину сообщения, и само сообщение, длина которого кодируется как varint в формате base-128 и может быть 1 до 3 байт, поэтому служебный заголовок может занимать от 6 до 8 байт.

Например при отправке небольшого сообщения длиной 10 байт будет создан блок размером 20 байт, длина сообщения кодируется в 1 байте (0x0A):

При отправке сообщения с длиной больше 65524 байт используются несколько блоков, например сообщение длиной 100 000 байт будет передано в двух блоках:

здесь header занимает 8 байт, так как 100 000 в base-128 это 0xA0 0x8D 0x06 .

Сборку сообщения из нескольких блоков выполняет функция parseChannelMessages , в ней:

channel[kMessageBuffer] - массив, в который собираются поступающие блоки

channel[kMessageBufferSize] - общий размер всех блоков

в fullMessageSize хранится взятый из первых 4 байт размер сообщения

  *parseChannelMessages(channel, readData) {
    if (readData.length === 0) return;

    ArrayPrototypePush(channel[kMessageBuffer], readData);
    channel[kMessageBufferSize] += readData.length;

    // Index 0 should always be present because we just pushed data into it.
    let messageBufferHead = channel[kMessageBuffer][0];
    while (messageBufferHead.length >= 4) {
      // We call `readUInt32BE` manually here, because this is faster than first converting
      // it to a buffer and using `readUInt32BE` on that.
      const fullMessageSize = (
        messageBufferHead[0] << 24 |
        messageBufferHead[1] << 16 |
        messageBufferHead[2] << 8 |
        messageBufferHead[3]
      ) + 4;

      if (channel[kMessageBufferSize] < fullMessageSize) break;

      const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
        channel[kMessageBuffer][0] :
        Buffer.concat(
          channel[kMessageBuffer],
          channel[kMessageBufferSize],
        );

      const deserializer = new ChildProcessDeserializer(
        TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize),
      );

      messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
      channel[kMessageBufferSize] = messageBufferHead.length;
      channel[kMessageBuffer] =
        channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];

      deserializer.readHeader();
      yield deserializer.readValue();
    }

    channel.buffering = channel[kMessageBufferSize] > 0;
  }

При передаче нескольких сообщений подряд могут использоваться блоки максимального размера, при этом в одном блоке может быть несколько сообщений.

Например, при передаче двух сообщений длиной 65521 байт используются два блока, причем заголовок второго сообщения разделен между блоками:

В приемнике при этом выполняются следующие операции:

  1. первый блок сохраняется в массиве channel[kMessageBuffer]

  2. его размер больше 4 байт, поэтому запускается цикл обработки

  3. определяется размер первого сообщения и сохраняется в fullMessageSize

  4. сообщение вырезается из блока subarray(4, fullMessageSize) и десериализуется

  5. хвост первого блока сохраняется в первом элементе channel[kMessageBuffer]

  6. первое сообщение отправляется обработчику события 'message'

  7. второй блок добавляется в массив channel[kMessageBuffer]

  8. проверяется длина первого элемента channel[kMessageBuffer], в котором находится хвост первого блока размером всего 1 байт, поэтому проверка условия запуска цикла (>=4) не проходит и блок остается не обработанным

  9. при поступлении новых блоков пункты 7 и 8 повторяются, при этом блоки остаются в массиве channel[kMessageBuffer] без обработки.

Таким образом, если заголовок сообщения оказался разделенным между блоками, то все последующие сообщения остаются без обработки, просто накапливаясь в очереди, что приводит к росту используемой памяти мастер-процессом и в итоге к ошибке 'heap out of memory'.

Очевидно, что для решения этой проблемы надо всего лишь добавить проверку перед п.1 - если длина первого элемента channel[kMessageBuffer] меньше 4 байт, то в ней находится начало заголовка пришедшего блока и их надо объединить:

if (channel[kMessageBufferSize] && channel[kMessageBuffer][0].length < 4) {
  // Message length split into two buffers, so let's concatenate it.
  channel[kMessageBuffer][0] = Buffer.concat([channel[kMessageBuffer][0], readData]);
} else {
  ArrayPrototypePush(channel[kMessageBuffer], readData);
}

Мы оформили issue и предложили свой патч, который недавно приняли и он вошел в релизы 22.14.0 и 23.7.0 .

Для предыдущих версий можно исправить ошибку непосредственно в JavaScript, проверяя длину первого элемента channel[kMessageBuffer] и объединяя его с последующим блоком при необходимости, например по условию длительного отсутствия сообщений:

const kChannelHandle = Object.getOwnPropertySymbols(childProcess).find(s => s.description === 'kChannelHandle');
if (childProcess[kChannelHandle]) {
  const kMessageBuffer = Object.getOwnPropertySymbols(childProcess[kChannelHandle]).find(s => s.description === 'kMessageBuffer');
  let buf = childProcess[kChannelHandle][kMessageBuffer];
  if (buf[0]?.length < 4) {
    buf.unshift(Buffer.concat([buf.shift(), buf.shift()]));
  }
}

здесь мы также проверяем длину первого элемента channel[kMessageBuffer] и если она меньше 4 байт, то объединяем первые два. Таким образом при поступлении новых блоков условие запуска цикла обработки будет выполняться и очередь из накопившихся блоков будет разобрана.

Проблема появилась в Node.js 16.17.0 вместе с этим PR и отсутствует в предыдущих версиях. Для проверки вашей версии можно использовать следующий скрипт:

const { spawn } = require('child_process');

const msgLen = 65521;
let cnt = 10;

if (process.argv[2] === 'child') {
  const msg = Buffer.allocUnsafe(msgLen);
  process.channel.ref();
  (function send() {
    if (cnt-- > 0) {
      process.send(msg, send);
    }
  })();  
} else {
  const child = spawn(process.execPath, [__filename, 'child'], {
    stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
    serialization: 'advanced'
  });

  setTimeout(() => {
    child.kill();
    console.log('Test failed');
    process.exit(1);
  }, 1000);
  child.on('message', (msg) => {
    if (--cnt === 0) {
      child.kill();
      console.log('Test passed');
      process.exit(0);
    }
  })
}

Аналогичный тест был добавлен в набор тестов Node.js, поэтому эта проблема не должна возникать в новых версиях.

Breaking IPC is bad
Breaking IPC is bad

Теги:
Хабы:
+12
Комментарии1

Публикации

Информация

Сайт
saby.ru
Дата регистрации
Дата основания
Численность
1 001–5 000 человек
Местоположение
Россия