Автор(ы):
Ши Цян, архитектор решений
Чжао Хэн, StarRocks TSC Member
Введение в RAG и векторные индексы
RAG (Retrieval-Augmented Generation — извлечение с последующей генерацией) сочетает поиск внешних знаний и генерацию ответов, компенсируя статичность знаний классических LLM и склонность к «галлюцинациям». Это повышает точность и опору на актуальные данные.
Стандартный конвейер RAG:
Извлечение (Retrieval): поиск релевантного контента в внешних источниках (векторные базы, поисковые движки, традиционные БД).
Генерация (Generation): объединение запроса пользователя и найденного контекста для LLM (например, GPT, LLaMA, DeepSeek), чтобы получить усиленный фактологичный ответ.

Изображения, документы, видео и аудио предварительно преобразуются в эмбеддинги и сохраняются в векторном хранилище. Эмбеддинги — это, как правило, высокоразмерные массивы float.
Для ускорения семантического поиска применяются векторные индексы (HNSW, IVFPQ) и ANN (Approximate Nearest Neighbor).
Такой подход уменьшает нагрузку высокоразмерных вычислений и снижает риск «галлюцинаций», делая ответы естественными и надёжными.
Подробнее о векторном поиске см. материалы по StarRocks Vector Index.
Типовые сценарии на базе StarRocks + DeepSeek
DeepSeek: генерация качественных эмбеддингов и ответов.
StarRocks: высокоэффективный векторный поиск и хранение.
Вместе они образуют основу для точных и масштабируемых AI‑решений.
1) Корпоративная база знаний
Подходит для:
Поиска по внутренним документам компании, FAQ
Вопрос‑ответ в специализированных доменах (право, финансы, медицина)
Поиска по коду, документации и архитектурным описаниям
Решение:
Встраивание документов (DeepSeek): преобразование текстов в векторы.
Хранение и индексирование (StarRocks): HNSW или IVFPQ с быстрым ANN‑поиском.
RAG: запрос → эмбеддинг запроса → векторный поиск → получение контекста → генерация ответа DeepSeek с опорой на контекст.
2) AI‑поддержка и интеллектуальный вопрос‑ответ
Подходит для:
Интеллектуальных сервисов поддержки (банк, брокер, e‑commerce)
Профконсультаций (право, медицина)
Автоматического техподдержки/FAQ
Решение:
Встраивание диалогов (DeepSeek): подготовка эмбеддингов для истории чатов и намерений.
Хранение и индексирование (StarRocks): быстрый поиск похожих кейсов.
RAG: объединение истории + базы знаний + DeepSeek для генерации точных ответов.
Пример:
Пользователь: «Как сменить номер телефона, привязанный к банковской карте?»
StarRocks извлекает 3 наиболее похожих случая.
DeepSeek, учитывая их + FAQ, формирует точный, контекстный ответ.
Демонстрация: архитектура и шаги
Состав системы:
DeepSeek: текстовые эмбеддинги и генерация ответов
StarRocks: хранение и быстрый ANN‑поиск по векторным данным (векторные индексы поддерживаются с версии 3.4+)
Процесс:
Подготовка окружения — Ollama, StarRocks
Векторизация данных — DeepSeek Embedding (например, размерность 3584)
Хранение векторов — таблицы StarRocks
ANN‑поиск — индексы IVFPQ/HNSW
Усиление (Augmentation) — комбинирование извлечённого контента
Генерация ответа — DeepSeek LLM
1. Подготовка окружения
1.1 Локальное развёртывание DeepSeek через Ollama
Скачать Ollama: https://ollama.com/
Установить и запустить модель DeepSeek (пример — deepseek-r1:7b):
# Скачает и запустит модель ollama run deepseek-r1:7b
[изображение]
Совет: для использования облачного DeepSeek (официальный API) получите API Key на https://platform.deepseek.com (разделы «API Keys» или «Developer»), например: sk-xxxxxxxxxxxxxxxx.
Начало работы:
# Войти в интерактивный режим ollama run deepseek-r1:7b
Оптимизация производительности (на один запуск):
export OLLAMA_GPU_LAYERS=35 export OLLAMA_CPU_THREADS=6 export OLLAMA_BATCH_SIZE=128 export OLLAMA_CONTEXT_SIZE=4096 ollama run deepseek-r1:7b

Вывод: напрямую задавая вопросы модели, вы получите общие ответы. Для точности и контекстности нужен RAG.
1.2 Подготовка StarRocks
Требуемая версия: 3.4+
Включение векторных индексов:
-- Динамически (до перезапуска FE) ADMIN SET FRONTEND CONFIG ("enable_experimental_vector" = "true"); -- Постоянно: в fe.conf -- enable_experimental_vector = true -- затем перезапустить FE
Создание базы и таблицы хранения эмбеддингов:
CREATE DATABASE knowledge_base; CREATE TABLE enterprise_knowledge ( id BIGINT AUTO_INCREMENT, content TEXT NOT NULL, embedding ARRAY<FLOAT> NOT NULL, INDEX vec_idx (embedding) USING VECTOR ( "index_type" = "hnsw", "dim" = "3584", "metric_type" = "l2_distance", "M" = "16", "efconstruction" = "40" ) ) ENGINE=OLAP PRIMARY KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ( "replication_num" = "1" );
Примечание: модель deepseek-r1:7b часто выдаёт эмбеддинги размерности 3584.
2. Преобразование текста в эмбеддинг
Тест локального Embedding API Ollama:
curl -X POST http://localhost:11434/api/embeddings \ -d '{"model": "deepseek-r1:7b", "prompt": "Срок гарантии на продукт — один год."}'
3. Загрузка эмбеддингов в StarRocks
import pymysql import requests def get_embedding(text: str): url = "http://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} resp = requests.post(url, json=payload, timeout=30) resp.raise_for_status() return resp.json()["embedding"] try: content = "Видение StarRocks — сделать аналитику данных более простой и гибкой." embedding = get_embedding(content) # Преобразуем Python-список в формат ARRAY<FLOAT> StarRocks embedding_str = "[" + ",".join(map(str, embedding)) + "]" conn = pymysql.connect( host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True, ) with conn.cursor() as cur: sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)" cur.execute(sql, (content, embedding_str)) print(f"Inserted: {content} with embedding {embedding[:5]}...") except requests.RequestException as e: print(f"Embedding API error: {e}") except pymysql.Error as e: print(f"Database error: {e}")

4. Извлечение знаний (ANN‑поиск и результат)
Важно: чтобы запрос задействовал векторный индекс, используйте приближённые функции и формат ORDER BY согласно документации StarRocks:
Функция:
l2_distance → approx_l2_distance
cosine_similarity → approx_cosine_similarity
Формат ORDER BY: ORDER BY approx_l2_distance(vector_column, constant_array)
constant_array — константный ARRAY той же размерности, что dim.
Обязателен LIMIT N.
В ORDER BY не должно быть других столбцов.
Пример на HNSW с параметром efsearch:
import pymysql import requests import json def get_embedding(text: str): url = "http://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} resp = requests.post(url, json=payload, timeout=30) resp.raise_for_status() return resp.json()["embedding"] def search_knowledge_base(query_embedding, topk=3, efsearch=128): # Преобразуем запрос в формат ARRAY<FLOAT> qv = "[" + ",".join(map(str, query_embedding)) + "]" sql = f""" SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */ content, approx_l2_distance(embedding, {qv}) AS distance FROM enterprise_knowledge ORDER BY approx_l2_distance(embedding, {qv}) ASC LIMIT {topk} """ conn = pymysql.connect( host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base" ) try: with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() return [r[0] for r in rows] if rows else [] finally: conn.close() # Пример использования query = "Каково видение StarRocks?" qe = get_embedding(query) contexts = search_knowledge_base(qe, topk=3, efsearch=128) print(contexts)
Проверка применения индекса:
EXPLAIN SELECT content FROM enterprise_knowledge ORDER BY approx_l2_distance(embedding, [1,1,1,1,1]) ASC LIMIT 3; -- В OlapScanNode должно быть: VECTORINDEX: ON
Точный пересчёт (для повторного ранжирования top‑K):
-- допустим, получили список id из приближённого запроса SELECT content, l2_distance(embedding, [1,1,1,1,1]) AS dist FROM enterprise_knowledge WHERE id IN (/* topK ids */) ORDER BY dist ASC LIMIT 3;

5. Подключение RAG‑усиления
5.1 Формирование Prompt и генерация ответа DeepSeek
import requests, json, re def build_rag_prompt(query: str, retrieved_content: str) -> str: return f""" [Системная инструкция] Вы — интеллектуальный ассистент. Отвечайте строго по контексту. [Контекст знаний] {retrieved_content} [Вопрос пользователя] {query} Если данных недостаточно, объясните, чего не хватает. """ def clean_response(text: str) -> str: # Удаляем служебные блоки вида <think>...</think> return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() def generate_answer(prompt: str) -> str: url = "http://localhost:11434/api/generate" payload = {"model": "deepseek-r1:7b", "prompt": prompt} resp = requests.post(url, json=payload, timeout=120) resp.raise_for_status() full = "" for line in resp.text.splitlines(): if not line.strip(): continue try: obj = json.loads(line) if "response" in obj: full += obj["response"] if obj.get("done", False): break except json.JSONDecodeError: continue return clean_response(full.strip())
5.2 Журнал процесса RAG
CREATE TABLE customer_service_log ( id BIGINT AUTO_INCREMENT, user_id VARCHAR(50), question TEXT NOT NULL, question_embedding ARRAY<FLOAT> NOT NULL, retrieved_content TEXT, generated_answer TEXT, timestamp DATETIME NOT NULL, feedback TINYINT DEFAULT NULL ) ENGINE=OLAP PRIMARY KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ("replication_num" = "1");
6. Оптимизированная версия
6.1 Конвейер RAG «под ключ»
import pymysql, requests, json, logging, re from datetime import datetime logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def get_embedding(text: str): url = "http://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} r = requests.post(url, json=payload, timeout=30) r.raise_for_status() return r.json()["embedding"] def search_knowledge_base(query_embedding, topk=3, efsearch=128): qv = "[" + ",".join(map(str, query_embedding)) + "]" sql = f""" SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */ content, approx_l2_distance(embedding, {qv}) AS distance FROM enterprise_knowledge ORDER BY approx_l2_distance(embedding, {qv}) ASC LIMIT {topk} """ conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base") try: with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() return "\n\n---\n\n".join([r[0] for r in rows]) if rows else "" finally: conn.close() def build_rag_prompt(query: str, retrieved_content: str) -> str: return f""" [Системная инструкция] Вы — корпоративный ассистент. Отвечайте кратко и по делу, строго по контексту. [Контекст знаний] {retrieved_content} [Вопрос] {query} """ def clean_response(text: str) -> str: return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() def generate_answer(prompt: str) -> str: url = "http://localhost:11434/api/generate" payload = {"model": "deepseek-r1:7b", "prompt": prompt} r = requests.post(url, json=payload, timeout=120) r.raise_for_status() full = "" for line in r.text.splitlines(): if not line.strip(): continue try: obj = json.loads(line) if "response" in obj: full += obj["response"] if obj.get("done", False): break except json.JSONDecodeError: continue return clean_response(full.strip()) def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer): emb = "[" + ",".join(map(str, question_embedding)) + "]" sql = """ INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp) VALUES (%s, %s, %s, %s, %s, NOW()) """ conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True) try: with conn.cursor() as cur: cur.execute(sql, (user_id, question, emb, retrieved_content, generated_answer)) finally: conn.close() def rag_pipeline(user_id: str, query: str) -> str: try: qe = get_embedding(query) ctx = search_knowledge_base(qe, topk=3, efsearch=128) prompt = build_rag_prompt(query, ctx) ans = generate_answer(prompt) log_conversation(user_id, query, qe, ctx, ans) return ans except Exception as e: logging.exception("RAG pipeline failed") return "Ошибка обработки запроса." if __name__ == "__main__": user_id = "user123" query = "Каково видение StarRocks?" print("Ответ:\n", rag_pipeline(user_id, query))

Суммарный конвейер:
Ввод вопроса
Эмбеддинг (DeepSeek)
ANN‑поиск в enterprise_knowledge (StarRocks Vector Index)
Формирование prompt (Augmentation)
Генерация ответа (DeepSeek)
Логирование (customer_service_log)
Возврат результата пользователю
6.2 Веб‑интерфейс (минимальный)
<!DOCTYPE html> <html lang="ru"> <head> <meta charset="UTF-8" /> <title>Интеллектуальная система вопросов и ответов</title> <meta name="viewport" content="width=device-width,initial-scale=1" /> <script> async function askQuestion() { const question = document.getElementById("question").value; const resp = await fetch("/ask", { method: "POST", headers: {"Content-Type": "application/json"}, body: JSON.stringify({ question }) }); const data = await resp.json(); document.getElementById("answer").innerText = data.answer; } </script> </head> <body> <h1>Интеллектуальная система вопросов и ответов</h1> <input id="question" type="text" placeholder="Введите ваш вопрос" /> <button onclick="askQuestion()">Спросить</button> <pre id="answer"></pre> </body> </html>
6.3 Полноценный бэкенд Q&A (Flask)
import pymysql, requests, json, logging, re from flask import Flask, request, jsonify, render_template_string app = Flask(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") INDEX_HTML = """<!DOCTYPE html><html lang="ru"><head> <meta charset="UTF-8"><title>Интеллектуальная система вопросов и ответов</title> <meta name="viewport" content="width=device-width,initial-scale=1" /> <script> async function askQuestion(){ const q=document.getElementById('question').value; const r=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({question:q})}); const d=await r.json(); document.getElementById('answer').innerText=d.answer; } </script></head><body> <h1>Интеллектуальная система вопросов и ответов</h1> <input id="question" type="text" placeholder="Введите ваш вопрос" /> <button onclick="askQuestion()">Спросить</button> <pre id="answer"></pre> </body></html>""" def get_embedding(text: str): url = "http://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} r = requests.post(url, json=payload, timeout=30) r.raise_for_status() return r.json()["embedding"] def search_knowledge_base(query_embedding, topk=3, efsearch=128): qv = "[" + ",".join(map(str, query_embedding)) + "]" sql = f""" SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */ content, approx_l2_distance(embedding, {qv}) AS distance FROM enterprise_knowledge ORDER BY approx_l2_distance(embedding, {qv}) ASC LIMIT {topk} """ conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base") try: with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() return "\n\n---\n\n".join([r[0] for r in rows]) if rows else "" finally: conn.close() def build_rag_prompt(query: str, retrieved_content: str) -> str: return f""" [Системная инструкция] Вы — корпоративный ассистент. Отвечайте строго по контексту. [Контекст] {retrieved_content} [Вопрос] {query} """ def clean_response(text: str) -> str: return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() def generate_answer(prompt: str) -> str: url = "http://localhost:11434/api/generate" payload = {"model": "deepseek-r1:7b", "prompt": prompt} r = requests.post(url, json=payload, timeout=120) r.raise_for_status() full = "" for line in r.text.splitlines(): if not line.strip(): continue try: obj = json.loads(line) if "response" in obj: full += obj["response"] if obj.get("done", False): break except json.JSONDecodeError: continue return clean_response(full.strip()) def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer): emb = "[" + ",".join(map(str, question_embedding)) + "]" sql = """ INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp) VALUES (%s, %s, %s, %s, %s, NOW()) """ conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True) try: with conn.cursor() as cur: cur.execute(sql, (user_id, question, emb, retrieved_content, generated_answer)) finally: conn.close() def rag_pipeline(user_id: str, query: str) -> str: qe = get_embedding(query) ctx = search_knowledge_base(qe, topk=3, efsearch=128) prompt = build_rag_prompt(query, ctx) ans = generate_answer(prompt) log_conversation(user_id, query, qe, ctx, ans) return ans @app.route("/") def index(): return render_template_string(INDEX_HTML) @app.route("/ask", methods=["POST"]) def ask(): data = request.get_json(force=True) question = data.get("question", "").strip() if not question: return jsonify({"answer": "Пожалуйста, введите вопрос."}) answer = rag_pipeline("sr_01", question) return jsonify({"answer": f"Вопрос: {question}\n\nОтвет: {answer}"}) if __name__ == "__main__": app.run(host="0.0.0.0", port=9033, debug=True)



Примечания по настройке векторного индекса StarRocks (HNSW/IVFPQ)
index_type: hnsw | ivfpq
dim: размерность эмбеддинга (например, 3584)
metric_type: l2_distance | cosine_similarity
HNSW‑параметры:
M (двунаправленных связей на вершину, по умолчанию 16)
efconstruction (размер кандидатов при построении графа, по умолчанию 40)
efsearch (через /*+ SET_VAR(ann_params) */ при запросе; выше — точнее, медленнее)
IVFPQ‑параметры:
nlist (число кластеров)
nbits (точность PQ, кратно 8)
M_IVFPQ (число субвекторов; делитель dim)
Поисковые: nprobe, max_codes, polysemous_ht, range_search_confidence
Требования к запросам, чтобы индекс применился:
ORDER BY только по approx_l2_distance(колонка, константный_массив) или approx_cosine_similarity(...)
Константный массив — ARRAY с длиной = dim
Обязателен LIMIT N
Предикаты строить на той же функции и согласовывать с направлением сортировки (ASC для l2_distance, DESC для cosine_similarity)
Проверка:
EXPLAIN ... -- VECTORINDEX: ON в OlapScanNode
Справочные материалы
Vector Index (StarRocks): https://docs.starrocks.io/zh/docs/table_design/indexes/vector_index/
DeepSeek: https://platform.deepseek.com
Ollama: https://ollama.com/
