Путь разработчика парсеров тернист и сложен, сперва ты пытаешься обойти официальные ограничения, потому что так проще, так нету квот и разных требований. Параллельно мучаясь с Selenium, в попытка угнаться за меняющейся версткой YouTube. Кто-то пишет простые скрипты на requests, которые падают при первой же ошибке. И куда вас все эти действия приводят?
Снова ко мне - к официальному YouTube Data API v3.
Вероятно можно сделать что то более изящное, но мой вариант работает и будет работать и у вас.
Но с этим АПИ все совсем не так просто, как кажется на первый взгляд. Хорошо, вы разобрались с Cloud Console (Браво!), даже получили API-ключ и активировали АПИ, написали пару запросов... и через 15 минут (а может даже раньше) уперлись в quotaExceeded. Знакомо? Мне да! Я вообще когда ворвался с ноги в этот мир АПИ, не думал про квоту - просто у меня был какой то план и я его придерживался. И то, о чем я вам хочу поведать - история, выстраданная несколькими днями. Падения на середине сбора данных, потеря спаршеных данных. Короче - этот скрипт, (по аналогии с УГиКС - кто поймет, тот поймет) - написан хоть и не кровью, но немного соли то там есть, так как слезы тоже соленые.
В общем - каждый ход и решение в этом скрипте принималось и добавлялось по мере вылезания той или иной проблемы - упирание в квоты, обрыв соединений и тп. Для меня скрипт свою задачу сделал, может поможет и вам.
Большинство гайдов в сети (из тех, что встретились на моем пути) показывают, как сделать один-два базовых запроса. Но мы пойдем дальше. Я покажу вам скрипт на Python, который:
Работает с пулом API-ключей, автоматически переключаясь на следующий, когда у текущего заканчивается квота.
Умеет продолжать работу с того места, где остановился, даже после полного выключения.
Кэширует запросы, чтобы не тратить драгоценные единицы квоты на повторные вызовы.
Устойчив к ошибкам сети благодаря механизму повторных запросов (retry).
Интегрирован с Google Sheets: читает из одной таблицы, записывает результаты в другую.
Сохраняет данные порциями (батчами), чтобы не потерять всё при сбое.
Обрабатывает и обогащает данные на лету: определяет язык, длительность, возраст видео и многое другое.
Эта статья — не просто теория и описание скрипта. Это пошаговый разбор готового, рабочего инструмента, который вы сможете адаптировать под свои задачи. Полный рабочий код я залью на свой (не Бог весь какой, естественно) GitHub, и также сделаю чуть более подробный разбор в своем блоге - ну это уже скорее для тех, кто хочет прям погрузиться в него с головой, как в том анекдоте - “курить люблю, капец…”. Поехали!
Часть 1. Подготовка
Приквел
Передо мной встала задача - собрать видео для 40+ тысяч страниц сайта, каждая страница имеет уникальный главный ключ. Собственно по этому ключу и нужно собирать данные из Ютуба. Задача - конечно же релевантные видео, но на этапе сбора релевантность проверить сложно (тем более что это не комбайн, работающий с несколькими задачами, который можно смело запустить в космос) - у меня просто парсер.
Собственно я протестировал два варианта парсеров - один чуть более замороченый, где каждый АПИ ключ привязан к своему сервисному аккаунту, и второй (он как раз ниже) - где есть один сервисный аккаунт и несколько ключей АПИ.
В обоих случаях время жизни ключей составляло примерно 3 дня, потом ютуб начинал понимать что происходит какая то непонятная чепуха и снижал квоту для каждого ключа до 1 запроса (с 10000 в сутки) и ограничивал сервисный аккаунт - короче просто вырубал парсер своими силами.
Соответсвенно - если нет разницы, зачем платить больше делать больше работы?

Так и родилось это решение
Прежде чем погружаться в парсер, давайте определимся с инструментами и подготовим всё что необходимо для его работы. Так как это не просто однофайловый скриптик, а своего рода мини система.
1.1. Конфигурация через .env
Несмотря на то, что я любитель все упрощать, так как парсер идет в паблик я не буду хардкодить ключи и ID таблиц в коде скрипта. Мы будем использовать .env файл для хранения всех нужных настроек. Это в целом безопасно и удобно.
Создаем в той же папке где лежит парсер файл .env такого содержания:
# ID вашей Google Таблицы
SHEET_ID="ID ТАБЛИЦЫ ВАШЕЙ ТАБЛИЦЫ"
# Имя листа с ключевыми словами
KEYWORDS_SHEET="Keywords"
# Имя листа для результатов
RESULTS_SHEET="Results"
# Путь к JSON-файлу сервисного аккаунта Google
SERVICE_ACCOUNT_JSON="credentials.json"
# Ваши API ключи от YouTube, через запятую
YT_API_KEYS="ВАШ_КЛЮЧ_1,ВАШ_КЛЮЧ_2,ВАШ_КЛЮЧ_3"
# Настройки парсера
NUM_RESULTS=10 # Сколько результатов на один ключевик
REGION="US" # Регион поиска
MAX_PAGES=1 # Сколько страниц результатов обрабатывать
BATCH_SIZE=50 # Через сколько ключевиков сохранять батч
OUTPUT_CSV="yt_results.csv" # Имя локального CSV файла
Этот файл вы уже настраиваете под себя самостоятельно, важно понимать, чем больше результатов на один ключ вы хотите получить, тем больше будет расходоваться квота.
1.2. Получение ключей АПИ и сервисного аккаунта
Нам понадобится два типа учетных данных:
YouTube Data API Keys (несколько штук!): Зайдите в Google Cloud Console, создайте новый проект, включите YouTube Data API v3 и создайте API-ключ. Повторить несколько раз… Зачем? Дневная квота на один ключ — 10 000 "единиц". Этого критически мало для серьезных задач. Пул из 3-5 ключей даст нам 30-50 тысяч единиц в день, что уже гораздо интереснее (я делал 8 аккаунтов по 12 проектов в каждом, что дало мне в комплексе 960к запросов в день).

Google Sheets API Service Account: Чтобы скрипт мог сам читать и писать в вашу таблицу, нужен сервисный аккаунт. В той же Cloud Console перейдите в IAM & Admin -> Service Accounts, создайте новый, дайте ему роль "Editor", и скачайте JSON-ключ. Переименуйте его в credentials.json и положите рядом со скриптом. Важно: не забудьте "расшарить" вашу Google Таблицу для email'а этого сервисного аккаунта (он выглядит как ...gserviceaccount.com).

Одного сервисного аккаунта по моему опыту жесткого использования данного парсера хватает на 3 дня, но я рекомендую на всякий случай использовать не больше 40-50 ключей на один сервисный аккаунт, чисто перестраховка, мало ли Гугл начнет что то подозревать раньше.

Собственно это и есть самая сложная часть парсинга - подготовка АПИ ключей. Теперь давайте заглянем под капот парсера.
Часть 2. KeyManager для ротации API-ключей
Вот как парсер управляет пулом ключей и успешно обрабатывает превышение квоты - через специальный класс-менеджер.
class APIKey:
"""Обертка для одного API ключа"""
def __init__(self, key: str):
self.key = key
self.service = build("youtube", "v3", developerKey=key, cache_discovery=False)
self.used_units = 0
self.active = True
class KeyManager:
"""Управляет пулом ключей, ротирует их и отключает при исчерпании квоты"""
def __init__(self, keys: list[str]):
self.keys = [APIKey(k) for k in keys]
self.index = 0
def get_key(self) -> APIKey:
# Ищем следующий активный ключ по кругу
for _ in range(len(self.keys)):
api = self.keys[self.index]
self.index = (self.index + 1) % len(self.keys) # Циклический сдвиг
if api.active:
return api
# Если активных ключей не осталось
raise QuotaExceededAllKeys()
def deactivate(self, api: APIKey):
api.active = False
logger.warning(f"Деактивирован ключ: {api.key}")
def execute(self, fn, units=0, backoff_max=3):
# Этот метод - "обертка" для всех вызовов API
attempt = 0
while True:
api = self.get_key() # Получаем рабочий ключ
try:
resp = fn(api.service).execute()
# ... логика подсчета расхода ...
return resp
except HttpError as e:
err_str = str(e)
if 'quotaExceeded' in err_str:
self.deactivate(api) # Ключ "сгорел", деактивируем и пробуем следующий
continue
if 'rateLimitExceeded' in err_str and attempt < backoff_max:
# Слишком частые запросы, ждем и пробуем снова (Exponential Backoff)
delay = 2 ** attempt
logger.warning(f"Rate limit, жду {delay}s...")
time.sleep(delay)
attempt += 1
continue
# Другая ошибка, пробрасываем наверх
raise
IGNORE_WHEN_COPYING_START
content_copy download
Use code with caution. Python
IGNORE_WHEN_COPYING_END
Что здесь происходит?
APIKey — простой класс-хранилище для одного ключа, его статуса и созданного для него service объекта.
KeyManager — мозг операции. Метод
get_key()
не просто отдает ключ, а ищет следующий активный ключ в списке. Если ключ исчерпал квоту, он будет помечен какactive = False
и пропущен.execute() - Вместо того чтобы в коде писать
youtube.search
().list(...).execute()
, пишемkey_manager.execute(lambda svc:
svc.search
().list(...)
). Эта обертка сама:Возьмет рабочий ключ из пула.
Выполнит запрос.
Если поймает ошибку
quotaExceeded
, деактивирует ключ и автоматически повторит запрос со следующим ключом.Если поймает
rateLimitExceeded
(слишком много запросов в секунду), подождет немного и попробует снова.
Это делает основной код чуть более простым и избавляет нас от десятков
try-except
блоков.Часть 3. Сохранение состояния и кэширование
Скрипт, обрабатывающий тысячи ключевых слов, может работать часами. Но если оборвется интернет или выключится свет? У меня такое произошло, через 2 часа работы скрипта и мне пришлось начать заново, но это помогло дополнить скрипт новой логикой… Персистентность состояния с помощью встроенной библиотеки
shelve
.shelve работает как словарь (
dict
), но хранит свои данные в файле на диске.Сохранение прогресса (progress.db):
Мы создаем shelve-файл для отслеживания прогресса. Перед началом цикла мы читаем из негоlast_index
, а после обработки каждого ключевого слова — сохраняем новый индекс.# В начале скрипта prog = shelve.open('progress.db') start_index = prog.get('last_index', 0) # Если файла нет, начнем с 0 # Внутри цикла for idx in range(start_index, len(keywords)): # ... обработка ... prog['last_index'] = idx + 1 # Сохраняем индекс СЛЕДУЮЩЕГО элемента # В конце prog.close() IGNORE_WHEN_COPYING_START content_copy download Use code with caution. Python IGNORE_WHEN_COPYING_END
При такой логике, даже если скрипт упадет на 5437-м ключевике, при следующем запуске он начнет работу именно с него! А не с самого начала, как это было в первоначальном варианте скрипта.
Кэширование запросов (yt_cache.db):
Вызов search.list стоит целых 100 единиц квоты. Если нам понадобится запустить скрипт для того же ключевика еще раз, было бы глупо тратить квоту снова. Решение — кэширование ответов API.
CACHE = shelve.open("yt_cache.db") def search_once(keyword: str, ...): # ... # Создаем уникальный ключ для запроса cache_key = f"S:{keyword}:{REGION}:{pageToken or ''}" if cache_key in CACHE: # Если ответ есть в кэше - берем его оттуда search_response = CACHE[cache_key] else: # Если нет - делаем реальный запрос к API search_response = key_manager.execute(...) # И сохраняем ответ в кэш для будущего использования CACHE[cache_key] = search_response # ... IGNORE_WHEN_COPYING_START content_copy download Use code with caution. Python IGNORE_WHEN_COPYING_END
Мы делаем то же самое для запросов
videos.list
иchannels.list
. Это не только экономит квоту при повторных запусках, но и значительно ускоряет их.Часть 4. Собираем всё вместе: Основной воркфлоу
Теперь посмотрим, как все эти компоненты работают вместе в главной функции search_once.
Алгоритм для одного ключевого слова:
Пагинация: YouTube отдает результаты страницами (обычно по 50 штук). Наш скрипт использует цикл while, который делает запросы до тех пор, пока не наберет нужное количество видео (
NUM_RESULTS
) или пока не закончатся страницы (nextPageToken
станетNone
).Эффективный сбор данных: Это классическая и самая правильная стратегия:
Сначала делаем один
search.list
запрос (стоимость 100 единиц), чтобы получить N идентификаторов видео (videoId
).Затем делаем один
videos.list
запрос, передав ему все N идентификаторов через запятую (стоимость N * 1 = N единиц).
Это гораздо дешевле, чем запрашивать полную информацию о каждом видео в search.list.
Обогащение данных: Получив сырые данные из API, мы не просто их сохраняем, а приводим в удобный вид:
iso2hms
: Превращает PT1M35S в читаемое 00:01:35 и секунды.fmt_age
: Высчитывает возраст видео в годах и месяцах.safe_detect
: Определяет язык названия и описания с помощью langdetect.Сбор данных о канале (аватарка) делается одним батчевым запросом
channels.list
для всех уникальных каналов в выдаче.
Часть 5. Отказоустойчивая запись результатов
Собрать данные — это полдела. Их нужно надежно сохранить. Скрипт делает это двумя способами, и оба с защитой от сбоев.
Кстати, эта часть скрипта родилась закономерно из третьей (кэширование результатов), так как первоначально я предусмотрел запись результата после полного завершения парсинга, но на тестовом прогоне все было хорошо, а на боевом - когда он упал через 2 часа работы было вообще не смешно.
Запись в Google Sheets: Мы используем метод
append
, который дописывает строки в конец таблицы. Но если API Google Sheets временно недоступен я обернул вызов в функцию с retry-логикой:def append_to_sheets_with_retry(...): for attempt in range(1, max_retries+1): try: # Пытаемся записать данные service.spreadsheets().values().append(...).execute() return True # Успех! except (HttpError, ConnectionResetError, socket.error) as e: # Если ошибка сети, ждем и пробуем снова с увеличенной задержкой delay = base_delay * 2**(attempt-1) time.sleep(delay) return False # Не удалось после всех попыток IGNORE_WHEN_COPYING_START content_copy download Use code with caution. Python IGNORE_WHEN_COPYING_END
Пакетное сохранение (Batching): Делать запрос к Google Sheets после каждого ключевого слова — неэффективно и медленно. Вместо этого мы накапливаем результаты в списке
batch_res
и сохраняем их все вместе, когда их количество достигнетBATCH_SIZE
. Это значительно снижает количество API-вызовов и нагрузку. Точно такая же логика применяется и для дозаписи в локальный CSV-файл.# В главном цикле batch_res.extend(res) if (idx + 1 - start) % BATCH_SIZE == 0: # Сохраняем накопленный батч append_csv_batch(df_batch, OUTPUT_CSV) append_to_sheets_with_retry(...) batch_res = [] # Очищаем батч для следующей порции # ... после завершения цикла не забываем сохранить остаток if batch_res: # ... логика сохранения финального батча IGNORE_WHEN_COPYING_START content_copy download Use code with caution. Python IGNORE_WHEN_COPYING_END
Заключение
Я бы наверное назвал этот скрипт - фреймворком для надежной и масштабируемой работы с YouTube API, если бы был чуть более амбициозным. И все же - скрипт умеет ротировать ключи, сохранять состояния, кэшировать данные, обрабатывать ошибки и осуществлять пакетную запись.
Такой подход позволит собрать большой объем данных, без потери времени.
Что можно улучшить, но делать я этого не планирую.
Асинхронность: Для максимальной производительности можно переписать сетевые запросы с использованием asyncio и aiohttp, чтобы выполнять несколько API-вызовов параллельно.
Прокси: Для очень больших объемов можно добавить ротацию прокси в KeyManager.
Более гранулярный part: В запросах videos.list можно запрашивать не все части (snippet,contentDetails,...), а только те, что реально нужны, еще сильнее экономя квоту.
Полный код проекта доступен на GitHub.
Но, чтобы не раздражать общественность вот полный код ниже:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ YouTube-парсер: Shorts + обычные ролики, без live/upcoming. Считывает ключевые слова (col A) и исходные URL (col J) из Google Sheets и сохраняет результаты обратно в лист "Results". Продолжает с места остановки через shelve. При исчерпании квоты всех ключей останавливается и записывает уже собранные данные. Сохраняет прогресс каждые BATCH_SIZE ключей. """ import os import sys import shelve import time import logging import socket from pathlib import Path from datetime import datetime, timezone import pandas as pd import isodate from langdetect import detect, LangDetectException from dotenv import load_dotenv, dotenv_values from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google.oauth2 import service_account # ─── Logging ───────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # ─── Конфигурация (из .env) ───────────────────────────────────── # Явно указываем путь к .env рядом со скриптом и перезаписываем системные переменные env_path = Path(__file__).parent / ".env" if not env_path.exists(): logger.error(f"❌ .env файл не найден по пути {env_path}") sys.exit(1) load_dotenv(dotenv_path=env_path, override=True) SHEET_ID = os.getenv("SHEET_ID") logger.info(f"🚩 Loaded SHEET_ID from {env_path}: {SHEET_ID}") KEYWORDS_SHEET = os.getenv("KEYWORDS_SHEET", "Keywords") RESULTS_SHEET = os.getenv("RESULTS_SHEET", "Results") SERVICE_ACCOUNT_JSON = os.getenv("SERVICE_ACCOUNT_JSON") NUM_RESULTS = int(os.getenv("NUM_RESULTS", 10)) REGION = os.getenv("REGION", "US") MAX_PAGES = int(os.getenv("MAX_PAGES", 1)) OUTPUT_CSV = os.getenv("OUTPUT_CSV", "yt_results.csv") BATCH_SIZE = int(os.getenv("BATCH_SIZE", 50)) if MAX_PAGES < 1: sys.exit("❌ MAX_PAGES должен быть ≥ 1") # ─── Google Sheets API ───────────────────────────────────────── SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] creds_sheets = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_JSON, scopes=SCOPES ) sheets_service = build( 'sheets', 'v4', credentials=creds_sheets, cache_discovery=False ) # ─── Хранилище кеша ───────────────────────────────────────────── CACHE = shelve.open("yt_cache.db") # ─── Ошибка исчерпания квоты всех ключей ──────────────────────── class QuotaExceededAllKeys(Exception): """Все ключи YouTube API исчерпали квоту""" pass # ─── Классы для управления API-ключами ──────────────────────────── class APIKey: def __init__(self, key: str): self.key = key self.service = build( "youtube", "v3", developerKey=key, cache_discovery=False ) self.used_units = 0 self.active = True class KeyManager: def __init__(self, keys: list[str]): self.keys = [APIKey(k) for k in keys] self.index = 0 def get_key(self) -> APIKey: n = len(self.keys) for _ in range(n): api = self.keys[self.index] self.index = (self.index + 1) % n if api.active: return api raise QuotaExceededAllKeys() def deactivate(self, api: APIKey): api.active = False logger.warning(f"Деактивирован ключ: {api.key}") def record(self, api: APIKey, units: int): api.used_units += units logger.info(f"Ключ {api.key}: расход {units} units (итого {api.used_units})") def execute(self, fn, units=0, backoff_max=3): attempt = 0 while True: api = self.get_key() try: resp = fn(api.service).execute() used = units(resp) if callable(units) else units self.record(api, used) return resp except HttpError as e: err_str = str(e) if 'quotaExceeded' in err_str or 'dailyLimitExceeded' in err_str: self.deactivate(api) continue if 'rateLimitExceeded' in err_str: if attempt < backoff_max: delay = 2 ** attempt logger.warning( f"Rate limitExceeded на ключе {api.key}, жду {delay}s (попытка {attempt+1})" ) time.sleep(delay) attempt += 1 continue else: self.deactivate(api) continue logger.error(f"Unexpected HttpError на ключе {api.key}: {e}") raise except (ConnectionResetError, socket.error) as e: if attempt < backoff_max: delay = 2 ** attempt logger.warning( f"Connection error на ключе {api.key}: {e}, жду {delay}s (попытка {attempt+1})" ) time.sleep(delay) attempt += 1 continue else: self.deactivate(api) continue # ─── Настройка ключей YouTube API ──────────────────────────────── raw_keys = os.getenv("YT_API_KEYS") or dotenv_values(env_path).get("YT_API_KEYS", "") KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()] if not KEYS: sys.exit("❌ Нет API-ключей (YT_API_KEYS).") logger.info(f"🔑 Найдено {len(KEYS)} ключей") key_manager = KeyManager(KEYS) VIDEO_PARTS = "snippet,contentDetails,status,player,statistics" # ─── Утилиты ───────────────────────────────────────────────────── def iso2hms(iso: str): if not iso: return "", 0 td = isodate.parse_duration(iso) s = int(td.total_seconds()) h, r = divmod(s, 3600) m, sec = divmod(r, 60) return f"{h:02d}:{m:02d}:{sec:02d}", s def fmt_age(pub: str): if not pub: return "" dt = datetime.fromisoformat(pub.replace('Z', '+00:00')) d = (datetime.now(timezone.utc) - dt).days y, rem = divmod(d, 365) mo = rem // 30 return f"{y} years {mo} months" if y else f"{mo} months" def safe_detect(txt: str): try: return detect(txt) if txt.strip() else "" except LangDetectException: return "" # ─── Функции для записи в Google Sheets и CSV ─────────────────── def append_to_sheets_with_retry(service, spreadsheet_id, sheet_range, values, max_retries=5, base_delay=1): for attempt in range(1, max_retries+1): try: service.spreadsheets().values().append( spreadsheetId=spreadsheet_id, range=sheet_range, valueInputOption='RAW', body={'values': values} ).execute() logger.info("✅ Успешно записали результаты в Google Sheets") return True except (HttpError, ConnectionResetError, socket.error) as e: delay = base_delay * 2**(attempt-1) logger.warning( f"⚠️ Попытка {attempt}/{max_retries} записи в Sheets упала: {e}. Ждём {delay}s." ) time.sleep(delay) logger.error("❌ Не удалось записать в Google Sheets после всех попыток") return False def save_csv_with_retry(df: pd.DataFrame, path: str, max_retries=3, base_delay=1): for attempt in range(1, max_retries+1): try: df.to_csv(path, index=False) logger.info(f"✅ CSV сохранён: {path}") return True except Exception as e: delay = base_delay * 2**(attempt-1) logger.warning( f"⚠️ Попытка {attempt}/{max_retries} сохранения CSV упала: {e}. Ждём {delay}s." ) time.sleep(delay) logger.error("❌ Не удалось сохранить CSV после всех попыток") return False def append_csv_batch(df: pd.DataFrame, path: str, max_retries=3, base_delay=1) -> bool: """Дозаписывает DataFrame в CSV, создавая файл с заголовком, если не существует.""" p = Path(path) header = not p.exists() for attempt in range(1, max_retries+1): try: df.to_csv(path, index=False, header=header, mode='a') logger.info(f"✅ CSV батч сохранён ({len(df)} строк): {path}") return True except Exception as e: delay = base_delay * 2**(attempt-1) logger.warning(f"⚠️ Попытка {attempt}/{max_retries} дозаписи CSV упала: {e}. Ждём {delay}s.") time.sleep(delay) logger.error("❌ Не удалось дозаписать CSV после всех попыток") return False # ─── Поиск YouTube видео по ключу ──────────────────────────────── def search_once(keyword: str, input_url: str) -> list: rows, token, page = [], None, 0 while len(rows) < NUM_RESULTS and page < MAX_PAGES: page += 1 sk = f"S:{keyword}:{REGION}:{token or ''}" if sk in CACHE: sr = CACHE[sk] else: sr = key_manager.execute( lambda svc: svc.search().list( q=keyword, part="id", type="video", order="relevance", regionCode=REGION, maxResults=min(50, NUM_RESULTS - len(rows)), pageToken=token or "", safeSearch="none" ), units=100 ) CACHE[sk] = sr vids = [it.get('id', {}).get('videoId') for it in sr.get('items', []) if it.get('id', {}).get('videoId')] if not vids: break vk = f"V:{','.join(vids)}" if vk in CACHE: vr = CACHE[vk] else: vr = key_manager.execute( lambda svc: svc.videos().list( id=",".join(vids), part=VIDEO_PARTS ), units=lambda resp: len(resp.get('items', [])) ) CACHE[vk] = vr cids = list({it['snippet']['channelId'] for it in vr.get('items', [])}) amap = {} if cids: ck = f"C:{','.join(cids)}" if ck in CACHE: ch = CACHE[ck] else: ch = key_manager.execute( lambda svc: svc.channels().list( part="snippet", id=",".join(cids) ), units=1 ) CACHE[ck] = ch amap = {c['id']: c['snippet']['thumbnails']['default']['url'] for c in ch.get('items', [])} for it in vr.get('items', []): sn = it.get('snippet', {}) if sn.get('liveBroadcastContent') in {'live', 'upcoming'}: continue det = it.get('contentDetails', {}) dur_str, dur_s = iso2hms(det.get('duration', '')) st = it.get('status', {}) stats = it.get('statistics', {}) author = sn.get('channelTitle', '') avatar = amap.get(sn.get('channelId', ''), '') if sn.get('defaultAudioLanguage'): ls, lang = 'audio', sn['defaultAudioLanguage'] elif sn.get('defaultLanguage'): ls, lang = 'default', sn['defaultLanguage'] else: ls, lang = 'detect', safe_detect(sn.get('title', '') + ' ' + sn.get('description', '')) or 'unknown' lic = st.get('license', '') emb = st.get('embeddable', False) allowed = emb and lic == 'creativeCommon' rows.append([ keyword, input_url, it['id'], sn.get('title', ''), sn.get('description', ''), lang, ls, 'short' if dur_s < 60 else 'video', dur_str, dur_s, fmt_age(sn.get('publishedAt', '')), author, avatar, stats.get('viewCount', ''), stats.get('likeCount', ''), stats.get('dislikeCount', ''), emb, lic, allowed, it.get('player', {}).get('embedHtml', f"<iframe src=\"https://www.youtube.com/embed/{it['id']}\" allowfullscreen></iframe>") ]) token = sr.get('nextPageToken') or None return rows # ─── Главная функция ─────────────────────────────────────────── r1 = sheets_service.spreadsheets().values().get( spreadsheetId=SHEET_ID, range=f"{KEYWORDS_SHEET}!A2:A" ).execute() r2 = sheets_service.spreadsheets().values().get( spreadsheetId=SHEET_ID, range=f"{KEYWORDS_SHEET}!J2:J" ).execute() keywords = [r[0] for r in r1.get('values', [])] input_urls = [r[0] for r in r2.get('values', [])] prog = shelve.open('progress.db') start = prog.get('last_index', 0) all_res = [] batch_res = [] try: for idx in range(start, len(keywords)): kw = keywords[idx] input_url = input_urls[idx] if idx < len(input_urls) else '' logger.info(f"🔍 [{idx}] {kw}") res = search_once(kw, input_url) all_res.extend(res) batch_res.extend(res) prog['last_index'] = idx + 1 # Сохранение батча каждые BATCH_SIZE ключей if (idx + 1 - start) % BATCH_SIZE == 0: df_batch = pd.DataFrame(batch_res, columns=[ 'keyword','input_url','videoId','title','description','language','language_source', 'video_type','duration','duration_seconds','age','author','author_avatar', 'view_count','like_count','dislike_count','embeddable','license', 'allowed_on_third_party','iframe' ]) append_csv_batch(df_batch, OUTPUT_CSV) append_to_sheets_with_retry( sheets_service, SHEET_ID, f"{RESULTS_SHEET}!A2", batch_res ) logger.info(f"🔄 Батч из {len(batch_res)} результатов сохранён после {(idx+1-start)} ключей") batch_res = [] except QuotaExceededAllKeys: logger.warning("⚠️ Все ключи исчерпали квоту, завершаем работу") finally: prog.close() CACHE.close() # Сохраняем остаток батча после завершения if batch_res: df_batch = pd.DataFrame(batch_res, columns=[ 'keyword','input_url','videoId','title','description','language','language_source', 'video_type','duration','duration_seconds','age','author','author_avatar', 'view_count','like_count','dislike_count','embeddable','license', 'allowed_on_third_party','iframe' ]) append_csv_batch(df_batch, OUTPUT_CSV) append_to_sheets_with_retry( sheets_service, SHEET_ID, f"{RESULTS_SHEET}!A2", batch_res ) logger.info(f"🔄 Финальный батч из {len(batch_res)} строк сохранён") # Итоговый расход квоты total_units = sum(api.used_units for api in key_manager.keys) logger.info(f"ℹ️ Всего расход: {total_units} units")
Скрипт выкладывается как есть, свою работу для меня он выполнил. Следующий этап - очистка результата от мусора, ее я уже провел и скоро расскажу как и главное чем.