Автор(ы):

  • Ши Цян, архитектор решений

  • Чжао Хэн, StarRocks TSC Member

Введение в RAG и векторные индексы

RAG (Retrieval-Augmented Generation — извлечение с последующей генерацией) сочетает поиск внешних знаний и генерацию ответов, компенсируя статичность знаний классических LLM и склонность к «галлюцинациям». Это повышает точность и опору на актуальные данные.

Стандартный конвейер RAG:

  1. Извлечение (Retrieval): поиск релевантного контента в внешних источниках (векторные базы, поисковые движки, традиционные БД).

  2. Генерация (Generation): объединение запроса пользователя и найденного контекста для LLM (например, GPT, LLaMA, DeepSeek), чтобы получить усиленный фактологичный ответ.

  • Изображения, документы, видео и аудио предварительно преобразуются в эмбеддинги и сохраняются в векторном хранилище. Эмбеддинги — это, как правило, высокоразмерные массивы float.

  • Для ускорения семантического поиска применяются векторные индексы (HNSW, IVFPQ) и ANN (Approximate Nearest Neighbor).

  • Такой подход уменьшает нагрузку высокоразмерных вычислений и снижает риск «галлюцинаций», делая ответы естественными и надёжными.

Подробнее о векторном поиске см. материалы по StarRocks Vector Index.

Типовые сценарии на базе StarRocks + DeepSeek

  • DeepSeek: генерация качественных эмбеддингов и ответов.

  • StarRocks: высокоэффективный векторный поиск и хранение.

Вместе они образуют основу для точных и масштабируемых AI‑решений.

1) Корпоративная база знаний

Подходит для:

  • Поиска по внутренним документам компании, FAQ

  • Вопрос‑ответ в специализированных доменах (право, финансы, медицина)

  • Поиска по коду, документации и архитектурным описаниям

Решение:

  1. Встраивание документов (DeepSeek): преобразование текстов в векторы.

  2. Хранение и индексирование (StarRocks): HNSW или IVFPQ с быстрым ANN‑поиском.

  3. RAG: запрос → эмбеддинг запроса → векторный поиск → получение контекста → генерация ответа DeepSeek с опорой на контекст.

2) AI‑поддержка и интеллектуальный вопрос‑ответ

Подходит для:

  • ��нтеллектуальных сервисов поддержки (банк, брокер, e‑commerce)

  • Профконсультаций (право, медицина)

  • Автоматического техподдержки/FAQ

Решение:

  1. Встраивание диалогов (DeepSeek): подготовка эмбеддингов для истории чатов и намерений.

  2. Хранение и индексирование (StarRocks): быстрый поиск похожих кейсов.

  3. RAG: объединение истории + базы знаний + DeepSeek для генерации точных ответов.

Пример:

  • Пользователь: «Как сменить номер телефона, привязанный к банковской карте?»

  • StarRocks извлекает 3 наиболее похожих случая.

  • DeepSeek, учитывая их + FAQ, формирует точный, контекстный ответ.

Демонстрация: архитектура и шаги

Состав системы:

  • DeepSeek: текстовые эмбеддинги и генерация ответов

  • StarRocks: хранение и быстрый ANN‑поиск по векторным данным (векторные индексы поддерживаются с версии 3.4+)

Процесс:

  1. Подготовка окружения — Ollama, StarRocks

  2. Векторизация данных — DeepSeek Embedding (например, размерность 3584)

  3. Хранение векторов — таблицы StarRocks

  4. ANN‑поиск — индексы IVFPQ/HNSW

  5. Усиление (Augmentation) — комбинирование извлечённого контента

  6. Генерация ответа — DeepSeek LLM

1. Подготовка окружения

1.1 Локальное развёртывание DeepSeek через Ollama

  • Скачать Ollama: https://ollama.com/

  • Установить и запустить модель DeepSeek (пример — deepseek-r1:7b):

# Скачает и запустит модель
ollama run deepseek-r1:7b

[изображение]

Совет: для использования облачного DeepSeek (официальный API) получите API Key на https://platform.deepseek.com (разделы «API Keys» или «Developer»), например: sk-xxxxxxxxxxxxxxxx.

Начало работы:

# Войти в интерактивный режим
ollama run deepseek-r1:7b

Оптимизация производительности (на один запуск):

export OLLAMA_GPU_LAYERS=35
export OLLAMA_CPU_THREADS=6
export OLLAMA_BATCH_SIZE=128
export OLLAMA_CONTEXT_SIZE=4096
ollama run deepseek-r1:7b

Вывод: напрямую задавая вопросы модели, вы получите общие ответы. Для точности и контекстности нужен RAG.

1.2 Подготовка StarRocks

Требуемая версия: 3.4+

Включение векторных индексов:

-- Динамически (до перезапуска FE)
ADMIN SET FRONTEND CONFIG ("enable_experimental_vector" = "true");

-- Постоянно: в fe.conf
-- enable_experimental_vector = true
-- затем перезапустить FE

Создание базы и таблицы хранения эмбеддингов:

CREATE DATABASE knowledge_base;

CREATE TABLE enterprise_knowledge (
    id  BIGINT AUTO_INCREMENT,
    content TEXT NOT NULL,
    embedding ARRAY<FLOAT> NOT NULL,
    INDEX vec_idx (embedding) USING VECTOR (
        "index_type" = "hnsw",
        "dim" = "3584",
        "metric_type" = "l2_distance",
        "M" = "16",
        "efconstruction" = "40"
    )
) ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_num" = "1"
);

Примечание: модель deepseek-r1:7b часто выдаёт эмбеддинги размерности 3584.

2. Преобразование текста в эмбеддинг

Тест локального Embedding API Ollama:

curl -X POST http://localhost:11434/api/embeddings \
  -d '{"model": "deepseek-r1:7b", "prompt": "Срок гарантии на продукт — один год."}'

3. Загрузка эмбеддингов в StarRocks

import pymysql
import requests

def get_embedding(text: str):
    url = "http://localhost:11434/api/embeddings"
    payload = {"model": "deepseek-r1:7b", "prompt": text}
    resp = requests.post(url, json=payload, timeout=30)
    resp.raise_for_status()
    return resp.json()["embedding"]

try:
    content = "Видение StarRocks — сделать аналитику данных более простой и гибкой."
    embedding = get_embedding(content)

    # Преобразуем Python-список в формат ARRAY<FLOAT> StarRocks
    embedding_str = "[" + ",".join(map(str, embedding)) + "]"

    conn = pymysql.connect(
        host="X.X.X.X",
        port=9030,
        user="root",
        password="sr123456",
        database="knowledge_base",
        autocommit=True,
    )
    with conn.cursor() as cur:
        sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)"
        cur.execute(sql, (content, embedding_str))
    print(f"Inserted: {content} with embedding {embedding[:5]}...")

except requests.RequestException as e:
    print(f"Embedding API error: {e}")
except pymysql.Error as e:
    print(f"Database error: {e}")

4. Извлечение знаний (ANN‑поиск и результат)

Важно: чтобы запрос задействовал векторный индекс, используйте приближённые функции и формат ORDER BY согласно документации StarRocks:

  • Функция:

    • l2_distance → approx_l2_distance

    • cosine_similarity → approx_cosine_similarity

  • Формат ORDER BY: ORDER BY approx_l2_distance(vector_column, constant_array)

  • constant_array — константный ARRAY той же размерности, что dim.

  • Обязателен LIMIT N.

  • В ORDER BY не должно быть других столбцов.

Пример на HNSW с параметром efsearch:

import pymysql
import requests
import json

def get_embedding(text: str):
    url = "http://localhost:11434/api/embeddings"
    payload = {"model": "deepseek-r1:7b", "prompt": text}
    resp = requests.post(url, json=payload, timeout=30)
    resp.raise_for_status()
    return resp.json()["embedding"]

def search_knowledge_base(query_embedding, topk=3, efsearch=128):
    # Преобразуем запрос в формат ARRAY<FLOAT>
    qv = "[" + ",".join(map(str, query_embedding)) + "]"
    sql = f"""
    SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */
           content,
           approx_l2_distance(embedding, {qv}) AS distance
    FROM enterprise_knowledge
    ORDER BY approx_l2_distance(embedding, {qv}) ASC
    LIMIT {topk}
    """
    conn = pymysql.connect(
        host="X.X.X.X",
        port=9030,
        user="root",
        password="sr123456",
        database="knowledge_base"
    )
    try:
        with conn.cursor() as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return [r[0] for r in rows] if rows else []
    finally:
        conn.close()

# Пример использования
query = "Каково видение StarRocks?"
qe = get_embedding(query)
contexts = search_knowledge_base(qe, topk=3, efsearch=128)
print(contexts)

Проверка применения индекса:

EXPLAIN
SELECT
    content
FROM enterprise_knowledge
ORDER BY approx_l2_distance(embedding, [1,1,1,1,1]) ASC
LIMIT 3;

-- В OlapScanNode должно быть: VECTORINDEX: ON

Точный пересчёт (для повторного ранжирования top‑K):

-- допустим, получили список id из приближённого запроса
SELECT content, l2_distance(embedding, [1,1,1,1,1]) AS dist
FROM enterprise_knowledge
WHERE id IN (/* topK ids */)
ORDER BY dist ASC
LIMIT 3;

5. Подключение RAG‑усиления

5.1 Формирование Prompt и генерация ответа DeepSeek

import requests, json, re

def build_rag_prompt(query: str, retrieved_content: str) -> str:
    return f"""
[Системная инструкция] Вы — интеллектуальный ассистент. Отвечайте строго по контексту.
[Контекст знаний]
{retrieved_content}

[Вопрос пользователя]
{query}

Если данных недостаточно, объясните, чего не хватает.
"""

def clean_response(text: str) -> str:
    # Удаляем служебные блоки вида <think>...</think>
    return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()

def generate_answer(prompt: str) -> str:
    url = "http://localhost:11434/api/generate"
    payload = {"model": "deepseek-r1:7b", "prompt": prompt}
    resp = requests.post(url, json=payload, timeout=120)
    resp.raise_for_status()

    full = ""
    for line in resp.text.splitlines():
        if not line.strip():
            continue
        try:
            obj = json.loads(line)
            if "response" in obj:
                full += obj["response"]
            if obj.get("done", False):
                break
        except json.JSONDecodeError:
            continue
    return clean_response(full.strip())

5.2 Журнал процесса RAG

CREATE TABLE customer_service_log (
    id BIGINT AUTO_INCREMENT,
    user_id VARCHAR(50),
    question TEXT NOT NULL,
    question_embedding ARRAY<FLOAT> NOT NULL,
    retrieved_content TEXT,
    generated_answer TEXT,
    timestamp DATETIME NOT NULL,
    feedback TINYINT DEFAULT NULL
) ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES ("replication_num" = "1");

6. Оптимизированная версия

6.1 Конвейер RAG «под ключ»

import pymysql, requests, json, logging, re
from datetime import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

def get_embedding(text: str):
    url = "http://localhost:11434/api/embeddings"
    payload = {"model": "deepseek-r1:7b", "prompt": text}
    r = requests.post(url, json=payload, timeout=30)
    r.raise_for_status()
    return r.json()["embedding"]

def search_knowledge_base(query_embedding, topk=3, efsearch=128):
    qv = "[" + ",".join(map(str, query_embedding)) + "]"
    sql = f"""
    SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */
           content,
           approx_l2_distance(embedding, {qv}) AS distance
    FROM enterprise_knowledge
    ORDER BY approx_l2_distance(embedding, {qv}) ASC
    LIMIT {topk}
    """
    conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base")
    try:
        with conn.cursor() as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return "\n\n---\n\n".join([r[0] for r in rows]) if rows else ""
    finally:
        conn.close()

def build_rag_prompt(query: str, retrieved_content: str) -> str:
    return f"""
[Системная инструкция] Вы — корпоративный ассистент. Отвечайте кратко и по делу, строго по контексту.
[Контекст знаний]
{retrieved_content}

[Вопрос]
{query}
"""

def clean_response(text: str) -> str:
    return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()

def generate_answer(prompt: str) -> str:
    url = "http://localhost:11434/api/generate"
    payload = {"model": "deepseek-r1:7b", "prompt": prompt}
    r = requests.post(url, json=payload, timeout=120)
    r.raise_for_status()
    full = ""
    for line in r.text.splitlines():
        if not line.strip():
            continue
        try:
            obj = json.loads(line)
            if "response" in obj: full += obj["response"]
            if obj.get("done", False): break
        except json.JSONDecodeError:
            continue
    return clean_response(full.strip())

def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer):
    emb = "[" + ",".join(map(str, question_embedding)) + "]"
    sql = """
    INSERT INTO customer_service_log
        (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp)
    VALUES (%s, %s, %s, %s, %s, NOW())
    """
    conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True)
    try:
        with conn.cursor() as cur:
            cur.execute(sql, (user_id, question, emb, retrieved_content, generated_answer))
    finally:
        conn.close()

def rag_pipeline(user_id: str, query: str) -> str:
    try:
        qe = get_embedding(query)
        ctx = search_knowledge_base(qe, topk=3, efsearch=128)
        prompt = build_rag_prompt(query, ctx)
        ans = generate_answer(prompt)
        log_conversation(user_id, query, qe, ctx, ans)
        return ans
    except Exception as e:
        logging.exception("RAG pipeline failed")
        return "Ошибка обработки запроса."

if __name__ == "__main__":
    user_id = "user123"
    query = "Каково видение StarRocks?"
    print("Ответ:\n", rag_pipeline(user_id, query))

Суммарный конвейер:

  1. Ввод вопроса

  2. Эмбеддинг (DeepSeek)

  3. ANN‑поиск в enterprise_knowledge (StarRocks Vector Index)

  4. Формирование prompt (Augmentation)

  5. Генерация ответа (DeepSeek)

  6. Логирование (customer_service_log)

  7. Возврат результата пользователю

6.2 Веб‑интерфейс (минимальный)

<!DOCTYPE html>
<html lang="ru">
<head>
  <meta charset="UTF-8" />
  <title>Интеллектуальная система вопросов и ответов</title>
  <meta name="viewport" content="width=device-width,initial-scale=1" />
  <script>
    async function askQuestion() {
      const question = document.getElementById("question").value;
      const resp = await fetch("/ask", {
        method: "POST",
        headers: {"Content-Type": "application/json"},
        body: JSON.stringify({ question })
      });
      const data = await resp.json();
      document.getElementById("answer").innerText = data.answer;
    }
  </script>
</head>
<body>
  <h1>Интеллектуальная система вопросов и ответов</h1>
  <input id="question" type="text" placeholder="Введите ваш вопрос" />
  <button onclick="askQuestion()">Спросить</button>
  <pre id="answer"></pre>
</body>
</html>

6.3 Полноценный бэкенд Q&A (Flask)

import pymysql, requests, json, logging, re
from flask import Flask, request, jsonify, render_template_string

app = Flask(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

INDEX_HTML = """<!DOCTYPE html><html lang="ru"><head>
<meta charset="UTF-8"><title>Интеллектуальная система вопросов и ответов</title>
<meta name="viewport" content="width=device-width,initial-scale=1" />
<script>
async function askQuestion(){
  const q=document.getElementById('question').value;
  const r=await fetch('/ask',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({question:q})});
  const d=await r.json(); document.getElementById('answer').innerText=d.answer;
}
</script></head><body>
<h1>Интеллектуальная система вопросов и ответов</h1>
<input id="question" type="text" placeholder="Введите ваш вопрос" />
<button onclick="askQuestion()">Спросить</button>
<pre id="answer"></pre>
</body></html>"""

def get_embedding(text: str):
    url = "http://localhost:11434/api/embeddings"
    payload = {"model": "deepseek-r1:7b", "prompt": text}
    r = requests.post(url, json=payload, timeout=30)
    r.raise_for_status()
    return r.json()["embedding"]

def search_knowledge_base(query_embedding, topk=3, efsearch=128):
    qv = "[" + ",".join(map(str, query_embedding)) + "]"
    sql = f"""
    SELECT /*+ SET_VAR(ann_params='{{efsearch={efsearch}}}') */
           content,
           approx_l2_distance(embedding, {qv}) AS distance
    FROM enterprise_knowledge
    ORDER BY approx_l2_distance(embedding, {qv}) ASC
    LIMIT {topk}
    """
    conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base")
    try:
        with conn.cursor() as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return "\n\n---\n\n".join([r[0] for r in rows]) if rows else ""
    finally:
        conn.close()

def build_rag_prompt(query: str, retrieved_content: str) -> str:
    return f"""
[Системная инструкция] Вы — корпоративный ассистент. Отвечайте строго по контексту.
[Контекст]
{retrieved_content}

[Вопрос]
{query}
"""

def clean_response(text: str) -> str:
    return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()

def generate_answer(prompt: str) -> str:
    url = "http://localhost:11434/api/generate"
    payload = {"model": "deepseek-r1:7b", "prompt": prompt}
    r = requests.post(url, json=payload, timeout=120)
    r.raise_for_status()
    full = ""
    for line in r.text.splitlines():
        if not line.strip():
            continue
        try:
            obj = json.loads(line)
            if "response" in obj: full += obj["response"]
            if obj.get("done", False): break
        except json.JSONDecodeError:
            continue
    return clean_response(full.strip())

def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer):
    emb = "[" + ",".join(map(str, question_embedding)) + "]"
    sql = """
    INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp)
    VALUES (%s, %s, %s, %s, %s, NOW())
    """
    conn = pymysql.connect(host="X.X.X.X", port=9030, user="root", password="sr123456", database="knowledge_base", autocommit=True)
    try:
        with conn.cursor() as cur:
            cur.execute(sql, (user_id, question, emb, retrieved_content, generated_answer))
    finally:
        conn.close()

def rag_pipeline(user_id: str, query: str) -> str:
    qe = get_embedding(query)
    ctx = search_knowledge_base(qe, topk=3, efsearch=128)
    prompt = build_rag_prompt(query, ctx)
    ans = generate_answer(prompt)
    log_conversation(user_id, query, qe, ctx, ans)
    return ans

@app.route("/")
def index():
    return render_template_string(INDEX_HTML)

@app.route("/ask", methods=["POST"])
def ask():
    data = request.get_json(force=True)
    question = data.get("question", "").strip()
    if not question:
        return jsonify({"answer": "Пожалуйста, введите вопрос."})
    answer = rag_pipeline("sr_01", question)
    return jsonify({"answer": f"Вопрос: {question}\n\nОтвет: {answer}"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=9033, debug=True)

Примечания по настройке векторного индекса StarRocks (HNSW/IVFPQ)

  • index_type: hnsw | ivfpq

  • dim: размерность эмбеддинга (например, 3584)

  • metric_type: l2_distance | cosine_similarity

  • HNSW‑параметры:

    • M (двунаправленных связей на вершину, по умолчанию 16)

    • efconstruction (размер кандидатов при построении графа, по умолчанию 40)

    • efsearch (через /*+ SET_VAR(ann_params) */ при запросе; выше — точнее, медленнее)

  • IVFPQ‑параметры:

    • nlist (число кластеров)

    • nbits (точность PQ, кратно 8)

    • M_IVFPQ (число субвекторов; делитель dim)

    • Поисковые: nprobe, max_codes, polysemous_ht, range_search_confidence

Требования к запросам, чтобы индекс применился:

  • ORDER BY только по approx_l2_distance(колонка, константный_массив) или approx_cosine_similarity(...)

  • Константный массив — ARRAY с длиной = dim

  • Обязателен LIMIT N

  • Предикаты строить на той же функции и согласовывать с направлением сортировки (ASC для l2_distance, DESC для cosine_similarity)

Проверка:

EXPLAIN ... -- VECTORINDEX: ON в OlapScanNode

Справочные материалы