Всем привет.

Я работаю в IT, и кроме дейликов, общения с окружающими не так уж много. Но иногда бывают мысли после какого-нибудь разговора вне работы, что надо было ответить как-то по‑другому, или почему я промолчал, не знал как ответить, в особенности каверзные или провокационные вопросы. Из этого вырисовывается проблема — как ответить нужное в нужный момент. В моём восприятии это как карате: если удар не натренирован, то он с большой долей вероятности будет плохой. Или другое сравнение: туз никогда не выпадет из рукава, если его туда заранее не положить. Поэтому нужно потренироваться.

Задача: потренироваться, чтобы речь гладко звучала, и не было неловкого смущения, задержек, а всё получалось на автомате. Варианты решения:

  1. Потренироваться на домашних, чтобы они задавали токсичные вопросы, рандомно их перебирая. Не мой вариант.

  2. Сделать приложение для прогулки, идёшь куда-нибудь, а тебе задаются токсичные вопросы, ты на них отвечаешь после чего валидируется ответ. По результату даются либо подсказки, если ответ некорректный, или задаётся следующий вопрос.

Требования: всё должно быть open-source и после скачивания запускаться локально без внешних сервисов.

Проработка идеи (основная логика): Реализация логики самой идеи — это больше психология: для чего нам нужен open-source психолог. Я нашел одного который говорит

«Все мои материалы в открытом доступе»

Нашёл у него материалы на тему общения с токсичными людьми. По этим двум параметрам идеально подходит Альберт Сафин. Возьмём небольшой список токсичных фраз-крючков, при необходимости его можно будет дополнить, генерируя их через LLM. По теории, пример ответа будет состоять из амортизирующей фразы, фразы-стратегии и фразы-захвата.

Тренировать можно всё
Тренировать можно всё

Выбираем архитектуру

Хочу микросервисы, так как из этого проекта можно будет сделать другой, переписав конфиги, — а это могут быть идеи для реализации в будущем. Open-source здесь Kubernetes.

Выбираем стек технологий (предварительно):

  1. Python — однозначно т. к. это мой любимый язык программирования.

  2. Kubernetes нам поможет в микросервисной архитектуре. Его использование необязательно для экспериментов.

  3. Linux — ну куда же без него.

Общее описание и проработка микросервисов:

Исходя из варианта 2 решения, нам нужно будет front на телефоне, то это будет kivy. Эх, танцев с бубном при сборке apk не избежать т. к. python плохо приспособлен под android, и есть проблемы с зависимостями при компилировании apk. Front будет запрашивать у бэка рандомную токсичную фразу и воспроизводить ее.

Для того, чтобы нам можно было используя интернет обращаться к домашнему хосту я использовал KeenDns — это бесплатный сервис доменных имён для роутеров Keenetic/Netcrize. В файле params указываем локально или нет будем связываться с хостом. Для доступа используя KeenDns, ставим галочку прямой доступ, выбираем доменное имя для маршрутизации и в пункте Удаленный доступ выбираем «Свободный доступ».

В приложении Kivy tсть 2 экрана и несколько кнопок: начать тренировку, завершить тренировку.

Если есть front, то должен быть и backend — так его и назовём.

Backend должен взять фразу из какого-нибудь источника — базы данных для перевода в аудио. Возьмём postgre, т.к. open-source фраза должна быть сгенерирована на каком-то микросервисе, т.е. текст должен быть переведён в голос Text-to-Speech, так и назовём микросервис TTS.

Использую модель Silero версия v5_5_ru результаты генерации аудио на v3_1_ru были хуже. Модель автоматически загрузится при первом запуске. В коде можно выбрать голос озвучивания, или передавать нужный голос при запросе. Но если постоянно генерировать, то будет большая нагрузка на этот микросервис, а фразы могут повторяться.

Надо будет где-то хранить фразы и выдавать их, чтобы генерировать по требованию. Как хранилище используем Redis т. к. он очень быстро отдаёт по ключу значение. Для уникальности ключей используем UUID.

Логирование я реализовал только в backend микросервисе, т. к. остальные микросервисы слишком простые чтобы туда добавлять ещё и логирования, все ошибки, приходящие от них, и время работы логируется в backend.

@router.get("/task/start")
async def start_task(db: Session = Depends(get_session)):
    """Начать новое задание"""  
    try:
        start = datetime.now()
        task_data = get_random_task(db)
        logger.info(f'create random task {datetime.now() - start}')
        return task_data
    except Exception as e:
        logger.error(f"❌ Ошибка: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@router.post("/task/{task_id}/check")
async def check_answer(task_id: str, audio_file: UploadFile = File(...), db: Session = Depends(get_session)):
    """Проверить ответ пользователя"""
   
    try:
        task_info, task_uuid = get_task_by_id(db, task_id)
        if not task_info:
            raise HTTPException(status_code=404, detail=f"Задание {task_id} не найдено")
        
        logger.info(f"✅ Задание найдено: {task_info[:50]}...")
        audio_file = await audio_file.read()

        try:
            start = datetime.now()
            transcribed_text = audio_service.transcribe_bytes(audio_file)  # audio_file.read()
            logger.info(f"Transcirbe action_time: {datetime.now()-start} result:'{transcribed_text}'")
            
            start = datetime.now()
            response = requests.post(f'http://{params.PHRASE_COMPARE_HOST}:{params.PHRASE_COMPARE_PORT}/search_phrases', json={'text': transcribed_text})
            phrase_analis_result = response.json()
            logger.info(f"Phrase compare action_time: {datetime.now()-start} result:\n\t{transcribed_text} \n\t'{phrase_analis_result}'")

            start = datetime.now()
            audio_data, text_audio = utils.get_result(phrase_analis_result)
            logger.info(f"Get result action_time: {datetime.now()-start} result: {text_audio}")

            return {
                "task_id": task_uuid,
                "audio_data": base64.b64encode(audio_data).decode('utf-8'),
                "phrase_text": text_audio
            }

            
        finally:
            ...

    except HTTPException:
        logger.error(f"❌ Ошибка при проверке ответа: {e} {traceback.print_exc()}")
        raise
    except Exception as e:
        logger.error(f"❌ Ошибка при проверке ответа: {e} {traceback.print_exc()}")
        raise HTTPException(status_code=500, detail=f"Ошибка проверки: {str(e)} {traceback.print_exc()}")


Front воспроизводит аудио, записывает ответ и отправляет его на backend.

Backend должен принять аудио-сообщение, например, wav и расшифровать его — для расшифровки нужен отдельный микросервис, т.е. переводящий голос в текст если по-английски Speech-to-Text так и назовём STT. Нашёл интересное решение Whisper, и в проекте использовал его форк Faster Whisper что значительно ускорило работу микросервиса. По размеру модели остановил свой выбор на small, т. к. с более маленькими — быстрыми моделями качество было неприемлемым. Из базы нужно будет взять исходную фразу по ключу. Ключом возьмём UUID фразы, чтобы не повторялись. Сделаем UUID ключом для всех видов фраз.

@app.post("/transcribe", response_model=TranscribeResponse)
async def transcribe_audio(
    audio_file: UploadFile = File(...),
    language: str = "ru",
    model: str = "small"
):
    """Распознавание аудио из загруженного файла"""
    try:
        content = await audio_file.read()
        file_size = len(content)
        print(f"📦 Файл загружен в память: {file_size} bytes")
        result = await _transcribe_bytes(content, language)
        return result
            
    except Exception as e:
        print(f"❌ Ошибка распознавания: {e}")
        import traceback
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=str(e))


async def _transcribe_bytes(audio_bytes: bytes, language: str = "ru") -> dict:
    """Распознавание аудио из байтов (без сохранения на диск)"""
    start_time = time.time()
    model = get_whisper_model()
    
    if model is None:
        raise HTTPException(status_code=503, detail="Модель Faster Whisper недоступна")
    
    temp_file = None
    
    try:
        # Определяем расширение по сигнатуре файла
        suffix = ".wav"
        if audio_bytes[:4] == b'RIFF':
            suffix = ".wav"
        elif audio_bytes[:3] == b'ID3' or audio_bytes[:2] == b'\xff\xfb':
            suffix = ".mp3"
        elif audio_bytes[:4] == b'fLaC':
            suffix = ".flac"
        elif audio_bytes[:4] == b'OggS':
            suffix = ".ogg"
        
        with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
            tmp.write(audio_bytes)
            temp_file = tmp.name
        
        # Настройки транскрибации для Faster Whisper
        transcribe_options = {
            "language": language if language != "auto" else None,
            "task": "transcribe",
            "beam_size": 5,
            "best_of": 5,
            "vad_filter": True,  # Фильтр голосовой активности
            "vad_parameters": dict(
                min_silence_duration_ms=500,
                threshold=0.5
            )
        }
        
        # Распознаем через Faster Whisper
        segments, info = model.transcribe(temp_file, **transcribe_options)
        
        # Собираем результат
        text_parts = []
        confidences = []
        
        for segment in segments:
            text_parts.append(segment.text)
            if segment.avg_logprob is not None:
                # Конвертируем logprob в confidence (0-1)
                confidences.append(round(segment.avg_logprob, 3))
        
        text = " ".join(text_parts).strip()
        detected_language = info.language
        duration = time.time() - start_time
        
        # Вычисляем среднюю уверенность
        confidence = None
        if confidences:
            confidence = round(sum(confidences) / len(confidences), 3)
        
        print(f"✅ Faster Whisper распознал: '{text}'")
        print(f"   Язык: {detected_language}, Уверенность: {confidence}, Время: {duration:.2f}с")
        print(f"   Вероятность языка: {info.language_probability:.3f}")
        
        return {
            "text": text,
            "status": "success",
            "engine": "faster_whisper",
            "model": model_size,
            "language": detected_language,
            "confidence": confidence
        }
        
    finally:
        # Удаляем временный файл
        if temp_file and os.path.exists(temp_file):
            try:
                os.unlink(temp_file)
            except:
                pass

Backend должен сопоставить полученную фразу и возможные варианты ответа т. е. нужен микросервис, сопоставляющий фразы, а желательно логику фраз. Как-то читал на habr статью про llm FREEDA. Для русского языка, по-моему, это самый подходящий вариант. Назовём этот микросервис phrases_compare. В phrases_compare для повышения качества сверки, добавлена сверка с неточным поиском посредствам thefuzz в сочетании с Freeda качество сверки фраз начинает быть приемлемым. Есть коэффициенты для настройки прохождения искомых фраз в файле params.

@app.post("/synthesize")
async def synthesize(request: TTSRequest):
    """ Синтез речи из текста через Silero"""
    print(f"🎵 Запрос синтеза: '{request.text[:50]}...' (speaker: {request.speaker})")
    
    if not request.text or not request.text.strip():
        raise HTTPException(status_code=400, detail="Текст не может быть пустым")
    
    model = get_silero_model()
    if model is None:
        raise HTTPException(status_code=503, detail="Модель Silero недоступна")
    
    try:
        speaker = request.speaker
        if not speaker:
            raise HTTPException(status_code=503, detail="Нужно выбрать диктора")
        
        audio_tensor = model.apply_tts(text=request.text,speaker=speaker, sample_rate=request.sample_rate)        
        audio_np = audio_tensor.cpu().numpy()
        audio_np = np.clip(audio_np * 32767, -32768, 32767).astype(np.int16)
        wav_buffer = io.BytesIO()
        
        with wave.open(wav_buffer, 'wb') as wav:
            wav.setnchannels(1)  # Моно
            wav.setsampwidth(2)  # 16-bit
            wav.setframerate(request.sample_rate)
            wav.writeframes(audio_np.tobytes())
        
        wav_bytes = wav_buffer.getvalue()
        
        print(f"✅ Синтез завершен: {len(wav_bytes)} bytes, {len(audio_np)/request.sample_rate:.2f} сек")
        
        return Response(
            content=wav_bytes,
            media_type="audio/wav",
            headers={
                "Content-Disposition": f"inline; filename=silero_{speaker}.wav",
                "X-Speaker": speaker,
                "X-Text-Length": str(len(request.text)),
                "X-Audio-Duration": str(len(audio_np) / request.sample_rate)
            }
        )
        
    except Exception as e:
        print(f"❌ Ошибка синтеза: {e}")
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=f"Ошибка синтеза: {str(e)}")

После сопоставления фраз backend должен отправить либо корректирующую фразу, либо отбивку, что всё ОК — а это тоже фраза. А значит, нам нужна будет ещё таблица в базе для стандартных фраз. Стандартные фразы и минимум фраз для работы приложения загружаются backend, если база пустая. В процессе проработки микросервисов и экспериментов по разделению фраз типа Крючок, Стратегия и Захват. Я добавлял ещё один микросервис между STT и phrases_compare основанный на qwen2.5, его результаты были лучшими по сравнению с ollama моделями gemma llama deepseek с различными числом параметров. В backend я оставил пример экспериментов с qwen2.5 модуль — PhrasSplitClient. Но по итогу лучшие результаты показал микросервис на основе сочетания FREEDA и thefuzz.

@app.post("/search_phrases")
async def analyze_toxicity(request: PhraseOneRequest):
    request = request.model_dump()
    result = {}
    result['amortization'] = thefuzz_freeda_compare('amortization', request['text'], model)
    result['strategy'] = thefuzz_freeda_compare('strategy', request['text'], model)
    result['capture'] = thefuzz_freeda_compare('capture', request['text'], model)
    return result
def thefuzz_freeda_compare(find_type, source_text, model):
    queries = get_concret_query(find_type)
    matches = process.extract(source_text, queries, limit=1, scorer=fuzz.ratio)
    good_matches = [m for m in matches if m[1]>params.MINIMAL_FUZZ_SCORE]
    if not good_matches:
         return None
    good_matches = [m[0] for m in good_matches]
    doc_embedding = model.encode([f"search_document: {source_text}"])[0]
    query_texts = [f"search_query: {q}" for q in good_matches]
    query_embeddings = model.encode(query_texts)
    similarities = np.dot(query_embeddings, doc_embedding) / (np.linalg.norm(query_embeddings, axis=1) * np.linalg.norm(doc_embedding))
    if similarities[0] and similarities [0] * 100 > params.MINIMAL_FREEDA_SCORE:
        # print(f'{find_type} RESULT {good_matches[0]}')
        return good_matches[0]
    return None

В итоге к исходному стеку технологий добавляются Postgre и Redis. В процессе отладки ещё прикрутил отправку логов в kafka из основного backend, и контейнеры-сайдкары для отправки логов.

Если захочется мониторить поды, то пригодятся еще Prometheus и grafana, но при запуске в связке minikube kubectl можно подключить аддоны. Т. к. у нас python, а не Java с миллионом параметров конфигурации и удобным подключением метрик через actuator, то вполне достаточно мониторить RAM и CPU, чтобы выбрать оптимальные размеры подов.

  • minikube addons enable metrics-server

  • minikube addons enable dashboard

Для получения списка адднонов: minikube addons list

Итоговый стек технологий

  1. Python - все микросервисы. Базой будет FastApi uvicorn, для фронта Kivy.

  2. Kubernetes - можно запуститься и без него для экспериментов.

  3. Linux - ну кудаже без него.

  4. Redis - хранение аудио.

  5. Postgre - хранение текстов фраз.

  6. Kafka - для хранения логов.

Для запуска проекта первым делом необходимо задать параметры postgre, redis и адрес хоста — они указываются в файлах params.py каждого микросервиса. Кроме этих параметров все остальные имеют значения по умолчанию. А для запуска Kubernetes передаются в ConfigMap находящемся в папке kuber. Для каждого микросервиса создан отдельный файл с необходимым набором абстракций. Файл rbac.yaml необходим для настройки сбора метрик из подов, если это будет необходимо.

apiVersion: v1
kind: ConfigMap
metadata:
  name: backend-config
  namespace: default
data:
  HOST: "0.0.0.0"
  PORT: "8000"
  PHRASE_COMPARE_HOST: '<адрес linux машины в локальной сети>'
  # PHRASE_COMPARE_HOST: "phrases-compare-service"
  PHRASE_COMPARE_PORT: "8004"
  REDIS_HOST: '<адрес linux машины в локальной сети>'
  REDIS_PORT: "6379"
  TTS_SERVER: '<адрес linux машины в локальной сети>'
  # TTS_SERVER: "tts-app-service"
  TTS_SERVER_PORT: "8001"
  STT_HOST: '<адрес linux машины в локальной сети>'
  # STT_HOST: "stt-app-service"
  STT_PORT: "8002"

Запустить sh файл из папки bash_commands. Предварительно нужно локально скачать модели для STT, TTS, phrase_compare- запустив каждую из них.

sh bash_commands/all_run.sh

Для первого локального запуска для запускаемого микросервиса достаточно установить зависимости.

pip install -r requirements.txt

Я чтобы не загрязнять системный python обычно пользуюсь

  • pyenv pyenv virtualenv 3.11.15 phrase_compare

  • pyenv local phrase_compare

  • pip install -r requirements.txt

И произвести запуск

python main.py или “nohup python main.py &” чтобы не привязываться терминалу

Микросервисы STT и TTS и phrases_compare при первом локальном запуске автоматически скачаются модели, при запуске с использованием k8s, они будут перекопированы в поды.

У каждого микросервиса есть свой dockerfile для сборки контейнера, основная цель которого установить зависимости переложить код и библиотеки для llm.

FROM python:3.13-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8001
CMD ["python", "main.py"]

Запуск Kubernetes и minikube

minikube start --driver=kvm2 --memory=40G --cpus=9 --container-runtime=containerd --disk-size=100g

Пример сборки одного образа в k8s (для сборки каждого образа есть sh файл в папке bash_commands).

  1. docker build ‑t tts‑app:latest <Абсолютный путь>/TTS/.

  2. minikube image load tts‑app:latest

  3. kubectl apply ‑f <Абсолютный путь>/kuber/tts.yaml

Абсолютный путь можно получить командой ‘pwd’, находясь в нужной директории

Микросервисы STT TTS phrases_compare работают на CPU. Хотелось бы поэкспериментировать с NPU от AMD, но пока ollama его не поддерживает. Верисия python для микросревисов 3.12… или 3.11… т. к. с версиями выше не поднимается STT из за особенности библиотек. Все версии зафиксированы в каждом микросервисе в requirements.txt для поднятия в k8s/или локального запуска.

Если нагрузить микросервисы чем-то типа gatling, то скорее всего первым будет отказывать STT, поэтому при необходимости лёгким изменением конфигов k8s можно сделать несколько реплик этого пода.

Репозиторий с исходным кодом в Git

Идеи, которые возможно реализовать, используя базу данного эксперимента:

Как-то на habr читал интересную статью про то, как llm общалась с мошенниками и в максимуме потратила 30 минут на 1 разговор, притворяясь дедом, постоянно путая цифры и рассказывая несвязанные истории, но кода в той статье не было. Что-то подобное можно повторить, если переиспользовать микросервисы STT TTS и ollama.

Считывание текста из аудио-записи в телеграм боте выполнение каких-либо действий на бэкэнде с возвратом аудио, тут удобнее будет через kafka писать в топики и напрямую к ним подключать микросервисы.

Различные переводчики с добавлением микросервиса перевода, или аудио-гид если на front сдалать увязку с GPS и в определённых местах TTS записывать аудио.

Написанию этой статьи послужил душевный порыв, т.к. образованию Python разработчик. В свободное от работы время люблю писать на python, в настоящее время работаю в IT, но должностные обязанности не подразумевают разработки, но душа требует. Спасибо за внимание.