Автор(ы):
Ши Цян, архитектор решений
Чжао Хэн, StarRocks TSC Member
Введение в RAG и векторные индексы
RAG (Retrieval-Augmented Generation — извлечение с последующей генерацией) сочетает поиск внешних знаний и генерацию ответов, компенсируя статичность знаний классических LLM и склонность к «галлюцинациям». Это повышает точность и опору на актуальные данные.
Стандартный конвейер RAG:
Извлечение (Retrieval): поиск релевантного контента в внешних источниках (векторные базы, поисковые движки, традиционные БД).
Генерация (Generation): объединение запроса пользователя и найденного контекста для LLM (например, GPT, LLaMA, DeepSeek), чтобы получить усиленный фактологичный ответ.

Изображения, документы, видео и аудио предварительно преобразуются в эмбеддинги и сохраняются в векторном хранилище. Эмбеддинги — это, как правило, высокоразмерные массивы float.
Для ускорения семантического поиска применяются векторные индексы (HNSW, IVFPQ) и ANN (Approximate Nearest Neighbor).
Такой подход уменьшает нагрузку высокоразмерных вычислений и снижает риск «галлюцинаций», делая ответы естественными и надёжными.
Подробнее о векторном поиске см. материалы по StarRocks Vector Index.
Типовые сценарии на базе StarRocks + DeepSeek
DeepSeek: генерация качественных эмбеддингов и ответов.
StarRocks: высокоэффективный векторный поиск и хранение.
Вместе они образуют основу для точных и масштабируемых AI‑решений.
1) Корпоративная база знаний
Подходит для:
Поиска по внутренним документам компании, FAQ
Вопрос‑ответ в специализированных доменах (право, финансы, медицина)
Поиска по коду, документации и архитектурным описаниям
Решение:
Встраивание документов (DeepSeek): преобразование текстов в векторы.
Хранение и индексирование (StarRocks): HNSW или IVFPQ с быстрым ANN‑поиском.
RAG: запрос → эмбеддинг запроса → векторный поиск → получение контекста → генерация ответа DeepSeek с опорой на контекст.
2) AI‑поддержка и интеллектуальный вопрос‑ответ
Подходит для:
��нтеллектуальных сервисов поддержки (банк, брокер, e‑commerce)
Профконсультаций (право, медицина)
Автоматического техподдержки/FAQ
Решение:
Встраивание диалогов (DeepSeek): подготовка эмбеддингов для истории чатов и намерений.
Хранение и индексирование (StarRocks): быстрый поиск похожих кейсов.
RAG: объединение истории + базы знаний + DeepSeek для генерации точных ответов.
Пример:
Пользователь: «Как сменить номер телефона, привязанный к банковской карте?»
StarRocks извлекает 3 наиболее похожих случая.
DeepSeek, учитывая их + FAQ, формирует точный, контекстный ответ.
Демонстрация: архитектура и шаги
Состав системы:
DeepSeek: текстовые эмбеддинги и генерация ответов
StarRocks: хранение и быстрый ANN‑поиск по векторным данным (векторные индексы поддерживаются с версии 3.4+)
Процесс:
Подготовка окружения — Ollama, StarRocks
Векторизация данных — DeepSeek Embedding (например, размерность 3584)
Хранение векторов — таблицы StarRocks
ANN‑поиск — индексы IVFPQ/HNSW
Усиление (Augmentation) — комбинирование извлечённого контента
Генерация ответа — DeepSeek LLM
1. Подготовка окружения
1.1 Локальное развёртывание DeepSeek через Ollama
Скачать Ollama: https://ollama.com/
Установить и запустить модель DeepSeek (пример — deepseek-r1:7b):
# Скачает и запустит модель
ollama run deepseek-r1:7b
[изображение]
Совет: для использования облачного DeepSeek (официальный API) получите API Key на https://platform.deepseek.com (разделы «API Keys» или «Developer»), например: sk-xxxxxxxxxxxxxxxx.
Начало работы:
# Войти в интерактивный режим
ollama run deepseek-r1:7b
Оптимизация производительности (на один запуск):
export OLLAMA_GPU_LAYERS=35
export OLLAMA_CPU_THREADS=6
export OLLAMA_BATCH_SIZE=128
export OLLAMA_CONTEXT_SIZE=4096
ollama run deepseek-r1:7b

Вывод: напрямую задавая вопросы модели, вы получите общие ответы. Для точности и контекстности нужен RAG.
1.2 Подготовка StarRocks
Требуемая версия: 3.4+
Включение векторных индексов:
-- Динамически (до перезапуска FE)
ADMIN SET FRONTEND CONFIG ("enable_experimental_vector" = "true");
-- Постоянно: в fe.conf
-- enable_experimental_vector = true
-- затем перезапустить FE
Создание базы и таблицы хранения эмбеддингов:
CREATE DATABASE knowledge_base;
CREATE TABLE enterprise_knowledge (
id BIGINT AUTO_INCREMENT,
content TEXT NOT NULL,
embedding ARRAY<FLOAT> NOT NULL,
INDEX vec_idx (embedding) USING VECTOR (
"index_type" = "hnsw",
"dim" = "3584",
"metric_type" = "l2_distance",
"M" = "16",
"efconstruction" = "40"
)
) ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
Примечание: модель deepseek-r1:7b часто выдаёт эмбеддинги размерности 3584.
2. Преобразование текста в эмбеддинг
Тест локального Embedding API Ollama:
curl -X POST http://localhost:11434/api/embeddings \
-d '{"model": "deepseek-r1:7b", "prompt": "Срок гарантии на продукт — один год."}'
3. Загрузка эмбеддингов в StarRocks
import pymysql
import requests
def get_embedding(text: str):
url = "http://localhost:11434/api/embeddings"
payload = {"model": "deepseek-r1:7b", "prompt": text}
resp = requests.post(url, json=payload, timeout=30)
resp.raise_for_status()
return resp.json()["embedding"]
try:
content = "Видение StarRocks — сделать аналитику данных более простой и гибкой."
embedding = get_embedding(content)
# Преобразуем Python-список в формат ARRAY<FLOAT> StarRocks
embedding_str = "[" + ",".join(map(str, embedding)) + "]"
conn = pymysql.connect(
host="X.X.X.X",
port=9030,
user="root",
password="sr123456",
database="knowledge_base",
autocommit=True,
)
with conn.cursor() as cur:
sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)"
cur.execute(sql, (content, embedding_str))
print(f"Inserted: {content} with embedding {embedding[:5]}...")
except requests.RequestException as e:
print(f"Embedding API error: {e}")
except pymysql.Error as e:
print(f"Database error: {e}")

4. Извлечение знаний (ANN‑поиск и результат)
Важно: чтобы запрос задействовал векторный индекс, используйте приближённые функции и формат ORDER BY согласно документации StarRocks:
Функция:
l2_distance → approx_l2_distance
cosine_similarity → approx_cosine_similarity
Формат ORDER BY: ORDER BY approx_l2_distance(vector_column, constant_array)
constant_array — константный ARRAY той же размерности, что dim.
Обязателен LIMIT N.
В ORDER BY не должно быть других столбцов.
Пример на HNSW с параметром efsearch:
import pymysql
import requests
import json
def get_embedding(text: str):
url = "http://localhost:11434/api/embeddings"
payload = {"model": "deepseek-r1:7b", "prompt": text}
resp = requests.post(url, json=payload, timeout=30)
resp.raise_for_status()
return resp.json()["embedding"]
def search_knowledge_base(query_embedding, topk=3, efsearch=128):
# Преобразуем запрос в формат ARRAY<FLOAT>
qv = "[" + ",".join(map(str, query_embedding)) + "]"
sql = f"""
SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */
content,
approx_l2_distance(embedding, {qv}) AS distance
FROM enterprise_knowledge
ORDER BY approx_l2_distance(embedding, {qv}) ASC
LIMIT {topk}
"""
conn = pymysql.connect(
host="X.X.X.X",
port=9030,
user="root",
password="sr123456",
database="knowledge_base"
)
try:
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
return [r[0] for r in rows] if rows else []
finally:
conn.close()
# Пример использования
query = "Каково видение StarRocks?"
qe = get_embedding(query)
contexts = search_knowledge_base(qe, topk=3, efsearch=128)
print(contexts)
Проверка применения индекса:
EXPLAIN
SELECT
content
FROM enterprise_knowledge
ORDER BY approx_l2_distance(embedding, [1,1,1,1,1]) ASC
LIMIT 3;
-- В OlapScanNode должно быть: VECTORINDEX: ON
Точный пересчёт (для повторного ранжирования top‑K):
-- допустим, получили список id из приближённого запроса
SELECT content, l2_distance(embedding, [1,1,1,1,1]) AS dist
FROM enterprise_knowledge
WHERE id IN (/* topK ids */)
ORDER BY dist ASC
LIMIT 3;

5. Подключение RAG‑усиления
5.1 Формирование Prompt и генерация ответа DeepSeek
import requests, json, re
def build_rag_prompt(query: str, retrieved_content: str) -> str:
return f"""
[Системная инструкция] Вы — интеллектуальный ассистент. Отвечайте строго по контексту.
[Контекст знаний]
{retrieved_content}
[Вопрос пользователя]
{query}
Если данных недостаточно, объясните, чего не хватает.
"""
def clean_response(text: str) -> str:
# Удаляем служебные блоки вида <think>...</think>
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
def generate_answer(prompt: str) -> str:
url = "http://localhost:11434/api/generate"
payload = {"model": "deepseek-r1:7b", "prompt": prompt}
resp = requests.post(url, json=payload, timeout=120)
resp.raise_for_status()
full = ""
for line in resp.text.splitlines():
if not line.strip():
continue
try:
obj = json.loads(line)
if "response" in obj:
full += obj["response"]
if obj.get("done", False):
break
except json.JSONDecodeError:
continue
return clean_response(full.strip())
5.2 Журнал процесса RAG
CREATE TABLE customer_service_log (
id BIGINT AUTO_INCREMENT,
user_id VARCHAR(50),
question TEXT NOT NULL,
question_embedding ARRAY<FLOAT> NOT NULL,
retrieved_content TEXT,
generated_answer TEXT,
timestamp DATETIME NOT NULL,
feedback TINYINT DEFAULT NULL
) ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES ("replication_num" = "1");
6. Оптимизированная версия
6.1 Конвейер RAG «под ключ»
import pymysql, requests, json, logging, re
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def get_embedding(text: str):
url = "http://localhost:11434/api/embeddings"
payload = {"model": "deepseek-r1:7b", "prompt": text}
r = requests.post(url, json=payload, timeout=30)
r.raise_for_status()
return r.json()["embedding"]
def search_knowledge_base(query_embedding, topk=3, efsearch=128):
qv = "[" + ",".join(map(str, query_embedding)) + "]"
sql = f"""
SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */
content,
approx_l2_distance(embedding, {qv}) AS distance
FROM enterprise_knowledge
ORDER BY approx_l2_distance(embedding, {qv}) ASC
LIMIT {topk}
"""
conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base")
try:
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
return "\n\n---\n\n".join([r[0] for r in rows]) if rows else ""
finally:
conn.close()
def build_rag_prompt(query: str, retrieved_content: str) -> str:
return f"""
[Системная инструкция] Вы — корпоративный ассистент. Отвечайте кратко и по делу, строго по контексту.
[Контекст знаний]
{retrieved_content}
[Вопрос]
{query}
"""
def clean_response(text: str) -> str:
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
def generate_answer(prompt: str) -> str:
url = "http://localhost:11434/api/generate"
payload = {"model": "deepseek-r1:7b", "prompt": prompt}
r = requests.post(url, json=payload, timeout=120)
r.raise_for_status()
full = ""
for line in r.text.splitlines():
if not line.strip():
continue
try:
obj = json.loads(line)
if "response" in obj: full += obj["response"]
if obj.get("done", False): break
except json.JSONDecodeError:
continue
return clean_response(full.strip())
def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer):
emb = "[" + ",".join(map(str, question_embedding)) + "]"
sql = """
INSERT INTO customer_service_log
(user_id, question, question_embedding, retrieved_content, generated_answer, timestamp)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True)
try:
with conn.cursor() as cur:
cur.execute(sql, (user_id, question, emb, retrieved_content, generated_answer))
finally:
conn.close()
def rag_pipeline(user_id: str, query: str) -> str:
try:
qe = get_embedding(query)
ctx = search_knowledge_base(qe, topk=3, efsearch=128)
prompt = build_rag_prompt(query, ctx)
ans = generate_answer(prompt)
log_conversation(user_id, query, qe, ctx, ans)
return ans
except Exception as e:
logging.exception("RAG pipeline failed")
return "Ошибка обработки запроса."
if __name__ == "__main__":
user_id = "user123"
query = "Каково видение StarRocks?"
print("Ответ:\n", rag_pipeline(user_id, query))

Суммарный конвейер:
Ввод вопроса
Эмбеддинг (DeepSeek)
ANN‑поиск в enterprise_knowledge (StarRocks Vector Index)
Формирование prompt (Augmentation)
Генерация ответа (DeepSeek)
Логирование (customer_service_log)
Возврат результата пользователю
6.2 Веб‑интерфейс (минимальный)
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8" />
<title>Интеллектуальная система вопросов и ответов</title>
<meta name="viewport" content="width=device-width,initial-scale=1" />
<script>
async function askQuestion() {
const question = document.getElementById("question").value;
const resp = await fetch("/ask", {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({ question })
});
const data = await resp.json();
document.getElementById("answer").innerText = data.answer;
}
</script>
</head>
<body>
<h1>Интеллектуальная система вопросов и ответов</h1>
<input id="question" type="text" placeholder="Введите ваш вопрос" />
<button onclick="askQuestion()">Спросить</button>
<pre id="answer"></pre>
</body>
</html>
6.3 Полноценный бэкенд Q&A (Flask)
import pymysql, requests, json, logging, re
from flask import Flask, request, jsonify, render_template_string
app = Flask(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
INDEX_HTML = """<!DOCTYPE html><html lang="ru"><head>
<meta charset="UTF-8"><title>Интеллектуальная система вопросов и ответов</title>
<meta name="viewport" content="width=device-width,initial-scale=1" />
<script>
async function askQuestion(){
const q=document.getElementById('question').value;
const r=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({question:q})});
const d=await r.json(); document.getElementById('answer').innerText=d.answer;
}
</script></head><body>
<h1>Интеллектуальная система вопросов и ответов</h1>
<input id="question" type="text" placeholder="Введите ваш вопрос" />
<button onclick="askQuestion()">Спросить</button>
<pre id="answer"></pre>
</body></html>"""
def get_embedding(text: str):
url = "http://localhost:11434/api/embeddings"
payload = {"model": "deepseek-r1:7b", "prompt": text}
r = requests.post(url, json=payload, timeout=30)
r.raise_for_status()
return r.json()["embedding"]
def search_knowledge_base(query_embedding, topk=3, efsearch=128):
qv = "[" + ",".join(map(str, query_embedding)) + "]"
sql = f"""
SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */
content,
approx_l2_distance(embedding, {qv}) AS distance
FROM enterprise_knowledge
ORDER BY approx_l2_distance(embedding, {qv}) ASC
LIMIT {topk}
"""
conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base")
try:
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall()
return "\n\n---\n\n".join([r[0] for r in rows]) if rows else ""
finally:
conn.close()
def build_rag_prompt(query: str, retrieved_content: str) -> str:
return f"""
[Системная инструкция] Вы — корпоративный ассистент. Отвечайте строго по контексту.
[Контекст]
{retrieved_content}
[Вопрос]
{query}
"""
def clean_response(text: str) -> str:
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
def generate_answer(prompt: str) -> str:
url = "http://localhost:11434/api/generate"
payload = {"model": "deepseek-r1:7b", "prompt": prompt}
r = requests.post(url, json=payload, timeout=120)
r.raise_for_status()
full = ""
for line in r.text.splitlines():
if not line.strip():
continue
try:
obj = json.loads(line)
if "response" in obj: full += obj["response"]
if obj.get("done", False): break
except json.JSONDecodeError:
continue
return clean_response(full.strip())
def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer):
emb = "[" + ",".join(map(str, question_embedding)) + "]"
sql = """
INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True)
try:
with conn.cursor() as cur:
cur.execute(sql, (user_id, question, emb, retrieved_content, generated_answer))
finally:
conn.close()
def rag_pipeline(user_id: str, query: str) -> str:
qe = get_embedding(query)
ctx = search_knowledge_base(qe, topk=3, efsearch=128)
prompt = build_rag_prompt(query, ctx)
ans = generate_answer(prompt)
log_conversation(user_id, query, qe, ctx, ans)
return ans
@app.route("/")
def index():
return render_template_string(INDEX_HTML)
@app.route("/ask", methods=["POST"])
def ask():
data = request.get_json(force=True)
question = data.get("question", "").strip()
if not question:
return jsonify({"answer": "Пожалуйста, введите вопрос."})
answer = rag_pipeline("sr_01", question)
return jsonify({"answer": f"Вопрос: {question}\n\nОтвет: {answer}"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9033, debug=True)



Примечания по настройке векторного индекса StarRocks (HNSW/IVFPQ)
index_type: hnsw | ivfpq
dim: размерность эмбеддинга (например, 3584)
metric_type: l2_distance | cosine_similarity
HNSW‑параметры:
M (двунаправленных связей на вершину, по умолчанию 16)
efconstruction (размер кандидатов при построении графа, по умолчанию 40)
efsearch (через /*+ SET_VAR(ann_params) */ при запросе; выше — точнее, медленнее)
IVFPQ‑параметры:
nlist (число кластеров)
nbits (точность PQ, кратно 8)
M_IVFPQ (число субвекторов; делитель dim)
Поисковые: nprobe, max_codes, polysemous_ht, range_search_confidence
Требования к запросам, чтобы индекс применился:
ORDER BY только по approx_l2_distance(колонка, константный_массив) или approx_cosine_similarity(...)
Константный массив — ARRAY с длиной = dim
Обязателен LIMIT N
Предикаты строить на той же функции и согласовывать с направлением сортировки (ASC для l2_distance, DESC для cosine_similarity)
Проверка:
EXPLAIN ... -- VECTORINDEX: ON в OlapScanNode
Справочные материалы
Vector Index (StarRocks): https://docs.starrocks.io/zh/docs/table_design/indexes/vector_index/
DeepSeek: https://platform.deepseek.com
Ollama: https://ollama.com/
