В архитектуре потоковой обработки данных Kafka, как высокопроизводительная очередь сообщений, обычно используется для агрегации данных, а StarRocks, как высокопроизводительная аналитическая СУБД, отвечает за хранение и анализ. С помощью Routine Load можно стабильно и эффективно загружать в StarRocks данные в форматах JSON и CSV из Kafka.
Недавно была решена проблема с загрузкой данных JSON из Kafka. Кратко ход диагностики:
Команда разработки данных сообщила, что после запуска задания Routine Load в тестовой среде данные не записываются в таблицу с первичным ключом.
По журналу ошибок (скриншот) обнаружилось, что данные импортируются как NULL, в то время как столбцы в таблице не допускают NULL.

Проверка содержимого топика показала, что сами данные корректны.
В конфигурации Routine Load отсутствовал параметр jsonpaths, из‑за чего поля вытягивались некорректно. После добавления корректного jsonpaths задание Routine Load стало работать штатно и данные в реальном времени начали записываться в таблицу с первичным ключом.
Ниже — сводная практика по Routine Load в StarRocks: шаги конфигурации и примеры.
I. Подготовка окружения
1.1 Базовые требования
Кластер Kafka: версия 0.10.2 и выше; кластер работает корректно, создан тестовый топик.
Кластер StarRocks: версия 2.0 и выше; включает узлы FE (Frontend) и BE (CN).
JDK: версия 1.8 и выше; переменные окружения настроены.
Сеть: кластер StarRocks должен иметь доступ к портам брокеров Kafka (по умолчанию 9092).
1.2 Подготовка топиков и данных
В Kafka создаются два топика: для JSON и для CSV.
# Создать топик для JSON-данных kafka-topics.sh --create --bootstrap-server kafka-broker1:9092,kafka-broker2:9092 \ --topic user_behavior_json --partitions 3 --replication-factor 2 # Создать топик для CSV-данных kafka-topics.sh --create --bootstrap-server kafka-broker1:9092,kafka-broker2:9092 \ --topic order_info_csv --partitions 2 --replication-factor 2
II. Пример импорта данных JSON
2.1 Пример данных
В топике Kafka user_behavior_json хранятся данные действий пользователей (ID, тип действия, ID товара, timestamp и детальная информация):
{ "user_id": 10001, "behavior_type": "click", "product_id": "prod_001", "timestamp": 1685000000, "details": { "page": "home", "stay_time": 15, "device": "mobile" } } { "user_id": 10002, "behavior_type": "purchase", "product_id": "prod_002", "timestamp": 1685000100, "details": { "page": "product_detail", "stay_time": 60, "device": "pc" } }
2.2 Создание целевой таблицы в StarRocks (часто обновляемая таблица — модель с Primary Key)
В соответствии со структурой JSON создаётся таблица user_behavior. Вложенное поле details — типа STRUCT.
CREATE TABLE user_behavior ( user_id BIGINT COMMENT '用户ID', behavior_type STRING COMMENT '行为类型,如click、purchase等', product_id STRING COMMENT '商品ID', timestamp BIGINT COMMENT '时间戳', details STRUCT< page STRING COMMENT '页面名称', stay_time INT COMMENT '停留时间(秒)', device STRING COMMENT '设备类型' > COMMENT '详细信息' ) ENGINE = OLAP PRIMARY KEY (user_id, timestamp) DISTRIBUTED BY HASH(user_id) BUCKETS 8 PROPERTIES ( "replication_num" = "1", "storage_medium" = "SSD" );
2.3 Создание задания Routine Load для импорта JSON
Создаём задание Routine Load для импорта данных из топика user_behavior_json в таблицу user_behavior:
CREATE ROUTINE LOAD db_name.json_load_task ON user_behavior COLUMNS( user_id, behavior_type, product_id, timestamp, details ) PROPERTIES ( "format" = "json", "jsonpaths" = "[ \"$.user_id\", \"$.behavior_type\", \"$.product_id\", \"$.timestamp\", \"$.details\" ]", "read_json_by_line" = "true", "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_error_number" = "1000", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092", "kafka_topic" = "user_behavior_json", "kafka_consumer_group" = "sr_json_consumer_group", "kafka_partitions" = "0,1,2", "kafka_offsets" = "OFFSET_BEGINNING" );
2.4 Описание параметров
format: формат данных — json.
jsonpaths: сопоставление полей JSON столбцам таблицы в порядке следования.
read_json_by_line: true — по одному объекту JSON на строку.
desired_concurrent_number: 3 — совпадает с числом партиций топика для повышения скорости.
max_batch_interval: максимальный интервал пакетной загрузки (сек).
kafka_offsets: начинать потребление с начала топика.
При необходимости для JSON-массивов используйте "strip_outer_array" = "true".
2.5 Проверка импорта JSON
SELECT user_id, behavior_type, product_id, timestamp, details.page, details.stay_time FROM user_behavior LIMIT 10;
Если запрос корректно возвращает данные, импорт JSON прошёл успешно.
III. Пример импорта данных CSV
3.1 Пример данных
В топике Kafka order_info_csv хранятся данные заказов. Поля разделены запятой:
order_001,10001,299.9,2023-06-25 10:30:00,paid order_002,10002,599.8,2023-06-25 11:15:00,pending order_003,10001,199.9,2023-06-25 14:20:00,paid
3.2 Создание целевой таблицы
CREATE TABLE order_info ( order_id STRING COMMENT '订单ID', user_id BIGINT COMMENT '用户ID', amount DECIMAL(10, 2) COMMENT '订单金额', order_time DATETIME COMMENT '订单时间', status STRING COMMENT '订单状态' ) ENGINE = OLAP PRIMARY KEY (order_id, user_id) DISTRIBUTED BY HASH(order_id) BUCKETS 6 PROPERTIES ( "replication_num" = "1" );
3.3 Создание задания Routine Load для CSV
CREATE ROUTINE LOAD db_name.csv_load_task ON order_info COLUMNS( order_id, user_id, amount, order_time, status ) PROPERTIES ( "format" = "csv", "column_separator" = ",", "line_delimiter" = "\n", "desired_concurrent_number" = "2", "max_batch_interval" = "15", "max_error_number" = "500", "skip_header" = "false" ) FROM KAFKA ( "kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092", "kafka_topic" = "order_info_csv", "kafka_consumer_group" = "sr_csv_consumer_group", "kafka_partitions" = "0,1", "kafka_offsets" = "OFFSET_BEGINNING" );
3.4 Описание параметров
format: формат данных — csv.
column_separator: разделитель столбцов — «,».
line_delimiter: разделитель строк — «\n».
desired_concurrent_number: 2 — соответствует числу партиций топика.
skip_header: false — не пропускать первую строку.
3.5 Проверка импорта CSV
SELECT * FROM order_info LIMIT 10;
Если данные отображаются корректно, импорт CSV успешен.
IV. Управление и мониторинг Routine Load
4.1 Статус заданий
SHOW ROUTINE LOAD \G;
Команда выводит имя задания, состояние, прогресс импорта, ошибки и прочие детали.
4.2 Пауза, возобновление и остановка
PAUSE ROUTINE LOAD FOR json_load_task; PAUSE ROUTINE LOAD FOR csv_load_task; RESUME ROUTINE LOAD FOR json_load_task; RESUME ROUTINE LOAD FOR csv_load_task; -- Осторожно: удаляет задание STOP ROUTINE LOAD FOR json_load_task; STOP ROUTINE LOAD FOR csv_load_task;
4.3 Просмотр ошибочных данных
SHOW ROUTINE LOAD; SHOW ROUTINE LOAD TASK WHERE JobName = 'csv_load_task';

V. Замечания и рекомендации по оптимизации
5.1 Проверка формата данных
JSON: следите за корректностью синтаксиса; используйте jsonlint и аналогичные инструменты.
CSV: обеспечьте единообразный разделитель; если в значениях встречается запятая, обрамляйте значение кавычками.
5.2 Производительность
desired_concurrent_number: подбирайте согласно числу партиций топика (часто равно или кратно).
Баланс задержки/пропускной способности: регулируйте
max_batch_interval, а также при необходимости добавьтеmax_batch_rowsи/илиmax_batch_size.Ресурсы StarRocks: выделите узлам BE достаточно CPU и памяти, чтобы избежать узких мест.
В продуктивных кластерах используйте
"replication_num" = "3"и выше для надёжности.
5.3 Отказоустойчивость
Установите разумный
max_error_number, чтобы единичные ошибки не останавливали импорт.Регулярно мониторьте статус заданий и оперативно обрабатывайте сбои.
Используйте репликацию StarRocks для повышения отказоустойчивости.
VI. Итоги
Показано, как с помощью Routine Load импортировать в StarRocks данные форматов JSON и CSV из Kafka. Routine Load — эффективный и стабильный механизм непрерывной загрузки, удовлетворяющий требованиям потоковой обработки. На практике необходимо с учётом структуры данных и бизнес‑контекста корректно настраивать параметры заданий и обеспечивать мониторинг, чтобы гарантировать точность и эффективность импорта. Комбинация Kafka и StarRocks позволяет построить мощную платформу для обработки и анализа данных в реальном времени, поддерживающую бизнес‑принятие решений.
Как «расплющить» JSON полностью в плоскую двумерную таблицу
Если требу��тся не хранить вложенный STRUCT, а сразу маппить его поля в отдельные столбцы, используйте схему таблицы с «плоскими» колонками и соответствующие jsonpaths.
Создание таблицы (плоская схема)
CREATE TABLE user_behavior ( user_id BIGINT COMMENT '用户ID', behavior_type STRING COMMENT '行为类型,如click、purchase等', product_id STRING COMMENT '商品ID', timestamp BIGINT COMMENT '时间戳', details_page STRING COMMENT '页面名称', details_stay_time INT COMMENT '停留时间(秒)', details_device STRING COMMENT '设备类型' ) ENGINE = OLAP PRIMARY KEY (user_id, timestamp) DISTRIBUTED BY HASH(user_id) BUCKETS 8 PROPERTIES ( "replication_num" = "1", "storage_medium" = "SSD" );
Routine Load для плоского маппинга JSON → колонки
CREATE ROUTINE LOAD db_name.json_load_task ON user_behavior COLUMNS( user_id, behavior_type, product_id, timestamp, details_page, details_stay_time, details_device ) PROPERTIES ( "format" = "json", "jsonpaths" = "[ \"$.user_id\", \"$.behavior_type\", \"$.product_id\", \"$.timestamp\", \"$.details.page\", \"$.details.stay_time\", \"$.details.device\" ]", "read_json_by_line" = "true", "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_error_number" = "1000", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "kafka-broker1:9092,kafka-broker2:9092", "kafka_topic" = "user_behavior_json", "kafka_consumer_group" = "sr_json_consumer_group", "kafka_partitions" = "0,1,2", "kafka_offsets" = "OFFSET_BEGINNING" );
Ключевой момент — корректные jsonpaths, где вложенные поля обращаются через $.details.page, $.details.stay_time, $.details.device. Это обеспечивает полное «расплющивание» исходного JSON в плоские колонки таблицы. Если сообщения приходят как массивы объектов, добавьте свойство "strip_outer_array" = "true".
