Мужик приходит к переводчику английского:
— Слушай, как правильно перевести фразу “I don’t know”?
— Я не знаю.
— Вот, блин, никто не знает!
Привет, Хабр!
Присаживайтесь поудобней, заварите себе чайку, ибо я пишу немного затянуто и через правое ухо. Итак, Вы готовы? Отлично, тогда приступаем.
ВНИМАНИЕ! Информация, описанная ниже, написана исключительно в исследовательских целях и не предназначена для использования в корыстных целях!
Начну, пожалуй, с предыстории. Несколько лет назад завел себе домашнего питомца, королевского питона. Пуф вырос классным змеем и сих пор эта наглая колбаса является моим верным антидепрессантом. В итоге это привело к тому, что заинтересовался фильмами по змеям. Но, как назло, большинство фильмов идут на английском или испанском языке. Нет, не то, чтобы я не знал английского, будучи разработчиком, да и когда-то заканчивал языковую школу с углубленном изучением ин. языков, однако вечером, с пивасом и креветками смотреть английскую озвучку.... в общем, не каждому это по душе. И тут пришла в голову мысль, "а чтобы нам не использовать ИИ для перевода фильмов", к тому же множество компаний уже предлагают подобные решения. Но мне было ещё интересно изучить этот вопрос и пройти весь путь самим.

Сказано, сделано. Фильм за основу взял отсюда YouTube. И, как выяснится в последствии, это был очень сложный вариант, с кучей действующих лиц, шумами, африканским диалектом. Но "Такой Путь!". Как скачивать видосики с площадок, говорить не буду, лишь намекну - Pytubefix.
Окей, исходник получен. Нужно извлечь из него аудио. Тут нам на помощь приходит FFmpeg.
Для обнаружения прописал в PATH: C:\Users\ИМЯ_ПОЛЬЗОВАТЕЛЯ\AppData\Local\Microsoft\WinGet\Packages\Gyan.FFmpeg_Microsoft.Winget.Source_8wekyb3d8bbwe\ffmpeg-8.0.1-full_build\bin у вас вероятно будет что-то похожее.
Но т.к. я хочу в дальнейшем автоматизировать процесс, составил небольшой Python скрипт.
import subprocess
from pathlib import Path
from tqdm import tqdm
# ---------------- CONFIG ----------------
INPUT_DIR = Path(r"D:\Experiments\Audio\1__video_source")
OUTPUT_DIR = Path(r"D:\Experiments\Audio\2__audio_source")
INPUT_EXT = "mp4"
RECURSIVE = True
FFMPEG = "ffmpeg"
TARGET_BITRATE_K = 320
OVERWRITE = False
# ---------------------------------------
def run(cmd: list[str]) -> None:
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
raise RuntimeError(
f"Command failed ({p.returncode}): {' '.join(cmd)}\n"
f"STDOUT:\n{p.stdout}\nSTDERR:\n{p.stderr}"
)
def main() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
if TARGET_BITRATE_K > 320:
print(f"[warn] MP3 max is 320k. Requested {TARGET_BITRATE_K}k -> clamping to 320k.")
bitrate_k = 320
elif TARGET_BITRATE_K < 8:
bitrate_k = 8
else:
bitrate_k = TARGET_BITRATE_K
glob_pat = f"**/*.{INPUT_EXT}" if RECURSIVE else f"*.{INPUT_EXT}"
inputs = sorted(INPUT_DIR.glob(glob_pat))
for src in tqdm(inputs, desc="Converting", unit="file"):
rel = src.relative_to(INPUT_DIR)
dst = (OUTPUT_DIR / rel).with_suffix(".mp3")
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists() and not OVERWRITE:
continue
cmd = [
FFMPEG, "-hide_banner",
"-y" if OVERWRITE else "-n",
"-i", str(src),
"-vn", # no video
"-c:a", "libmp3lame", # MP3 encoder
"-b:a", f"{bitrate_k}k", # CBR (max 320k)
str(dst),
]
run(cmd)
if __name__ == "__main__":
main()Тут все просто, думаю разберётесь. Заполняем входную, выходную директорию и расширение нашего видео.
После этого у нас есть шумный источник аудио в mp3. Чтобы перевод был качественный, нам требуется разделить аудио на сторонние звуки/музыку и голос. Я долго искал решение, но лучше UVR не нашел. Возможно, кто-то знает лучше, но пока оставил его. Запускаем, в настройках качаем MDX-MDX23C-InstVoc-HQ. Выставляем MP3, SEGMENT 512, OVERLAP 16, GPU Conversion и "Полетели!". После обработки мы получим разделённые дорожки VOICE и INSTRUMENTAL. Инструментальную дорожку откладываем, она понадобится позже, при сведении звука. А по Voice пройдемся денойзером VR_Architecture-UVR-De-Echo-Normal с теми же настройками, только ставим Vocals Only для скорости.

Отлично, теперь мы имеем файл с чистой речью. Теперь нам нужно транскрибировать речь, чтобы получить тест с таймингами и репликами, которые мы будем переводить. Т.к. обычно у нас многоголосая озвучка, то нам также требуется выделить персон на аудио. Для этого нужно создать Person_НОМЕР директории и положить туда файлики mp3 с репликами персонажа. Для этого я использую WisperX. С ней можно много провозиться. Лично я не выдержал и поставил на WSL2 conda с python 3.12. Далее накатил WisperX и следом поставил PyTorch 2.7 + CUDA 12.8: pip install torch==2.7.0 torchvision==0.22.0 torchaudio==2.7.0 --index-url https://download.pytorch.org/whl/cu128, игнорируя ошибки версий.
Опять же, накидал скрипт:
import os
# CUDNN
os.environ.setdefault("LD_LIBRARY_PATH", f"/lib/x86_64-linux-gnu:{os.environ.get('LD_LIBRARY_PATH','')}")
# HF
os.environ.setdefault("HF_HUB_ETAG_TIMEOUT", "60")
os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "120")
import csv
import logging
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
from tqdm import tqdm
# ---------------- CONFIG ----------------
INPUT_FILE = r"D:/Experiments/Audio/3_separated/1_1_videoplayback_(Vocals)_(No Echo).mp3"
OUTPUT_DIR = r"D:/Experiments/Audio/4_data"
# CI/CD will substitute:
HF_TOKEN = r"..."
os.environ.setdefault("HF_TOKEN", HF_TOKEN)
# WhisperX (ASR+alignment)
WHISPER_MODEL = "large-v3"
LANGUAGE = "en"
DEVICE = "cuda"
COMPUTE_TYPE = "bfloat16"
BATCH_SIZE = 32
ALIGN_RETURN_CHAR_ALIGNMENTS = False
# Diarization constraints (optional)
MIN_SPEAKERS = None # e.g. 2 if sure
MAX_SPEAKERS = None # e.g. 2 if sure
# Purity & segmentation rules
MAX_WORD_GAP_SEC = 0.75
MIN_UTT_DUR_SEC = 0.25
MIN_WORDS = 1 # <-- твоя правка (реплики типа "What?")
# Safety: treat overlaps as "mixed" and drop those words
OTHER_SPK_OVERLAP_TOL = 0.03
PAD_SEC = 0.06 # padding around utterance; clamped not to overlap others
# Audio cutting
FFMPEG = "ffmpeg"
FFPROBE = "ffprobe"
CUT_SR = 24000
CUT_CH = 1
CUT_MP3_BITRATE = "320k"
OVERWRITE = False
LOG_LEVEL = logging.INFO
SEG_RE = re.compile(r"^seg_(\d{6})__.*\.mp3$", re.IGNORECASE)
# ---------------- DATA ----------------
@dataclass(frozen=True)
class Utterance:
speaker: str
start: float
end: float
text: str
# ---------------- PATH HELPERS ----------------
def to_wsl_path(p: str) -> str:
"""Accepts Windows-like path (D:/.. or D:\\..) and normalizes for WSL (/mnt/d/..)."""
s = p.replace("\\", "/")
m = re.match(r"^([A-Za-z]):/(.*)$", s)
if m:
drive = m.group(1).lower()
rest = m.group(2)
return f"/mnt/{drive}/{rest}"
return s
# ---------------- SHELL ----------------
def run(cmd: List[str]) -> subprocess.CompletedProcess:
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
raise RuntimeError(
f"Command failed ({p.returncode}): {' '.join(cmd)}\n"
f"STDOUT:\n{p.stdout}\nSTDERR:\n{p.stderr}"
)
return p
def fmt_hmsms(sec: float) -> str:
if sec < 0:
sec = 0.0
ms = int(round(sec * 1000.0))
hh = ms // (3600 * 1000)
ms %= (3600 * 1000)
mm = ms // (60 * 1000)
ms %= (60 * 1000)
ss = ms // 1000
mmm = ms % 1000
return f"{hh:02d}:{mm:02d}:{ss:02d}.{mmm:03d}"
def safe_name_time(a: float, b: float) -> str:
return f"{a:010.3f}-{b:010.3f}".replace(" ", "")
def cut_mp3(src: Path, dst: Path, start: float, end: float) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists() and not OVERWRITE:
return
dur = max(0.0, end - start)
if dur <= 0.0:
return
run([
FFMPEG, "-hide_banner", "-y" if OVERWRITE else "-n",
"-ss", f"{start:.3f}",
"-t", f"{dur:.3f}",
"-i", str(src),
"-vn",
"-ac", str(CUT_CH),
"-ar", str(CUT_SR),
"-c:a", "libmp3lame",
"-b:a", CUT_MP3_BITRATE,
str(dst),
])
def concat_mp3(files: List[Path], out_mp3: Path) -> None:
if out_mp3.exists() and not OVERWRITE:
return
out_mp3.parent.mkdir(parents=True, exist_ok=True)
files = [f for f in files if f.exists()]
if not files:
return
cmd = [FFMPEG, "-hide_banner", "-y" if OVERWRITE else "-n"]
for f in files:
cmd += ["-i", str(f)]
inputs = "".join([f"[{i}:a]" for i in range(len(files))])
cmd += [
"-filter_complex", f"{inputs}concat=n={len(files)}:v=0:a=1[a]",
"-map", "[a]",
"-ac", str(CUT_CH),
"-ar", str(CUT_SR),
"-c:a", "libmp3lame",
"-b:a", CUT_MP3_BITRATE,
str(out_mp3),
]
run(cmd)
# ---------------- DIARIZATION HELPERS ----------------
def diarize_to_df(diarization_obj) -> pd.DataFrame:
"""Normalize diarization output into DataFrame(start,end,speaker)."""
if isinstance(diarization_obj, pd.DataFrame):
df = diarization_obj.copy()
colmap = {c.lower(): c for c in df.columns}
startc = colmap.get("start")
endc = colmap.get("end")
speakerc = colmap.get("speaker")
if not (startc and endc and speakerc):
raise RuntimeError(f"Unknown diarization dataframe columns: {list(df.columns)}")
return df.rename(columns={startc: "start", endc: "end", speakerc: "speaker"})[["start", "end", "speaker"]]
# pyannote Annotation case
rows = []
try:
for segment, _, label in diarization_obj.itertracks(yield_label=True):
rows.append({"start": float(segment.start), "end": float(segment.end), "speaker": str(label)})
return pd.DataFrame(rows, columns=["start", "end", "speaker"])
except Exception as e:
raise RuntimeError(f"Cannot normalize diarization output type: {type(diarization_obj)}") from e
def overlap_len(a0: float, a1: float, b0: float, b1: float) -> float:
x0 = max(a0, b0)
x1 = min(a1, b1)
return max(0.0, x1 - x0)
def word_is_safe(word_start: float, word_end: float, spk: str, diar_df: pd.DataFrame) -> bool:
"""Safe if word doesn't significantly overlap any OTHER speaker segment."""
if word_start is None or word_end is None:
return False
w0, w1 = float(word_start), float(word_end)
if w1 <= w0:
return False
other = diar_df[diar_df["speaker"] != spk]
if other.empty:
return True
for _, r in other.iterrows():
ov = overlap_len(w0, w1, float(r["start"]), float(r["end"]))
if ov > OTHER_SPK_OVERLAP_TOL:
return False
return True
# ---------------- MAIN PIPELINE ----------------
def build_utterances_from_words(result: dict, diar_df: pd.DataFrame) -> List[Utterance]:
"""Build contiguous single-speaker utterances from word-level labeled result."""
utts: List[Utterance] = []
words: List[Tuple[float, float, str, str]] = []
for seg in result.get("segments", []):
for w in (seg.get("words", []) or []):
start = w.get("start")
end = w.get("end")
spk = w.get("speaker")
text = (w.get("word", "") or "").strip()
if spk is None or start is None or end is None or not text:
continue
words.append((float(start), float(end), str(spk), text))
words.sort(key=lambda x: x[0])
if not words:
return utts
cur_spk: Optional[str] = None
cur_words: List[Tuple[float, float, str]] = []
cur_start: Optional[float] = None
cur_end: Optional[float] = None
prev_end: Optional[float] = None
def flush():
nonlocal cur_spk, cur_words, cur_start, cur_end
if not cur_words or cur_spk is None or cur_start is None or cur_end is None:
cur_spk, cur_words, cur_start, cur_end = None, [], None, None
return
# naive spacing join
text = ""
for _, _, t in cur_words:
if not text:
text = t
elif t.startswith(("-", "’", "'")):
text += t
else:
text += " " + t
if (cur_end - cur_start) >= MIN_UTT_DUR_SEC and len(cur_words) >= MIN_WORDS and text.strip():
utts.append(Utterance(cur_spk, cur_start, cur_end, text.strip()))
cur_spk, cur_words, cur_start, cur_end = None, [], None, None
for (w0, w1, spk, t) in words:
if not word_is_safe(w0, w1, spk, diar_df):
flush()
prev_end = w1
continue
if cur_spk is None:
cur_spk = spk
cur_start = w0
cur_end = w1
cur_words = [(w0, w1, t)]
prev_end = w1
continue
gap = (w0 - (prev_end if prev_end is not None else w0))
if spk != cur_spk or gap > MAX_WORD_GAP_SEC:
flush()
cur_spk = spk
cur_start = w0
cur_end = w1
cur_words = [(w0, w1, t)]
prev_end = w1
continue
cur_words.append((w0, w1, t))
cur_end = w1
prev_end = w1
flush()
# padding with clamping against other-speaker segments
padded: List[Utterance] = []
diar_sorted = diar_df.sort_values("start").reset_index(drop=True)
for u in utts:
s0 = max(0.0, u.start - PAD_SEC)
e0 = u.end + PAD_SEC
other = diar_sorted[diar_sorted["speaker"] != u.speaker]
for _, r in other.iterrows():
os0 = float(r["start"])
oe0 = float(r["end"])
if overlap_len(s0, e0, os0, oe0) > 0.0:
# clamp to nearest edge
if os0 < u.start:
s0 = max(s0, oe0)
else:
e0 = min(e0, os0)
if e0 > s0 and (e0 - s0) >= MIN_UTT_DUR_SEC:
padded.append(Utterance(u.speaker, s0, e0, u.text))
padded.sort(key=lambda x: x.start)
return padded
def main() -> None:
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s | %(levelname)s | %(message)s")
in_path = Path(to_wsl_path(INPUT_FILE))
out_root = Path(to_wsl_path(OUTPUT_DIR))
out_root.mkdir(parents=True, exist_ok=True)
if not in_path.exists():
raise FileNotFoundError(f"INPUT_FILE not found: {in_path}")
stem = in_path.stem
job_dir = out_root / stem
job_dir.mkdir(parents=True, exist_ok=True)
token = (HF_TOKEN or "").strip()
if not token or "ECRYPTED" in token:
raise RuntimeError(
"HF_TOKEN placeholder not substituted. CI/CD must inject real HuggingFace token string."
)
import whisperx # noqa
from whisperx.diarize import DiarizationPipeline
logging.info("Loading WhisperX model=%s device=%s compute_type=%s batch=%s",
WHISPER_MODEL, DEVICE, COMPUTE_TYPE, BATCH_SIZE)
model = whisperx.load_model(WHISPER_MODEL, device=DEVICE, compute_type=COMPUTE_TYPE, language=LANGUAGE)
logging.info("Loading audio: %s", in_path)
audio = whisperx.load_audio(str(in_path))
logging.info("Transcribing...")
result = model.transcribe(audio, batch_size=BATCH_SIZE, language=LANGUAGE)
logging.info("Loading alignment model...")
align_model, metadata = whisperx.load_align_model(language_code=LANGUAGE, device=DEVICE)
logging.info("Aligning...")
result_aligned = whisperx.align(
result["segments"],
align_model,
metadata,
audio,
DEVICE,
return_char_alignments=ALIGN_RETURN_CHAR_ALIGNMENTS,
)
logging.info("Running diarization...")
diarize_model = DiarizationPipeline(use_auth_token=token, device=DEVICE)
diar_out = diarize_model(str(in_path), min_speakers=MIN_SPEAKERS, max_speakers=MAX_SPEAKERS)
diar_df = diarize_to_df(diar_out)
logging.info("Assigning speakers to words...")
try:
result_labeled = whisperx.assign_word_speakers(diar_df, result_aligned)
except Exception:
result_labeled = whisperx.assign_word_speakers(diar_out, result_aligned)
logging.info("Building strictly single-speaker utterances...")
utterances = build_utterances_from_words(result_labeled, diar_df)
if not utterances:
logging.warning("No utterances produced. Check diarization/token/inputs.")
return
# speaker label -> Person_XX by first appearance
spk_order: Dict[str, int] = {}
for u in utterances:
if u.speaker not in spk_order:
spk_order[u.speaker] = len(spk_order) + 1
person_dirs: Dict[int, Path] = {}
for spk, idx in spk_order.items():
d = job_dir / f"Person_{idx:02d}"
d.mkdir(parents=True, exist_ok=True)
person_dirs[idx] = d
csv_path = job_dir / "transcribe.csv"
rows_out = []
person_cuts: Dict[int, List[Path]] = {i: [] for i in person_dirs.keys()}
for i, u in enumerate(tqdm(utterances, desc="Cutting", unit="utt"), 1):
person_idx = spk_order[u.speaker]
pdir = person_dirs[person_idx]
start = max(0.0, u.start)
end = max(start, u.end)
tname = safe_name_time(start, end)
cut_name = f"cut_{tname}.mp3"
cut_path = pdir / cut_name
cut_mp3(in_path, cut_path, start, end)
person_cuts[person_idx].append(cut_path)
rows_out.append({
"Row": i,
"Person": f"Person_{person_idx:02d}",
"StartSec": f"{start:.3f}",
"EndSec": f"{end:.3f}",
"Start": fmt_hmsms(start),
"End": fmt_hmsms(end),
"Text": u.text,
"File": f"{pdir.name}/{cut_name}",
})
with csv_path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=["Row", "Person", "StartSec", "EndSec", "Start", "End", "Text", "File"])
w.writeheader()
w.writerows(rows_out)
logging.info("Creating full.mp3 per person...")
for person_idx, files in person_cuts.items():
files = [p for p in files if p.exists()]
if not files:
continue
out_full = person_dirs[person_idx] / "full.mp3"
concat_mp3(files, out_full)
logging.info("Done. Output: %s", job_dir)
if __name__ == "__main__":
main()
Код рассчитан на запуск из WSL2. Также вместо HF_TOKEN нужен токен на Hugging Face. Подробнее описа��о в WhisperX. Запускаем и получаем файлик transcribe.csv:
Row,Person,StartSec,EndSec,Start,End,Text,File
1,Person_01,5.168,47.631,00:00:05.168,00:00:47.631,"This is West Africa, a region known as the Sub-Saharan Savannah. This is home to the world's most popular pet snake, and over the next month, I'll be touring this region from Ghana to Togo and Benin to tell not only the story of how the ball python is living out here in the wild, but I'll also tell you the story on its economic impact in one of the world's most impoverished areas, and also how it has long influenced the culture here. I'm Dave Kaufman and these are my reptile adventures and this is the story of the world's most popular pet snake. This is the story of the ball python here in the wilds of Africa.",Person_01/cut_000005.168-000047.631.mp3
2,Person_01,130.769,135.577,00:02:10.769,00:02:15.577,"Our story begins in Ghana, a Commonwealth country situated in West Africa.",Person_01/cut_000130.769-000135.577.mp3Также появятся директории с репликами персонажей cut_ТАЙМИНГ.mp3 и full.mp3 - склейка реплик, по которой будем дообучать модель.
Окей, теперь у нас есть текст. Надо его перевести. Тут есть много путей. Лично я - человек ленивый, поэтому выбрал более менее шуструю компактную модель в LM Studio (mistral-small-3.2-24b-instruct-2506). Можете взять свою модель или перевести в ручную. В LM я запустил сервер и увеличил число токенов.

Также, т.к. имеется проблема с передачей смысла, мы переводим в 2а захода, как профессиональные переводчики. Сперва строим карту перевода, а потом переводим. Это стабилизирует перевод определенных слов. Конечно, накидал скриптик:
import re
import time
from pathlib import Path
from typing import List, Dict, Optional
import pandas as pd
from tqdm import tqdm
import lmstudio as lms
# ---------------- CONFIG ----------------
INPUT_FILE = Path(r"D:/Experiments/Audio/4_data/1_1_videoplayback_(Vocals)_(No Echo)/transcribe.csv")
# Use the model identifier exactly as LM Studio shows it
MODEL_ID = "mistral-small-3.2-24b-instruct-2506"
# Load-time
CONTEXT_LENGTH = 40960
GPU_RATIO = 1.0
# Pass 1 (guide building)
GUIDE_CHUNK_CHARS = 16000
GUIDE_MAX_TOKENS = 1200
GUIDE_TEMPERATURE = 0.1
GUIDE_TOP_P = 0.9
# Pass 2 (row translation)
ROW_MAX_TOKENS = 512
ROW_TEMPERATURE = 0.15
ROW_TOP_P = 0.9
# Retries / robustness
ROW_MAX_TRIES = 4
SLEEP_BETWEEN_TRIES_SEC = 0.8
SAVE_PARTIAL_EVERY_N_ROWS = 25
# Disable client-side timeout for sync requests
lms.set_sync_api_timeout(None)
# ---------------- PROMPTS ----------------
SYSTEM_TRANSLATE = """Ты профессиональный переводчик-адаптер реплик для дубляжа.
Задача: перевести реплики с английского на русский и сохранить связность/единый стиль по всему файлу.
Критично по дубляжу:
- Русская реплика должна быть максимально близка по длительности к оригиналу:
примерно сопоставимая длина и число слогов/гласных.
- Пиши естественно и легко, как для озвучки: разговорно-нейтрально, без канцелярита и тяжёлых конструкций.
- Предпочитай короткие фразы, активный залог, естественный порядок слов.
- Разрешено аккуратно перефразировать ради естественности, но без потери смысла/фактов.
Грамматика (строго):
- Следи за родом, числом, падежами, согласованием прилагательных/местоимений.
- Следи за управлением (предлоги/падежи), склоняй имена и термины по-русски.
Термины (строго):
- Любые варианты “ball python / ball pythons / royal python” переводятся ТОЛЬКО как «королевский питон».
Допускаются только грамматические формы: «королевского питона», «королевским питоном» и т.п.
Запрещены: «болл-питон», «питон-болл», «питон-шар», «шаровой питон» и т.д.
- "enclosure" -> «террариум»
- "shed" -> «линька»
- "morph" -> «морф»
Антигаллюцинации (строго):
- НЕ вставляй посторонние иностранные слова/наборы букв.
- НЕ допускай повторов и “слов-паразитов” вида «Ivoire ...», «... везде, ... повсюду» и похожих.
- Латиницу оставляй только если она реально есть в исходной реплике как имя/бренд/топоним; иначе переводить.
Вывод:
- Отвечай ТОЛЬКО переводом одной реплики. Без кавычек, без JSON, без пояснений и без markdown.
"""
SYSTEM_GUIDE = """Ты редактор дубляжного перевода и терминолог.
Тебе дают фрагменты полного транскрипта (ТОЛЬКО английские реплики из колонки Text).
Собери и поддерживай краткий Translation Guide (10–25 пунктов), чтобы перевод был единообразным.
Что включить:
- тон/стиль озвучки (разговорно-нейтральный), как делать фразы по-русски естественно;
- устойчивые переводы повторяющихся фраз/частых формулировок;
- обращения/местоимения (если видно);
- словарь терминов.
Строгие правила:
- “ball python / ball pythons / royal python” -> только «королевский питон» (со склонением).
Запретить: «болл-питон», «питон-болл», «питон-шар», «шаровой питон» и т.п.
- Запретить любые посторонние латинские вставки и повторы вида «Ivoire ...».
Формат:
- Верни только обновлённый гайд (короткий список пунктов), без рассуждений и без заголовков.
"""
# ---------------- TEXT / VALIDATION ----------------
_RE_CODEBLOCK = re.compile(r"^\s*```.*?$|```\s*$", re.MULTILINE)
_RE_QUOTES = re.compile(r'^\s*[«"]|[»"]\s*$')
# Forbidden "ball python" Russian variants — we will forcibly normalize to "королевский питон"
# (Yes, it’s not perfect for падежи, но это лучше, чем “болл/шар”.)
_RE_FORBIDDEN_BALL = re.compile(
r"(?iu)\b(болл[\s\-]?питон|питон[\s\-]?болл|питон[\s\-]?шар|шаровой\s+питон|шар\s*питон|питон\s*шар)\b"
)
# Catch notorious weirdness
_RE_IVOIRE = re.compile(r"(?iu)\bivoire\b")
# Detect unexpected latin spam (very rough): long-ish ascii words
_RE_LATIN_WORD = re.compile(r"\b[A-Za-z]{4,}\b")
def split_by_newlines(text: str, max_chars: int) -> List[str]:
lines = text.splitlines()
chunks: List[str] = []
cur: List[str] = []
cur_len = 0
for ln in lines:
add_len = len(ln) + 1
if cur and cur_len + add_len > max_chars:
chunks.append("\n".join(cur))
cur = [ln]
cur_len = len(ln)
else:
cur.append(ln)
cur_len += add_len
if cur:
chunks.append("\n".join(cur))
return chunks
def result_to_text(result) -> str:
# In LM Studio Python docs, printing result prints content; also result may have .content.
s = getattr(result, "content", None)
if isinstance(s, str) and s.strip():
return s
return str(result)
def clean_model_output(s: str) -> str:
# strip code fences, trim
s = s.strip()
s = _RE_CODEBLOCK.sub("", s).strip()
# keep only first non-empty line if model emitted multiple paragraphs
lines = [ln.strip() for ln in s.splitlines() if ln.strip()]
if not lines:
return ""
s = lines[0]
# strip surrounding quotes
s = s.strip()
s = s.strip("`")
s = _RE_QUOTES.sub("", s).strip()
return s
def enforce_terms_ru(s: str) -> str:
# normalize forbidden russian variants to королевский питон
s2 = _RE_FORBIDDEN_BALL.sub("королевский питон", s)
return s2
def looks_bad_translation(src_en: str, ru: str) -> bool:
if not ru:
return True
# obvious nonsense / repeated latin keyword
if _RE_IVOIRE.search(ru):
return True
# latin words that are not likely needed (if they weren't in src)
# allow if that word appears in src (e.g., brand place)
latin = _RE_LATIN_WORD.findall(ru)
if latin:
src_lower = src_en.lower()
for w in latin:
if w.lower() not in src_lower:
# Not in source: likely hallucinated latin insertion
return True
return False
# ---------------- LLM CALLS ----------------
def load_model():
return lms.llm(MODEL_ID, config={
"contextLength": CONTEXT_LENGTH,
"gpu": {"ratio": GPU_RATIO},
})
def build_translation_guide(model, full_text_only: str) -> str:
chunks = split_by_newlines(full_text_only, GUIDE_CHUNK_CHARS)
guide = ""
for idx, chunk in enumerate(tqdm(chunks, desc="Building guide (feed full Text)", unit="chunk"), 1):
chat = lms.Chat(SYSTEM_GUIDE)
user = (
f"Текущий гайд (может быть пустым):\n{guide}\n\n"
f"Новый фрагмент транскрипта #{idx}/{len(chunks)}:\n"
f"{chunk}\n\n"
"Обнови гайд: 10–25 коротких пунктов, строго по делу. Верни только гайд."
)
chat.add_user_message(user)
result = model.respond(chat, config={
"temperature": GUIDE_TEMPERATURE,
"topP": GUIDE_TOP_P,
"maxTokens": GUIDE_MAX_TOKENS,
})
out = clean_model_output(result_to_text(result))
# allow multi-line guide: use original cleaned (first-line policy would kill it),
# so for guide we do a different cleaning: strip codefences, keep all lines.
out_full = result_to_text(result).strip()
out_full = _RE_CODEBLOCK.sub("", out_full).strip()
out_full = out_full.strip("`").strip()
if out_full:
guide = out_full
return guide.strip()
def translate_one_row(model, guide: str, row_id: int, src_en: str) -> str:
"""
Returns RU translation (single line), with retries + post-fixes.
"""
last = ""
for attempt in range(1, ROW_MAX_TRIES + 1):
chat = lms.Chat(SYSTEM_TRANSLATE)
# Make the guide short-ish in prompt; don't paste huge wall
user = (
"Контекстный гайд для единообразия (используй, но не цитируй):\n"
f"{guide}\n\n"
"Переведи ОДНУ реплику EN->RU.\n"
"Требования:\n"
"- Только перевод, одной строкой.\n"
"- Без кавычек, без JSON, без комментариев, без markdown.\n"
"- Естественная речь для озвучки.\n"
"- Род/падеж/склонения — внимательно.\n"
"- “ball python/royal python” -> строго «королевский питон» (без болл/шар).\n"
"- Никаких посторонних латинских вставок/повторов (типа Ivoire).\n\n"
f"Row={row_id}\nText={src_en}"
)
chat.add_user_message(user)
result = model.respond(chat, config={
"temperature": ROW_TEMPERATURE if attempt == 1 else 0.05, # become stricter on retries
"topP": ROW_TOP_P,
"maxTokens": ROW_MAX_TOKENS,
})
ru = clean_model_output(result_to_text(result))
ru = enforce_terms_ru(ru)
if not ru:
last = ru
time.sleep(SLEEP_BETWEEN_TRIES_SEC)
continue
# If bad: retry with even stricter instruction
if looks_bad_translation(src_en, ru):
last = ru
time.sleep(SLEEP_BETWEEN_TRIES_SEC)
continue
return ru
# final fallback: whatever we got last, normalized
return enforce_terms_ru(clean_model_output(last)) or ""
def translate_rows(model, df: pd.DataFrame, guide: str, partial_path: Path) -> Dict[int, str]:
row_to_ru: Dict[int, str] = {}
# If partial exists, resume
if partial_path.exists():
try:
p = pd.read_csv(partial_path, dtype=str, keep_default_na=False)
if "Row" in p.columns and "Text" in p.columns:
for _, r in p.iterrows():
try:
rid = int(str(r["Row"]).strip())
except Exception:
continue
txt = str(r["Text"]).strip()
if txt:
row_to_ru[rid] = txt
except Exception:
pass
out_df = df.copy()
done = 0
for _, r in tqdm(df.iterrows(), total=len(df), desc="Translating rows", unit="row"):
row_id = int(str(r["Row"]).strip())
src = str(r["Text"])
if row_id in row_to_ru and row_to_ru[row_id].strip():
done += 1
continue
ru = translate_one_row(model, guide, row_id, src)
if not ru:
ru = src # final fallback (better than blank)
row_to_ru[row_id] = ru
done += 1
# periodic partial save
if done % SAVE_PARTIAL_EVERY_N_ROWS == 0:
out_df["Text"] = [row_to_ru.get(int(str(x).strip()), str(df.loc[df["Row"] == x, "Text"].values[0]))
for x in out_df["Row"].tolist()]
out_df.to_csv(partial_path, index=False, encoding="utf-8-sig")
# final save of partial
out_df["Text"] = [row_to_ru.get(int(str(x).strip()), "") for x in out_df["Row"].tolist()]
out_df.to_csv(partial_path, index=False, encoding="utf-8-sig")
return row_to_ru
def main() -> None:
if not INPUT_FILE.exists():
raise FileNotFoundError(f"INPUT_FILE not found: {INPUT_FILE}")
df = pd.read_csv(INPUT_FILE, dtype=str, keep_default_na=False)
if "Row" not in df.columns or "Text" not in df.columns:
raise RuntimeError("CSV must contain columns: Row, Text")
# 1) FULL TEXT feed (only concatenated Text)
full_text_only = "\n".join(df["Text"].astype(str).tolist())
model = load_model()
guide = build_translation_guide(model, full_text_only)
guide_path = INPUT_FILE.with_name("translation_guide.txt")
guide_path.write_text(guide, encoding="utf-8")
partial_path = INPUT_FILE.with_name("transcribe_RUS.partial.csv")
row_to_ru = translate_rows(model, df, guide, partial_path)
out_df = df.copy()
out_df["Text"] = [row_to_ru.get(int(str(x).strip()), "") for x in out_df["Row"].tolist()]
out_path = INPUT_FILE.with_name("transcribe_RUS.csv")
out_df.to_csv(out_path, index=False, encoding="utf-8-sig")
print(f"OK: saved -> {out_path}")
print(f"Guide saved -> {guide_path}")
print(f"Partial snapshot -> {partial_path}")
if __name__ == "__main__":
main()
Вот, так обработка получается значительно стабильней и можно не так сильно упираться в VRAM.
Итак, теперь у нас есть cut_*.mp3 с эмоциями и тембром, full.mp3 (если не хватит cut), translate_RUS. Можем приступать к озвучке.
Как я смог найти, есть 3 модели, которые нормально умеют с иностранного в русский: Misha24-10/F5-TTS_RUSSIAN, tensorbanana/xttsv2_banana, Chatterbox-Multilingual-TTS.
Сравнение первых двух можно посмотреть здесь: Дообучение модели F5-TTS на русский язык. Однако у всех была проблема с непонятными словами, когда они внезапно переставали говорить по-русски и несли тарабарщину. Да и нормально поджать акцент тоже не получилось. В итоге, остался на Chatterbox-Multilingual-TTS.
Она также вредничала на версию Python и прочее, поэтому установил в WSL2 с conda. Но в этот раз брал версию Python3.11. Иначе потом проблемы с установкой будут. Часть проблем поборол патчами, но тем не менее. Также имелась проблема с watermark, поэтому добавил параметры, чтобы отключить watermark для каждой части и добавить для целикового аудио. Но была ещё проблема. Ударения. Если вы послушаете иностранца, то ухо будет резать ударение, и здесь также. Для решения проблем добавил использование RUAccent.
import os
import re
import json
import shutil
import subprocess
import logging
from pathlib import Path
from typing import Any, List, Dict, Optional, Callable, Tuple
import pandas as pd
import soundfile as sf
from tqdm import tqdm
import torch
# ===================== CONFIG =====================
INPUT_DIR = r"D:/Experiments/Audio/4_data/1_1_videoplayback_(Vocals)_(No Echo)"
CSV_RU = "transcribe_RUS.csv"
OVERWRITE = True
OUT_MP3_BITRATE = "320k"
# Put ALL caches/tmp here (NO trash in INPUT_DIR)
CACHE_ROOT = r"/mnt/d/Experiments/TTS/_cache_chatterbox"
HF_DIRNAME = "hf"
TMP_DIRNAME = "tmp"
RUACCENT_DIRNAME = "ruaccent"
# NEW: two-phase chunk folders (under CACHE_ROOT)
CHUNKS_RAW_DIRNAME = "chunks_raw"
CHUNKS_DENOISED_DIRNAME = "chunks_denoised"
MANIFEST_NAME = "chunks_manifest.json"
FFMPEG = "ffmpeg"
FFPROBE = "ffprobe"
# Reference audio rules
REF_SR = 24000
MIN_REF_SEC = 10.0
REF_MAX_SEC = 12.0
# ======= SETTINGS LIKE THE SPACE UI =======
EXAGGERATION = 0.50
TEMPERATURE = 0.80
CFG_WEIGHT = 0.05
SEED = 12345
# ==========================================
# Chunking: keep under 300 chars (Space truncates)
MAX_CHARS_PER_CHUNK = 280
MAX_WORDS_PER_CHUNK = 60
HARD_TEXT_LIMIT = 300
# Audio post
PEAK = 0.95
TAIL_PAD_SEC = 0.12
CROSSFADE_MS = 14
CHUNK_EDGE_FADE_MS = 4
# Phase B: trim silence at BOTH ends (after UVR)
TRIM_BOTH_EDGES = True
EDGE_SIL_THRESH = 0.010
EDGE_HOLD_SEC = 0.06
EDGE_KEEP_PAD_SEC = 0.03
# Watermark handling
DISABLE_WATERMARK_DURING_CHUNKS = True
APPLY_WATERMARK_ON_FINAL = False
# RUAccent
USE_RUACCENT = True
RUACCENT_MODEL = "turbo3.1"
RUACCENT_USE_DICTIONARY = True
RUACCENT_TINY_MODE_FALLBACK = True
# Print stressed text
LOG_STRESSED_TEXT = True
PRINT_STRESSED_TEXT = True
CUSTOM_WORD_OVERRIDES: Dict[str, str] = {
"регион": "регио+н",
"змея": "змея+",
"питон": "пито+н",
"Квази": "Куа+зи",
"Сахельская": "Сахе+льская",
}
# ==================================================
# ---- Force eager attention for transformers (voice ref path uses alignment analyzer) ----
os.environ.setdefault("TRANSFORMERS_ATTN_IMPLEMENTATION", "eager")
# Redirect caches
CACHE_ROOT_PATH = Path(CACHE_ROOT)
HF_CACHE = CACHE_ROOT_PATH / HF_DIRNAME
TMP_DIR = CACHE_ROOT_PATH / TMP_DIRNAME
RUACCENT_DIR = CACHE_ROOT_PATH / RUACCENT_DIRNAME
RAW_CHUNKS_DIR = CACHE_ROOT_PATH / CHUNKS_RAW_DIRNAME
DENOISED_CHUNKS_DIR = CACHE_ROOT_PATH / CHUNKS_DENOISED_DIRNAME
MANIFEST_PATH = CACHE_ROOT_PATH / MANIFEST_NAME
for p in (HF_CACHE, TMP_DIR, RUACCENT_DIR, RAW_CHUNKS_DIR, DENOISED_CHUNKS_DIR):
p.mkdir(parents=True, exist_ok=True)
os.environ.setdefault("HF_HOME", str(HF_CACHE))
os.environ.setdefault("HUGGINGFACE_HUB_CACHE", str(HF_CACHE))
os.environ.setdefault("TORCH_HOME", str(HF_CACHE))
os.environ.setdefault("XDG_CACHE_HOME", str(HF_CACHE))
def force_transformers_eager_attention() -> None:
"""Force attn_implementation='eager' in transformers from_pretrained()."""
try:
from transformers.modeling_utils import PreTrainedModel
orig_ptm = PreTrainedModel.from_pretrained
@classmethod
def patched_ptm_from_pretrained(cls, *args, **kwargs):
kwargs.setdefault("attn_implementation", "eager")
fn = orig_ptm.__func__ if hasattr(orig_ptm, "__func__") else orig_ptm
return fn(cls, *args, **kwargs)
PreTrainedModel.from_pretrained = patched_ptm_from_pretrained # type: ignore[assignment]
try:
from transformers.models.auto.auto_factory import _BaseAutoModelClass # type: ignore
orig_auto = _BaseAutoModelClass.from_pretrained
@classmethod
def patched_auto_from_pretrained(cls, pretrained_model_name_or_path, *args, **kwargs):
kwargs.setdefault("attn_implementation", "eager")
fn = orig_auto.__func__ if hasattr(orig_auto, "__func__") else orig_auto
return fn(cls, pretrained_model_name_or_path, *args, **kwargs)
_BaseAutoModelClass.from_pretrained = patched_auto_from_pretrained # type: ignore[assignment]
except Exception:
pass
logging.info("Transformers patched: forcing attn_implementation='eager' in from_pretrained()")
except Exception as e:
logging.warning("Could not patch transformers eager-attn (continuing): %s", e)
force_transformers_eager_attention()
from chatterbox.mtl_tts import ChatterboxMultilingualTTS
# ===================== INTERNAL RU STRESS DISABLE (Chatterbox) =====================
def disable_chatterbox_internal_ru_stress() -> None:
patched = 0
try:
import chatterbox.models.tokenizers.tokenizer as tokmod
if hasattr(tokmod, "_russian_stresser"):
try:
tokmod._russian_stresser = None
patched += 1
logging.info("Disabled internal RU stress: tokenizer._russian_stresser -> None")
except Exception as e:
logging.warning("Failed patch _russian_stresser: %s", e)
if hasattr(tokmod, "add_russian_stress"):
try:
tokmod.add_russian_stress = lambda text: text # type: ignore[assignment]
patched += 1
logging.info("Disabled internal RU stress: tokenizer.add_russian_stress -> identity")
except Exception as e:
logging.warning("Failed patch add_russian_stress: %s", e)
except Exception as e:
logging.warning("Could not import tokenizer to disable stress: %s", e)
if patched == 0:
logging.warning("Internal RU stress patch: nothing patched (version mismatch). Continue anyway.")
# ===================== WATERMARK PATCH =====================
def patch_disable_watermark(model: Any) -> Optional[Callable[[Any, int], Any]]:
wm = getattr(model, "watermarker", None)
if wm is None:
logging.warning("Watermarker not found on model (nothing to patch).")
return None
orig = getattr(wm, "apply_watermark", None)
if not callable(orig):
logging.warning("model.watermarker.apply_watermark not callable (nothing to patch).")
return None
def _identity(wav, sample_rate: int): # noqa: ANN001
return wav
try:
wm.apply_watermark = _identity # type: ignore[assignment]
logging.info("Patched watermark: apply_watermark -> identity (disabled during chunk generation).")
return orig
except Exception as e:
logging.warning("Failed to patch watermark: %s", e)
return None
# ===================== HELPERS =====================
def to_wsl_path(p: str) -> str:
s = p.replace("\\", "/")
m = re.match(r"^([A-Za-z]):/(.*)$", s)
if m:
return f"/mnt/{m.group(1).lower()}/{m.group(2)}"
return s
def ensure_tool(name: str) -> None:
if shutil.which(name) is None:
raise RuntimeError(f"Required tool not found in PATH: {name}")
def run(cmd: list[str]) -> subprocess.CompletedProcess:
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
raise RuntimeError(f"Command failed ({p.returncode}): {' '.join(cmd)}\nSTDERR:\n{p.stderr}")
return p
def get_duration_sec(media: Path) -> float:
p = run([
FFPROBE, "-hide_banner", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=nw=1:nk=1",
str(media),
])
try:
return float(p.stdout.strip())
except Exception:
return 0.0
def prep_text_ru(s: str) -> str:
s = re.sub(r"\s+", " ", (s or "").replace("\r", " ").replace("\n", " ")).strip()
if not s:
return s
if s[-1] not in ".!?…":
s += "."
return s
VOWELS_RU = "АЕЁИОУЫЭЮЯаеёиоуыэюя"
COMBINING_ACUTE = "\u0301"
VISIBLE_ALT = "ˊ"
def normalize_stress_marks(text: str) -> str:
if not text:
return text
text = re.sub(rf"{re.escape(VISIBLE_ALT)}([{VOWELS_RU}])", rf"\1{COMBINING_ACUTE}", text)
text = re.sub(rf"([{VOWELS_RU}]){re.escape(VISIBLE_ALT)}", rf"\1{COMBINING_ACUTE}", text)
text = re.sub(rf"\+([{VOWELS_RU}])", rf"\1{COMBINING_ACUTE}", text)
text = re.sub(rf"([{VOWELS_RU}])\+", rf"\1{COMBINING_ACUTE}", text)
text = text.replace("+", "").replace(VISIBLE_ALT, "")
text = re.sub(rf"{COMBINING_ACUTE}+", COMBINING_ACUTE, text)
return text
def sanitize_audio(wav_t: torch.Tensor, peak: float = 0.98) -> torch.Tensor:
wav_t = wav_t.detach().cpu().to(torch.float32)
if wav_t.ndim == 2:
if wav_t.shape[0] == 1:
wav_t = wav_t[0]
elif wav_t.shape[1] == 1:
wav_t = wav_t[:, 0]
else:
wav_t = wav_t.mean(dim=0)
wav_t = torch.nan_to_num(wav_t, nan=0.0, posinf=0.0, neginf=0.0)
m = float(wav_t.abs().max().item()) if wav_t.numel() else 0.0
if m > 0.0 and m > peak:
wav_t = wav_t * (peak / m)
return wav_t.clamp(-1.0, 1.0)
def fade_edges(wav: torch.Tensor, sr: int, fade_ms: float) -> torch.Tensor:
if wav.numel() == 0:
return wav
n = int(round((fade_ms / 1000.0) * sr))
if n <= 0:
return wav
if wav.numel() < n * 2:
return wav
ramp = torch.linspace(0.0, 1.0, steps=n, dtype=wav.dtype)
wav = wav.clone()
wav[:n] = wav[:n] * ramp
wav[-n:] = wav[-n:] * ramp.flip(0)
return wav
def pad_tail(wav: torch.Tensor, sr: int, tail_sec: float) -> torch.Tensor:
n = int(round(max(0.0, tail_sec) * sr))
if n <= 0:
return wav
return torch.cat([wav, torch.zeros(n, dtype=wav.dtype)], dim=0)
def crossfade_concat(chunks: List[torch.Tensor], sr: int, fade_ms: float) -> torch.Tensor:
if not chunks:
return torch.zeros(0, dtype=torch.float32)
if len(chunks) == 1:
return chunks[0]
fade = int(round((fade_ms / 1000.0) * sr))
fade = max(1, fade)
out = chunks[0]
for nxt in chunks[1:]:
if out.numel() < fade or nxt.numel() < fade:
out = torch.cat([out, nxt], dim=0)
continue
a = out[-fade:]
b = nxt[:fade]
ramp = torch.linspace(0.0, 1.0, steps=fade, dtype=out.dtype)
mix = a * (1.0 - ramp) + b * ramp
out = torch.cat([out[:-fade], mix, nxt[fade:]], dim=0)
return out
def audio_to_ref_wav(src_audio: Path, out_wav: Path, sr: int, clip_sec: float) -> Path:
out_wav.parent.mkdir(parents=True, exist_ok=True)
cmd = [
FFMPEG, "-hide_banner", "-y",
"-i", str(src_audio),
"-vn",
"-ac", "1",
"-ar", str(sr),
]
if clip_sec and clip_sec > 0:
cmd += ["-t", f"{clip_sec:.3f}"]
cmd += ["-c:a", "pcm_s16le", str(out_wav)]
run(cmd)
return out_wav
def write_wav_tensor_pcm16(wav_t: torch.Tensor, sr: int, dst_wav: Path) -> None:
dst_wav.parent.mkdir(parents=True, exist_ok=True)
wav_t = sanitize_audio(wav_t, peak=PEAK)
sf.write(str(dst_wav), wav_t.numpy(), sr, subtype="PCM_16")
def write_mp3_from_wav_tensor(wav_t: torch.Tensor, sr: int, dst_mp3: Path, tmp_wav: Path) -> None:
dst_mp3.parent.mkdir(parents=True, exist_ok=True)
tmp_wav.parent.mkdir(parents=True, exist_ok=True)
wav_t = sanitize_audio(wav_t, peak=PEAK)
sf.write(str(tmp_wav), wav_t.numpy(), sr, subtype="PCM_16")
run([
FFMPEG, "-hide_banner", "-y" if OVERWRITE else "-n",
"-i", str(tmp_wav),
"-vn",
"-ac", "1",
"-ar", str(sr),
"-c:a", "libmp3lame",
"-b:a", OUT_MP3_BITRATE,
str(dst_mp3),
])
def call_generate_space_style(model: Any, text: str, language_id: str, audio_prompt_path: Optional[str]) -> torch.Tensor:
text = (text or "")[:HARD_TEXT_LIMIT]
kwargs = {
"exaggeration": float(EXAGGERATION),
"temperature": float(TEMPERATURE),
"cfg_weight": float(CFG_WEIGHT),
}
if audio_prompt_path:
kwargs["audio_prompt_path"] = audio_prompt_path
return model.generate(text, language_id=language_id, **kwargs)
def split_ru_into_chunks(text: str, max_chars: int, max_words: int) -> List[str]:
t = prep_text_ru(text)
if not t:
return []
parts = re.split(r"([.!?…]+)\s*", t)
sents: List[str] = []
for i in range(0, len(parts), 2):
s = (parts[i] or "").strip()
p = (parts[i + 1] or "").strip() if i + 1 < len(parts) else ""
if s:
sents.append((s + p).strip())
if not sents:
sents = [t]
chunks: List[str] = []
cur = ""
cur_words = 0
def flush():
nonlocal cur, cur_words
if cur.strip():
chunks.append(cur.strip())
cur = ""
cur_words = 0
for s in sents:
w = len(s.split())
if (len(cur) + 1 + len(s) <= max_chars) and (cur_words + w <= max_words):
cur = (cur + " " + s).strip()
cur_words += w
else:
flush()
if len(s) > max_chars or w > max_words:
words = s.split()
buf: List[str] = []
for word in words:
buf.append(word)
if len(" ".join(buf)) >= max_chars or len(buf) >= max_words:
chunks.append(" ".join(buf).strip())
buf = []
if buf:
chunks.append(" ".join(buf).strip())
else:
cur = s
cur_words = w
flush()
out: List[str] = []
for c in chunks:
c = c.strip()
if c and c[-1] not in ".!?…":
c += "."
out.append(c)
return out
# ===================== MANUAL OVERRIDES =====================
def _case_like(src: str, repl: str) -> str:
if not src:
return repl
if src.isupper():
return repl.upper()
if src[0].isupper():
return repl[0].upper() + repl[1:]
return repl
def apply_manual_overrides(text: str, overrides: Dict[str, str]) -> str:
if not text or not overrides:
return text
norm_overrides: Dict[str, str] = {}
for k, v in overrides.items():
k = (k or "").strip()
v = (v or "").strip()
if not k or not v:
continue
norm_overrides[k] = normalize_stress_marks(v)
keys = sorted(norm_overrides.keys(), key=len, reverse=True)
out = text
for k in keys:
v = norm_overrides[k]
pattern = re.compile(
rf"(?<![A-Za-zА-Яа-яЁё0-9\-]){re.escape(k)}(?![A-Za-zА-Яа-яЁё0-9\-])",
re.IGNORECASE,
)
def _sub(m: re.Match) -> str:
return _case_like(m.group(0), v)
out = pattern.sub(_sub, out)
return out
# ===================== RUACCENT =====================
class RUAccentHelper:
def __init__(self) -> None:
self.impl = None
self.ready = False
def load(self) -> None:
if not USE_RUACCENT:
return
try:
from ruaccent import RUAccent
self.impl = RUAccent()
device = "CPU"
try:
import onnxruntime as ort
if "CUDAExecutionProvider" in ort.get_available_providers():
device = "CUDA"
except Exception:
pass
try:
self.impl.load(
omograph_model_size=RUACCENT_MODEL,
use_dictionary=RUACCENT_USE_DICTIONARY,
tiny_mode=False,
device=device,
workdir=str(RUACCENT_DIR),
)
self.ready = True
logging.info("RUAccent loaded (device=%s, model=%s, dict=%s)", device, RUACCENT_MODEL, RUACCENT_USE_DICTIONARY)
return
except Exception as e:
logging.warning("RUAccent load (dict mode) failed: %s", e)
if RUACCENT_TINY_MODE_FALLBACK:
self.impl.load(
omograph_model_size=RUACCENT_MODEL,
use_dictionary=False,
tiny_mode=True,
device=device,
workdir=str(RUACCENT_DIR),
)
self.ready = True
logging.info("RUAccent loaded in tiny_mode (device=%s, model=%s)", device, RUACCENT_MODEL)
return
except Exception as e:
logging.warning("RUAccent unavailable; continuing without stress: %s", e)
self.impl = None
self.ready = False
def apply(self, text_ru: str) -> str:
if not self.ready or self.impl is None:
return text_ru
try:
t = self.impl.process_all(text_ru)
return normalize_stress_marks(t)
except Exception as e:
logging.warning("RUAccent failed; fallback no-stress: %s", e)
return text_ru
# ===================== PHASE B: load denoised chunks + trim both edges =====================
def _to_mono_1d(x: torch.Tensor) -> torch.Tensor:
if x.ndim == 2:
# soundfile returns (frames, channels)
if x.shape[1] == 1:
x = x[:, 0]
else:
x = x.mean(dim=1)
return x
def load_wav_any(path: Path) -> Tuple[torch.Tensor, int]:
data, sr = sf.read(str(path), always_2d=False, dtype="float32")
t = torch.from_numpy(data).to(torch.float32)
t = _to_mono_1d(t)
return t, int(sr)
def ffmpeg_resample_to(path_in: Path, path_out: Path, sr: int) -> None:
path_out.parent.mkdir(parents=True, exist_ok=True)
run([
FFMPEG, "-hide_banner", "-y",
"-i", str(path_in),
"-vn",
"-ac", "1",
"-ar", str(sr),
"-c:a", "pcm_s16le",
str(path_out),
])
def load_wav_and_force_sr(path: Path, target_sr: int, work_tmp: Path) -> torch.Tensor:
wav, sr = load_wav_any(path)
if sr == target_sr:
return sanitize_audio(wav, peak=PEAK)
tmp = work_tmp / f"rs_{path.stem}.wav"
ffmpeg_resample_to(path, tmp, target_sr)
wav2, sr2 = load_wav_any(tmp)
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
if sr2 != target_sr:
logging.warning("Resample mismatch: got %s (expected %s) for %s", sr2, target_sr, path.name)
return sanitize_audio(wav2, peak=PEAK)
def trim_silence_both_ends(wav: torch.Tensor, sr: int, thr: float, hold_sec: float, keep_pad_sec: float) -> torch.Tensor:
"""
Trim silence from both beginning and end using peak threshold.
We scan using a small window to be robust to tiny spikes.
"""
if wav.numel() == 0:
return wav
x = wav.abs()
hold = max(1, int(round(hold_sec * sr)))
pad = max(0, int(round(keep_pad_sec * sr)))
# start
start = 0
i = 0
while i + hold < x.numel() and float(x[i:i+hold].max()) < thr:
i += hold
start = max(0, i - pad)
# end
end = x.numel()
j = x.numel()
while j - hold > 0 and float(x[j-hold:j].max()) < thr:
j -= hold
end = min(x.numel(), j + pad)
if end <= start:
return wav # don't destroy audio if threshold too aggressive
return wav[start:end]
def find_best_match_by_token(root: Path, token: str) -> Optional[Path]:
"""
Match denoised files by token anywhere in filename: *{token}*.wav (recursive).
Pick newest by mtime.
"""
cand = list(root.rglob(f"*{token}*.wav"))
if not cand:
return None
cand.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return cand[0]
# ===================== MAIN =====================
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
ensure_tool(FFMPEG)
ensure_tool(FFPROBE)
device = "cuda" if torch.cuda.is_available() else "cpu"
in_dir = Path(to_wsl_path(INPUT_DIR))
csv_ru = in_dir / CSV_RU
if not csv_ru.exists():
raise FileNotFoundError(f"Missing {CSV_RU}: {csv_ru}")
df = pd.read_csv(csv_ru, dtype=str, keep_default_na=False)
need_cols = {"Text", "File"}
if not need_cols.issubset(set(df.columns)):
raise RuntimeError(f"{CSV_RU} must contain columns {sorted(need_cols)}; got: {list(df.columns)}")
logging.info("Using CACHE_ROOT=%s", CACHE_ROOT_PATH)
logging.info("RAW_CHUNKS_DIR=%s", RAW_CHUNKS_DIR)
logging.info("DENOISED_CHUNKS_DIR=%s", DENOISED_CHUNKS_DIR)
logging.info("Loading Chatterbox Multilingual (device=%s)...", device)
model = ChatterboxMultilingualTTS.from_pretrained(device=device)
disable_chatterbox_internal_ru_stress()
orig_apply_watermark = None
if DISABLE_WATERMARK_DURING_CHUNKS:
orig_apply_watermark = patch_disable_watermark(model)
sr = int(getattr(model, "sr", REF_SR))
if SEED and int(SEED) != 0:
torch.manual_seed(int(SEED))
if device == "cuda":
torch.cuda.manual_seed_all(int(SEED))
ruaccent = RUAccentHelper()
ruaccent.load()
# ---------------- PHASE A: generate chunk WAVs ----------------
manifest: List[dict] = []
logging.info("PHASE A: generating WAV chunks into %s ...", RAW_CHUNKS_DIR)
for idx, r in tqdm(df.iterrows(), total=len(df), desc="PHASE A | gen chunks", unit="row"):
rel = str(r["File"]).strip().replace("\\", "/")
raw_text = str(r["Text"])
if not rel or not raw_text.strip():
continue
cut_mp3 = in_dir / rel
if not cut_mp3.exists():
logging.warning("Missing source cut (skip): %s", cut_mp3)
continue
dst_mp3 = cut_mp3.with_name(cut_mp3.stem + "_RUS.mp3")
# Reference selection: cut too short -> full.mp3
cut_dur = get_duration_sec(cut_mp3)
ref_src = cut_mp3
if cut_dur > 0 and cut_dur < MIN_REF_SEC:
full_mp3 = cut_mp3.parent / "full.mp3"
if full_mp3.exists():
ref_src = full_mp3
logging.info(
"Row %d: cut=%.2fs < %.2fs -> using full.mp3 as reference (clip to %.2fs)",
idx + 1, cut_dur, MIN_REF_SEC, REF_MAX_SEC
)
# Text pipeline (stress)
text_ru = prep_text_ru(raw_text)
has_manual = ("+" in text_ru) or (VISIBLE_ALT in text_ru) or (COMBINING_ACUTE in text_ru)
if has_manual:
text_ru = normalize_stress_marks(text_ru)
else:
text_ru = ruaccent.apply(text_ru)
text_ru = apply_manual_overrides(text_ru, CUSTOM_WORD_OVERRIDES)
text_ru = normalize_stress_marks(text_ru)
if LOG_STRESSED_TEXT:
logging.info("Row %d | %s | STRESSED_TEXT: %s", idx + 1, rel, text_ru)
if PRINT_STRESSED_TEXT:
print(f"Row {idx+1} | {rel} | {text_ru}", flush=True)
chunks = split_ru_into_chunks(text_ru, MAX_CHARS_PER_CHUNK, MAX_WORDS_PER_CHUNK)
if not chunks:
continue
ref_wav = TMP_DIR / f"ref_{idx:06d}.wav"
try:
audio_to_ref_wav(ref_src, ref_wav, sr=sr, clip_sec=REF_MAX_SEC)
chunk_tokens: List[str] = []
for ci, c in enumerate(chunks):
token = f"cbx_r{idx+1:06d}_c{ci:02d}"
out_wav = RAW_CHUNKS_DIR / f"{token}.wav"
chunk_tokens.append(token)
if out_wav.exists() and not OVERWRITE:
continue # keep existing
# generate (Space-style)
w = call_generate_space_style(
model,
text=c,
language_id="ru",
audio_prompt_path=str(ref_wav),
).squeeze(0)
# store RAW chunk for UVR (no trimming here, minimal touching)
w = sanitize_audio(w, peak=PEAK)
write_wav_tensor_pcm16(w, sr=sr, dst_wav=out_wav)
manifest.append({
"row_index_1based": int(idx + 1),
"rel_file": rel,
"dst_mp3": str(dst_mp3),
"sr": int(sr),
"chunk_tokens": chunk_tokens,
})
except Exception as e:
logging.exception("Chunk generation failed for row %d (%s): %s", idx + 1, cut_mp3.name, e)
finally:
try:
ref_wav.unlink(missing_ok=True)
except Exception:
pass
# write manifest
MANIFEST_PATH.parent.mkdir(parents=True, exist_ok=True)
MANIFEST_PATH.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
logging.info("PHASE A complete. Manifest saved: %s", MANIFEST_PATH)
# ---------------- STOP: user must denoise manually ----------------
msg = f"""
================================================================================
MANUAL DENOISE REQUIRED (UVR)
--------------------------------------------------------------------------------
1) Take WAV chunks from:
{RAW_CHUNKS_DIR}
2) Denoise them manually in UVR and PUT RESULTS into:
{DENOISED_CHUNKS_DIR}
IMPORTANT:
- Output filenames MAY have prefix/suffix, that's OK.
- But each file name MUST still contain its token, like:
cbx_r000123_c00
anywhere in the filename, for matching.
Example acceptable UVR output names:
cbx_r000001_c00.wav
DENOISED__cbx_r000001_c00__whatever.wav
When you're done, type Y and press Enter to continue assembly.
================================================================================
"""
print(msg, flush=True)
ans = input("Type 'Y' to continue (anything else = exit): ").strip().upper()
if ans != "Y":
logging.info("Exit requested (no assembly performed).")
return
# ---------------- PHASE B: assemble mp3 from denoised chunks ----------------
logging.info("PHASE B: loading denoised WAV chunks from %s and assembling mp3 ...", DENOISED_CHUNKS_DIR)
# reload manifest (so you can re-run Phase B later without Phase A if desired)
try:
manifest2 = json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))
except Exception as e:
raise RuntimeError(f"Failed to read manifest {MANIFEST_PATH}: {e}")
for item in tqdm(manifest2, total=len(manifest2), desc="PHASE B | assemble", unit="row"):
dst_mp3 = Path(item["dst_mp3"])
sr_item = int(item.get("sr", sr))
tokens: List[str] = list(item.get("chunk_tokens", []))
rel = str(item.get("rel_file", ""))
if dst_mp3.exists() and not OVERWRITE:
continue
wav_chunks: List[torch.Tensor] = []
for token in tokens:
den = find_best_match_by_token(DENOISED_CHUNKS_DIR, token)
if den is None:
rawp = RAW_CHUNKS_DIR / f"{token}.wav"
if rawp.exists():
logging.warning("DENOISED missing for token=%s (using RAW): %s", token, rawp.name)
den = rawp
else:
logging.warning("Missing BOTH denoised+raw chunk for token=%s (skip row %s)", token, rel)
wav_chunks = []
break
w = load_wav_and_force_sr(den, target_sr=sr_item, work_tmp=TMP_DIR)
if TRIM_BOTH_EDGES:
w = trim_silence_both_ends(
w, sr_item,
thr=EDGE_SIL_THRESH,
hold_sec=EDGE_HOLD_SEC,
keep_pad_sec=EDGE_KEEP_PAD_SEC,
)
w = fade_edges(w, sr_item, CHUNK_EDGE_FADE_MS)
wav_chunks.append(w)
if not wav_chunks:
continue
out = crossfade_concat(wav_chunks, sr=sr_item, fade_ms=CROSSFADE_MS)
out = pad_tail(out, sr_item, TAIL_PAD_SEC)
if APPLY_WATERMARK_ON_FINAL and callable(orig_apply_watermark):
try:
out_np = out.detach().cpu().numpy()
out_np = orig_apply_watermark(out_np, sr_item)
out = torch.from_numpy(out_np).to(torch.float32)
except Exception as e:
logging.warning("Final watermark apply failed (continuing without): %s", e)
tmp_wav = TMP_DIR / f"final_{Path(rel).stem}.wav"
try:
write_mp3_from_wav_tensor(out, sr=sr_item, dst_mp3=dst_mp3, tmp_wav=tmp_wav)
except Exception as e:
logging.exception("MP3 write failed for %s: %s", dst_mp3.name, e)
finally:
try:
tmp_wav.unlink(missing_ok=True)
except Exception:
pass
logging.info("Done. Generated *_RUS.mp3 next to each cut_*.mp3")
logging.info("Raw chunks kept in: %s", RAW_CHUNKS_DIR)
logging.info("Denoised chunks expected in: %s", DENOISED_CHUNKS_DIR)
logging.info("Manifest: %s", MANIFEST_PATH)
if __name__ == "__main__":
main()
Однако, оказалось, что при генерации, Chatterbox любит на конце вставлять металлический шум, как на электростанции. Чтобы побороть эту проблему, добавил два этапа, между которыми надо пройтись UVR по файлам из директории. Код вам все напишет.
В итоге, мы получим нормальную русскую речь. Останется только её объединить. Набросал простенький скрипт:
import csv
import logging
import shutil
import subprocess
import tempfile
import wave
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List
from tqdm import tqdm
# ===================== CONFIG =====================
INPUT_DIR = Path(r"D:\Experiments\Audio\4_data")
CSV_NAME = "transcribe_RUS.csv"
FFMPEG = "ffmpeg"
# output encoding
OUT_MP3_BITRATE = "320k"
OVERWRITE = True
# timeline/render
TARGET_SR = 24000 # 24k хорошо подходит под TTS-кусочки, быстрее и меньше
SAMPLE_WIDTH_BYTES = 2 # s16le
CHANNELS = 1 # mono
# overlap handling
MIN_GAP_SEC = 0.02 # минимальная пауза между кусками (после overlap сдвигаем к time_cursor+MIN_GAP)
# gap handling (optional): можно чуть подтянуть следующий сегмент назад, чтобы сделать паузу короче
MAX_SHIFT_BACK_SEC = 2.00 # максимум на сколько можно сдвинуть сегмент РАНЬШЕ его StartSec (0 = выключить)
# optional: clamp unreasonable gaps (0 = off)
MAX_SILENCE_SEC = 0.0 # например 30.0 чтобы ограничить тишину между сегментами (при этом сегмент будет сдвинут раньше)
# ===================== DATA =====================
@dataclass
class Segment:
start: float
end: float
src_rus_mp3: Path
# ===================== UTILS =====================
def ensure_tool(name: str) -> None:
if shutil.which(name) is None:
raise RuntimeError(f"Required tool not found in PATH: {name}")
def safe_float(x: str, default: float = 0.0) -> float:
try:
return float(str(x).strip())
except Exception:
return default
def write_silence(wf: wave.Wave_write, silence_samples: int) -> None:
if silence_samples <= 0:
return
wf.writeframesraw(b"\x00" * (silence_samples * SAMPLE_WIDTH_BYTES * CHANNELS))
def normalize_file_to_rus(file_field: str) -> str:
"""
File в CSV обычно: Person_01/cut_....mp3
Нам нужен: Person_01/cut_...._RUS.mp3
"""
s = str(file_field).strip().replace("\\", "/")
if not s:
return s
low = s.lower()
if low.endswith("_rus.mp3"):
return s
if low.endswith(".mp3"):
return s[:-4] + "_RUS.mp3"
return s + "_RUS.mp3"
def person_key(person: str) -> str:
return str(person).strip()
def list_audio_dirs(root: Path) -> List[Path]:
# audio dir = папка где есть transcribe_RUS.csv
out: List[Path] = []
if not root.exists():
return out
for p in root.iterdir():
if p.is_dir() and (p / CSV_NAME).exists():
out.append(p)
return sorted(out)
def read_segments(audio_dir: Path) -> Dict[str, List[Segment]]:
"""
Возвращает { "Person_01": [Segment...], ... }
"""
csv_path = audio_dir / CSV_NAME
by_person: Dict[str, List[Segment]] = {}
with csv_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
person = person_key(row.get("Person", ""))
if not person:
continue
start = safe_float(row.get("StartSec", "0"), 0.0)
end = safe_float(row.get("EndSec", "0"), 0.0)
if end <= start:
# для сортировки/валидности нам нужен хотя бы порядок, но длительность НЕ используем для обрезки
continue
file_field = row.get("File", "")
rel_rus = normalize_file_to_rus(file_field)
if not rel_rus:
continue
src = audio_dir / rel_rus
if not src.exists():
# иногда файл лежит просто в папке Person_XX без префикса Person_XX/...
bn = Path(rel_rus).name
alt = audio_dir / person / bn
if alt.exists():
src = alt
else:
logging.warning("Missing segment (skip): %s", src)
continue
by_person.setdefault(person, []).append(Segment(start=max(0.0, start), end=end, src_rus_mp3=src))
for k in list(by_person.keys()):
by_person[k].sort(key=lambda s: (s.start, s.end))
return by_person
def encode_wav_to_mp3(wav_path: Path, mp3_path: Path) -> None:
mp3_path.parent.mkdir(parents=True, exist_ok=True)
cmd = [
FFMPEG, "-hide_banner", "-loglevel", "error",
"-y" if OVERWRITE else "-n",
"-i", str(wav_path),
"-vn",
"-ac", str(CHANNELS),
"-ar", str(TARGET_SR),
"-c:a", "libmp3lame",
"-b:a", OUT_MP3_BITRATE,
str(mp3_path),
]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.returncode != 0:
raise RuntimeError(f"ffmpeg encode failed: {mp3_path}\n{p.stderr.decode(errors='ignore')}")
def decode_and_stream_to_wav(mp3_path: Path, wf: wave.Wave_write, sr: int) -> int:
"""
Потоково декодирует mp3 -> raw PCM s16le (mono) и пишет в wav.
Возвращает количество записанных samples.
"""
cmd = [
FFMPEG,
"-hide_banner",
"-loglevel", "error",
"-i", str(mp3_path),
"-vn",
"-ac", str(CHANNELS),
"-ar", str(sr),
"-f", "s16le",
"pipe:1",
]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert p.stdout is not None
bytes_written = 0
rem = b""
chunk_size = 256 * 1024
while True:
chunk = p.stdout.read(chunk_size)
if not chunk:
break
if rem:
chunk = rem + chunk
rem = b""
# выровнять по sample boundary (2 bytes * channels)
frame_bytes = SAMPLE_WIDTH_BYTES * CHANNELS
cut = (len(chunk) // frame_bytes) * frame_bytes
if cut == 0:
rem = chunk
continue
wf.writeframesraw(chunk[:cut])
bytes_written += cut
rem = chunk[cut:]
# если остался хвост не кратный sample-границе — просто игнор (микрохвост, лучше чем краш)
_, err = p.communicate()
if p.returncode != 0:
raise RuntimeError(f"ffmpeg decode failed: {mp3_path}\n{(err or b'').decode(errors='ignore')}")
samples = bytes_written // (SAMPLE_WIDTH_BYTES * CHANNELS)
return int(samples)
def build_person_timeline(person: str, segs: List[Segment], out_mp3: Path, tmp_dir: Path) -> None:
"""
Сборка таймлайна БЕЗ ОБРЕЗКИ:
- ставим сегмент примерно около его StartSec
- при overlap сдвигаем вперёд до time_cursor + MIN_GAP_SEC
- при больших паузах можно слегка сдвинуть НАЗАД (MAX_SHIFT_BACK_SEC)
- сегмент пишем целиком (как декодировался), time_cursor двигаем по фактической длительности
"""
if not segs:
return
tmp_wav = tmp_dir / f"{person}_tmp.wav"
with wave.open(str(tmp_wav), "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH_BYTES)
wf.setframerate(TARGET_SR)
time_cursor = 0.0
for s in segs:
min_start = time_cursor + MIN_GAP_SEC
# Базовая цель — s.start, но:
# 1) если overlap — двигаем вперёд (min_start)
# 2) если есть пауза — можем чуть подтянуть назад (не больше MAX_SHIFT_BACK_SEC)
target = s.start
if target < min_start:
start = min_start
else:
if MAX_SHIFT_BACK_SEC > 0.0:
start = max(min_start, target - MAX_SHIFT_BACK_SEC)
else:
start = target
# optional clamp huge silences by shifting earlier a lot (off by default)
gap = start - time_cursor
if MAX_SILENCE_SEC > 0.0 and gap > MAX_SILENCE_SEC:
gap = MAX_SILENCE_SEC
start = time_cursor + gap
gap_samples = int(round(max(0.0, gap) * TARGET_SR))
write_silence(wf, gap_samples)
# пишем сегмент целиком (без trim/pad)
written_samples = decode_and_stream_to_wav(s.src_rus_mp3, wf, TARGET_SR)
dur_sec = written_samples / float(TARGET_SR) if written_samples > 0 else 0.0
time_cursor = start + dur_sec
wf.writeframes(b"") # finalize header sizes
encode_wav_to_mp3(tmp_wav, out_mp3)
try:
tmp_wav.unlink(missing_ok=True)
except Exception:
pass
# ===================== MAIN =====================
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
ensure_tool(FFMPEG)
root = INPUT_DIR
audio_dirs = list_audio_dirs(root)
if not audio_dirs:
logging.warning("No audio dirs found in: %s (expected subdirs with %s)", root, CSV_NAME)
return
for audio_dir in tqdm(audio_dirs, desc="Audio dirs", unit="dir"):
try:
by_person = read_segments(audio_dir)
if not by_person:
continue
with tempfile.TemporaryDirectory(prefix="_tmp_join_", dir=str(audio_dir)) as tdir:
tdir_p = Path(tdir)
for person, segs in tqdm(by_person.items(), desc=f"{audio_dir.name}", unit="person", leave=False):
out_mp3 = audio_dir / f"{person}.mp3"
build_person_timeline(person, segs, out_mp3, tdir_p)
except Exception as e:
logging.exception("Failed dir %s: %s", audio_dir, e)
logging.info("Done.")
if __name__ == "__main__":
main()
На выходе получим русскую речь по персонам, а не в виде cut_. Осталось объединить дорожки. И тут нам пригодится Instrumental дорожка, которую переименовываем в Person_99.mp3 и запускаем финальную сборку.
import csv
import json
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional
# ===================== CONFIG =====================
INPUT_DIR_AUDIO = Path(r"D:/Experiments/Audio/4_data/1_1_videoplayback_(Vocals)_(No Echo)")
VIDEO_FILE = Path(r"D:/Experiments/Audio/1__video_source/videoplayback.mp4")
FFMPEG = "ffmpeg"
FFPROBE = "ffprobe"
AUDIO_SR = 48000
AUDIO_BITRATE = "320k" # финальный AAC битрейт в MP4
MIX_LIMIT = 0.95 # лимитер, чтобы уменьшить клиппинг при миксе
LANG_TAG = "rus" # тег языка аудио дорожки в контейнере
OUT_MP4_NAME = "output.mp4"
OUT_SRT_NAME = "output.srt"
# ==================================================
def run(cmd: List[str]) -> None:
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace")
if p.returncode != 0:
raise RuntimeError(f"Command failed ({p.returncode}):\n{' '.join(cmd)}\n\n{p.stdout}")
def ffprobe_duration_seconds(path: Path) -> float:
# Возвращает длительность в секундах (float)
cmd = [
FFPROBE,
"-v", "error",
"-show_entries", "format=duration",
"-of", "json",
str(path),
]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace")
if p.returncode != 0:
raise RuntimeError(f"ffprobe failed:\n{p.stderr}")
data = json.loads(p.stdout)
dur = data.get("format", {}).get("duration", None)
if dur is None:
raise RuntimeError("ffprobe: couldn't read duration")
return float(dur)
def sec_to_srt_time(t: float) -> str:
# HH:MM:SS,mmm
if t < 0:
t = 0.0
ms_total = int(round(t * 1000.0))
hh = ms_total // 3_600_000
ms_total -= hh * 3_600_000
mm = ms_total // 60_000
ms_total -= mm * 60_000
ss = ms_total // 1000
ms = ms_total - ss * 1000
return f"{hh:02d}:{mm:02d}:{ss:02d},{ms:03d}"
def parse_time_hhmmss_mmm(s: str) -> Optional[float]:
# "00:00:05.168" -> seconds
s = (s or "").strip()
if not s:
return None
try:
parts = s.split(":")
if len(parts) != 3:
return None
hh = int(parts[0])
mm = int(parts[1])
ss_part = parts[2]
if "." in ss_part:
ss_str, ms_str = ss_part.split(".", 1)
ss = int(ss_str)
ms = int(ms_str.ljust(3, "0")[:3])
elif "," in ss_part:
ss_str, ms_str = ss_part.split(",", 1)
ss = int(ss_str)
ms = int(ms_str.ljust(3, "0")[:3])
else:
ss = int(ss_part)
ms = 0
return hh * 3600.0 + mm * 60.0 + ss + (ms / 1000.0)
except Exception:
return None
def read_transcribe_csv(csv_path: Path) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
with csv_path.open("r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
for r in reader:
rows.append(r)
return rows
def write_srt_from_csv(rows: List[Dict[str, Any]], srt_path: Path) -> int:
"""
Поддерживает поля:
- Start / End (как HH:MM:SS.mmm)
- или StartSec / EndSec (float)
- Text
"""
out_lines: List[str] = []
idx = 1
for r in rows:
text = (r.get("Text") or "").strip()
if not text:
continue
# Вытаскиваем start/end
start = parse_time_hhmmss_mmm(r.get("Start", ""))
end = parse_time_hhmmss_mmm(r.get("End", ""))
if start is None:
try:
start = float((r.get("StartSec") or "").strip())
except Exception:
start = None
if end is None:
try:
end = float((r.get("EndSec") or "").strip())
except Exception:
end = None
if start is None or end is None or end <= start:
continue
# SRT не любит переносы внутри поля — делаем аккуратно
text = text.replace("\r", "").replace("\n", " ").strip()
out_lines.append(str(idx))
out_lines.append(f"{sec_to_srt_time(start)} --> {sec_to_srt_time(end)}")
out_lines.append(text)
out_lines.append("") # пустая строка
idx += 1
srt_path.write_text("\n".join(out_lines), encoding="utf-8")
return idx - 1
def build_mix_audio(person_files: List[Path], out_audio_m4a: Path, target_dur: float) -> None:
"""
Сводит N дорожек в одну:
- ресемпл to 48k
- amix
- лимитер
- async ресемпл
- pad/trim до длительности видео
"""
if not person_files:
raise RuntimeError("No Person_*.mp3 found to mix.")
cmd: List[str] = [FFMPEG, "-hide_banner", "-y"]
# inputs
for p in person_files:
cmd += ["-i", str(p)]
# filter_complex
# Подготовка каждой дорожки: stereo + 48k + сброс таймстампов
prep = []
labels = []
for i in range(len(person_files)):
a = f"a{i}"
labels.append(f"[{a}]")
prep.append(
f"[{i}:a]"
f"aresample={AUDIO_SR},aformat=sample_fmts=fltp:channel_layouts=stereo,"
f"asetpts=PTS-STARTPTS"
f"[{a}]"
)
# amix + limiter + async + pad/trim
# pad_dur считает от текущей точки; затем atrim гарантирует ровно target_dur
dur_str = f"{target_dur:.6f}"
mix = (
"".join(labels)
+ f"amix=inputs={len(person_files)}:dropout_transition=0:normalize=0,"
f"alimiter=limit={MIX_LIMIT},"
f"aresample=async=1:first_pts=0,"
f"apad,"
f"atrim=0:{dur_str}"
f"[aout]"
)
filter_complex = ";".join(prep + [mix])
cmd += [
"-filter_complex", filter_complex,
"-map", "[aout]",
"-c:a", "aac",
"-b:a", AUDIO_BITRATE,
"-ar", str(AUDIO_SR),
"-ac", "2",
str(out_audio_m4a),
]
run(cmd)
def mux_video_audio(video: Path, audio: Path, out_mp4: Path) -> None:
# Видео копируем как есть, аудио копируем (оно уже AAC в m4a)
cmd = [
FFMPEG, "-hide_banner", "-y",
"-i", str(video),
"-i", str(audio),
"-map", "0:v:0",
"-map", "1:a:0",
"-c:v", "copy",
"-c:a", "copy",
"-movflags", "+faststart",
"-metadata:s:a:0", f"language={LANG_TAG}",
str(out_mp4)
]
run(cmd)
def main() -> None:
if not INPUT_DIR_AUDIO.exists():
raise FileNotFoundError(f"INPUT_DIR_AUDIO not found: {INPUT_DIR_AUDIO}")
if not VIDEO_FILE.exists():
raise FileNotFoundError(f"VIDEO_FILE not found: {VIDEO_FILE}")
csv_path = INPUT_DIR_AUDIO / "transcribe_RUS.csv"
if not csv_path.exists():
raise FileNotFoundError(f"transcribe_RUS.csv not found: {csv_path}")
person_files = sorted(INPUT_DIR_AUDIO.glob("Person_*.mp3"))
if not person_files:
raise RuntimeError(f"No Person_*.mp3 in {INPUT_DIR_AUDIO}")
out_dir = VIDEO_FILE.parent
out_mp4 = out_dir / OUT_MP4_NAME
out_srt = out_dir / OUT_SRT_NAME
tmp_audio = out_dir / "_mixed_audio_tmp.m4a"
# 1) subtitles
rows = read_transcribe_csv(csv_path)
nsubs = write_srt_from_csv(rows, out_srt)
print(f"[OK] Wrote subtitles: {out_srt} ({nsubs} cues)")
# 2) duration from video
dur = ffprobe_duration_seconds(VIDEO_FILE)
print(f"[OK] Video duration: {dur:.3f} sec")
# 3) mix audio exactly to video duration
build_mix_audio(person_files, tmp_audio, target_dur=dur)
print(f"[OK] Mixed audio: {tmp_audio}")
# 4) mux
mux_video_audio(VIDEO_FILE, tmp_audio, out_mp4)
print(f"[OK] Output video: {out_mp4}")
# cleanup temp
try:
tmp_audio.unlink(missing_ok=True)
except Exception:
pass
if __name__ == "__main__":
main()
В итоге получаем видео. Да, есть огрехи на каждом этапе, но начало проложено. И для этого не требуется заплатить ни копейки. Конечно, это лишь эксперимент, но это был долгий и интересный путь. Всем спасибо за внимание!
Ну и на последок, с получившимся результатом можете ознакомиться на VK Video.
