Руководство «Быстрый старт по StarRocks Lakehouse» помогает быстро разобраться с технологиями Lakehouse (лейкхаус): ключевые особенности, уникальные преимущества, сценарии использования и то, как со StarRocks оперативно собрать решение. В финале — лучшие практики на основе реальных сценариев.
В этой статье представлены лучшие практики StarRocks Hive Catalog на примере прикладной задачи управления заказами.
Введение в Apache Hive
Apache Hive — распределённая, отказоустойчивая система хранилища данных, поддерживающая масштабную аналитику. Hive Metastore (HMS) предоставляет репозиторий метаданных, которые можно анализировать для принятия решений на основе данных; потому HMS — ключевой компонент во многих архитектурах озёр данных.
Hive построен поверх Apache Hadoop и через HDFS поддерживает управление данными в хранилищах S3, Azure Data Lake Storage (ADLS), Google Cloud Storage (GCS) и др. Hive позволяет с помощью SQL читать, записывать и управлять данными объёмов до петабайт.
Архитектура и ключевые особенности

HiveServer2 (HS2): поддержка параллельной работы множества клиентов и аутентификации; улучшенная поддержка JDBC и ODBC.
Hive Metastore Server (HMS): хранение метаданных таблиц и партиций; поддержка множества OSS‑инструментов (Spark, Presto и др.); важная часть озера данных.
Hive ACID: полная поддержка ACID для таблиц ORC; поддержка только вставок для других форматов.
Hive Iceberg: нативная поддержка таблиц Apache Iceberg через Hive StorageHandler; подходит для облачных высокопроизводительных сценариев.
Безопасность и наблюдаемость: поддержка Kerberos, интеграция с Apache Ranger и Apache Atlas.
Hive LLAP: интерактивные SQL‑запросы с низкой задержкой; оптимизация кэширования и ускорение запросов.
Оптимизация запросов: оптимизатор на основе стоимости (CBO) на базе Apache Calcite.
Преимущества Apache Hive
Функции DWH: базы данных, таблицы, партиции и др. упрощают управление и запросы.
Несколько движков выполнения: MapReduce, Tez, Spark — можно выбирать для нужной производительности.
Расширяемость: пользовательские функции (UDF), интеграция с инструментами экосистемы Hadoop — гибкость обработки.
Пакетная обработка: особенно подходит для крупномасштабной аналитики, отчётности и ETL.
Простая интеграция: Flume, Sqoop, Oozie и др. — расширение возможностей обработки больших данных.
Сценарии использования Apache Hive
Хранилище данных: преобразует данные Hadoop в SQL‑представление, предоставляя функции DWH.
Аналитика данных: HiveQL‑запросы, агрегирование и фильтрация для масштабной аналитики.
Интеллектуальный анализ данных (data mining): интеграция с ML‑инструментами для добычи знаний и поиска паттернов.
ETL‑операции: анализ логов и обработка исторических данных для оптимизации производительности и понимания поведения пользователей.
Офлайн‑обработка: подходит для офлайн‑сценариев обработки больших данных; пакетные движки поддерживают крупные задания.
Интеграция инструментов: бесшовная работа с Apache Spark, Mahout и др. для ускорения запросов и моделирования данных.
StarRocks Hive Catalog
Hive — классический движок на базе MapReduce, широко применяемый для пакетной и офлайн‑аналитики. В задачах аналитики в реальном времени ему часто не хватает производительности запросов и эффективности использования ресурсов.
StarRocks — это MPP‑СУБД, которая быстро обрабатывает сложные запросы на больших наборах данных, поддерживает аналитику в реальном времени и обеспечивает быстрый отклик — для сценариев, где важна мгновенная обратная связь.
StarRocks не только эффективно анализирует локально хранимые данные, но и выступает вычислительным движком для прямого анализа данных в озере (data lake). С помощью External Catalog пользователи могут без миграции данных запрашивать данные из Apache Hive, Apache Iceberg, Apache Hudi, Delta Lake и др. Поддерживаемые системы хранения: HDFS, S3, OSS; форматы — Parquet, ORC, CSV.
Благодаря StarRocks Hive Catalog достигается бесшовная интеграция StarRocks и Hive, объединяющая их сильные стороны. В аналитике по озеру данных StarRocks отвечает за вычисления и анализ, а озеро — за хранение, организацию и поддержку данных. Преимущества озера — открытые форматы хранения и гибкая схема (schema), что обеспечивает для BI/AI/ad‑hoc/отчётности единый источник истины (single source of truth). В роли вычислительного движка для озера StarRocks задействует векторизованный движок и CBO, существенно ускоряя аналитику.
Модель данных

Эволюция технической архитектуры
StarRocks Hive Catalog может эволюционировать так:
Прямые запросы к данным таблиц Hive →
Ускорение запросов к таблицам Hive с помощью Data Cache →
Ускорение с помощью Data Cache и асинхронных материализованных представлений.
Прямые запросы к результатам через StarRocks Hive Catalog: весь ETL выполняется в Hive; StarRocks через Hive Catalog запрашивает витрины DWD, DWS и ADS.
Запросы «на лету» с использованием StarRocks Hive Catalog + Data Cache: в Hive формируются только ODS и DWD; уровни DWS и ADS считаются на лету (on the fly) через Hive Catalog StarRocks.
Ускорение асинхронными материализованными представлениями: в Hive остаётся лишь ODS; уровни DWD и DWS строятся асинхронными MVs StarRocks; ADS запрашивается напрямую.

Быстрый старт
1. Базовая среда
Component | Version |
Zookeeper | 3.5.7 |
HDFS | 3.3.4 |
Hive | 3.1.2 |
StarRocks | 3.3.0 |
Node | Deployed Services | Machine Specifications |
StarRocks-BE | 8C32G 2T High-performance Cloud Disk | |
Zookeeper | ||
DataNode | ||
JournalNode | ||
NameNode | ||
StarRocks-FE | 8C64G 2T High-performance Cloud Disk | |
StarRocks-BE | ||
Zookeeper | ||
DataNode | ||
JournalNode | ||
NameNode | ||
HiveServer | ||
HiveMetaStore | ||
StarRocks-BE | 8C64G 1T High-performance Cloud Disk | |
Zookeeper | ||
DataNode | ||
JournalNode | ||
NameNode |
2. Создание таблиц в Hive
CREATE DATABASE orders; -- ODS: вспомогательная внешняя таблица для загрузки локально сгенерированных тестовых данных CREATE EXTERNAL TABLE IF NOT EXISTS ods_orders_text ( order_id STRING, user_id STRING, order_time STRING, product_id STRING, quantity INT, price DECIMAL(10, 2), order_status STRING ) COMMENT 'Таблица хранения операционных данных заказов' ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; CREATE TABLE IF NOT EXISTS ods_orders ( order_id INT, user_id INT, order_time STRING, product_id INT, quantity INT, price DOUBLE, order_status STRING ) COMMENT 'Таблица хранения операционных данных заказов' PARTITIONED BY (order_date STRING) STORED AS PARQUET; CREATE TABLE IF NOT EXISTS dim_products ( product_id INT, product_name STRING, category_id INT, price DECIMAL(10, 2), product_description STRING ) COMMENT 'Таблица измерений продуктов' STORED AS PARQUET; CREATE TABLE IF NOT EXISTS dim_categories ( category_id INT, category_name STRING, category_description STRING ) COMMENT 'Таблица измерений категорий' STORED AS PARQUET; -- DWD CREATE TABLE IF NOT EXISTS dwd_order_facts ( order_id STRING, user_id STRING, order_time STRING, product_id STRING, quantity INT, price DECIMAL(10, 2), order_status STRING, product_name STRING, category_id STRING, category_name STRING ) COMMENT 'Фактовая таблица заказов' PARTITIONED BY (order_date DATE) STORED AS PARQUET;
3. Генерация данных
3.1 Таблицы измерений
-- Вспомогательная таблица для генерации последовательности CREATE TABLE aux_order_data (seq_num INT);
#!/usr/bin/env python3 with open('aux_order_data.txt', 'w') as f: for i in range(1, 10000001): f.write("{}\n".format(i))
LOAD DATA LOCAL INPATH '/home/disk1/sr/aux_order_data.txt' INTO TABLE aux_order_data; INSERT INTO dim_products SELECT floor(RAND() * 10000) + 1 AS product_id, CONCAT('Название продукта', floor(RAND() * 10000) + 1) AS product_name, floor(RAND() * 1000) + 1 AS category_id, ROUND(100 + RAND() * 5000, 2) AS price, CONCAT('Описание продукта', floor(RAND() * 100)) AS product_description FROM aux_order_data a CROSS JOIN aux_order_data b LIMIT 10000; INSERT INTO dim_categories SELECT floor(RAND() * 1000) + 1 AS category_id, CONCAT('Название категории', floor(RAND() * 1000) + 1) AS category_name, CONCAT('Описание категории', floor(RAND() * 100)) AS category_description FROM aux_order_data a CROSS JOIN aux_order_data b LIMIT 1000;
3.2 Данные ODS
Отдельно формируем данные за 3–5 августа 2024 года.
#!/usr/bin/env python3 import random import time def generate_order_data(num_records): with open('ods_orders.txt', 'w') as f: for i in range(1, num_records + 1): order_id = i user_id = random.randint(1, 1000) # временные метки охватывают 3–5 августа order_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(random.randint(1722614400, 1722700800))) product_id = random.randint(1, 10000) quantity = random.randint(1, 10) price = round(random.uniform(10, 1000), 2) order_status = '已完成' if random.random() < 0.9 else '已取消' f.write(f"{order_id},{user_id},{order_time},{product_id},{quantity},{price},{order_status}\n") generate_order_data(10000000)
LOAD DATA LOCAL INPATH '/home/disk1/sr/ods_orders.txt' INTO TABLE ods_orders_text; INSERT OVERWRITE TABLE ods_orders PARTITION (order_date) SELECT CAST(order_id AS INT), CAST(user_id AS INT), order_time, CAST(product_id AS INT), CAST(quantity AS INT), CAST(price AS DOUBLE), order_status, substr(order_time, 1, 10) AS order_date FROM ods_orders_text;
3.3 Данные DWD
INSERT OVERWRITE TABLE dwd_order_facts PARTITION (order_date) SELECT o.order_id, o.user_id, o.order_time, o.product_id, o.quantity, o.price, COALESCE(o.order_status, 'UNKNOWN'), p.product_name, p.category_id, c.category_name, o.order_date FROM ods_orders o JOIN dim_products p ON o.product_id = p.product_id JOIN dim_categories c ON p.category_id = c.category_id WHERE o.price > 0;
3.4 Данные DWS
CREATE TABLE IF NOT EXISTS dws_order_aggregates ( user_id STRING, category_name STRING, order_date DATE, total_quantity INT, total_revenue DECIMAL(10, 2), total_orders INT ) COMMENT 'Агрегированная витрина заказов' STORED AS PARQUET; INSERT OVERWRITE TABLE dws_order_aggregates SELECT user_id, category_name, order_date, SUM(quantity) AS total_quantity, SUM(price * quantity) AS total_revenue, COUNT(DISTINCT order_id) AS total_orders FROM dwd_order_facts WHERE order_status = '已完成' GROUP BY user_id, category_name, order_date;
3.5 Данные ADS
CREATE TABLE IF NOT EXISTS ads_product_order_report ( category_name STRING, report_date STRING, total_orders INT, total_quantity INT, total_revenue DECIMAL(10, 2) ) COMMENT 'Таблица отчёта по топ‑товарам' STORED AS PARQUET; WITH ranked_category_sales AS ( SELECT category_name, order_date, total_quantity, total_revenue, total_orders, ROW_NUMBER() OVER (PARTITION BY order_date ORDER BY total_revenue DESC) AS revenue_rank FROM dws_order_aggregates ) INSERT OVERWRITE TABLE ads_product_order_report SELECT category_name, order_date, total_quantity, total_revenue, total_orders FROM ranked_category_sales WHERE revenue_rank <= 10;
4. Подключение Hive Catalog
Скопируйте конфигурацию Hive/Hadoop и перезапустите службы StarRocks:
scp hive-site.xml hdfs-site.xml core-site.xml sr@node:/home/disk1/sr/fe/conf scp hdfs-site.xml core-site.xml sr@node:/home/disk1/sr/be/conf ./bin/stop_be.sh ./bin/start_be.sh --daemon ./bin/stop_fe.sh ./bin/start_fe.sh --daemon
5. Аналитика по озеру данных
5.1 Запрос результатов Hive через Hive Catalog
CREATE EXTERNAL CATALOG `hive_catalog_krb5_sr` PROPERTIES ( "hive.metastore.type" = "hive", "hive.metastore.uris" = "thrift://cs02.starrocks.com:9083", "type" = "hive" ); SET CATALOG hive_catalog_krb5_sr; USE orders; -- DWS SELECT * FROM dws_order_aggregates; -- ADS SELECT * FROM ads_product_order_report;
5.2 Запросы «на лету» с использованием StarRocks Hive Catalog + Data Cache
Включите Data Cache и при необходимости выполните предзагрузку кэша.
-- включить Data Cache на уровне сессии SET enable_scan_datacache = true;
В конфигурации BE (be.conf):
datacache_disk_path = /data2/datacache datacache_enable = true datacache_disk_size = 200G
Предзагрузка кэша:
CACHE SELECT * FROM hive_catalog_krb5_sr.orders.dwd_order_facts;
Онлайновые витрины:
SET CATALOG hive_catalog_krb5_sr; USE orders; -- DWS WITH dwd AS ( SELECT user_id, category_name, order_date, quantity, price, order_id, order_status FROM dwd_order_facts ) SELECT user_id, category_name, order_date, SUM(quantity) AS total_quantity, SUM(price * quantity) AS total_revenue, COUNT(DISTINCT order_id) AS total_orders FROM dwd WHERE order_status = '已完成' GROUP BY user_id, category_name, order_date; -- ADS WITH dws_order_aggregates AS ( SELECT user_id, category_name, order_date, SUM(quantity) AS total_quantity, SUM(price * quantity) AS total_revenue, COUNT(DISTINCT order_id) AS total_orders FROM dwd_order_facts WHERE order_status = '已完成' GROUP BY user_id, category_name, order_date ), ranked_category_sales AS ( SELECT category_name, order_date, total_quantity, total_revenue, total_orders, ROW_NUMBER() OVER (PARTITION BY order_date ORDER BY total_revenue DESC) AS revenue_rank FROM dws_order_aggregates ) SELECT category_name, order_date, total_quantity, total_revenue, total_orders FROM ranked_category_sales WHERE revenue_rank <= 10;
5.3 Ускорение за счёт асинхронных материализованных представлений
SET CATALOG default_catalog; USE orders; CREATE MATERIALIZED VIEW dwd_order_facts_mv PARTITION BY str2date(order_date, '%Y-%m-%d') DISTRIBUTED BY HASH(`order_id`) BUCKETS 12 PROPERTIES ("replication_num" = "3") REFRESH ASYNC START('2024-08-01 01:00:00') EVERY (interval 1 day) AS SELECT o.order_date, o.order_id, o.user_id, o.order_time, o.product_id, o.quantity, o.price, COALESCE(o.order_status, 'UNKNOWN') AS order_status, p.product_name, p.category_id, c.category_name FROM hive_catalog_krb5_sr.orders.ods_orders o JOIN hive_catalog_krb5_sr.orders.dim_products p ON o.product_id = p.product_id JOIN hive_catalog_krb5_sr.orders.dim_categories c ON p.category_id = c.category_id WHERE o.price > 0;
Автоматическое обнаружение новых партиций (пример с 6 августа):
#!/usr/bin/env python3 import random import time def generate_order_data(num_records): with open('ods_orders_0806.txt', 'w') as f: for i in range(1, num_records + 1): order_id = i user_id = random.randint(1, 1000) order_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(random.randint(1722873600, 1722959999))) product_id = random.randint(1, 10000) quantity = random.randint(1, 10) price = round(random.uniform(10, 1000), 2) order_status = '已完成' if random.random() < 0.9 else '已取消' f.write(f"{order_id},{user_id},{order_time},{product_id},{quantity},{price},{order_status}\n") generate_order_data(10000000)
На стороне Hive:
LOAD DATA LOCAL INPATH '/home/disk1/sr/ods_orders_0806.txt' INTO TABLE ods_orders_text; -- Добавляем новые данные с автоматическим формированием значения партиции INSERT OVERWRITE TABLE ods_orders PARTITION (order_date) SELECT CAST(order_id AS INT), CAST(user_id AS INT), order_time, CAST(product_id AS INT), CAST(quantity AS INT), CAST(price AS DOUBLE), order_status, substr(order_time, 1, 10) AS order_date FROM ods_orders_text WHERE substr(order_time, 1, 10) = '2024-08-06';
Обновление и проверка MV:
-- ручной запуск обновления REFRESH MATERIALIZED VIEW dwd_order_facts_mv; -- проверяем статус обновления (должен быть SUCCESS) SELECT * FROM information_schema.task_runs ORDER BY CREATE_TIME DESC LIMIT 1; -- проверяем, что MV видит новые данные SELECT order_date, COUNT(1) FROM dwd_order_facts_mv GROUP BY order_date;
DWS на основе MV:
SET CATALOG default_catalog; USE orders; SELECT user_id, category_name, order_date, SUM(quantity) AS total_quantity, SUM(price * quantity) AS total_revenue, COUNT(DISTINCT order_id) AS total_orders FROM dwd_order_facts_mv WHERE order_status = '已完成' GROUP BY user_id, category_name, order_date;
ADS на основе MV:
SET CATALOG default_catalog; USE orders; CREATE MATERIALIZED VIEW dws_order_aggregates_mv PARTITION BY str2date(order_date, '%Y-%m-%d') DISTRIBUTED BY HASH(`user_id`) BUCKETS 12 PROPERTIES ("replication_num" = "3") REFRESH ASYNC START('2024-08-01 04:00:00') EVERY (interval 1 day) AS SELECT user_id, category_name, order_date, SUM(quantity) AS total_quantity, SUM(price * quantity) AS total_revenue, COUNT(DISTINCT order_id) AS total_orders FROM dwd_order_facts_mv WHERE order_status = '已完成' GROUP BY user_id, category_name, order_date; -- ручной запуск обновления REFRESH MATERIALIZED VIEW dws_order_aggregates_mv; -- проверяем статус обновления (должен быть SUCCESS) SELECT * FROM information_schema.task_runs ORDER BY CREATE_TIME DESC LIMIT 1; WITH ranked_category_sales AS ( SELECT category_name, order_date, total_quantity, total_revenue, total_orders, ROW_NUMBER() OVER (PARTITION BY order_date ORDER BY total_revenue DESC) AS revenue_rank FROM dws_order_aggregates_mv ) -- топ‑10 категорий SELECT category_name, order_date, total_quantity, total_revenue, total_orders FROM ranked_category_sales WHERE revenue_rank <= 10;
Итоги
В большинстве случаев связка StarRocks Hive Catalog + Data Cache отлично покрывает потребности аналитики по озеру данных.
Асинхронные материализованные представления StarRocks упрощают ETL‑процессы, снижают сложность бизнес‑логики и обеспечивают высокую производительность запросов.
