В архитектуре потоковой обработки данных Kafka, как высокопроизводительная очередь сообщений, обычно используется для агрегации данных, а StarRocks, как высокопроизводительная аналитическая СУБД, отвечает за хранение и анализ. С помощью Routine Load можно стабильно и эффективно загружать в StarRocks данные в форматах JSON и CSV из Kafka.
Недавно была решена проблема с загрузкой данных JSON из Kafka. Кратко ход диагностики:
Команда разработки данных сообщила, что после запуска задания Routine Load в тестовой среде данные не записываются в таблицу с первичным ключом.
По журналу ошибок (скриншот) обнаружилось, что данные импортируются как NULL, в то время как столбцы в таблице не допускают NULL.

Проверка содержимого топика показала, что сами данные корректны.
В конфигурации Routine Load отсутствовал параметр jsonpaths, из‑за чего поля вытягивались некорректно. После добавления корректного jsonpaths задание Routine Load стало работать штатно и данные в реальном времени начали записываться в таблицу с первичным ключом.
Ниже — сводная практика по Routine Load в StarRocks: шаги конфигурации и примеры.
I. Подготовка окружения
1.1 Базовые требования
Кластер Kafka: версия 0.10.2 и выше; кластер работает корректно, создан тестовый топик.
Кластер StarRocks: версия 2.0 и выше; включает узлы FE (Frontend) и BE (CN).
JDK: версия 1.8 и выше; переменные окружения настроены.
Сеть: кластер StarRocks должен иметь доступ к портам брокеров Kafka (по умолчанию 9092).
1.2 Подготовка топиков и данных
В Kafka создаются два топика: для JSON и для CSV.
# Создать топик для JSON-данных
kafka-topics.sh --create --bootstrap-server kafka-broker1:9092,kafka-broker2:9092 \
--topic user_behavior_json --partitions 3 --replication-factor 2
# Создать топик для CSV-данных
kafka-topics.sh --create --bootstrap-server kafka-broker1:9092,kafka-broker2:9092 \
--topic order_info_csv --partitions 2 --replication-factor 2
II. Пример импорта данных JSON
2.1 Пример данных
В топике Kafka user_behavior_json хранятся данные действий пользователей (ID, тип действия, ID товара, timestamp и детальная информация):
{
"user_id": 10001,
"behavior_type": "click",
"product_id": "prod_001",
"timestamp": 1685000000,
"details": {
"page": "home",
"stay_time": 15,
"device": "mobile"
}
}
{
"user_id": 10002,
"behavior_type": "purchase",
"product_id": "prod_002",
"timestamp": 1685000100,
"details": {
"page": "product_detail",
"stay_time": 60,
"device": "pc"
}
}
2.2 Создание целевой таблицы в StarRocks (часто обновляемая таблица — модель с Primary Key)
В соответствии со структурой JSON создаётся таблица user_behavior. Вложенное поле details — типа STRUCT.
CREATE TABLE user_behavior (
user_id BIGINT COMMENT '用户ID',
behavior_type STRING COMMENT '行为类型,如click、purchase等',
product_id STRING COMMENT '商品ID',
timestamp BIGINT COMMENT '时间戳',
details STRUCT<
page STRING COMMENT '页面名称',
stay_time INT COMMENT '停留时间(秒)',
device STRING COMMENT '设备类型'
> COMMENT '详细信息'
) ENGINE = OLAP
PRIMARY KEY (user_id, timestamp)
DISTRIBUTED BY HASH(user_id) BUCKETS 8
PROPERTIES (
"replication_num" = "1",
"storage_medium" = "SSD"
);
2.3 Создание задания Routine Load для импорта JSON
Создаём задание Routine Load для импорта данных из топика user_behavior_json в таблицу user_behavior:
CREATE ROUTINE LOAD db_name.json_load_task ON user_behavior
COLUMNS(
user_id,
behavior_type,
product_id,
timestamp,
details
)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[
\"$.user_id\",
\"$.behavior_type\",
\"$.product_id\",
\"$.timestamp\",
\"$.details\"
]",
"read_json_by_line" = "true",
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000",
"strict_mode" = "false"
)
FROM KAFKA (
"kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092",
"kafka_topic" = "user_behavior_json",
"kafka_consumer_group" = "sr_json_consumer_group",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "OFFSET_BEGINNING"
);
2.4 Описание параметров
format: формат данных — json.
jsonpaths: сопоставление полей JSON столбцам таблицы в порядке следования.
read_json_by_line: true — по одному объекту JSON на строку.
desired_concurrent_number: 3 — совпадает с числом партиций топика для повышения скорости.
max_batch_interval: максимальный интервал пакетной загрузки (сек).
kafka_offsets: начинать потребление с начала топика.
При необходимости для JSON-массивов используйте "strip_outer_array" = "true".
2.5 Проверка импорта JSON
SELECT user_id, behavior_type, product_id, timestamp, details.page, details.stay_time
FROM user_behavior
LIMIT 10;
Если запрос корректно возвращает данные, импорт JSON прошёл успешно.
III. Пример импорта данных CSV
3.1 Пример данных
В топике Kafka order_info_csv хранятся данные заказов. Поля разделены запятой:
order_001,10001,299.9,2023-06-25 10:30:00,paid
order_002,10002,599.8,2023-06-25 11:15:00,pending
order_003,10001,199.9,2023-06-25 14:20:00,paid
3.2 Создание целевой таблицы
CREATE TABLE order_info (
order_id STRING COMMENT '订单ID',
user_id BIGINT COMMENT '用户ID',
amount DECIMAL(10, 2) COMMENT '订单金额',
order_time DATETIME COMMENT '订单时间',
status STRING COMMENT '订单状态'
) ENGINE = OLAP
PRIMARY KEY (order_id, user_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 6
PROPERTIES (
"replication_num" = "1"
);
3.3 Создание задания Routine Load для CSV
CREATE ROUTINE LOAD db_name.csv_load_task ON order_info
COLUMNS(
order_id,
user_id,
amount,
order_time,
status
)
PROPERTIES (
"format" = "csv",
"column_separator" = ",",
"line_delimiter" = "\n",
"desired_concurrent_number" = "2",
"max_batch_interval" = "15",
"max_error_number" = "500",
"skip_header" = "false"
)
FROM KAFKA (
"kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092",
"kafka_topic" = "order_info_csv",
"kafka_consumer_group" = "sr_csv_consumer_group",
"kafka_partitions" = "0,1",
"kafka_offsets" = "OFFSET_BEGINNING"
);
3.4 Описание параметров
format: формат данных — csv.
column_separator: разделитель столбцов — «,».
line_delimiter: разделитель строк — «\n».
desired_concurrent_number: 2 — соответствует числу партиций топика.
skip_header: false — не пропускать первую строку.
3.5 Проверка импорта CSV
SELECT * FROM order_info LIMIT 10;
Если данные отображаются корректно, импорт CSV успешен.
IV. Управление и мониторинг Routine Load
4.1 Статус заданий
SHOW ROUTINE LOAD \G;
Команда выводит имя задания, состояние, прогресс импорта, ошибки и прочие детали.
4.2 Пауза, возобновление и остановка
PAUSE ROUTINE LOAD FOR json_load_task;
PAUSE ROUTINE LOAD FOR csv_load_task;
RESUME ROUTINE LOAD FOR json_load_task;
RESUME ROUTINE LOAD FOR csv_load_task;
-- Осторожно: удаляет задание
STOP ROUTINE LOAD FOR json_load_task;
STOP ROUTINE LOAD FOR csv_load_task;
4.3 Просмотр ошибочных данных
SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD TASK WHERE JobName = 'csv_load_task';
V. Замечания и рекомендации по оптимизации
5.1 Проверка формата данных
JSON: следите за корректностью синтаксиса; используйте jsonlint и аналогичные инструменты.
CSV: обеспечьте единообразный разделитель; если в значениях встречается запятая, обрамляйте значение кавычками.
5.2 Производительность
desired_concurrent_number: подбирайте согласно числу партиций топика (часто равно или кратно).
Баланс задержки/пропускной способности: регулируйте
max_batch_interval, а также при необходимости добавьтеmax_batch_rowsи/илиmax_batch_size.Ресурсы StarRocks: выделите узлам BE достаточно CPU и памяти, чтобы избежать узких мест.
В продуктивных кластерах используйте
"replication_num" = "3"и выше для надёжности.
5.3 Отказоустойчивость
Установите разумный
max_error_number, чтобы единичные ошибки не останавливали импорт.Регулярно ��ониторьте статус заданий и оперативно обрабатывайте сбои.
Используйте репликацию StarRocks для повышения отказоустойчивости.
VI. Итоги
Показано, как с помощью Routine Load импортировать в StarRocks данные форматов JSON и CSV из Kafka. Routine Load — эффективный и стабильный механизм непрерывной загрузки, удовлетворяющий требованиям потоковой обработки. На практике необходимо с учётом структуры данных и бизнес‑контекста корректно настраивать параметры заданий и обеспечивать мониторинг, чтобы гарантировать точность и эффективность импорта. Комбинация Kafka и StarRocks позволяет построить мощную платформу для обработки и анализа данных в реальном времени, поддерживающую бизнес‑принятие решений.
Как «расплющить» JSON полностью в плоскую двумерную таблицу
Если требуется не хранить вложенный STRUCT, а сразу маппить его поля в отдельные столбцы, используйте схему таблицы с «плоскими» колонками и соответствующие jsonpaths.
Создание таблицы (плоская схема)
CREATE TABLE user_behavior (
user_id BIGINT COMMENT '用户ID',
behavior_type STRING COMMENT '行为类型,如click、purchase等',
product_id STRING COMMENT '商品ID',
timestamp BIGINT COMMENT '时间戳',
details_page STRING COMMENT '页面名称',
details_stay_time INT COMMENT '停留时间(秒)',
details_device STRING COMMENT '设备类型'
) ENGINE = OLAP
PRIMARY KEY (user_id, timestamp)
DISTRIBUTED BY HASH(user_id) BUCKETS 8
PROPERTIES (
"replication_num" = "1",
"storage_medium" = "SSD"
);
Routine Load для плоского маппинга JSON → колонки
CREATE ROUTINE LOAD db_name.json_load_task ON user_behavior
COLUMNS(
user_id,
behavior_type,
product_id,
timestamp,
details_page,
details_stay_time,
details_device
)
PROPERTIES (
"format" = "json",
"jsonpaths" = "[
\"$.user_id\",
\"$.behavior_type\",
\"$.product_id\",
\"$.timestamp\",
\"$.details.page\",
\"$.details.stay_time\",
\"$.details.device\"
]",
"read_json_by_line" = "true",
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000",
"strict_mode" = "false"
)
FROM KAFKA (
"kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092",
"kafka_topic" = "user_behavior_json",
"kafka_consumer_group" = "sr_json_consumer_group",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "OFFSET_BEGINNING"
);
Ключевой момент — корректные jsonpaths, где вложенные поля обращаются через $.details.page, $.details.stay_time, $.details.device. Это обеспечивает полное «расплющивание» исходного JSON в плоские колонки таблицы. Если сообщения приходят как массивы объектов, добавьте свойство "strip_outer_array" = "true".
