В архитектуре потоковой обработки данных Kafka, как высокопроизводительная очередь сообщений, обычно используется для агрегации данных, а StarRocks, как высокопроизводительная аналитическая СУБД, отвечает за хранение и анализ. С помощью Routine Load можно стабильно и эффективно загружать в StarRocks данные в форматах JSON и CSV из Kafka.

Недавно была решена проблема с загрузкой данных JSON из Kafka. Кратко ход диагностики:

  1. Команда разработки данных сообщила, что после запуска задания Routine Load в тестовой среде данные не записываются в таблицу с первичным ключом.

  2. По журналу ошибок (скриншот) обнаружилось, что данные импортируются как NULL, в то время как столбцы в таблице не допускают NULL.

  3. Проверка содержимого топика показала, что сами данные корректны.

  4. В конфигурации Routine Load отсутствовал параметр jsonpaths, из‑за чего поля вытягивались некорректно. После добавления корректного jsonpaths задание Routine Load стало работать штатно и данные в реальном времени начали записываться в таблицу с первичным ключом.

Ниже — сводная практика по Routine Load в StarRocks: шаги конфигурации и примеры.


I. Подготовка окружения

1.1 Базовые требования

  • Кластер Kafka: версия 0.10.2 и выше; кластер работает корректно, создан тестовый топик.

  • Кластер StarRocks: версия 2.0 и выше; включает узлы FE (Frontend) и BE (CN).

  • JDK: версия 1.8 и выше; переменные окружения настроены.

  • Сеть: кластер StarRocks должен иметь доступ к портам брокеров Kafka (по умолчанию 9092).

1.2 Подготовка топиков и данных

В Kafka создаются два топика: для JSON и для CSV.

# Создать топик для JSON-данных
kafka-topics.sh --create --bootstrap-server kafka-broker1:9092,kafka-broker2:9092 \
  --topic user_behavior_json --partitions 3 --replication-factor 2

# Создать топик для CSV-данных
kafka-topics.sh --create --bootstrap-server kafka-broker1:9092,kafka-broker2:9092 \
  --topic order_info_csv --partitions 2 --replication-factor 2

II. Пример импорта данных JSON

2.1 Пример данных

В топике Kafka user_behavior_json хранятся данные действий пользователей (ID, тип действия, ID товара, timestamp и детальная информация):

{
  "user_id": 10001,
  "behavior_type": "click",
  "product_id": "prod_001",
  "timestamp": 1685000000,
  "details": {
    "page": "home",
    "stay_time": 15,
    "device": "mobile"
  }
}
{
  "user_id": 10002,
  "behavior_type": "purchase",
  "product_id": "prod_002",
  "timestamp": 1685000100,
  "details": {
    "page": "product_detail",
    "stay_time": 60,
    "device": "pc"
  }
}

2.2 Создание целевой таблицы в StarRocks (часто обновляемая таблица — модель с Primary Key)

В соответствии со структурой JSON создаётся таблица user_behavior. Вложенное поле details — типа STRUCT.

CREATE TABLE user_behavior (
    user_id BIGINT COMMENT '用户ID',
    behavior_type STRING COMMENT '行为类型,如click、purchase等',
    product_id STRING COMMENT '商品ID',
    timestamp BIGINT COMMENT '时间戳',
    details STRUCT<
        page STRING COMMENT '页面名称',
        stay_time INT COMMENT '停留时间(秒)',
        device STRING COMMENT '设备类型'
    > COMMENT '详细信息'
) ENGINE = OLAP
PRIMARY KEY (user_id, timestamp)
DISTRIBUTED BY HASH(user_id) BUCKETS 8
PROPERTIES (
    "replication_num" = "1",
    "storage_medium" = "SSD"
);

2.3 Создание задания Routine Load для импорта JSON

Создаём задание Routine Load для импорта данных из топика user_behavior_json в таблицу user_behavior:

CREATE ROUTINE LOAD db_name.json_load_task ON user_behavior
COLUMNS(
    user_id,
    behavior_type,
    product_id,
    timestamp,
    details
)
PROPERTIES (
    "format" = "json",
    "jsonpaths" = "[
        \"$.user_id\",
        \"$.behavior_type\",
        \"$.product_id\",
        \"$.timestamp\",
        \"$.details\"
    ]",
    "read_json_by_line" = "true",
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_error_number" = "1000",
    "strict_mode" = "false"
)
FROM KAFKA (
    "kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092",
    "kafka_topic" = "user_behavior_json",
    "kafka_consumer_group" = "sr_json_consumer_group",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "OFFSET_BEGINNING"
);

2.4 Описание параметров

  • format: формат данных — json.

  • jsonpaths: сопоставление полей JSON столбцам таблицы в порядке следования.

  • read_json_by_line: true — по одному объекту JSON на строку.

  • desired_concurrent_number: 3 — совпадает с числом партиций топика для повышения скорости.

  • max_batch_interval: максимальный интервал пакетной загрузки (сек).

  • kafka_offsets: начинать потребление с начала топика.

При необходимости для JSON-массивов используйте "strip_outer_array" = "true".

2.5 Проверка импорта JSON

SELECT user_id, behavior_type, product_id, timestamp, details.page, details.stay_time
FROM user_behavior
LIMIT 10;

Если запрос корректно возвращает данные, импорт JSON прошёл успешно.


III. Пример импорта данных CSV

3.1 Пример данных

В топике Kafka order_info_csv хранятся данные заказов. Поля разделены запятой:

order_001,10001,299.9,2023-06-25 10:30:00,paid
order_002,10002,599.8,2023-06-25 11:15:00,pending
order_003,10001,199.9,2023-06-25 14:20:00,paid

3.2 Создание целевой таблицы

CREATE TABLE order_info (
    order_id STRING COMMENT '订单ID',
    user_id BIGINT COMMENT '用户ID',
    amount DECIMAL(10, 2) COMMENT '订单金额',
    order_time DATETIME COMMENT '订单时间',
    status STRING COMMENT '订单状态'
) ENGINE = OLAP
PRIMARY KEY (order_id, user_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 6
PROPERTIES (
    "replication_num" = "1"
);

3.3 Создание задания Routine Load для CSV

CREATE ROUTINE LOAD db_name.csv_load_task ON order_info
COLUMNS(
    order_id,
    user_id,
    amount,
    order_time,
    status
)
PROPERTIES (
    "format" = "csv",
    "column_separator" = ",",
    "line_delimiter" = "\n",
    "desired_concurrent_number" = "2",
    "max_batch_interval" = "15",
    "max_error_number" = "500",
    "skip_header" = "false"
)
FROM KAFKA (
    "kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092",
    "kafka_topic" = "order_info_csv",
    "kafka_consumer_group" = "sr_csv_consumer_group",
    "kafka_partitions" = "0,1",
    "kafka_offsets" = "OFFSET_BEGINNING"
);

3.4 Описание параметров

  • format: формат данных — csv.

  • column_separator: разделитель столбцов — «,».

  • line_delimiter: разделитель строк — «\n».

  • desired_concurrent_number: 2 — соответствует числу партиций топика.

  • skip_header: false — не пропускать первую строку.

3.5 Проверка импорта CSV

SELECT * FROM order_info LIMIT 10;

Если данные отображаются корректно, импорт CSV успешен.


IV. Управление и мониторинг Routine Load

4.1 Статус заданий

SHOW ROUTINE LOAD \G;

Команда выводит имя задания, состояние, прогресс импорта, ошибки и прочие детали.

4.2 Пауза, возобновление и остановка

PAUSE ROUTINE LOAD FOR json_load_task;
PAUSE ROUTINE LOAD FOR csv_load_task;

RESUME ROUTINE LOAD FOR json_load_task;
RESUME ROUTINE LOAD FOR csv_load_task;

-- Осторожно: удаляет задание
STOP ROUTINE LOAD FOR json_load_task;
STOP ROUTINE LOAD FOR csv_load_task;

4.3 Просмотр ошибочных данных

SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD TASK WHERE JobName = 'csv_load_task';

V. Замечания и рекомендации по оптимизации

5.1 Проверка формата данных

  • JSON: следите за корректностью синтаксиса; используйте jsonlint и аналогичные инструменты.

  • CSV: обеспечьте единообразный разделитель; если в значениях встречается запятая, обрамляйте значение кавычками.

5.2 Производительность

  • desired_concurrent_number: подбирайте согласно числу партиций топика (часто равно или кратно).

  • Баланс задержки/пропускной способности: регулируйте max_batch_interval, а также при необходимости добавьте max_batch_rows и/или max_batch_size.

  • Ресурсы StarRocks: выделите узлам BE достаточно CPU и памяти, чтобы избежать узких мест.

  • В продуктивных кластерах используйте "replication_num" = "3" и выше для надёжности.

5.3 Отказоустойчивость

  • Установите разумный max_error_number, чтобы единичные ошибки не останавливали импорт.

  • Регулярно ��ониторьте статус заданий и оперативно обрабатывайте сбои.

  • Используйте репликацию StarRocks для повышения отказоустойчивости.


VI. Итоги

Показано, как с помощью Routine Load импортировать в StarRocks данные форматов JSON и CSV из Kafka. Routine Load — эффективный и стабильный механизм непрерывной загрузки, удовлетворяющий требованиям потоковой обработки. На практике необходимо с учётом структуры данных и бизнес‑контекста корректно настраивать параметры заданий и обеспечивать мониторинг, чтобы гарантировать точность и эффективность импорта. Комбинация Kafka и StarRocks позволяет построить мощную платформу для обработки и анализа данных в реальном времени, поддерживающую бизнес‑принятие решений.


Как «расплющить» JSON полностью в плоскую двумерную таблицу

Если требуется не хранить вложенный STRUCT, а сразу маппить его поля в отдельные столбцы, используйте схему таблицы с «плоскими» колонками и соответствующие jsonpaths.

Создание таблицы (плоская схема)

CREATE TABLE user_behavior (
    user_id BIGINT COMMENT '用户ID',
    behavior_type STRING COMMENT '行为类型,如click、purchase等',
    product_id STRING COMMENT '商品ID',
    timestamp BIGINT COMMENT '时间戳',
    details_page STRING COMMENT '页面名称',
    details_stay_time INT COMMENT '停留时间(秒)',
    details_device STRING COMMENT '设备类型'
) ENGINE = OLAP
PRIMARY KEY (user_id, timestamp)
DISTRIBUTED BY HASH(user_id) BUCKETS 8
PROPERTIES (
    "replication_num" = "1",
    "storage_medium" = "SSD"
);

Routine Load для плоского маппинга JSON → колонки

CREATE ROUTINE LOAD db_name.json_load_task ON user_behavior
COLUMNS(
    user_id,
    behavior_type,
    product_id,
    timestamp,
    details_page,
    details_stay_time,
    details_device
)
PROPERTIES (
    "format" = "json",
    "jsonpaths" = "[
        \"$.user_id\",
        \"$.behavior_type\",
        \"$.product_id\",
        \"$.timestamp\",
        \"$.details.page\",
        \"$.details.stay_time\",
        \"$.details.device\"
    ]",
    "read_json_by_line" = "true",
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_error_number" = "1000",
    "strict_mode" = "false"
)
FROM KAFKA (
    "kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092",
    "kafka_topic" = "user_behavior_json",
    "kafka_consumer_group" = "sr_json_consumer_group",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "OFFSET_BEGINNING"
);

Ключевой момент — корректные jsonpaths, где вложенные поля обращаются через $.details.page, $.details.stay_time, $.details.device. Это обеспечивает полное «расплющивание» исходного JSON в плоские колонки таблицы. Если сообщения приходят как массивы объектов, добавьте свойство "strip_outer_array" = "true".