После 10 лет внедрения BI-систем (Qlik Sense, Power BI, Data Lens) я понял одну вещь: дашборд — это не решение. Это данные для решения. А между данными и решением — пропасть, которую преодолевает человек.

В этой статье покажу, как построил RAG-систему с чат-интерфейсом для базы из 600 000 записей техники из Федресурса. Без философии — только архитектура, код и грабли.

Проблема: почему дашборды не работают

Типичный сценарий. Аналитик открывает дашборд с данными о технике в лизинге. Нужно найти топ-10 компаний с бензовозами в Московской области.

Что происходит:

  1. Ищет нужный дашборд (их 15 штук)

  2. Выбирает правильные фильтры

  3. Понимает, что нужного среза нет

  4. Идёт к разработчику

  5. Ждёт доработку

Время: от часов до дней. И это для простого вопроса.

Ключевая проблема: чтобы найти ответ в BI, нужно знать, как задать вопрос системе. А пользователи хотят задавать вопросы на человеческом языке.

Архитектура решения

Архитектура системы: Telegram Бот → FastAPI Шлюз → LLM Роутер → PostgreSQL/Qdrant/Redis
Архитектура системы: Telegram Бот → FastAPI Шлюз → LLM Роутер → PostgreSQL/Qdrant/Redis

Компоненты

1. LLM Router — определяет тип запроса и маршрутизирует к нужному обработчику:

  • Точный запрос → SQL Generator

  • Семантический поиск → RAG

  • Агрегация → Aggregator

2. SQL Generator — генерирует SQL по естественному языку. Использую Claude API с few-shot примерами.

3. RAG Search — семантический поиск по векторной базе. Для случаев, когда пользователь не знает точных названий.

4. Aggregator — предрасчитанные агрегаты в Redis для быстрых ответов на частые вопросы.

Реализация SQL Generator

Самая интересная часть. Нужно научить LLM генерировать корректный SQL для конкретной схемы.

Схема данных (упрощённо)

CREATE TABLE vehicles (
    id SERIAL PRIMARY KEY,
    vin VARCHAR(17),
    vehicle_type VARCHAR(100),    -- "бензовоз", "автовоз", "экскаватор"
    brand VARCHAR(100),
    model VARCHAR(100),
    year_manufacture INTEGER,
    leasing_company VARCHAR(200),
    leasing_start DATE,
    leasing_end DATE,
    region VARCHAR(100),
    inn VARCHAR(12),
    company_name VARCHAR(300)
);

-- Индексы для частых запросов
CREATE INDEX idx_vehicle_type ON vehicles(vehicle_type);
CREATE INDEX idx_region ON vehicles(region);
CREATE INDEX idx_leasing_company ON vehicles(leasing_company);

Промпт для генерации SQL

SYSTEM_PROMPT = """Ты — SQL-генератор для базы техники в лизинге.

Схема таблицы vehicles:
- id: первичный ключ
- vin: VIN-номер
- vehicle_type: тип техники (бензовоз, автовоз, экскаватор, погрузчик...)
- brand, model: марка и модель
- year_manufacture: год выпуска
- leasing_company: лизинговая компания
- leasing_start, leasing_end: даты лизинга
- region: регион Р��
- inn, company_name: данные лизингополучателя

Правила:
1. Генерируй ТОЛЬКО SELECT-запросы
2. Всегда добавляй LIMIT 100, если не указано иное
3. Для топ-N используй ORDER BY ... DESC LIMIT N
4. Регионы: "Москва", "Московская область", "Санкт-Петербург"...
5. Возвращай только SQL без объяснений

Примеры:
Вопрос: Сколько всего записей?
SQL: SELECT COUNT(*) as total FROM vehicles;

Вопрос: Топ-5 компаний по количеству экскаваторов
SQL: SELECT company_name, COUNT(*) as cnt FROM vehicles WHERE vehicle_type = 'экскаватор' GROUP BY company_name ORDER BY cnt DESC LIMIT 5;

Вопрос: Бензовозы в Московской области
SQL: SELECT * FROM vehicles WHERE vehicle_type = 'бензовоз' AND region = 'Московская область' LIMIT 100;
"""

Код генератора

import anthropic
from typing import Optional
import re

class SQLGenerator:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.model = "claude-sonnet-4-20250514"

    def generate(self, question: str) -> Optional[str]:
        response = self.client.messages.create(
            model=self.model,
            max_tokens=500,
            system=SYSTEM_PROMPT,
            messages=[{"role": "user", "content": question}]
        )

        sql = response.content[0].text.strip()

        # Валидация: только SELECT
        if not sql.upper().startswith("SELECT"):
            return None

        # Защита от инъекций: проверяем на опасные конструкции
        dangerous = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "TRUNCATE", ";--"]
        if any(d in sql.upper() for d in dangerous):
            return None

        return sql

    def execute_safe(self, sql: str, conn) -> list:
        """Выполнение с таймаутом и лимитами"""
        # Принудительно добавляем LIMIT, если его нет
        if "LIMIT" not in sql.upper():
            sql = sql.rstrip(";") + " LIMIT 100;"

        with conn.cursor() as cur:
            cur.execute("SET statement_timeout = '5s';")
            cur.execute(sql)
            return cur.fetchall()

Проблема #1: галлюцинации в SQL

LLM иногда генерирует несуществующие колонки или таблицы. Решение — валидация перед выполнением:

def validate_sql(sql: str, schema: dict) -> tuple[bool, str]:
    """Проверяем, что все упомянутые колонки существуют"""
    # Извлекаем имена колонок из SQL
    # Упрощённая версия, в проде используем sqlparse
    words = re.findall(r'\b([a-z_]+)\b', sql.lower())

    valid_columns = set(schema.get("vehicles", []))
    valid_columns.update(["select", "from", "where", "and", "or", "count",
                          "group", "by", "order", "desc", "asc", "limit", "as"])

    unknown = [w for w in words if w not in valid_columns and not w.isdigit()]

    if unknown:
        return False, f"Неизвестные колонки: {unknown}"
    return True, ""

RAG для семантического поиска

Пользователь пишет "грузовики для нефти". В базе это "бензовоз" или "автоцистерна". SQL-генератор не справится — нужен семантический поиск.

Индексация

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import openai

client = QdrantClient(host="localhost", port=6333)
openai_client = openai.OpenAI()

def create_vehicle_index():
    # Создаём коллекцию
    client.create_collection(
        collection_name="vehicle_types",
        vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
    )

    # Типы техники с синонимами
    vehicle_types = [
        {"type": "бензовоз", "synonyms": "автоцистерна, топливозаправщик, грузовик для нефти, цистерна для топлива"},
        {"type": "автовоз", "synonyms": "автомобилевоз, транспортировщик машин, перевозчик авто"},
        {"type": "экскаватор", "synonyms": "землеройная машина, копатель, ковшовый экскаватор"},
        # ... ещё 50+ типов
    ]

    points = []
    for i, vt in enumerate(vehicle_types):
        text = f"{vt['type']}: {vt['synonyms']}"
        embedding = get_embedding(text)
        points.append(PointStruct(
            id=i,
            vector=embedding,
            payload={"vehicle_type": vt["type"]}
        ))

    client.upsert(collection_name="vehicle_types", points=points)

def get_embedding(text: str) -> list[float]:
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

Поиск

def find_vehicle_type(query: str) -> str:
    """Находим тип техники по семантике запроса"""
    query_vector = get_embedding(query)

    results = client.search(
        collection_name="vehicle_types",
        query_vector=query_vector,
        limit=1
    )

    if results and results[0].score > 0.7:
        return results[0].payload["vehicle_type"]
    return None

Интеграция в пайплайн

class QueryProcessor:
    def __init__(self):
        self.sql_gen = SQLGenerator()
        self.rag = RAGSearch()

    def process(self, question: str) -> str:
        # 1. Пробуем найти типы техники через RAG
        enriched_question = self.enrich_with_rag(question)

        # 2. Генерируем SQL
        sql = self.sql_gen.generate(enriched_question)

        if not sql:
            return "Не удалось понять запрос. Попробуйте переформулировать."

        # 3. Валидируем
        valid, error = validate_sql(sql, SCHEMA)
        if not valid:
            return f"Ошибка в запросе: {error}"

        # 4. Выполняем
        results = self.sql_gen.execute_safe(sql, self.conn)

        # 5. Форматируем ответ
        return self.format_response(results, question)

    def enrich_with_rag(self, question: str) -> str:
        """Заменяем синонимы на точные названия"""
        # Ищем упоминания техники
        vehicle_type = self.rag.find_vehicle_type(question)
        if vehicle_type:
            # Добавляем контекст для LLM
            return f"{question}\n[Тип техники в базе: {vehicle_type}]"
        return question
Путь запроса: от вопроса пользователя до ответа в чат
Путь запроса: от вопроса пользователя до ответа в чат

Проблема #2: медленные запросы

600K записей — не big data, но без оптимизации запросы выполняются секунды.

Решение: предрасчитанные агрегаты

# Ежедневный пересчёт популярных агрег��тов
AGGREGATES = {
    "top_companies_by_type": """
        SELECT vehicle_type, company_name, COUNT(*) as cnt
        FROM vehicles
        GROUP BY vehicle_type, company_name
    """,
    "vehicles_by_region": """
        SELECT region, vehicle_type, COUNT(*) as cnt
        FROM vehicles
        GROUP BY region, vehicle_type
    """,
    "leasing_companies_stats": """
        SELECT leasing_company, COUNT(*) as total,
               COUNT(DISTINCT company_name) as clients
        FROM vehicles
        GROUP BY leasing_company
    """
}

def precompute_aggregates():
    """Запускается по cron раз в сутки"""
    redis = Redis()

    for name, sql in AGGREGATES.items():
        with conn.cursor() as cur:
            cur.execute(sql)
            results = cur.fetchall()
            redis.set(f"agg:{name}", json.dumps(results), ex=86400)

Router для быстрых запросов

def route_query(question: str) -> str:
    """Определяем, можно ли ответить из кэша"""

    # Паттерны для быстрых ответов
    patterns = {
        r"топ.+компани.+по количеству": "top_companies_by_type",
        r"сколько.+в.+регион": "vehicles_by_region",
        r"лизинговые компании|рейтинг лизинг": "leasing_companies_stats"
    }

    for pattern, agg_name in patterns.items():
        if re.search(pattern, question.lower()):
            cached = redis.get(f"agg:{agg_name}")
            if cached:
                return filter_cached_results(json.loads(cached), question)

    # Если нет в кэше — генерируем SQL
    return None

Форматирование ответа

LLM генерирует человекочитаемый ответ из табличных данных:

def format_response(results: list, question: str, sql: str) -> str:
    if not results:
        return "По вашему запросу ничего не найдено."

    # Для коротких результатов — текстовый ответ
    if len(results) <= 10:
        prompt = f"""Вопрос пользователя: {question}

Данные из базы (SQL: {sql}):
{json.dumps(results, ensure_ascii=False, indent=2)}

Сформулируй краткий ответ на русском языке.
Если это топ — пронумеруй. Укажи ключевые цифры."""

        response = anthropic.Anthropic().messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1000,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.content[0].text

    # Для длинных — предлагаем экспорт
    return f"Найдено {len(results)} записей. Показать первые 10 или экспортировать в Excel?"

Результаты

Сравнение подходов: BI-дашборд требует обучения и времени, AI-чат доступен сразу
Сравнение подходов: BI-дашборд требует обучения и времени, AI-чат доступен сразу

До (BI-дашборд):

  • Время на ответ: 5-30 минут

  • Требуется обучение: да, существенное

  • Доступность: только аналитики

После (AI-чат):

  • Время на ответ: 3-15 секунд

  • Требуется обучение: нет

  • Доступность: любой сотрудник с доступом в Telegram

Метрики за месяц

  • 1,200 запросов обработано

  • 94% успешно отвечены без участия человека

  • 6% потребовали уточнения или ручной обработки

Типичные запросы

Пример диалога: запрос на естественном языке → структурированный ответ за 3 секунды
Пример диалога: запрос на естественном языке → структурированный ответ за 3 секунды
"Сколько экскаваторов в лизинге у Сбербанк Лизинг?"
→ 3 сек, точный ответ из SQL

"Компании с грузовиками для перевозки нефтепродуктов в Татарстане"
→ 5 сек, RAG + SQL

"Динамика лизинга спецтехники за 2024-2025"
→ 2 сек, из кэша агрегатов

Грабли и выводы

1. LLM галлюцинирует названия колонок. Решение: строгая валидация SQL перед выполнением.

2. Пользователи не знают точных терминов. Решение: RAG для семантического обогащения запроса.

3. Медленные аналитические запросы. Решение: предрасчитанные агрегаты + pattern matching для роутинга.

4. Сложно отлаживать. Решение: логирование всего пайплайна (вопрос → обогащённый вопрос → SQL → результат).

5. Пользователи ожидают магии. Решение: честные ограничения в интерфейсе. "Я могу искать технику по типу, региону, компании. Для сложной аналитики обратитесь к аналитику."


Стек

  • Backend: Python 3.12, FastAPI

  • LLM: Claude API (Claude Sonnet 4 для SQL), OpenAI Embeddings (text-embedding-3-small)

  • Векторная БД: Qdrant 1.12

  • Основная БД: PostgreSQL 17

  • Кэш: Redis 7

  • Frontend: Telegram Bot API (aiogram 3.x)

Код отдельных компонентов доступен в статье. Полная архитектура — под NDA клиента.


Если интересна тема text-to-SQL и RAG для бизнес-данных — пишите в комментариях, какие аспекты раскрыть подробнее.