Всем привет! Я Виктор Кугай, руководитель команды разработки спецпроектов в Тинькофф. Мы создаем геймификационные проекты, основанные на данных, чтобы познакомить пользователей с экосистемой компании и повысить узнаваемость бренда.

Расскажу, как с помощью Node.js Streams и механизма Back Pressure протокола TCP реализовать пакетную обработку сотен гигабайтов данных на машинах с жестким лимитом памяти.

Как мы передаем по сети большие объемы данных

Спецпроекты помогают пользователям в игровой форме вовлечься в использование экосистемы компании. Например, пользователь переводит в благотворительный фонд X рублей в месяц или выполняет какое-то другое действие в приложении Тинькофф и получает за это экстрабонусы.

Для долгосрочных спецпроектов отлично подходит потоковая обработка данных с помощью Apache Kafka. Например, для реализации игровых механик наша команда обрабатывает более 150 000 000 событий в сутки суммарным объемом свыше 250 ГБ.

Для краткосрочных спецпроектов — сроком жизни несколько месяцев — создание новой потоковой интеграции сопряжено с рисками. Например, настройка интеграции потребует больше ресурсов и времени, чем мы ожидали, и мы не успеем подготовить надежную интеграцию к старту проекта.

В этом случае на помощь приходит сценарий пакетной обработки данных:

  1. Платформа для совместной обработки данных Helicopter по расписанию инициирует агрегацию данных в Data Warehouse.

  2. Процесс отправляет CSV-файл с составным содержимым формата multipart/form-data по HTTP.

  3. Node.js-сервис валидирует входящие данные и сохраняет в PostgreSQL базу данных спецпроекта.

Мы используем формат multipart/form-data для передачи составных данных по сети, потому что во многих языках программирования есть его поддержка, а реализация не меняется в зависимости от языка.

Например, в JavaScript для загрузки составных данных используется объект FormData. В браузере отправить составные данные по сети можно с помощью объекта XMLHttpRequest и Fetch API, а в Node.js — с помощью модуля http/https, который используется в библиотеке axios, или модуля net в библиотеке undici.

const body = new FormData();
body.append('file', file);
fetch(url, {method: 'POST', body});

В python поддержка multipart/form-data встроена в модуль requests и дополнительных преобразований не требуется, поэтому multipart/form-data удобно использовать при написании скриптов переливки данных.

import requests

requests.post(
  url,
  files={'file', file}
)

Загружать составные данные несложно, но что произойдет, если игнорировать контроль потока при обработке данных? Потребление памяти превысит лимиты, и приложение упадет вот с такой ошибкой OOMKilled:

$ docker inspect --format='{{json .State}}' <container>

{
    "Status": "exited",
    "Running": false,
    "Paused": false,
    "Restarting": false,
    "OOMKilled": true,
    "Dead": false,
    "Pid": 0,
    "ExitCode": 137,
    "Error": "",
    "StartedAt": "2024-03-11T15:30:38.222462386Z",
    "FinishedAt": "2024-03-11T15:31:24.086041716Z"
}

Контроль потока при обработке составного содержимого сетевых запросов в Node.js

Пример ручного контроля потока с помощью подписки на событие data:

const HIGH_WATER_MARK = 1_048_576; // ~1MB

let buffer = Buffer.from([]);

readable.on('data', (data) => {
  buffer = Buffer.concat([buffer, data]);
  
  if (buffer.byteLength > HIGH_WATER_MARK) {
    readable.pause();
    // ...
    // === Обработать данные ===
    // ...
    buffer = Buffer.from([]);
    readable.resume();
  }
});

В Node.js v10 добавили поддержку асинхронной обработки данных с помощью цикла for await … of:

for await (const data of readable) {
  // ...
  // === Обработать данные ===
  // ...
}

Верхнеуровневый алгоритм обработки данных с помощью цикла for await … of такой:

  1. Пакеты данных аккумулируются в буфере.

  2. Общий размер данных в буфере достигает лимита highWaterMark.

  3. Потребление данных приостанавливается.

  4. Все непрочитанные пакеты объединяются.

  5. Данные поступают в цикл for await … of.

  6. Данные обрабатываются в теле цикла.

  7. Буфер освобождается.

  8. Потребление данных продолжается.

В случае с асинхронными итераторами данные сначала накапливаются в буфере до порогового значения highWaterMark. Затем потребление данных из потока приостанавливается, и только после завершения обработки оно продолжается. Ручное управление потоком с помощью readable.pause() и readable.resume() игнорируется.

В случае с подпиской на событие data за факт обработки данных принимается вызов функции-подписчика, поэтому полезная работа выполняется конкурентно, что может привести к переполнению памяти. Для контроля переполнения памяти нужно вручную ставить обработку данных на паузу с помощью readable.pause() и readable.resume().

Параметр highWaterMark конфигурирует пороговое значение буфера для всех типов стримов в Node.js. Для Duplex- и Transform-стримов предусмотрено два независимых буфера. Значение highWaterMark не является строгим лимитом памяти и носит рекомендательный характер, то есть реальная утилизация памяти буфером может существенно превышать значение highWaterMark.

Параметр highWaterMark имеет два режима конфигурации: в байтах и в символах. 

В байтах параметр конфигурируется по умолчанию, а в символах — при явном указании кодировки стрима с помощью stream.setEncoding(...). Один символ обычно занимает один байт (latin1, ascii), но может занимать несколько байт, поэтому конфигурация highWaterMark в символах не эквивалентна конфигурации в байтах.

В документации Node.js конфигурация setEncodingупоминается в самом последнем абзаце — его легко пропустить.

Отличия в обработке составного содержимого сетевых запросов в Node.js

При подписке на событие data обработчик вызывается каждый раз при чтении следующего TCP-пакета. Параметр highWaterMark не влияет на размер TCP-пакета, поэтому использовать подписку на событие data не очень удобно. Во-первых, необходимо вручную контролировать поток с помощью readable.pause() и readable.resume(). Во-вторых, данные обрабатываются слишком часто. Например, пакетами по 64 килобайта, потому что это максимальный размер пакета в протоколе IPv4.

Обработка данных с помощью асинхронных итераторов буферизует TCP-пакеты до порогового значения highWaterMark. Затем пакеты объединяются в один чанк и отправляются в тело цикла. Размер чанка в цикле зависит от значения highWaterMark, поэтому можно управлять размерами чанков более гибко. Важно помнить, что highWaterMark — это не жесткий лимит, а всего лишь пороговое значение, которое может быть превышено или проигнорировано.

Механизм Back Pressure при обработке сетевых запросов в Node.js

Node.js использует возможности библиотеки libuv для контроля сетевых соединений, которая с помощью механизма окон протокола TCP регулирует пропускную способность потока.

Как только размер данных в буфере превышает пороговое значение highWaterMark, библиотека libuv уменьшает размер TCP-окна до нуля и тем самым сигнализирует, что отправку данных нужно приостановить.

Параметры TCP-сессии, которые имеют ключевое значение для управления потоком:

  • Max Segment Size — максимальный размер сегмента для передачи по сети. Нужен, чтобы отправитель и получатель не передавали слишком большие пакеты друг другу.

  • Window Scale — коэффициент для расчета актуального размера окна. Нужен, чтобы экспоненциально увеличивать размер окна.

  • Window — размер окна. Нужен, чтобы подсказывать отправителю количество данных, которые потребитель может обработать.

Верхнеуровневый алгоритм управления потоком с помощью окон в протоколе TCP выглядит так:

  1. Клиент и сервер обмениваются параметрами для установки TCP-соединения.

  2. Клиент передает на сервер пакеты размером не больше Max Segment Size.

  3. Сервер буферизует пакеты, подтверждает их получение и уведомляет об уменьшении окна.

  4. Клиент приостанавливает отправку пакетов, когда окно становится недостаточным для отправки следующего пакета.

  5. Сервер обрабатывает пакеты из буфера и уведомляет клиента об увеличении окна.

  6. Клиент продолжает отправку данных.

Если клиент игнорирует размер окна и продолжает отправлять пакеты, лишние пакеты будут отброшены.

Отправка потоковых данных на практике

Рассмотрим пример отправки потоковых данных на сервер с контролем переполнения памяти и использованием утилиты tcpdump для логирования TCP-пакетов. Будем использовать только встроенные в Node.js модули node:stream и node:http, чтобы исключить влияние фреймворков на Readable-стрим.

1. Реализуем исходящий поток данных. Создаем Readable Stream, который генерирует данные чанками по 5 МБ, и отправляем с помощью функции request из модуля node:http.

/* client.js */

const {request} = require('node:http');
const {Readable} = require('node:stream');

const STREAM_SIZE_IN_BYTES = 104_857_600; // ~100MB
const STREAM_CHUNK_SIZE_IN_BYTES = 5_242_880; // ~5MB

main().catch((error) => {
  console.log(error);
  process.exit(1);
});

async function main() {
  await new Promise((resolve) => {
    let counter = 0;

    const readableStream = new Readable({
      read() {
        if (counter < STREAM_SIZE_IN_BYTES) {
          this.push(Buffer.alloc(STREAM_SIZE_IN_BYTES));
          counter += STREAM_CHUNK_SIZE_IN_BYTES;
        } else {
          this.push(null);
        }
      },
    });

    const options = {
      path: '/',
      method: 'POST',
      protocol: 'http:',
      hostname: 'localhost',
      port: '3000',
    };

    const req = request(options, (res) => {
      res.on('end', () => {
        resolve(null);
      });
    });

    readableStream.pipe(req);
  });
}

2. Реализуем обработку входящего потока. HighWaterMark указываем 1 МБ, чтобы убедиться, что управление потоком работает. Каждая итерация цикла использует задержку, чтобы имитировать сложную работу, которая выполняется в теле цикла.

/* server.js */

const {createServer} = require('node:http');
const {PassThrough} = require('node:stream');

const HIGH_WATER_MARK = 1_048_576; // ~1MB

const MAX_DELAY_IN_MS = 5_000;
const MIN_DELAY_IN_MS = 3_000;

main().catch((error) => {
  console.log(error);
});

async function main() {
  await new Promise((resolve) => {
    const server = createServer(async (req) => {
      const passThrough = new PassThrough({
        highWaterMark: HIGH_WATER_MARK,
      });

      req.pipe(passThrough);

      for await (const chunk of passThrough) {
        console.log(`Обработано ${chunk.toString().length} байт`);

        const randomTimeoutMs =
          Math.random() * (MAX_DELAY_IN_MS - MIN_DELAY_IN_MS) + MIN_DELAY_IN_MS;

        await new Promise((resolve) => setTimeout(resolve, randomTimeoutMs));
      }
    });

    server.listen(3000, () => {
      console.log(`Сервер успешно запущен`);
      resolve(null);
    });
  });
}

3. Собираем Docker-контейнер и конфигурируем ресурсы в файле Docker Compose для лимитирования памяти. Лимит памяти ставим 50 МБ, из которых ~20 МБ закрепляются за Node.js.

FROM node:18.16-alpine3.18

COPY build ./build

EXPOSE 3000

CMD ["node", "./build/server.js"]
version: '3'
name: node-streams
services:
  server:
    build:
      dockerfile: ./Dockerfile
    ports:
      - 3000:3000
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 50M
        reservations:
          cpus: '0.5'
          memory: 50M

4. Запускаем контейнер и отправляем данные. Видим, что сервер обрабатывает чанки размером примерно 1 МБ. Так происходит, потому что highWaterMark не лимит, а всего лишь пороговое значение.

node-streams-server-1  | Сервер успешно запущен
node-streams-server-1  | Обработано 32671 байт
node-streams-server-1  | Обработано 1081344 байт
node-streams-server-1  | Обработано 1114112 байт
node-streams-server-1  | Обработано 1114112 байт
node-streams-server-1  | Обработано 1114112 байт

5. Запускаем утилиту tcpdump, чтобы подробно отследить процесс управления потоком на уровне протокола TCP. Параметр -i — это интерфейс localhost, port — порт, который слушает утилита.

$ sudo tcpdump -i lo0 port 3000

6. Запускаем пример повторно. Сначала стороны обмениваются параметрами для установки TCP-сессии. Max Message Size (mss) — 16,324 байта, коэффициент Window Scale (wscale) — 6.

15:47:10.004647 IP localhost.54892 > localhost.hbci: Flags [S], seq 2664759488, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 792746651 ecr 0,sackOK,eol], length 0
15:47:10.005743 IP localhost.hbci > localhost.54892: Flags [S.], seq 4116912290, ack 2664759489, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 840270532 ecr 792746651,sackOK,eol], length 0

7. Клиент отправляет данные пакетами по 16332 байта, а получатель подтверждает получение пакетов. Номер последовательности (seq) указывает на диапазон данных, например 1:16313 и далее 16313:32625. Получатель подтверждает, что пакеты получены, например ack: 16313. Размер окна при этом постоянно уменьшается.

15:47:10.006046 IP localhost.54892 > localhost.hbci: Flags [.], seq 1:16333, ack 1, win 6379, options [nop,nop,TS val 792746653 ecr 840270534], length 16332
15:47:10.006047 IP localhost.54892 > localhost.hbci: Flags [.], seq 16333:32665, ack 1, win 6379, options [nop,nop,TS val 792746653 ecr 840270534], length 16332
15:47:10.006087 IP localhost.hbci > localhost.54892: Flags [.], ack 16333, win 6124, options [nop,nop,TS val 840270534 ecr 792746653], length 0
15:47:10.006099 IP localhost.hbci > localhost.54892: Flags [.], ack 32665, win 5869, options [nop,nop,TS val 840270534 ecr 792746653], length 0

8. Так продолжается, пока размер окна не становится слишком маленьким для отправки следующего пакета (win: 255). Отправитель приостанавливает отправку и ждет, когда размер окна восстановится. Значение параметра win — это не значение в байтах. Чуть ниже расскажу, как рассчитать актуальный размер окна.

15:47:10.031553 IP localhost.hbci > localhost.54892: Flags [.], ack 1159573, win 510, options [nop,nop,TS val 840270559 ecr 792746678], length 0
15:47:10.031567 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 255, options [nop,nop,TS val 840270559 ecr 792746678], length 0

9. Через три секунды приходит подтверждение, что размер окна восстановлен и можно продолжить отправку данных. Отправка продолжается.

15:47:10.031567 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 255, options [nop,nop,TS val 840270559 ecr 792746678], length 0
15:47:13.033733 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 4338, options [nop,nop,TS val 840270561 ecr 792746678], length 0
15:47:13.033850 IP localhost.54892 > localhost.hbci: Flags [.], seq 1175905:1192237, ack 1, win 6379, options [nop,nop,TS val 792746680 ecr 840270561], length 16332
15:47:13.033853 IP localhost.54892 > localhost.hbci: Flags [.], seq 1192237:1208569, ack 1, win 6379, options [nop,nop,TS val 792746680 ecr 840270561], length 16332

Чтобы получить размер актуального окна, нужно к значению параметра win применить формулу расчета актуального размера окна. Когда создавали протокол TCP, разработчики решили, что размер окна не должен превышать 65,535 байта. Для размера окна выделили параметр Window в заголовке сообщения. Значение параметра не должно превышать 16 бит (2^16 === 65,535).

В конце 90-х стало очевидно, что текущего размера окна недостаточно, поэтому в стандарт TCP добавили атрибут Window Scale Option, чтобы не ломать обратную совместимость протокола. При установке соединения хосты обмениваются параметрами Window Scale и рассчитывают актуальный размер окна согласно формуле Window Size * (2 ^ Window Scale), таким образом максимальный размер окна увеличился с 0,06 МБ (65,535 байта) до ~1 ГБ и не потребовалось менять размер заголовка Window.

Собираем все вместе

В Node.js при обработке большого количества данных, которые передаются по сети, можно и нужно контролировать объем потребляемой памяти. Для этого при обработке Readable Stream необходимо использовать цикл for await … of. Можно использовать подписку на событие data, но придется вручную ставить поток на паузу с помощью readable.pause().

Под капотом Node.js использует механизм Back Pressure, основанный на механизме окон протокола TCP, а библиотека libuv управляет состоянием TCP-соединения. Размер окна зависит от свободного места в буфере. Если размер данных в буфере превысил значение highWaterMark, то потребление данных приостанавливается и продолжается, когда буфер очищается.

В начале я описывал интеграцию для передачи составных данных с помощью HTTP. Благодаря простому и удобному механизму для контроля потока в Node.js наша команда надежно обрабатывает сотни гигабайтов данных на машинах с лимитом памяти в 256 МБ. Главное — использовать цикл for await … of для обработки данных и корректно сконфигурировать highWaterMark в соответствии с выделенными ресурсами.

Если вы повышали надежность обработки данных в Node.js — делитесь своими практиками в комментариях!