Мужик приходит к переводчику английского:
— Слушай, как правильно перевести фразу “I don’t know”?
— Я не знаю.
— Вот, блин, никто не знает!

Привет, Хабр!

Присаживайтесь поудобней, заварите себе чайку, ибо я пишу немного затянуто и через правое ухо. Итак, Вы готовы? Отлично, тогда приступаем.

ВНИМАНИЕ! Информация, описанная ниже, написана исключительно в исследовательских целях и не предназначена для использования в корыстных целях!

Начну, пожалуй, с предыстории. Несколько лет назад завел себе домашнего питомца, королевского питона. Пуф вырос классным змеем и сих пор эта наглая колбаса является моим верным антидепрессантом. В итоге это привело к тому, что заинтересовался фильмами по змеям. Но, как назло, большинство фильмов идут на английском или испанском языке. Нет, не то, чтобы я не знал английского, будучи разработчиком, да и когда-то заканчивал языковую школу с углубленном изучением ин. языков, однако вечером, с пивасом и креветками смотреть английскую озвучку.... в общем, не каждому это по душе. И тут пришла в голову мысль, "а чтобы нам не использовать ИИ для перевода фильмов", к тому же множество компаний уже предлагают подобные решения. Но мне было ещё интересно изучить этот вопрос и пройти весь путь самим.

Сказано, сделано. Фильм за основу взял отсюда YouTube. И, как выяснится в последствии, это был очень сложный вариант, с кучей действующих лиц, шумами, африканским диалектом. Но "Такой Путь!". Как скачивать видосики с площадок, говорить не буду, лишь намекну - Pytubefix.

Окей, исходник получен. Нужно извлечь из него аудио. Тут нам на помощь приходит FFmpeg.
Для обнаружения прописал в PATH: C:\Users\ИМЯ_ПОЛЬЗОВАТЕЛЯ\AppData\Local\Microsoft\WinGet\Packages\Gyan.FFmpeg_Microsoft.Winget.Source_8wekyb3d8bbwe\ffmpeg-8.0.1-full_build\bin у вас вероятно будет что-то похожее.
Но т.к. я хочу в дальнейшем автоматизировать процесс, составил небольшой Python скрипт.

import subprocess
from pathlib import Path
from tqdm import tqdm

# ---------------- CONFIG ----------------
INPUT_DIR = Path(r"D:\Experiments\Audio\1__video_source")
OUTPUT_DIR = Path(r"D:\Experiments\Audio\2__audio_source")
INPUT_EXT = "mp4" 
RECURSIVE = True

FFMPEG = "ffmpeg"
TARGET_BITRATE_K = 320
OVERWRITE = False
# ---------------------------------------


def run(cmd: list[str]) -> None:
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if p.returncode != 0:
        raise RuntimeError(
            f"Command failed ({p.returncode}): {' '.join(cmd)}\n"
            f"STDOUT:\n{p.stdout}\nSTDERR:\n{p.stderr}"
        )


def main() -> None:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    if TARGET_BITRATE_K > 320:
        print(f"[warn] MP3 max is 320k. Requested {TARGET_BITRATE_K}k -> clamping to 320k.")
        bitrate_k = 320
    elif TARGET_BITRATE_K < 8:
        bitrate_k = 8
    else:
        bitrate_k = TARGET_BITRATE_K

    glob_pat = f"**/*.{INPUT_EXT}" if RECURSIVE else f"*.{INPUT_EXT}"
    inputs = sorted(INPUT_DIR.glob(glob_pat))

    for src in tqdm(inputs, desc="Converting", unit="file"):
        rel = src.relative_to(INPUT_DIR)
        dst = (OUTPUT_DIR / rel).with_suffix(".mp3")
        dst.parent.mkdir(parents=True, exist_ok=True)

        if dst.exists() and not OVERWRITE:
            continue

        cmd = [
            FFMPEG, "-hide_banner",
            "-y" if OVERWRITE else "-n",
            "-i", str(src),
            "-vn",                         # no video
            "-c:a", "libmp3lame",          # MP3 encoder
            "-b:a", f"{bitrate_k}k",       # CBR (max 320k)
            str(dst),
        ]
        run(cmd)

if __name__ == "__main__":
    main()

Тут все просто, думаю разберётесь. Заполняем входную, выходную директорию и расширение нашего видео.

После этого у нас есть шумный источник аудио в mp3. Чтобы перевод был качественный, нам требуется разделить аудио на сторонние звуки/музыку и голос. Я долго искал решение, но лучше UVR не нашел. Возможно, кто-то знает лучше, но пока оставил его. Запускаем, в настройках качаем MDX-MDX23C-InstVoc-HQ. Выставляем MP3, SEGMENT 512, OVERLAP 16, GPU Conversion и "Полетели!". После обработки мы получим разделённые дорожки VOICE и INSTRUMENTAL. Инструментальную дорожку откладываем, она понадобится позже, при сведении звука. А по Voice пройдемся денойзером VR_Architecture-UVR-De-Echo-Normal с теми же настройками, только ставим Vocals Only для скорости.


Отлично, теперь мы имеем файл с чистой речью. Теперь нам нужно транскрибировать речь, чтобы получить тест с таймингами и репликами, которые мы будем переводить. Т.к. обычно у нас многоголосая озвучка, то нам также требуется выделить персон на аудио. Для этого нужно создать Person_НОМЕР директории и положить туда файлики mp3 с репликами персонажа. Для этого я использую WisperX. С ней можно много провозиться. Лично я не выдержал и поставил на WSL2 conda с python 3.12. Далее накатил WisperX и следом поставил PyTorch 2.7 + CUDA 12.8: pip install torch==2.7.0 torchvision==0.22.0 torchaudio==2.7.0 --index-url https://download.pytorch.org/whl/cu128, игнорируя ошибки версий.

Опять же, накидал скрипт:

import os
# CUDNN
os.environ.setdefault("LD_LIBRARY_PATH", f"/lib/x86_64-linux-gnu:{os.environ.get('LD_LIBRARY_PATH','')}")

# HF
os.environ.setdefault("HF_HUB_ETAG_TIMEOUT", "60")
os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "120")

import csv
import logging
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import pandas as pd
from tqdm import tqdm

# ---------------- CONFIG ----------------
INPUT_FILE = r"D:/Experiments/Audio/3_separated/1_1_videoplayback_(Vocals)_(No Echo).mp3"
OUTPUT_DIR = r"D:/Experiments/Audio/4_data"  

# CI/CD will substitute:
HF_TOKEN = r"..."
os.environ.setdefault("HF_TOKEN", HF_TOKEN)

# WhisperX (ASR+alignment)
WHISPER_MODEL = "large-v3"
LANGUAGE = "en"
DEVICE = "cuda"
COMPUTE_TYPE = "bfloat16"

BATCH_SIZE = 32

ALIGN_RETURN_CHAR_ALIGNMENTS = False

# Diarization constraints (optional)
MIN_SPEAKERS = None  # e.g. 2 if sure
MAX_SPEAKERS = None  # e.g. 2 if sure

# Purity & segmentation rules
MAX_WORD_GAP_SEC = 0.75
MIN_UTT_DUR_SEC = 0.25
MIN_WORDS = 1  # <-- твоя правка (реплики типа "What?")

# Safety: treat overlaps as "mixed" and drop those words
OTHER_SPK_OVERLAP_TOL = 0.03
PAD_SEC = 0.06  # padding around utterance; clamped not to overlap others

# Audio cutting
FFMPEG = "ffmpeg"
FFPROBE = "ffprobe"

CUT_SR = 24000
CUT_CH = 1
CUT_MP3_BITRATE = "320k"  
OVERWRITE = False
LOG_LEVEL = logging.INFO

SEG_RE = re.compile(r"^seg_(\d{6})__.*\.mp3$", re.IGNORECASE)

# ---------------- DATA ----------------

@dataclass(frozen=True)
class Utterance:
    speaker: str
    start: float
    end: float
    text: str


# ---------------- PATH HELPERS ----------------

def to_wsl_path(p: str) -> str:
    """Accepts Windows-like path (D:/.. or D:\\..) and normalizes for WSL (/mnt/d/..)."""
    s = p.replace("\\", "/")
    m = re.match(r"^([A-Za-z]):/(.*)$", s)
    if m:
        drive = m.group(1).lower()
        rest = m.group(2)
        return f"/mnt/{drive}/{rest}"
    return s


# ---------------- SHELL ----------------

def run(cmd: List[str]) -> subprocess.CompletedProcess:
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if p.returncode != 0:
        raise RuntimeError(
            f"Command failed ({p.returncode}): {' '.join(cmd)}\n"
            f"STDOUT:\n{p.stdout}\nSTDERR:\n{p.stderr}"
        )
    return p

def fmt_hmsms(sec: float) -> str:
    if sec < 0:
        sec = 0.0
    ms = int(round(sec * 1000.0))
    hh = ms // (3600 * 1000)
    ms %= (3600 * 1000)
    mm = ms // (60 * 1000)
    ms %= (60 * 1000)
    ss = ms // 1000
    mmm = ms % 1000
    return f"{hh:02d}:{mm:02d}:{ss:02d}.{mmm:03d}"

def safe_name_time(a: float, b: float) -> str:
    return f"{a:010.3f}-{b:010.3f}".replace(" ", "")

def cut_mp3(src: Path, dst: Path, start: float, end: float) -> None:
    dst.parent.mkdir(parents=True, exist_ok=True)
    if dst.exists() and not OVERWRITE:
        return
    dur = max(0.0, end - start)
    if dur <= 0.0:
        return
    run([
        FFMPEG, "-hide_banner", "-y" if OVERWRITE else "-n",
        "-ss", f"{start:.3f}",
        "-t", f"{dur:.3f}",
        "-i", str(src),
        "-vn",
        "-ac", str(CUT_CH),
        "-ar", str(CUT_SR),
        "-c:a", "libmp3lame",
        "-b:a", CUT_MP3_BITRATE,
        str(dst),
    ])

def concat_mp3(files: List[Path], out_mp3: Path) -> None:
    if out_mp3.exists() and not OVERWRITE:
        return
    out_mp3.parent.mkdir(parents=True, exist_ok=True)
    files = [f for f in files if f.exists()]
    if not files:
        return

    cmd = [FFMPEG, "-hide_banner", "-y" if OVERWRITE else "-n"]
    for f in files:
        cmd += ["-i", str(f)]
    inputs = "".join([f"[{i}:a]" for i in range(len(files))])
    cmd += [
        "-filter_complex", f"{inputs}concat=n={len(files)}:v=0:a=1[a]",
        "-map", "[a]",
        "-ac", str(CUT_CH),
        "-ar", str(CUT_SR),
        "-c:a", "libmp3lame",
        "-b:a", CUT_MP3_BITRATE,
        str(out_mp3),
    ]
    run(cmd)


# ---------------- DIARIZATION HELPERS ----------------

def diarize_to_df(diarization_obj) -> pd.DataFrame:
    """Normalize diarization output into DataFrame(start,end,speaker)."""
    if isinstance(diarization_obj, pd.DataFrame):
        df = diarization_obj.copy()
        colmap = {c.lower(): c for c in df.columns}
        startc = colmap.get("start")
        endc = colmap.get("end")
        speakerc = colmap.get("speaker")
        if not (startc and endc and speakerc):
            raise RuntimeError(f"Unknown diarization dataframe columns: {list(df.columns)}")
        return df.rename(columns={startc: "start", endc: "end", speakerc: "speaker"})[["start", "end", "speaker"]]

    # pyannote Annotation case
    rows = []
    try:
        for segment, _, label in diarization_obj.itertracks(yield_label=True):
            rows.append({"start": float(segment.start), "end": float(segment.end), "speaker": str(label)})
        return pd.DataFrame(rows, columns=["start", "end", "speaker"])
    except Exception as e:
        raise RuntimeError(f"Cannot normalize diarization output type: {type(diarization_obj)}") from e


def overlap_len(a0: float, a1: float, b0: float, b1: float) -> float:
    x0 = max(a0, b0)
    x1 = min(a1, b1)
    return max(0.0, x1 - x0)

def word_is_safe(word_start: float, word_end: float, spk: str, diar_df: pd.DataFrame) -> bool:
    """Safe if word doesn't significantly overlap any OTHER speaker segment."""
    if word_start is None or word_end is None:
        return False
    w0, w1 = float(word_start), float(word_end)
    if w1 <= w0:
        return False

    other = diar_df[diar_df["speaker"] != spk]
    if other.empty:
        return True

    for _, r in other.iterrows():
        ov = overlap_len(w0, w1, float(r["start"]), float(r["end"]))
        if ov > OTHER_SPK_OVERLAP_TOL:
            return False
    return True


# ---------------- MAIN PIPELINE ----------------

def build_utterances_from_words(result: dict, diar_df: pd.DataFrame) -> List[Utterance]:
    """Build contiguous single-speaker utterances from word-level labeled result."""
    utts: List[Utterance] = []

    words: List[Tuple[float, float, str, str]] = []
    for seg in result.get("segments", []):
        for w in (seg.get("words", []) or []):
            start = w.get("start")
            end = w.get("end")
            spk = w.get("speaker")
            text = (w.get("word", "") or "").strip()
            if spk is None or start is None or end is None or not text:
                continue
            words.append((float(start), float(end), str(spk), text))
    words.sort(key=lambda x: x[0])

    if not words:
        return utts

    cur_spk: Optional[str] = None
    cur_words: List[Tuple[float, float, str]] = []
    cur_start: Optional[float] = None
    cur_end: Optional[float] = None
    prev_end: Optional[float] = None

    def flush():
        nonlocal cur_spk, cur_words, cur_start, cur_end
        if not cur_words or cur_spk is None or cur_start is None or cur_end is None:
            cur_spk, cur_words, cur_start, cur_end = None, [], None, None
            return
        # naive spacing join
        text = ""
        for _, _, t in cur_words:
            if not text:
                text = t
            elif t.startswith(("-", "’", "'")):
                text += t
            else:
                text += " " + t
        if (cur_end - cur_start) >= MIN_UTT_DUR_SEC and len(cur_words) >= MIN_WORDS and text.strip():
            utts.append(Utterance(cur_spk, cur_start, cur_end, text.strip()))
        cur_spk, cur_words, cur_start, cur_end = None, [], None, None

    for (w0, w1, spk, t) in words:
        if not word_is_safe(w0, w1, spk, diar_df):
            flush()
            prev_end = w1
            continue

        if cur_spk is None:
            cur_spk = spk
            cur_start = w0
            cur_end = w1
            cur_words = [(w0, w1, t)]
            prev_end = w1
            continue

        gap = (w0 - (prev_end if prev_end is not None else w0))
        if spk != cur_spk or gap > MAX_WORD_GAP_SEC:
            flush()
            cur_spk = spk
            cur_start = w0
            cur_end = w1
            cur_words = [(w0, w1, t)]
            prev_end = w1
            continue

        cur_words.append((w0, w1, t))
        cur_end = w1
        prev_end = w1

    flush()

    # padding with clamping against other-speaker segments
    padded: List[Utterance] = []
    diar_sorted = diar_df.sort_values("start").reset_index(drop=True)
    for u in utts:
        s0 = max(0.0, u.start - PAD_SEC)
        e0 = u.end + PAD_SEC

        other = diar_sorted[diar_sorted["speaker"] != u.speaker]
        for _, r in other.iterrows():
            os0 = float(r["start"])
            oe0 = float(r["end"])
            if overlap_len(s0, e0, os0, oe0) > 0.0:
                # clamp to nearest edge
                if os0 < u.start:
                    s0 = max(s0, oe0)
                else:
                    e0 = min(e0, os0)

        if e0 > s0 and (e0 - s0) >= MIN_UTT_DUR_SEC:
            padded.append(Utterance(u.speaker, s0, e0, u.text))

    padded.sort(key=lambda x: x.start)
    return padded


def main() -> None:
    logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s | %(levelname)s | %(message)s")

    in_path = Path(to_wsl_path(INPUT_FILE))
    out_root = Path(to_wsl_path(OUTPUT_DIR))
    out_root.mkdir(parents=True, exist_ok=True)

    if not in_path.exists():
        raise FileNotFoundError(f"INPUT_FILE not found: {in_path}")

    stem = in_path.stem
    job_dir = out_root / stem
    job_dir.mkdir(parents=True, exist_ok=True)

    token = (HF_TOKEN or "").strip()
    if not token or "ECRYPTED" in token:
        raise RuntimeError(
            "HF_TOKEN placeholder not substituted. CI/CD must inject real HuggingFace token string."
        )

    import whisperx  # noqa
    from whisperx.diarize import DiarizationPipeline

    logging.info("Loading WhisperX model=%s device=%s compute_type=%s batch=%s",
                 WHISPER_MODEL, DEVICE, COMPUTE_TYPE, BATCH_SIZE)
    model = whisperx.load_model(WHISPER_MODEL, device=DEVICE, compute_type=COMPUTE_TYPE, language=LANGUAGE)

    logging.info("Loading audio: %s", in_path)
    audio = whisperx.load_audio(str(in_path))

    logging.info("Transcribing...")
    result = model.transcribe(audio, batch_size=BATCH_SIZE, language=LANGUAGE)

    logging.info("Loading alignment model...")
    align_model, metadata = whisperx.load_align_model(language_code=LANGUAGE, device=DEVICE)

    logging.info("Aligning...")
    result_aligned = whisperx.align(
        result["segments"],
        align_model,
        metadata,
        audio,
        DEVICE,
        return_char_alignments=ALIGN_RETURN_CHAR_ALIGNMENTS,
    )

    logging.info("Running diarization...")
    diarize_model = DiarizationPipeline(use_auth_token=token, device=DEVICE)
    diar_out = diarize_model(str(in_path), min_speakers=MIN_SPEAKERS, max_speakers=MAX_SPEAKERS)
    diar_df = diarize_to_df(diar_out)

    logging.info("Assigning speakers to words...")
    try:
        result_labeled = whisperx.assign_word_speakers(diar_df, result_aligned)
    except Exception:
        result_labeled = whisperx.assign_word_speakers(diar_out, result_aligned)

    logging.info("Building strictly single-speaker utterances...")
    utterances = build_utterances_from_words(result_labeled, diar_df)
    if not utterances:
        logging.warning("No utterances produced. Check diarization/token/inputs.")
        return

    # speaker label -> Person_XX by first appearance
    spk_order: Dict[str, int] = {}
    for u in utterances:
        if u.speaker not in spk_order:
            spk_order[u.speaker] = len(spk_order) + 1

    person_dirs: Dict[int, Path] = {}
    for spk, idx in spk_order.items():
        d = job_dir / f"Person_{idx:02d}"
        d.mkdir(parents=True, exist_ok=True)
        person_dirs[idx] = d

    csv_path = job_dir / "transcribe.csv"
    rows_out = []
    person_cuts: Dict[int, List[Path]] = {i: [] for i in person_dirs.keys()}

    for i, u in enumerate(tqdm(utterances, desc="Cutting", unit="utt"), 1):
        person_idx = spk_order[u.speaker]
        pdir = person_dirs[person_idx]

        start = max(0.0, u.start)
        end = max(start, u.end)
        tname = safe_name_time(start, end)
        cut_name = f"cut_{tname}.mp3"
        cut_path = pdir / cut_name

        cut_mp3(in_path, cut_path, start, end)
        person_cuts[person_idx].append(cut_path)

        rows_out.append({
            "Row": i,
            "Person": f"Person_{person_idx:02d}",
            "StartSec": f"{start:.3f}",
            "EndSec": f"{end:.3f}",
            "Start": fmt_hmsms(start),
            "End": fmt_hmsms(end),
            "Text": u.text,
            "File": f"{pdir.name}/{cut_name}",
        })

    with csv_path.open("w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["Row", "Person", "StartSec", "EndSec", "Start", "End", "Text", "File"])
        w.writeheader()
        w.writerows(rows_out)

    logging.info("Creating full.mp3 per person...")
    for person_idx, files in person_cuts.items():
        files = [p for p in files if p.exists()]
        if not files:
            continue
        out_full = person_dirs[person_idx] / "full.mp3"
        concat_mp3(files, out_full)

    logging.info("Done. Output: %s", job_dir)


if __name__ == "__main__":
    main()

Код рассчитан на запуск из WSL2. Также вместо HF_TOKEN нужен токен на Hugging Face. Подробнее описа��о в WhisperX. Запускаем и получаем файлик transcribe.csv:

Row,Person,StartSec,EndSec,Start,End,Text,File
1,Person_01,5.168,47.631,00:00:05.168,00:00:47.631,"This is West Africa, a region known as the Sub-Saharan Savannah. This is home to the world's most popular pet snake, and over the next month, I'll be touring this region from Ghana to Togo and Benin to tell not only the story of how the ball python is living out here in the wild, but I'll also tell you the story on its economic impact in one of the world's most impoverished areas, and also how it has long influenced the culture here. I'm Dave Kaufman and these are my reptile adventures and this is the story of the world's most popular pet snake. This is the story of the ball python here in the wilds of Africa.",Person_01/cut_000005.168-000047.631.mp3
2,Person_01,130.769,135.577,00:02:10.769,00:02:15.577,"Our story begins in Ghana, a Commonwealth country situated in West Africa.",Person_01/cut_000130.769-000135.577.mp3

Также появятся директории с репликами персонажей cut_ТАЙМИНГ.mp3 и full.mp3 - склейка реплик, по которой будем дообучать модель.

Окей, теперь у нас есть текст. Надо его перевести. Тут есть много путей. Лично я - человек ленивый, поэтому выбрал более менее шуструю компактную модель в LM Studio (mistral-small-3.2-24b-instruct-2506). Можете взять свою модель или перевести в ручную. В LM я запустил сервер и увеличил число токенов.

Также, т.к. имеется проблема с передачей смысла, мы переводим в 2а захода, как профессиональные переводчики. Сперва строим карту перевода, а потом переводим. Это стабилизирует перевод определенных слов. Конечно, накидал скриптик:

import re
import time
from pathlib import Path
from typing import List, Dict, Optional

import pandas as pd
from tqdm import tqdm
import lmstudio as lms


# ---------------- CONFIG ----------------
INPUT_FILE = Path(r"D:/Experiments/Audio/4_data/1_1_videoplayback_(Vocals)_(No Echo)/transcribe.csv")

# Use the model identifier exactly as LM Studio shows it
MODEL_ID = "mistral-small-3.2-24b-instruct-2506"

# Load-time
CONTEXT_LENGTH = 40960
GPU_RATIO = 1.0

# Pass 1 (guide building)
GUIDE_CHUNK_CHARS = 16000
GUIDE_MAX_TOKENS = 1200
GUIDE_TEMPERATURE = 0.1
GUIDE_TOP_P = 0.9

# Pass 2 (row translation)
ROW_MAX_TOKENS = 512
ROW_TEMPERATURE = 0.15
ROW_TOP_P = 0.9

# Retries / robustness
ROW_MAX_TRIES = 4
SLEEP_BETWEEN_TRIES_SEC = 0.8
SAVE_PARTIAL_EVERY_N_ROWS = 25

# Disable client-side timeout for sync requests
lms.set_sync_api_timeout(None)


# ---------------- PROMPTS ----------------

SYSTEM_TRANSLATE = """Ты профессиональный переводчик-адаптер реплик для дубляжа.
Задача: перевести реплики с английского на русский и сохранить связность/единый стиль по всему файлу.

Критично по дубляжу:
- Русская реплика должна быть максимально близка по длительности к оригиналу:
  примерно сопоставимая длина и число слогов/гласных.
- Пиши естественно и легко, как для озвучки: разговорно-нейтрально, без канцелярита и тяжёлых конструкций.
- Предпочитай короткие фразы, активный залог, естественный порядок слов.
- Разрешено аккуратно перефразировать ради естественности, но без потери смысла/фактов.

Грамматика (строго):
- Следи за родом, числом, падежами, согласованием прилагательных/местоимений.
- Следи за управлением (предлоги/падежи), склоняй имена и термины по-русски.

Термины (строго):
- Любые варианты “ball python / ball pythons / royal python” переводятся ТОЛЬКО как «королевский питон».
  Допускаются только грамматические формы: «королевского питона», «королевским питоном» и т.п.
  Запрещены: «болл-питон», «питон-болл», «питон-шар», «шаровой питон» и т.д.
- "enclosure" -> «террариум»
- "shed" -> «линька»
- "morph" -> «морф»

Антигаллюцинации (строго):
- НЕ вставляй посторонние иностранные слова/наборы букв.
- НЕ допускай повторов и “слов-паразитов” вида «Ivoire ...», «... везде, ... повсюду» и похожих.
- Латиницу оставляй только если она реально есть в исходной реплике как имя/бренд/топоним; иначе переводить.

Вывод:
- Отвечай ТОЛЬКО переводом одной реплики. Без кавычек, без JSON, без пояснений и без markdown.
"""

SYSTEM_GUIDE = """Ты редактор дубляжного перевода и терминолог.
Тебе дают фрагменты полного транскрипта (ТОЛЬКО английские реплики из колонки Text).
Собери и поддерживай краткий Translation Guide (10–25 пунктов), чтобы перевод был единообразным.

Что включить:
- тон/стиль озвучки (разговорно-нейтральный), как делать фразы по-русски естественно;
- устойчивые переводы повторяющихся фраз/частых формулировок;
- обращения/местоимения (если видно);
- словарь терминов.

Строгие правила:
- “ball python / ball pythons / royal python” -> только «королевский питон» (со склонением).
  Запретить: «болл-питон», «питон-болл», «питон-шар», «шаровой питон» и т.п.
- Запретить любые посторонние латинские вставки и повторы вида «Ivoire ...».

Формат:
- Верни только обновлённый гайд (короткий список пунктов), без рассуждений и без заголовков.
"""


# ---------------- TEXT / VALIDATION ----------------

_RE_CODEBLOCK = re.compile(r"^\s*```.*?$|```\s*$", re.MULTILINE)
_RE_QUOTES = re.compile(r'^\s*[«"]|[»"]\s*$')

# Forbidden "ball python" Russian variants — we will forcibly normalize to "королевский питон"
# (Yes, it’s not perfect for падежи, но это лучше, чем “болл/шар”.)
_RE_FORBIDDEN_BALL = re.compile(
    r"(?iu)\b(болл[\s\-]?питон|питон[\s\-]?болл|питон[\s\-]?шар|шаровой\s+питон|шар\s*питон|питон\s*шар)\b"
)

# Catch notorious weirdness
_RE_IVOIRE = re.compile(r"(?iu)\bivoire\b")

# Detect unexpected latin spam (very rough): long-ish ascii words
_RE_LATIN_WORD = re.compile(r"\b[A-Za-z]{4,}\b")


def split_by_newlines(text: str, max_chars: int) -> List[str]:
    lines = text.splitlines()
    chunks: List[str] = []
    cur: List[str] = []
    cur_len = 0
    for ln in lines:
        add_len = len(ln) + 1
        if cur and cur_len + add_len > max_chars:
            chunks.append("\n".join(cur))
            cur = [ln]
            cur_len = len(ln)
        else:
            cur.append(ln)
            cur_len += add_len
    if cur:
        chunks.append("\n".join(cur))
    return chunks


def result_to_text(result) -> str:
    # In LM Studio Python docs, printing result prints content; also result may have .content.
    s = getattr(result, "content", None)
    if isinstance(s, str) and s.strip():
        return s
    return str(result)


def clean_model_output(s: str) -> str:
    # strip code fences, trim
    s = s.strip()
    s = _RE_CODEBLOCK.sub("", s).strip()

    # keep only first non-empty line if model emitted multiple paragraphs
    lines = [ln.strip() for ln in s.splitlines() if ln.strip()]
    if not lines:
        return ""
    s = lines[0]

    # strip surrounding quotes
    s = s.strip()
    s = s.strip("`")
    s = _RE_QUOTES.sub("", s).strip()
    return s


def enforce_terms_ru(s: str) -> str:
    # normalize forbidden russian variants to королевский питон
    s2 = _RE_FORBIDDEN_BALL.sub("королевский питон", s)
    return s2


def looks_bad_translation(src_en: str, ru: str) -> bool:
    if not ru:
        return True

    # obvious nonsense / repeated latin keyword
    if _RE_IVOIRE.search(ru):
        return True

    # latin words that are not likely needed (if they weren't in src)
    # allow if that word appears in src (e.g., brand place)
    latin = _RE_LATIN_WORD.findall(ru)
    if latin:
        src_lower = src_en.lower()
        for w in latin:
            if w.lower() not in src_lower:
                # Not in source: likely hallucinated latin insertion
                return True

    return False


# ---------------- LLM CALLS ----------------

def load_model():
    return lms.llm(MODEL_ID, config={
        "contextLength": CONTEXT_LENGTH,
        "gpu": {"ratio": GPU_RATIO},
    })


def build_translation_guide(model, full_text_only: str) -> str:
    chunks = split_by_newlines(full_text_only, GUIDE_CHUNK_CHARS)
    guide = ""

    for idx, chunk in enumerate(tqdm(chunks, desc="Building guide (feed full Text)", unit="chunk"), 1):
        chat = lms.Chat(SYSTEM_GUIDE)
        user = (
            f"Текущий гайд (может быть пустым):\n{guide}\n\n"
            f"Новый фрагмент транскрипта #{idx}/{len(chunks)}:\n"
            f"{chunk}\n\n"
            "Обнови гайд: 10–25 коротких пунктов, строго по делу. Верни только гайд."
        )
        chat.add_user_message(user)

        result = model.respond(chat, config={
            "temperature": GUIDE_TEMPERATURE,
            "topP": GUIDE_TOP_P,
            "maxTokens": GUIDE_MAX_TOKENS,
        })
        out = clean_model_output(result_to_text(result))

        # allow multi-line guide: use original cleaned (first-line policy would kill it),
        # so for guide we do a different cleaning: strip codefences, keep all lines.
        out_full = result_to_text(result).strip()
        out_full = _RE_CODEBLOCK.sub("", out_full).strip()
        out_full = out_full.strip("`").strip()

        if out_full:
            guide = out_full

    return guide.strip()


def translate_one_row(model, guide: str, row_id: int, src_en: str) -> str:
    """
    Returns RU translation (single line), with retries + post-fixes.
    """
    last = ""

    for attempt in range(1, ROW_MAX_TRIES + 1):
        chat = lms.Chat(SYSTEM_TRANSLATE)

        # Make the guide short-ish in prompt; don't paste huge wall
        user = (
            "Контекстный гайд для единообразия (используй, но не цитируй):\n"
            f"{guide}\n\n"
            "Переведи ОДНУ реплику EN->RU.\n"
            "Требования:\n"
            "- Только перевод, одной строкой.\n"
            "- Без кавычек, без JSON, без комментариев, без markdown.\n"
            "- Естественная речь для озвучки.\n"
            "- Род/падеж/склонения — внимательно.\n"
            "- “ball python/royal python” -> строго «королевский питон» (без болл/шар).\n"
            "- Никаких посторонних латинских вставок/повторов (типа Ivoire).\n\n"
            f"Row={row_id}\nText={src_en}"
        )
        chat.add_user_message(user)

        result = model.respond(chat, config={
            "temperature": ROW_TEMPERATURE if attempt == 1 else 0.05,  # become stricter on retries
            "topP": ROW_TOP_P,
            "maxTokens": ROW_MAX_TOKENS,
        })

        ru = clean_model_output(result_to_text(result))
        ru = enforce_terms_ru(ru)

        if not ru:
            last = ru
            time.sleep(SLEEP_BETWEEN_TRIES_SEC)
            continue

        # If bad: retry with even stricter instruction
        if looks_bad_translation(src_en, ru):
            last = ru
            time.sleep(SLEEP_BETWEEN_TRIES_SEC)
            continue

        return ru

    # final fallback: whatever we got last, normalized
    return enforce_terms_ru(clean_model_output(last)) or ""


def translate_rows(model, df: pd.DataFrame, guide: str, partial_path: Path) -> Dict[int, str]:
    row_to_ru: Dict[int, str] = {}

    # If partial exists, resume
    if partial_path.exists():
        try:
            p = pd.read_csv(partial_path, dtype=str, keep_default_na=False)
            if "Row" in p.columns and "Text" in p.columns:
                for _, r in p.iterrows():
                    try:
                        rid = int(str(r["Row"]).strip())
                    except Exception:
                        continue
                    txt = str(r["Text"]).strip()
                    if txt:
                        row_to_ru[rid] = txt
        except Exception:
            pass

    out_df = df.copy()
    done = 0

    for _, r in tqdm(df.iterrows(), total=len(df), desc="Translating rows", unit="row"):
        row_id = int(str(r["Row"]).strip())
        src = str(r["Text"])

        if row_id in row_to_ru and row_to_ru[row_id].strip():
            done += 1
            continue

        ru = translate_one_row(model, guide, row_id, src)
        if not ru:
            ru = src  # final fallback (better than blank)

        row_to_ru[row_id] = ru
        done += 1

        # periodic partial save
        if done % SAVE_PARTIAL_EVERY_N_ROWS == 0:
            out_df["Text"] = [row_to_ru.get(int(str(x).strip()), str(df.loc[df["Row"] == x, "Text"].values[0]))
                              for x in out_df["Row"].tolist()]
            out_df.to_csv(partial_path, index=False, encoding="utf-8-sig")

    # final save of partial
    out_df["Text"] = [row_to_ru.get(int(str(x).strip()), "") for x in out_df["Row"].tolist()]
    out_df.to_csv(partial_path, index=False, encoding="utf-8-sig")

    return row_to_ru


def main() -> None:
    if not INPUT_FILE.exists():
        raise FileNotFoundError(f"INPUT_FILE not found: {INPUT_FILE}")

    df = pd.read_csv(INPUT_FILE, dtype=str, keep_default_na=False)
    if "Row" not in df.columns or "Text" not in df.columns:
        raise RuntimeError("CSV must contain columns: Row, Text")

    # 1) FULL TEXT feed (only concatenated Text)
    full_text_only = "\n".join(df["Text"].astype(str).tolist())

    model = load_model()

    guide = build_translation_guide(model, full_text_only)
    guide_path = INPUT_FILE.with_name("translation_guide.txt")
    guide_path.write_text(guide, encoding="utf-8")

    partial_path = INPUT_FILE.with_name("transcribe_RUS.partial.csv")

    row_to_ru = translate_rows(model, df, guide, partial_path)

    out_df = df.copy()
    out_df["Text"] = [row_to_ru.get(int(str(x).strip()), "") for x in out_df["Row"].tolist()]

    out_path = INPUT_FILE.with_name("transcribe_RUS.csv")
    out_df.to_csv(out_path, index=False, encoding="utf-8-sig")

    print(f"OK: saved -> {out_path}")
    print(f"Guide saved -> {guide_path}")
    print(f"Partial snapshot -> {partial_path}")


if __name__ == "__main__":
    main()

Вот, так обработка получается значительно стабильней и можно не так сильно упираться в VRAM.

Итак, теперь у нас есть cut_*.mp3 с эмоциями и тембром, full.mp3 (если не хватит cut), translate_RUS. Можем приступать к озвучке.

Как я смог найти, есть 3 модели, которые нормально умеют с иностранного в русский: Misha24-10/F5-TTS_RUSSIAN, tensorbanana/xttsv2_banana, Chatterbox-Multilingual-TTS.
Сравнение первых двух можно посмотреть здесь: Дообучение модели F5-TTS на русский язык. Однако у всех была проблема с непонятными словами, когда они внезапно переставали говорить по-русски и несли тарабарщину. Да и нормально поджать акцент тоже не получилось. В итоге, остался на Chatterbox-Multilingual-TTS.

Она также вредничала на версию Python и прочее, поэтому установил в WSL2 с conda. Но в этот раз брал версию Python3.11. Иначе потом проблемы с установкой будут. Часть проблем поборол патчами, но тем не менее. Также имелась проблема с watermark, поэтому добавил параметры, чтобы отключить watermark для каждой части и добавить для целикового аудио. Но была ещё проблема. Ударения. Если вы послушаете иностранца, то ухо будет резать ударение, и здесь также. Для решения проблем добавил использование RUAccent.

import os
import re
import json
import shutil
import subprocess
import logging
from pathlib import Path
from typing import Any, List, Dict, Optional, Callable, Tuple

import pandas as pd
import soundfile as sf
from tqdm import tqdm
import torch

# ===================== CONFIG =====================
INPUT_DIR = r"D:/Experiments/Audio/4_data/1_1_videoplayback_(Vocals)_(No Echo)"
CSV_RU = "transcribe_RUS.csv"

OVERWRITE = True
OUT_MP3_BITRATE = "320k"

# Put ALL caches/tmp here (NO trash in INPUT_DIR)
CACHE_ROOT = r"/mnt/d/Experiments/TTS/_cache_chatterbox"
HF_DIRNAME = "hf"
TMP_DIRNAME = "tmp"
RUACCENT_DIRNAME = "ruaccent"

# NEW: two-phase chunk folders (under CACHE_ROOT)
CHUNKS_RAW_DIRNAME = "chunks_raw"
CHUNKS_DENOISED_DIRNAME = "chunks_denoised"
MANIFEST_NAME = "chunks_manifest.json"

FFMPEG = "ffmpeg"
FFPROBE = "ffprobe"

# Reference audio rules
REF_SR = 24000
MIN_REF_SEC = 10.0
REF_MAX_SEC = 12.0

# ======= SETTINGS LIKE THE SPACE UI =======
EXAGGERATION = 0.50
TEMPERATURE = 0.80
CFG_WEIGHT = 0.05
SEED = 12345
# ==========================================

# Chunking: keep under 300 chars (Space truncates)
MAX_CHARS_PER_CHUNK = 280
MAX_WORDS_PER_CHUNK = 60
HARD_TEXT_LIMIT = 300

# Audio post
PEAK = 0.95
TAIL_PAD_SEC = 0.12
CROSSFADE_MS = 14
CHUNK_EDGE_FADE_MS = 4

# Phase B: trim silence at BOTH ends (after UVR)
TRIM_BOTH_EDGES = True
EDGE_SIL_THRESH = 0.010
EDGE_HOLD_SEC = 0.06         
EDGE_KEEP_PAD_SEC = 0.03

# Watermark handling
DISABLE_WATERMARK_DURING_CHUNKS = True
APPLY_WATERMARK_ON_FINAL = False

# RUAccent
USE_RUACCENT = True
RUACCENT_MODEL = "turbo3.1"
RUACCENT_USE_DICTIONARY = True
RUACCENT_TINY_MODE_FALLBACK = True

# Print stressed text
LOG_STRESSED_TEXT = True
PRINT_STRESSED_TEXT = True

CUSTOM_WORD_OVERRIDES: Dict[str, str] = {
    "регион": "регио+н",
    "змея": "змея+",
    "питон": "пито+н",
    "Квази": "Куа+зи",
    "Сахельская": "Сахе+льская",
}
# ==================================================

# ---- Force eager attention for transformers (voice ref path uses alignment analyzer) ----
os.environ.setdefault("TRANSFORMERS_ATTN_IMPLEMENTATION", "eager")

# Redirect caches
CACHE_ROOT_PATH = Path(CACHE_ROOT)
HF_CACHE = CACHE_ROOT_PATH / HF_DIRNAME
TMP_DIR = CACHE_ROOT_PATH / TMP_DIRNAME
RUACCENT_DIR = CACHE_ROOT_PATH / RUACCENT_DIRNAME
RAW_CHUNKS_DIR = CACHE_ROOT_PATH / CHUNKS_RAW_DIRNAME
DENOISED_CHUNKS_DIR = CACHE_ROOT_PATH / CHUNKS_DENOISED_DIRNAME
MANIFEST_PATH = CACHE_ROOT_PATH / MANIFEST_NAME

for p in (HF_CACHE, TMP_DIR, RUACCENT_DIR, RAW_CHUNKS_DIR, DENOISED_CHUNKS_DIR):
    p.mkdir(parents=True, exist_ok=True)

os.environ.setdefault("HF_HOME", str(HF_CACHE))
os.environ.setdefault("HUGGINGFACE_HUB_CACHE", str(HF_CACHE))
os.environ.setdefault("TORCH_HOME", str(HF_CACHE))
os.environ.setdefault("XDG_CACHE_HOME", str(HF_CACHE))

def force_transformers_eager_attention() -> None:
    """Force attn_implementation='eager' in transformers from_pretrained()."""
    try:
        from transformers.modeling_utils import PreTrainedModel
        orig_ptm = PreTrainedModel.from_pretrained

        @classmethod
        def patched_ptm_from_pretrained(cls, *args, **kwargs):
            kwargs.setdefault("attn_implementation", "eager")
            fn = orig_ptm.__func__ if hasattr(orig_ptm, "__func__") else orig_ptm
            return fn(cls, *args, **kwargs)

        PreTrainedModel.from_pretrained = patched_ptm_from_pretrained  # type: ignore[assignment]

        try:
            from transformers.models.auto.auto_factory import _BaseAutoModelClass  # type: ignore
            orig_auto = _BaseAutoModelClass.from_pretrained

            @classmethod
            def patched_auto_from_pretrained(cls, pretrained_model_name_or_path, *args, **kwargs):
                kwargs.setdefault("attn_implementation", "eager")
                fn = orig_auto.__func__ if hasattr(orig_auto, "__func__") else orig_auto
                return fn(cls, pretrained_model_name_or_path, *args, **kwargs)

            _BaseAutoModelClass.from_pretrained = patched_auto_from_pretrained  # type: ignore[assignment]
        except Exception:
            pass

        logging.info("Transformers patched: forcing attn_implementation='eager' in from_pretrained()")
    except Exception as e:
        logging.warning("Could not patch transformers eager-attn (continuing): %s", e)


force_transformers_eager_attention()

from chatterbox.mtl_tts import ChatterboxMultilingualTTS


# ===================== INTERNAL RU STRESS DISABLE (Chatterbox) =====================
def disable_chatterbox_internal_ru_stress() -> None:
    patched = 0
    try:
        import chatterbox.models.tokenizers.tokenizer as tokmod

        if hasattr(tokmod, "_russian_stresser"):
            try:
                tokmod._russian_stresser = None
                patched += 1
                logging.info("Disabled internal RU stress: tokenizer._russian_stresser -> None")
            except Exception as e:
                logging.warning("Failed patch _russian_stresser: %s", e)

        if hasattr(tokmod, "add_russian_stress"):
            try:
                tokmod.add_russian_stress = lambda text: text  # type: ignore[assignment]
                patched += 1
                logging.info("Disabled internal RU stress: tokenizer.add_russian_stress -> identity")
            except Exception as e:
                logging.warning("Failed patch add_russian_stress: %s", e)

    except Exception as e:
        logging.warning("Could not import tokenizer to disable stress: %s", e)

    if patched == 0:
        logging.warning("Internal RU stress patch: nothing patched (version mismatch). Continue anyway.")


# ===================== WATERMARK PATCH =====================
def patch_disable_watermark(model: Any) -> Optional[Callable[[Any, int], Any]]:
    wm = getattr(model, "watermarker", None)
    if wm is None:
        logging.warning("Watermarker not found on model (nothing to patch).")
        return None
    orig = getattr(wm, "apply_watermark", None)
    if not callable(orig):
        logging.warning("model.watermarker.apply_watermark not callable (nothing to patch).")
        return None

    def _identity(wav, sample_rate: int):  # noqa: ANN001
        return wav

    try:
        wm.apply_watermark = _identity  # type: ignore[assignment]
        logging.info("Patched watermark: apply_watermark -> identity (disabled during chunk generation).")
        return orig
    except Exception as e:
        logging.warning("Failed to patch watermark: %s", e)
        return None


# ===================== HELPERS =====================
def to_wsl_path(p: str) -> str:
    s = p.replace("\\", "/")
    m = re.match(r"^([A-Za-z]):/(.*)$", s)
    if m:
        return f"/mnt/{m.group(1).lower()}/{m.group(2)}"
    return s


def ensure_tool(name: str) -> None:
    if shutil.which(name) is None:
        raise RuntimeError(f"Required tool not found in PATH: {name}")


def run(cmd: list[str]) -> subprocess.CompletedProcess:
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if p.returncode != 0:
        raise RuntimeError(f"Command failed ({p.returncode}): {' '.join(cmd)}\nSTDERR:\n{p.stderr}")
    return p


def get_duration_sec(media: Path) -> float:
    p = run([
        FFPROBE, "-hide_banner", "-v", "error",
        "-show_entries", "format=duration",
        "-of", "default=nw=1:nk=1",
        str(media),
    ])
    try:
        return float(p.stdout.strip())
    except Exception:
        return 0.0


def prep_text_ru(s: str) -> str:
    s = re.sub(r"\s+", " ", (s or "").replace("\r", " ").replace("\n", " ")).strip()
    if not s:
        return s
    if s[-1] not in ".!?…":
        s += "."
    return s


VOWELS_RU = "АЕЁИОУЫЭЮЯаеёиоуыэюя"
COMBINING_ACUTE = "\u0301"
VISIBLE_ALT = "ˊ"


def normalize_stress_marks(text: str) -> str:
    if not text:
        return text
    text = re.sub(rf"{re.escape(VISIBLE_ALT)}([{VOWELS_RU}])", rf"\1{COMBINING_ACUTE}", text)
    text = re.sub(rf"([{VOWELS_RU}]){re.escape(VISIBLE_ALT)}", rf"\1{COMBINING_ACUTE}", text)
    text = re.sub(rf"\+([{VOWELS_RU}])", rf"\1{COMBINING_ACUTE}", text)
    text = re.sub(rf"([{VOWELS_RU}])\+", rf"\1{COMBINING_ACUTE}", text)
    text = text.replace("+", "").replace(VISIBLE_ALT, "")
    text = re.sub(rf"{COMBINING_ACUTE}+", COMBINING_ACUTE, text)
    return text


def sanitize_audio(wav_t: torch.Tensor, peak: float = 0.98) -> torch.Tensor:
    wav_t = wav_t.detach().cpu().to(torch.float32)

    if wav_t.ndim == 2:
        if wav_t.shape[0] == 1:
            wav_t = wav_t[0]
        elif wav_t.shape[1] == 1:
            wav_t = wav_t[:, 0]
        else:
            wav_t = wav_t.mean(dim=0)

    wav_t = torch.nan_to_num(wav_t, nan=0.0, posinf=0.0, neginf=0.0)

    m = float(wav_t.abs().max().item()) if wav_t.numel() else 0.0
    if m > 0.0 and m > peak:
        wav_t = wav_t * (peak / m)
    return wav_t.clamp(-1.0, 1.0)


def fade_edges(wav: torch.Tensor, sr: int, fade_ms: float) -> torch.Tensor:
    if wav.numel() == 0:
        return wav
    n = int(round((fade_ms / 1000.0) * sr))
    if n <= 0:
        return wav
    if wav.numel() < n * 2:
        return wav
    ramp = torch.linspace(0.0, 1.0, steps=n, dtype=wav.dtype)
    wav = wav.clone()
    wav[:n] = wav[:n] * ramp
    wav[-n:] = wav[-n:] * ramp.flip(0)
    return wav


def pad_tail(wav: torch.Tensor, sr: int, tail_sec: float) -> torch.Tensor:
    n = int(round(max(0.0, tail_sec) * sr))
    if n <= 0:
        return wav
    return torch.cat([wav, torch.zeros(n, dtype=wav.dtype)], dim=0)


def crossfade_concat(chunks: List[torch.Tensor], sr: int, fade_ms: float) -> torch.Tensor:
    if not chunks:
        return torch.zeros(0, dtype=torch.float32)
    if len(chunks) == 1:
        return chunks[0]
    fade = int(round((fade_ms / 1000.0) * sr))
    fade = max(1, fade)

    out = chunks[0]
    for nxt in chunks[1:]:
        if out.numel() < fade or nxt.numel() < fade:
            out = torch.cat([out, nxt], dim=0)
            continue
        a = out[-fade:]
        b = nxt[:fade]
        ramp = torch.linspace(0.0, 1.0, steps=fade, dtype=out.dtype)
        mix = a * (1.0 - ramp) + b * ramp
        out = torch.cat([out[:-fade], mix, nxt[fade:]], dim=0)
    return out


def audio_to_ref_wav(src_audio: Path, out_wav: Path, sr: int, clip_sec: float) -> Path:
    out_wav.parent.mkdir(parents=True, exist_ok=True)
    cmd = [
        FFMPEG, "-hide_banner", "-y",
        "-i", str(src_audio),
        "-vn",
        "-ac", "1",
        "-ar", str(sr),
    ]
    if clip_sec and clip_sec > 0:
        cmd += ["-t", f"{clip_sec:.3f}"]
    cmd += ["-c:a", "pcm_s16le", str(out_wav)]
    run(cmd)
    return out_wav


def write_wav_tensor_pcm16(wav_t: torch.Tensor, sr: int, dst_wav: Path) -> None:
    dst_wav.parent.mkdir(parents=True, exist_ok=True)
    wav_t = sanitize_audio(wav_t, peak=PEAK)
    sf.write(str(dst_wav), wav_t.numpy(), sr, subtype="PCM_16")


def write_mp3_from_wav_tensor(wav_t: torch.Tensor, sr: int, dst_mp3: Path, tmp_wav: Path) -> None:
    dst_mp3.parent.mkdir(parents=True, exist_ok=True)
    tmp_wav.parent.mkdir(parents=True, exist_ok=True)

    wav_t = sanitize_audio(wav_t, peak=PEAK)
    sf.write(str(tmp_wav), wav_t.numpy(), sr, subtype="PCM_16")

    run([
        FFMPEG, "-hide_banner", "-y" if OVERWRITE else "-n",
        "-i", str(tmp_wav),
        "-vn",
        "-ac", "1",
        "-ar", str(sr),
        "-c:a", "libmp3lame",
        "-b:a", OUT_MP3_BITRATE,
        str(dst_mp3),
    ])


def call_generate_space_style(model: Any, text: str, language_id: str, audio_prompt_path: Optional[str]) -> torch.Tensor:
    text = (text or "")[:HARD_TEXT_LIMIT]
    kwargs = {
        "exaggeration": float(EXAGGERATION),
        "temperature": float(TEMPERATURE),
        "cfg_weight": float(CFG_WEIGHT),
    }
    if audio_prompt_path:
        kwargs["audio_prompt_path"] = audio_prompt_path
    return model.generate(text, language_id=language_id, **kwargs)


def split_ru_into_chunks(text: str, max_chars: int, max_words: int) -> List[str]:
    t = prep_text_ru(text)
    if not t:
        return []

    parts = re.split(r"([.!?…]+)\s*", t)
    sents: List[str] = []
    for i in range(0, len(parts), 2):
        s = (parts[i] or "").strip()
        p = (parts[i + 1] or "").strip() if i + 1 < len(parts) else ""
        if s:
            sents.append((s + p).strip())
    if not sents:
        sents = [t]

    chunks: List[str] = []
    cur = ""
    cur_words = 0

    def flush():
        nonlocal cur, cur_words
        if cur.strip():
            chunks.append(cur.strip())
        cur = ""
        cur_words = 0

    for s in sents:
        w = len(s.split())
        if (len(cur) + 1 + len(s) <= max_chars) and (cur_words + w <= max_words):
            cur = (cur + " " + s).strip()
            cur_words += w
        else:
            flush()
            if len(s) > max_chars or w > max_words:
                words = s.split()
                buf: List[str] = []
                for word in words:
                    buf.append(word)
                    if len(" ".join(buf)) >= max_chars or len(buf) >= max_words:
                        chunks.append(" ".join(buf).strip())
                        buf = []
                if buf:
                    chunks.append(" ".join(buf).strip())
            else:
                cur = s
                cur_words = w
    flush()

    out: List[str] = []
    for c in chunks:
        c = c.strip()
        if c and c[-1] not in ".!?…":
            c += "."
        out.append(c)
    return out


# ===================== MANUAL OVERRIDES =====================
def _case_like(src: str, repl: str) -> str:
    if not src:
        return repl
    if src.isupper():
        return repl.upper()
    if src[0].isupper():
        return repl[0].upper() + repl[1:]
    return repl


def apply_manual_overrides(text: str, overrides: Dict[str, str]) -> str:
    if not text or not overrides:
        return text

    norm_overrides: Dict[str, str] = {}
    for k, v in overrides.items():
        k = (k or "").strip()
        v = (v or "").strip()
        if not k or not v:
            continue
        norm_overrides[k] = normalize_stress_marks(v)

    keys = sorted(norm_overrides.keys(), key=len, reverse=True)
    out = text
    for k in keys:
        v = norm_overrides[k]
        pattern = re.compile(
            rf"(?<![A-Za-zА-Яа-яЁё0-9\-]){re.escape(k)}(?![A-Za-zА-Яа-яЁё0-9\-])",
            re.IGNORECASE,
        )

        def _sub(m: re.Match) -> str:
            return _case_like(m.group(0), v)

        out = pattern.sub(_sub, out)
    return out


# ===================== RUACCENT =====================
class RUAccentHelper:
    def __init__(self) -> None:
        self.impl = None
        self.ready = False

    def load(self) -> None:
        if not USE_RUACCENT:
            return
        try:
            from ruaccent import RUAccent
            self.impl = RUAccent()

            device = "CPU"
            try:
                import onnxruntime as ort
                if "CUDAExecutionProvider" in ort.get_available_providers():
                    device = "CUDA"
            except Exception:
                pass

            try:
                self.impl.load(
                    omograph_model_size=RUACCENT_MODEL,
                    use_dictionary=RUACCENT_USE_DICTIONARY,
                    tiny_mode=False,
                    device=device,
                    workdir=str(RUACCENT_DIR),
                )
                self.ready = True
                logging.info("RUAccent loaded (device=%s, model=%s, dict=%s)", device, RUACCENT_MODEL, RUACCENT_USE_DICTIONARY)
                return
            except Exception as e:
                logging.warning("RUAccent load (dict mode) failed: %s", e)

            if RUACCENT_TINY_MODE_FALLBACK:
                self.impl.load(
                    omograph_model_size=RUACCENT_MODEL,
                    use_dictionary=False,
                    tiny_mode=True,
                    device=device,
                    workdir=str(RUACCENT_DIR),
                )
                self.ready = True
                logging.info("RUAccent loaded in tiny_mode (device=%s, model=%s)", device, RUACCENT_MODEL)
                return

        except Exception as e:
            logging.warning("RUAccent unavailable; continuing without stress: %s", e)

        self.impl = None
        self.ready = False

    def apply(self, text_ru: str) -> str:
        if not self.ready or self.impl is None:
            return text_ru
        try:
            t = self.impl.process_all(text_ru)
            return normalize_stress_marks(t)
        except Exception as e:
            logging.warning("RUAccent failed; fallback no-stress: %s", e)
            return text_ru


# ===================== PHASE B: load denoised chunks + trim both edges =====================
def _to_mono_1d(x: torch.Tensor) -> torch.Tensor:
    if x.ndim == 2:
        # soundfile returns (frames, channels)
        if x.shape[1] == 1:
            x = x[:, 0]
        else:
            x = x.mean(dim=1)
    return x


def load_wav_any(path: Path) -> Tuple[torch.Tensor, int]:
    data, sr = sf.read(str(path), always_2d=False, dtype="float32")
    t = torch.from_numpy(data).to(torch.float32)
    t = _to_mono_1d(t)
    return t, int(sr)


def ffmpeg_resample_to(path_in: Path, path_out: Path, sr: int) -> None:
    path_out.parent.mkdir(parents=True, exist_ok=True)
    run([
        FFMPEG, "-hide_banner", "-y",
        "-i", str(path_in),
        "-vn",
        "-ac", "1",
        "-ar", str(sr),
        "-c:a", "pcm_s16le",
        str(path_out),
    ])


def load_wav_and_force_sr(path: Path, target_sr: int, work_tmp: Path) -> torch.Tensor:
    wav, sr = load_wav_any(path)
    if sr == target_sr:
        return sanitize_audio(wav, peak=PEAK)

    tmp = work_tmp / f"rs_{path.stem}.wav"
    ffmpeg_resample_to(path, tmp, target_sr)
    wav2, sr2 = load_wav_any(tmp)
    try:
        tmp.unlink(missing_ok=True)
    except Exception:
        pass
    if sr2 != target_sr:
        logging.warning("Resample mismatch: got %s (expected %s) for %s", sr2, target_sr, path.name)
    return sanitize_audio(wav2, peak=PEAK)


def trim_silence_both_ends(wav: torch.Tensor, sr: int, thr: float, hold_sec: float, keep_pad_sec: float) -> torch.Tensor:
    """
    Trim silence from both beginning and end using peak threshold.
    We scan using a small window to be robust to tiny spikes.
    """
    if wav.numel() == 0:
        return wav

    x = wav.abs()
    hold = max(1, int(round(hold_sec * sr)))
    pad = max(0, int(round(keep_pad_sec * sr)))

    # start
    start = 0
    i = 0
    while i + hold < x.numel() and float(x[i:i+hold].max()) < thr:
        i += hold
    start = max(0, i - pad)

    # end
    end = x.numel()
    j = x.numel()
    while j - hold > 0 and float(x[j-hold:j].max()) < thr:
        j -= hold
    end = min(x.numel(), j + pad)

    if end <= start:
        return wav  # don't destroy audio if threshold too aggressive
    return wav[start:end]


def find_best_match_by_token(root: Path, token: str) -> Optional[Path]:
    """
    Match denoised files by token anywhere in filename: *{token}*.wav (recursive).
    Pick newest by mtime.
    """
    cand = list(root.rglob(f"*{token}*.wav"))
    if not cand:
        return None
    cand.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    return cand[0]


# ===================== MAIN =====================
def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
    ensure_tool(FFMPEG)
    ensure_tool(FFPROBE)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    in_dir = Path(to_wsl_path(INPUT_DIR))
    csv_ru = in_dir / CSV_RU
    if not csv_ru.exists():
        raise FileNotFoundError(f"Missing {CSV_RU}: {csv_ru}")

    df = pd.read_csv(csv_ru, dtype=str, keep_default_na=False)
    need_cols = {"Text", "File"}
    if not need_cols.issubset(set(df.columns)):
        raise RuntimeError(f"{CSV_RU} must contain columns {sorted(need_cols)}; got: {list(df.columns)}")

    logging.info("Using CACHE_ROOT=%s", CACHE_ROOT_PATH)
    logging.info("RAW_CHUNKS_DIR=%s", RAW_CHUNKS_DIR)
    logging.info("DENOISED_CHUNKS_DIR=%s", DENOISED_CHUNKS_DIR)
    logging.info("Loading Chatterbox Multilingual (device=%s)...", device)
    model = ChatterboxMultilingualTTS.from_pretrained(device=device)

    disable_chatterbox_internal_ru_stress()

    orig_apply_watermark = None
    if DISABLE_WATERMARK_DURING_CHUNKS:
        orig_apply_watermark = patch_disable_watermark(model)

    sr = int(getattr(model, "sr", REF_SR))

    if SEED and int(SEED) != 0:
        torch.manual_seed(int(SEED))
        if device == "cuda":
            torch.cuda.manual_seed_all(int(SEED))

    ruaccent = RUAccentHelper()
    ruaccent.load()

    # ---------------- PHASE A: generate chunk WAVs ----------------
    manifest: List[dict] = []
    logging.info("PHASE A: generating WAV chunks into %s ...", RAW_CHUNKS_DIR)

    for idx, r in tqdm(df.iterrows(), total=len(df), desc="PHASE A | gen chunks", unit="row"):
        rel = str(r["File"]).strip().replace("\\", "/")
        raw_text = str(r["Text"])

        if not rel or not raw_text.strip():
            continue

        cut_mp3 = in_dir / rel
        if not cut_mp3.exists():
            logging.warning("Missing source cut (skip): %s", cut_mp3)
            continue

        dst_mp3 = cut_mp3.with_name(cut_mp3.stem + "_RUS.mp3")

        # Reference selection: cut too short -> full.mp3
        cut_dur = get_duration_sec(cut_mp3)
        ref_src = cut_mp3
        if cut_dur > 0 and cut_dur < MIN_REF_SEC:
            full_mp3 = cut_mp3.parent / "full.mp3"
            if full_mp3.exists():
                ref_src = full_mp3
                logging.info(
                    "Row %d: cut=%.2fs < %.2fs -> using full.mp3 as reference (clip to %.2fs)",
                    idx + 1, cut_dur, MIN_REF_SEC, REF_MAX_SEC
                )

        # Text pipeline (stress)
        text_ru = prep_text_ru(raw_text)
        has_manual = ("+" in text_ru) or (VISIBLE_ALT in text_ru) or (COMBINING_ACUTE in text_ru)

        if has_manual:
            text_ru = normalize_stress_marks(text_ru)
        else:
            text_ru = ruaccent.apply(text_ru)

        text_ru = apply_manual_overrides(text_ru, CUSTOM_WORD_OVERRIDES)
        text_ru = normalize_stress_marks(text_ru)

        if LOG_STRESSED_TEXT:
            logging.info("Row %d | %s | STRESSED_TEXT: %s", idx + 1, rel, text_ru)
        if PRINT_STRESSED_TEXT:
            print(f"Row {idx+1} | {rel} | {text_ru}", flush=True)

        chunks = split_ru_into_chunks(text_ru, MAX_CHARS_PER_CHUNK, MAX_WORDS_PER_CHUNK)
        if not chunks:
            continue

        ref_wav = TMP_DIR / f"ref_{idx:06d}.wav"

        try:
            audio_to_ref_wav(ref_src, ref_wav, sr=sr, clip_sec=REF_MAX_SEC)

            chunk_tokens: List[str] = []
            for ci, c in enumerate(chunks):
                token = f"cbx_r{idx+1:06d}_c{ci:02d}"
                out_wav = RAW_CHUNKS_DIR / f"{token}.wav"
                chunk_tokens.append(token)

                if out_wav.exists() and not OVERWRITE:
                    continue  # keep existing

                # generate (Space-style)
                w = call_generate_space_style(
                    model,
                    text=c,
                    language_id="ru",
                    audio_prompt_path=str(ref_wav),
                ).squeeze(0)

                # store RAW chunk for UVR (no trimming here, minimal touching)
                w = sanitize_audio(w, peak=PEAK)
                write_wav_tensor_pcm16(w, sr=sr, dst_wav=out_wav)

            manifest.append({
                "row_index_1based": int(idx + 1),
                "rel_file": rel,
                "dst_mp3": str(dst_mp3),
                "sr": int(sr),
                "chunk_tokens": chunk_tokens,
            })

        except Exception as e:
            logging.exception("Chunk generation failed for row %d (%s): %s", idx + 1, cut_mp3.name, e)

        finally:
            try:
                ref_wav.unlink(missing_ok=True)
            except Exception:
                pass

    # write manifest
    MANIFEST_PATH.parent.mkdir(parents=True, exist_ok=True)
    MANIFEST_PATH.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
    logging.info("PHASE A complete. Manifest saved: %s", MANIFEST_PATH)

    # ---------------- STOP: user must denoise manually ----------------
    msg = f"""
================================================================================
MANUAL DENOISE REQUIRED (UVR)
--------------------------------------------------------------------------------
1) Take WAV chunks from:
   {RAW_CHUNKS_DIR}

2) Denoise them manually in UVR and PUT RESULTS into:
   {DENOISED_CHUNKS_DIR}

IMPORTANT:
- Output filenames MAY have prefix/suffix, that's OK.
- But each file name MUST still contain its token, like:
    cbx_r000123_c00
  anywhere in the filename, for matching.

Example acceptable UVR output names:
  cbx_r000001_c00.wav
  DENOISED__cbx_r000001_c00__whatever.wav

When you're done, type Y and press Enter to continue assembly.
================================================================================
"""
    print(msg, flush=True)
    ans = input("Type 'Y' to continue (anything else = exit): ").strip().upper()
    if ans != "Y":
        logging.info("Exit requested (no assembly performed).")
        return

    # ---------------- PHASE B: assemble mp3 from denoised chunks ----------------
    logging.info("PHASE B: loading denoised WAV chunks from %s and assembling mp3 ...", DENOISED_CHUNKS_DIR)

    # reload manifest (so you can re-run Phase B later without Phase A if desired)
    try:
        manifest2 = json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))
    except Exception as e:
        raise RuntimeError(f"Failed to read manifest {MANIFEST_PATH}: {e}")

    for item in tqdm(manifest2, total=len(manifest2), desc="PHASE B | assemble", unit="row"):
        dst_mp3 = Path(item["dst_mp3"])
        sr_item = int(item.get("sr", sr))
        tokens: List[str] = list(item.get("chunk_tokens", []))
        rel = str(item.get("rel_file", ""))

        if dst_mp3.exists() and not OVERWRITE:
            continue

        wav_chunks: List[torch.Tensor] = []
        for token in tokens:
            den = find_best_match_by_token(DENOISED_CHUNKS_DIR, token)
            if den is None:
                rawp = RAW_CHUNKS_DIR / f"{token}.wav"
                if rawp.exists():
                    logging.warning("DENOISED missing for token=%s (using RAW): %s", token, rawp.name)
                    den = rawp
                else:
                    logging.warning("Missing BOTH denoised+raw chunk for token=%s (skip row %s)", token, rel)
                    wav_chunks = []
                    break

            w = load_wav_and_force_sr(den, target_sr=sr_item, work_tmp=TMP_DIR)

            if TRIM_BOTH_EDGES:
                w = trim_silence_both_ends(
                    w, sr_item,
                    thr=EDGE_SIL_THRESH,
                    hold_sec=EDGE_HOLD_SEC,
                    keep_pad_sec=EDGE_KEEP_PAD_SEC,
                )

            w = fade_edges(w, sr_item, CHUNK_EDGE_FADE_MS)
            wav_chunks.append(w)

        if not wav_chunks:
            continue

        out = crossfade_concat(wav_chunks, sr=sr_item, fade_ms=CROSSFADE_MS)
        out = pad_tail(out, sr_item, TAIL_PAD_SEC)

        if APPLY_WATERMARK_ON_FINAL and callable(orig_apply_watermark):
            try:
                out_np = out.detach().cpu().numpy()
                out_np = orig_apply_watermark(out_np, sr_item)
                out = torch.from_numpy(out_np).to(torch.float32)
            except Exception as e:
                logging.warning("Final watermark apply failed (continuing without): %s", e)

        tmp_wav = TMP_DIR / f"final_{Path(rel).stem}.wav"
        try:
            write_mp3_from_wav_tensor(out, sr=sr_item, dst_mp3=dst_mp3, tmp_wav=tmp_wav)
        except Exception as e:
            logging.exception("MP3 write failed for %s: %s", dst_mp3.name, e)
        finally:
            try:
                tmp_wav.unlink(missing_ok=True)
            except Exception:
                pass

    logging.info("Done. Generated *_RUS.mp3 next to each cut_*.mp3")
    logging.info("Raw chunks kept in: %s", RAW_CHUNKS_DIR)
    logging.info("Denoised chunks expected in: %s", DENOISED_CHUNKS_DIR)
    logging.info("Manifest: %s", MANIFEST_PATH)


if __name__ == "__main__":
    main()

Однако, оказалось, что при генерации, Chatterbox любит на конце вставлять металлический шум, как на электростанции. Чтобы побороть эту проблему, добавил два этапа, между которыми надо пройтись UVR по файлам из директории. Код вам все напишет.

В итоге, мы получим нормальную русскую речь. Останется только её объединить. Набросал простенький скрипт:

import csv
import logging
import shutil
import subprocess
import tempfile
import wave
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List

from tqdm import tqdm


# ===================== CONFIG =====================
INPUT_DIR = Path(r"D:\Experiments\Audio\4_data")
CSV_NAME = "transcribe_RUS.csv"

FFMPEG = "ffmpeg"

# output encoding
OUT_MP3_BITRATE = "320k"
OVERWRITE = True

# timeline/render
TARGET_SR = 24000          # 24k хорошо подходит под TTS-кусочки, быстрее и меньше
SAMPLE_WIDTH_BYTES = 2     # s16le
CHANNELS = 1               # mono

# overlap handling
MIN_GAP_SEC = 0.02         # минимальная пауза между кусками (после overlap сдвигаем к time_cursor+MIN_GAP)

# gap handling (optional): можно чуть подтянуть следующий сегмент назад, чтобы сделать паузу короче
MAX_SHIFT_BACK_SEC = 2.00  # максимум на сколько можно сдвинуть сегмент РАНЬШЕ его StartSec (0 = выключить)

# optional: clamp unreasonable gaps (0 = off)
MAX_SILENCE_SEC = 0.0      # например 30.0 чтобы ограничить тишину между сегментами (при этом сегмент будет сдвинут раньше)


# ===================== DATA =====================
@dataclass
class Segment:
    start: float
    end: float
    src_rus_mp3: Path


# ===================== UTILS =====================
def ensure_tool(name: str) -> None:
    if shutil.which(name) is None:
        raise RuntimeError(f"Required tool not found in PATH: {name}")

def safe_float(x: str, default: float = 0.0) -> float:
    try:
        return float(str(x).strip())
    except Exception:
        return default

def write_silence(wf: wave.Wave_write, silence_samples: int) -> None:
    if silence_samples <= 0:
        return
    wf.writeframesraw(b"\x00" * (silence_samples * SAMPLE_WIDTH_BYTES * CHANNELS))

def normalize_file_to_rus(file_field: str) -> str:
    """
    File в CSV обычно: Person_01/cut_....mp3
    Нам нужен:        Person_01/cut_...._RUS.mp3
    """
    s = str(file_field).strip().replace("\\", "/")
    if not s:
        return s
    low = s.lower()
    if low.endswith("_rus.mp3"):
        return s
    if low.endswith(".mp3"):
        return s[:-4] + "_RUS.mp3"
    return s + "_RUS.mp3"

def person_key(person: str) -> str:
    return str(person).strip()

def list_audio_dirs(root: Path) -> List[Path]:
    # audio dir = папка где есть transcribe_RUS.csv
    out: List[Path] = []
    if not root.exists():
        return out
    for p in root.iterdir():
        if p.is_dir() and (p / CSV_NAME).exists():
            out.append(p)
    return sorted(out)

def read_segments(audio_dir: Path) -> Dict[str, List[Segment]]:
    """
    Возвращает { "Person_01": [Segment...], ... }
    """
    csv_path = audio_dir / CSV_NAME
    by_person: Dict[str, List[Segment]] = {}

    with csv_path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            person = person_key(row.get("Person", ""))
            if not person:
                continue

            start = safe_float(row.get("StartSec", "0"), 0.0)
            end = safe_float(row.get("EndSec", "0"), 0.0)
            if end <= start:
                # для сортировки/валидности нам нужен хотя бы порядок, но длительность НЕ используем для обрезки
                continue

            file_field = row.get("File", "")
            rel_rus = normalize_file_to_rus(file_field)
            if not rel_rus:
                continue

            src = audio_dir / rel_rus
            if not src.exists():
                # иногда файл лежит просто в папке Person_XX без префикса Person_XX/...
                bn = Path(rel_rus).name
                alt = audio_dir / person / bn
                if alt.exists():
                    src = alt
                else:
                    logging.warning("Missing segment (skip): %s", src)
                    continue

            by_person.setdefault(person, []).append(Segment(start=max(0.0, start), end=end, src_rus_mp3=src))

    for k in list(by_person.keys()):
        by_person[k].sort(key=lambda s: (s.start, s.end))
    return by_person

def encode_wav_to_mp3(wav_path: Path, mp3_path: Path) -> None:
    mp3_path.parent.mkdir(parents=True, exist_ok=True)
    cmd = [
        FFMPEG, "-hide_banner", "-loglevel", "error",
        "-y" if OVERWRITE else "-n",
        "-i", str(wav_path),
        "-vn",
        "-ac", str(CHANNELS),
        "-ar", str(TARGET_SR),
        "-c:a", "libmp3lame",
        "-b:a", OUT_MP3_BITRATE,
        str(mp3_path),
    ]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if p.returncode != 0:
        raise RuntimeError(f"ffmpeg encode failed: {mp3_path}\n{p.stderr.decode(errors='ignore')}")

def decode_and_stream_to_wav(mp3_path: Path, wf: wave.Wave_write, sr: int) -> int:
    """
    Потоково декодирует mp3 -> raw PCM s16le (mono) и пишет в wav.
    Возвращает количество записанных samples.
    """
    cmd = [
        FFMPEG,
        "-hide_banner",
        "-loglevel", "error",
        "-i", str(mp3_path),
        "-vn",
        "-ac", str(CHANNELS),
        "-ar", str(sr),
        "-f", "s16le",
        "pipe:1",
    ]

    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    assert p.stdout is not None

    bytes_written = 0
    rem = b""
    chunk_size = 256 * 1024

    while True:
        chunk = p.stdout.read(chunk_size)
        if not chunk:
            break
        if rem:
            chunk = rem + chunk
            rem = b""

        # выровнять по sample boundary (2 bytes * channels)
        frame_bytes = SAMPLE_WIDTH_BYTES * CHANNELS
        cut = (len(chunk) // frame_bytes) * frame_bytes
        if cut == 0:
            rem = chunk
            continue

        wf.writeframesraw(chunk[:cut])
        bytes_written += cut
        rem = chunk[cut:]

    # если остался хвост не кратный sample-границе — просто игнор (микрохвост, лучше чем краш)
    _, err = p.communicate()
    if p.returncode != 0:
        raise RuntimeError(f"ffmpeg decode failed: {mp3_path}\n{(err or b'').decode(errors='ignore')}")

    samples = bytes_written // (SAMPLE_WIDTH_BYTES * CHANNELS)
    return int(samples)

def build_person_timeline(person: str, segs: List[Segment], out_mp3: Path, tmp_dir: Path) -> None:
    """
    Сборка таймлайна БЕЗ ОБРЕЗКИ:
      - ставим сегмент примерно около его StartSec
      - при overlap сдвигаем вперёд до time_cursor + MIN_GAP_SEC
      - при больших паузах можно слегка сдвинуть НАЗАД (MAX_SHIFT_BACK_SEC)
      - сегмент пишем целиком (как декодировался), time_cursor двигаем по фактической длительности
    """
    if not segs:
        return

    tmp_wav = tmp_dir / f"{person}_tmp.wav"

    with wave.open(str(tmp_wav), "wb") as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(SAMPLE_WIDTH_BYTES)
        wf.setframerate(TARGET_SR)

        time_cursor = 0.0

        for s in segs:
            min_start = time_cursor + MIN_GAP_SEC

            # Базовая цель — s.start, но:
            # 1) если overlap — двигаем вперёд (min_start)
            # 2) если есть пауза — можем чуть подтянуть назад (не больше MAX_SHIFT_BACK_SEC)
            target = s.start

            if target < min_start:
                start = min_start
            else:
                if MAX_SHIFT_BACK_SEC > 0.0:
                    start = max(min_start, target - MAX_SHIFT_BACK_SEC)
                else:
                    start = target

            # optional clamp huge silences by shifting earlier a lot (off by default)
            gap = start - time_cursor
            if MAX_SILENCE_SEC > 0.0 and gap > MAX_SILENCE_SEC:
                gap = MAX_SILENCE_SEC
                start = time_cursor + gap

            gap_samples = int(round(max(0.0, gap) * TARGET_SR))
            write_silence(wf, gap_samples)

            # пишем сегмент целиком (без trim/pad)
            written_samples = decode_and_stream_to_wav(s.src_rus_mp3, wf, TARGET_SR)
            dur_sec = written_samples / float(TARGET_SR) if written_samples > 0 else 0.0

            time_cursor = start + dur_sec

        wf.writeframes(b"")  # finalize header sizes

    encode_wav_to_mp3(tmp_wav, out_mp3)

    try:
        tmp_wav.unlink(missing_ok=True)
    except Exception:
        pass


# ===================== MAIN =====================
def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
    ensure_tool(FFMPEG)

    root = INPUT_DIR
    audio_dirs = list_audio_dirs(root)
    if not audio_dirs:
        logging.warning("No audio dirs found in: %s (expected subdirs with %s)", root, CSV_NAME)
        return

    for audio_dir in tqdm(audio_dirs, desc="Audio dirs", unit="dir"):
        try:
            by_person = read_segments(audio_dir)
            if not by_person:
                continue

            with tempfile.TemporaryDirectory(prefix="_tmp_join_", dir=str(audio_dir)) as tdir:
                tdir_p = Path(tdir)

                for person, segs in tqdm(by_person.items(), desc=f"{audio_dir.name}", unit="person", leave=False):
                    out_mp3 = audio_dir / f"{person}.mp3"
                    build_person_timeline(person, segs, out_mp3, tdir_p)

        except Exception as e:
            logging.exception("Failed dir %s: %s", audio_dir, e)

    logging.info("Done.")


if __name__ == "__main__":
    main()

На выходе получим русскую речь по персонам, а не в виде cut_. Осталось объединить дорожки. И тут нам пригодится Instrumental дорожка, которую переименовываем в Person_99.mp3 и запускаем финальную сборку.

import csv
import json
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional


# ===================== CONFIG =====================
INPUT_DIR_AUDIO = Path(r"D:/Experiments/Audio/4_data/1_1_videoplayback_(Vocals)_(No Echo)")
VIDEO_FILE = Path(r"D:/Experiments/Audio/1__video_source/videoplayback.mp4")

FFMPEG = "ffmpeg"
FFPROBE = "ffprobe"

AUDIO_SR = 48000
AUDIO_BITRATE = "320k"          # финальный AAC битрейт в MP4
MIX_LIMIT = 0.95                # лимитер, чтобы уменьшить клиппинг при миксе
LANG_TAG = "rus"                # тег языка аудио дорожки в контейнере

OUT_MP4_NAME = "output.mp4"
OUT_SRT_NAME = "output.srt"
# ==================================================


def run(cmd: List[str]) -> None:
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace")
    if p.returncode != 0:
        raise RuntimeError(f"Command failed ({p.returncode}):\n{' '.join(cmd)}\n\n{p.stdout}")


def ffprobe_duration_seconds(path: Path) -> float:
    # Возвращает длительность в секундах (float)
    cmd = [
        FFPROBE,
        "-v", "error",
        "-show_entries", "format=duration",
        "-of", "json",
        str(path),
    ]
    p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace")
    if p.returncode != 0:
        raise RuntimeError(f"ffprobe failed:\n{p.stderr}")
    data = json.loads(p.stdout)
    dur = data.get("format", {}).get("duration", None)
    if dur is None:
        raise RuntimeError("ffprobe: couldn't read duration")
    return float(dur)


def sec_to_srt_time(t: float) -> str:
    # HH:MM:SS,mmm
    if t < 0:
        t = 0.0
    ms_total = int(round(t * 1000.0))
    hh = ms_total // 3_600_000
    ms_total -= hh * 3_600_000
    mm = ms_total // 60_000
    ms_total -= mm * 60_000
    ss = ms_total // 1000
    ms = ms_total - ss * 1000
    return f"{hh:02d}:{mm:02d}:{ss:02d},{ms:03d}"


def parse_time_hhmmss_mmm(s: str) -> Optional[float]:
    # "00:00:05.168" -> seconds
    s = (s or "").strip()
    if not s:
        return None
    try:
        parts = s.split(":")
        if len(parts) != 3:
            return None
        hh = int(parts[0])
        mm = int(parts[1])
        ss_part = parts[2]
        if "." in ss_part:
            ss_str, ms_str = ss_part.split(".", 1)
            ss = int(ss_str)
            ms = int(ms_str.ljust(3, "0")[:3])
        elif "," in ss_part:
            ss_str, ms_str = ss_part.split(",", 1)
            ss = int(ss_str)
            ms = int(ms_str.ljust(3, "0")[:3])
        else:
            ss = int(ss_part)
            ms = 0
        return hh * 3600.0 + mm * 60.0 + ss + (ms / 1000.0)
    except Exception:
        return None


def read_transcribe_csv(csv_path: Path) -> List[Dict[str, Any]]:
    rows: List[Dict[str, Any]] = []
    with csv_path.open("r", encoding="utf-8-sig", newline="") as f:
        reader = csv.DictReader(f)
        for r in reader:
            rows.append(r)
    return rows


def write_srt_from_csv(rows: List[Dict[str, Any]], srt_path: Path) -> int:
    """
    Поддерживает поля:
    - Start / End (как HH:MM:SS.mmm)
    - или StartSec / EndSec (float)
    - Text
    """
    out_lines: List[str] = []
    idx = 1

    for r in rows:
        text = (r.get("Text") or "").strip()
        if not text:
            continue

        # Вытаскиваем start/end
        start = parse_time_hhmmss_mmm(r.get("Start", ""))
        end = parse_time_hhmmss_mmm(r.get("End", ""))

        if start is None:
            try:
                start = float((r.get("StartSec") or "").strip())
            except Exception:
                start = None
        if end is None:
            try:
                end = float((r.get("EndSec") or "").strip())
            except Exception:
                end = None

        if start is None or end is None or end <= start:
            continue

        # SRT не любит переносы внутри поля — делаем аккуратно
        text = text.replace("\r", "").replace("\n", " ").strip()

        out_lines.append(str(idx))
        out_lines.append(f"{sec_to_srt_time(start)} --> {sec_to_srt_time(end)}")
        out_lines.append(text)
        out_lines.append("")  # пустая строка
        idx += 1

    srt_path.write_text("\n".join(out_lines), encoding="utf-8")
    return idx - 1


def build_mix_audio(person_files: List[Path], out_audio_m4a: Path, target_dur: float) -> None:
    """
    Сводит N дорожек в одну:
    - ресемпл to 48k
    - amix
    - лимитер
    - async ресемпл
    - pad/trim до длительности видео
    """
    if not person_files:
        raise RuntimeError("No Person_*.mp3 found to mix.")

    cmd: List[str] = [FFMPEG, "-hide_banner", "-y"]

    # inputs
    for p in person_files:
        cmd += ["-i", str(p)]

    # filter_complex
    # Подготовка каждой дорожки: stereo + 48k + сброс таймстампов
    prep = []
    labels = []
    for i in range(len(person_files)):
        a = f"a{i}"
        labels.append(f"[{a}]")
        prep.append(
            f"[{i}:a]"
            f"aresample={AUDIO_SR},aformat=sample_fmts=fltp:channel_layouts=stereo,"
            f"asetpts=PTS-STARTPTS"
            f"[{a}]"
        )

    # amix + limiter + async + pad/trim
    # pad_dur считает от текущей точки; затем atrim гарантирует ровно target_dur
    dur_str = f"{target_dur:.6f}"
    mix = (
        "".join(labels)
        + f"amix=inputs={len(person_files)}:dropout_transition=0:normalize=0,"
          f"alimiter=limit={MIX_LIMIT},"
          f"aresample=async=1:first_pts=0,"
          f"apad,"
          f"atrim=0:{dur_str}"
          f"[aout]"
    )

    filter_complex = ";".join(prep + [mix])

    cmd += [
        "-filter_complex", filter_complex,
        "-map", "[aout]",
        "-c:a", "aac",
        "-b:a", AUDIO_BITRATE,
        "-ar", str(AUDIO_SR),
        "-ac", "2",
        str(out_audio_m4a),
    ]

    run(cmd)


def mux_video_audio(video: Path, audio: Path, out_mp4: Path) -> None:
    # Видео копируем как есть, аудио копируем (оно уже AAC в m4a)
    cmd = [
        FFMPEG, "-hide_banner", "-y",
        "-i", str(video),
        "-i", str(audio),
        "-map", "0:v:0",
        "-map", "1:a:0",
        "-c:v", "copy",
        "-c:a", "copy",
        "-movflags", "+faststart",
        "-metadata:s:a:0", f"language={LANG_TAG}",
        str(out_mp4)
    ]
    run(cmd)


def main() -> None:
    if not INPUT_DIR_AUDIO.exists():
        raise FileNotFoundError(f"INPUT_DIR_AUDIO not found: {INPUT_DIR_AUDIO}")
    if not VIDEO_FILE.exists():
        raise FileNotFoundError(f"VIDEO_FILE not found: {VIDEO_FILE}")

    csv_path = INPUT_DIR_AUDIO / "transcribe_RUS.csv"
    if not csv_path.exists():
        raise FileNotFoundError(f"transcribe_RUS.csv not found: {csv_path}")

    person_files = sorted(INPUT_DIR_AUDIO.glob("Person_*.mp3"))
    if not person_files:
        raise RuntimeError(f"No Person_*.mp3 in {INPUT_DIR_AUDIO}")

    out_dir = VIDEO_FILE.parent
    out_mp4 = out_dir / OUT_MP4_NAME
    out_srt = out_dir / OUT_SRT_NAME
    tmp_audio = out_dir / "_mixed_audio_tmp.m4a"

    # 1) subtitles
    rows = read_transcribe_csv(csv_path)
    nsubs = write_srt_from_csv(rows, out_srt)
    print(f"[OK] Wrote subtitles: {out_srt} ({nsubs} cues)")

    # 2) duration from video
    dur = ffprobe_duration_seconds(VIDEO_FILE)
    print(f"[OK] Video duration: {dur:.3f} sec")

    # 3) mix audio exactly to video duration
    build_mix_audio(person_files, tmp_audio, target_dur=dur)
    print(f"[OK] Mixed audio: {tmp_audio}")

    # 4) mux
    mux_video_audio(VIDEO_FILE, tmp_audio, out_mp4)
    print(f"[OK] Output video: {out_mp4}")

    # cleanup temp
    try:
        tmp_audio.unlink(missing_ok=True)
    except Exception:
        pass


if __name__ == "__main__":
    main()

В итоге получаем видео. Да, есть огрехи на каждом этапе, но начало проложено. И для этого не требуется заплатить ни копейки. Конечно, это лишь эксперимент, но это был долгий и интересный путь. Всем спасибо за внимание!

Ну и на последок, с получившимся результатом можете ознакомиться на VK Video.