Pull to refresh

Парсим YouTube на Python как для взрослых: отказоустойчивый скрипт с ротацией ключей

Level of difficultyEasy
Reading time16 min
Views6.3K

Путь разработчика парсеров тернист и сложен, сперва ты пытаешься обойти официальные ограничения, потому что так проще, так нету квот и разных требований. Параллельно мучаясь с Selenium, в попытка угнаться за меняющейся версткой YouTube. Кто-то пишет простые скрипты на requests, которые падают при первой же ошибке. И куда вас все эти действия приводят?

Снова ко мне - к официальному YouTube Data API v3.

Вероятно можно сделать что то более изящное, но мой вариант работает и будет работать и у вас.

Но с этим АПИ все совсем не так просто, как кажется на первый взгляд. Хорошо, вы разобрались с Cloud Console (Браво!), даже получили API-ключ и активировали АПИ, написали пару запросов... и через 15 минут (а может даже раньше) уперлись в quotaExceeded. Знакомо? Мне да! Я вообще когда ворвался с ноги в этот мир АПИ, не думал про квоту - просто у меня был какой то план и я его придерживался. И то, о чем я вам хочу поведать - история, выстраданная несколькими днями. Падения на середине сбора данных, потеря спаршеных данных. Короче - этот скрипт, (по аналогии с УГиКС - кто поймет, тот поймет) - написан хоть и не кровью, но немного соли то там есть, так как слезы тоже соленые.

В общем - каждый ход и решение в этом скрипте принималось и добавлялось по мере вылезания той или иной проблемы - упирание в квоты, обрыв соединений и тп. Для меня скрипт свою задачу сделал, может поможет и вам.

Большинство гайдов в сети (из тех, что встретились на моем пути) показывают, как сделать один-два базовых запроса. Но мы пойдем дальше. Я покажу вам скрипт на Python, который:

  • Работает с пулом API-ключей, автоматически переключаясь на следующий, когда у текущего заканчивается квота.

  • Умеет продолжать работу с того места, где остановился, даже после полного выключения.

  • Кэширует запросы, чтобы не тратить драгоценные единицы квоты на повторные вызовы.

  • Устойчив к ошибкам сети благодаря механизму повторных запросов (retry).

  • Интегрирован с Google Sheets: читает из одной таблицы, записывает результаты в другую.

  • Сохраняет данные порциями (батчами), чтобы не потерять всё при сбое.

  • Обрабатывает и обогащает данные на лету: определяет язык, длительность, возраст видео и многое другое.

Эта статья — не просто теория и описание скрипта. Это пошаговый разбор готового, рабочего инструмента, который вы сможете адаптировать под свои задачи. Полный рабочий код я залью на свой (не Бог весь какой, естественно) GitHub, и также сделаю чуть более подробный разбор в своем блоге - ну это уже скорее для тех, кто хочет прям погрузиться в него с головой, как в том анекдоте - “курить люблю, капец…”. Поехали!

Часть 1. Подготовка

Приквел

Передо мной встала задача - собрать видео для 40+ тысяч страниц сайта, каждая страница имеет уникальный главный ключ. Собственно по этому ключу и нужно собирать данные из Ютуба. Задача - конечно же релевантные видео, но на этапе сбора релевантность проверить сложно (тем более что это не комбайн, работающий с несколькими задачами, который можно смело запустить в космос) - у меня просто парсер.

Собственно я протестировал два варианта парсеров - один чуть более замороченый, где каждый АПИ ключ привязан к своему сервисному аккаунту, и второй (он как раз ниже) - где есть один сервисный аккаунт и несколько ключей АПИ.

В обоих случаях время жизни ключей составляло примерно 3 дня, потом ютуб начинал понимать что происходит какая то непонятная чепуха и снижал квоту для каждого ключа до 1 запроса (с 10000 в сутки) и  ограничивал сервисный аккаунт - короче просто вырубал парсер своими силами.

Соответсвенно - если нет разницы, зачем платить больше делать больше работы?

Так и родилось это решение

Прежде чем погружаться в парсер, давайте определимся с инструментами и подготовим всё что необходимо для его работы. Так как это не просто однофайловый скриптик, а своего рода мини система.

1.1. Конфигурация через .env

Несмотря на то, что я любитель все упрощать, так как парсер идет в паблик я не буду хардкодить ключи и ID таблиц в коде скрипта. Мы будем использовать .env файл для хранения всех нужных настроек. Это в целом безопасно и удобно. 

Создаем в той же папке где лежит парсер файл .env такого содержания:

     # ID вашей Google Таблицы
SHEET_ID="ID ТАБЛИЦЫ ВАШЕЙ ТАБЛИЦЫ"
# Имя листа с ключевыми словами
KEYWORDS_SHEET="Keywords"
# Имя листа для результатов
RESULTS_SHEET="Results"
# Путь к JSON-файлу сервисного аккаунта Google
SERVICE_ACCOUNT_JSON="credentials.json"
# Ваши API ключи от YouTube, через запятую
YT_API_KEYS="ВАШ_КЛЮЧ_1,ВАШ_КЛЮЧ_2,ВАШ_КЛЮЧ_3"

# Настройки парсера
NUM_RESULTS=10      # Сколько результатов на один ключевик
REGION="US"         # Регион поиска
MAX_PAGES=1         # Сколько страниц результатов обрабатывать
BATCH_SIZE=50       # Через сколько ключевиков сохранять батч
OUTPUT_CSV="yt_results.csv" # Имя локального CSV файла

Этот файл вы уже настраиваете под себя самостоятельно, важно понимать, чем больше результатов на один ключ вы хотите получить, тем больше будет расходоваться квота.   

1.2. Получение ключей АПИ и сервисного аккаунта

Нам понадобится два типа учетных данных:

  1. YouTube Data API Keys (несколько штук!): Зайдите в Google Cloud Console, создайте новый проект, включите YouTube Data API v3 и создайте API-ключ. Повторить несколько раз… Зачем?  Дневная квота на один ключ — 10 000 "единиц". Этого критически мало для серьезных задач. Пул из 3-5 ключей даст нам 30-50 тысяч единиц в день, что уже гораздо интереснее (я делал 8 аккаунтов по 12 проектов в каждом, что дало мне в комплексе 960к запросов в день).

  1. Google Sheets API Service Account: Чтобы скрипт мог сам читать и писать в вашу таблицу, нужен сервисный аккаунт. В той же Cloud Console перейдите в IAM & Admin -> Service Accounts, создайте новый, дайте ему роль "Editor", и скачайте JSON-ключ. Переименуйте его в credentials.json и положите рядом со скриптом. Важно: не забудьте "расшарить" вашу Google Таблицу для email'а этого сервисного аккаунта (он выглядит как ...gserviceaccount.com).

Одного сервисного аккаунта по моему опыту жесткого использования данного парсера хватает на 3 дня, но я рекомендую на всякий случай использовать не больше 40-50 ключей на один сервисный аккаунт, чисто перестраховка, мало ли Гугл начнет что то подозревать раньше.

Собственно это и есть самая сложная часть парсинга - подготовка АПИ ключей. Теперь давайте заглянем под капот парсера.

Часть 2. KeyManager для ротации API-ключей

Вот как парсер управляет пулом ключей и успешно обрабатывает превышение квоты - через специальный класс-менеджер.

     class APIKey:
    """Обертка для одного API ключа"""
    def __init__(self, key: str):
        self.key = key
        self.service = build("youtube", "v3", developerKey=key, cache_discovery=False)
        self.used_units = 0
        self.active = True

class KeyManager:
    """Управляет пулом ключей, ротирует их и отключает при исчерпании квоты"""
    def __init__(self, keys: list[str]):
        self.keys = [APIKey(k) for k in keys]
        self.index = 0

    def get_key(self) -> APIKey:
        # Ищем следующий активный ключ по кругу
        for _ in range(len(self.keys)):
            api = self.keys[self.index]
            self.index = (self.index + 1) % len(self.keys) # Циклический сдвиг
            if api.active:
                return api
        # Если активных ключей не осталось
        raise QuotaExceededAllKeys()

    def deactivate(self, api: APIKey):
        api.active = False
        logger.warning(f"Деактивирован ключ: {api.key}")

    def execute(self, fn, units=0, backoff_max=3):
        # Этот метод - "обертка" для всех вызовов API
        attempt = 0
        while True:
            api = self.get_key() # Получаем рабочий ключ
            try:
                resp = fn(api.service).execute()
                # ... логика подсчета расхода ...
                return resp
            except HttpError as e:
                err_str = str(e)
                if 'quotaExceeded' in err_str:
                    self.deactivate(api) # Ключ "сгорел", деактивируем и пробуем следующий
                    continue
                if 'rateLimitExceeded' in err_str and attempt < backoff_max:
                    # Слишком частые запросы, ждем и пробуем снова (Exponential Backoff)
                    delay = 2 ** attempt
                    logger.warning(f"Rate limit, жду {delay}s...")
                    time.sleep(delay)
                    attempt += 1
                    continue
                # Другая ошибка, пробрасываем наверх
                raise
   
IGNORE_WHEN_COPYING_START
content_copy download
Use code with caution. Python
IGNORE_WHEN_COPYING_END

Что здесь происходит?

  • APIKey — простой класс-хранилище для одного ключа, его статуса и созданного для него service объекта.

  • KeyManager — мозг операции. Метод get_key() не просто отдает ключ, а ищет следующий активный ключ в списке. Если ключ исчерпал квоту, он будет помечен как active = False и пропущен.

  • execute() -  Вместо того чтобы в коде писать youtube.search().list(...).execute(), пишем key_manager.execute(lambda svc: svc.search().list(...)). Эта обертка сама:

    1. Возьмет рабочий ключ из пула.

    2. Выполнит запрос.

    3. Если поймает ошибку quotaExceeded, деактивирует ключ и автоматически повторит запрос со следующим ключом.

    4. Если поймает rateLimitExceeded (слишком много запросов в секунду), подождет немного и попробует снова.

  • Это делает основной код чуть более простым и избавляет нас от десятков try-except блоков.

    Часть 3. Сохранение состояния и кэширование

    Скрипт, обрабатывающий тысячи ключевых слов, может работать часами. Но если оборвется интернет или выключится свет? У меня такое произошло, через 2 часа работы скрипта и мне пришлось начать заново, но это помогло дополнить скрипт новой логикой… Персистентность состояния с помощью встроенной библиотеки shelve.

    shelve работает как словарь (dict), но хранит свои данные в файле на диске.

    Сохранение прогресса (progress.db):
    Мы создаем shelve-файл для отслеживания прогресса. Перед началом цикла мы читаем из него last_index, а после обработки каждого ключевого слова — сохраняем новый индекс.

          # В начале скрипта
    prog = shelve.open('progress.db')
    start_index = prog.get('last_index', 0) # Если файла нет, начнем с 0
    
    # Внутри цикла
    for idx in range(start_index, len(keywords)):
        # ... обработка ...
        prog['last_index'] = idx + 1 # Сохраняем индекс СЛЕДУЮЩЕГО элемента
    
    # В конце
    prog.close()
       
     IGNORE_WHEN_COPYING_START
     content_copy download
     Use code with caution. Python
    IGNORE_WHEN_COPYING_END

    При такой логике, даже если скрипт упадет на 5437-м ключевике, при следующем запуске он начнет работу именно с него! А не с самого начала, как это было в первоначальном варианте скрипта.

    Кэширование запросов (yt_cache.db):

    Вызов search.list стоит целых 100 единиц квоты. Если нам понадобится запустить скрипт для того же ключевика еще раз, было бы глупо тратить квоту снова. Решение — кэширование ответов API.

          CACHE = shelve.open("yt_cache.db")
    
    def search_once(keyword: str, ...):
        # ...
        # Создаем уникальный ключ для запроса
        cache_key = f"S:{keyword}:{REGION}:{pageToken or ''}"
    
        if cache_key in CACHE:
            # Если ответ есть в кэше - берем его оттуда
            search_response = CACHE[cache_key]
        else:
            # Если нет - делаем реальный запрос к API
            search_response = key_manager.execute(...)
            # И сохраняем ответ в кэш для будущего использования
            CACHE[cache_key] = search_response
        # ...
       
     IGNORE_WHEN_COPYING_START
     content_copy download
     Use code with caution. Python
    IGNORE_WHEN_COPYING_END

    Мы делаем то же самое для запросов videos.list и channels.list. Это не только экономит квоту при повторных запусках, но и значительно ускоряет их.

    Часть 4. Собираем всё вместе: Основной воркфлоу

    Теперь посмотрим, как все эти компоненты работают вместе в главной функции search_once.

    Алгоритм для одного ключевого слова:

    1. Пагинация: YouTube отдает результаты страницами (обычно по 50 штук). Наш скрипт использует цикл while, который делает запросы до тех пор, пока не наберет нужное количество видео (NUM_RESULTS) или пока не закончатся страницы (nextPageToken станет None).

    2. Эффективный сбор данных: Это классическая и самая правильная стратегия:

      • Сначала делаем один search.list запрос (стоимость 100 единиц), чтобы получить N идентификаторов видео (videoId).

      • Затем делаем один videos.list запрос, передав ему все N идентификаторов через запятую (стоимость N * 1 = N единиц).
        Это гораздо дешевле, чем запрашивать полную информацию о каждом видео в search.list.

    3. Обогащение данных: Получив сырые данные из API, мы не просто их сохраняем, а приводим в удобный вид:

      • iso2hms: Превращает PT1M35S в читаемое 00:01:35 и секунды.

      • fmt_age: Высчитывает возраст видео в годах и месяцах.

      • safe_detect: Определяет язык названия и описания с помощью langdetect.

      • Сбор данных о канале (аватарка) делается одним батчевым запросом channels.list для всех уникальных каналов в выдаче.

    Часть 5. Отказоустойчивая запись результатов

    Собрать данные — это полдела. Их нужно надежно сохранить. Скрипт делает это двумя способами, и оба с защитой от сбоев. 

    Кстати, эта часть скрипта родилась закономерно из третьей (кэширование результатов), так как первоначально я предусмотрел запись результата после полного завершения парсинга, но на тестовом прогоне все было хорошо, а на боевом - когда он упал через 2 часа работы было вообще не смешно.

    Запись в Google Sheets: Мы используем метод append, который дописывает строки в конец таблицы. Но если API Google Sheets временно недоступен я обернул вызов в функцию с retry-логикой:

          def append_to_sheets_with_retry(...):
        for attempt in range(1, max_retries+1):
            try:
                # Пытаемся записать данные
                service.spreadsheets().values().append(...).execute()
                return True # Успех!
            except (HttpError, ConnectionResetError, socket.error) as e:
                # Если ошибка сети, ждем и пробуем снова с увеличенной задержкой
                delay = base_delay * 2**(attempt-1)
                time.sleep(delay)
        return False # Не удалось после всех попыток
       
     IGNORE_WHEN_COPYING_START
     content_copy download
     Use code with caution. Python
    IGNORE_WHEN_COPYING_END

    Пакетное сохранение (Batching): Делать запрос к Google Sheets после каждого ключевого слова — неэффективно и медленно. Вместо этого мы накапливаем результаты в списке batch_res и сохраняем их все вместе, когда их количество достигнет BATCH_SIZE. Это значительно снижает количество API-вызовов и нагрузку. Точно такая же логика применяется и для дозаписи в локальный CSV-файл.

         # В главном цикле
    batch_res.extend(res)
    
    if (idx + 1 - start) % BATCH_SIZE == 0:
        # Сохраняем накопленный батч
        append_csv_batch(df_batch, OUTPUT_CSV)
        append_to_sheets_with_retry(...)
        batch_res = [] # Очищаем батч для следующей порции
    
    # ... после завершения цикла не забываем сохранить остаток
    if batch_res:
        # ... логика сохранения финального батча
       
    IGNORE_WHEN_COPYING_START
    content_copy download
    Use code with caution. Python
    IGNORE_WHEN_COPYING_END

    Заключение

    Я бы наверное назвал этот скрипт - фреймворком для надежной и масштабируемой работы с YouTube API, если бы был чуть более амбициозным. И все же - скрипт умеет ротировать ключи, сохранять состояния, кэшировать данные, обрабатывать ошибки и осуществлять пакетную запись.

    Такой подход позволит собрать большой объем данных, без потери времени.

    Что можно улучшить, но делать я этого не планирую.

    • Асинхронность: Для максимальной производительности можно переписать сетевые запросы с использованием asyncio и aiohttp, чтобы выполнять несколько API-вызовов параллельно.

    • Прокси: Для очень больших объемов можно добавить ротацию прокси в KeyManager.

    • Более гранулярный part: В запросах videos.list можно запрашивать не все части (snippet,contentDetails,...), а только те, что реально нужны, еще сильнее экономя квоту.

    Полный код проекта доступен на GitHub.

    Но, чтобы не раздражать общественность вот полный код ниже:

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    YouTube-парсер: Shorts + обычные ролики, без live/upcoming.
    Считывает ключевые слова (col A) и исходные URL (col J) из Google Sheets и сохраняет результаты обратно в лист "Results".
    Продолжает с места остановки через shelve.
    При исчерпании квоты всех ключей останавливается и записывает уже собранные данные.
    Сохраняет прогресс каждые BATCH_SIZE ключей.
    """
    import os
    import sys
    import shelve
    import time
    import logging
    import socket
    from pathlib import Path
    from datetime import datetime, timezone
    
    import pandas as pd
    import isodate
    from langdetect import detect, LangDetectException
    from dotenv import load_dotenv, dotenv_values
    from googleapiclient.discovery import build
    from googleapiclient.errors import HttpError
    from google.oauth2 import service_account
    
    # ─── Logging ─────────────────────────────────────────────────
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    logger = logging.getLogger(__name__)
    
    # ─── Конфигурация (из .env) ─────────────────────────────────────
    # Явно указываем путь к .env рядом со скриптом и перезаписываем системные переменные
    env_path = Path(__file__).parent / ".env"
    if not env_path.exists():
        logger.error(f"❌ .env файл не найден по пути {env_path}")
        sys.exit(1)
    
    load_dotenv(dotenv_path=env_path, override=True)
    SHEET_ID = os.getenv("SHEET_ID")
    logger.info(f"🚩 Loaded SHEET_ID from {env_path}: {SHEET_ID}")
    
    KEYWORDS_SHEET       = os.getenv("KEYWORDS_SHEET", "Keywords")
    RESULTS_SHEET        = os.getenv("RESULTS_SHEET", "Results")
    SERVICE_ACCOUNT_JSON = os.getenv("SERVICE_ACCOUNT_JSON")
    NUM_RESULTS          = int(os.getenv("NUM_RESULTS", 10))
    REGION               = os.getenv("REGION", "US")
    MAX_PAGES            = int(os.getenv("MAX_PAGES", 1))
    OUTPUT_CSV           = os.getenv("OUTPUT_CSV", "yt_results.csv")
    BATCH_SIZE           = int(os.getenv("BATCH_SIZE", 50))
    
    if MAX_PAGES < 1:
        sys.exit("❌ MAX_PAGES должен быть ≥ 1")
    
    # ─── Google Sheets API ─────────────────────────────────────────
    SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
    creds_sheets = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_JSON, scopes=SCOPES
    )
    sheets_service = build(
        'sheets', 'v4',
        credentials=creds_sheets,
        cache_discovery=False
    )
    
    # ─── Хранилище кеша ─────────────────────────────────────────────
    CACHE = shelve.open("yt_cache.db")
    
    # ─── Ошибка исчерпания квоты всех ключей ────────────────────────
    class QuotaExceededAllKeys(Exception):
        """Все ключи YouTube API исчерпали квоту"""
        pass
    
    # ─── Классы для управления API-ключами ────────────────────────────
    class APIKey:
        def __init__(self, key: str):
            self.key = key
            self.service = build(
                "youtube", "v3",
                developerKey=key,
                cache_discovery=False
            )
            self.used_units = 0
            self.active = True
    
    class KeyManager:
        def __init__(self, keys: list[str]):
            self.keys = [APIKey(k) for k in keys]
            self.index = 0
    
        def get_key(self) -> APIKey:
            n = len(self.keys)
            for _ in range(n):
                api = self.keys[self.index]
                self.index = (self.index + 1) % n
                if api.active:
                    return api
            raise QuotaExceededAllKeys()
    
        def deactivate(self, api: APIKey):
            api.active = False
            logger.warning(f"Деактивирован ключ: {api.key}")
    
        def record(self, api: APIKey, units: int):
            api.used_units += units
            logger.info(f"Ключ {api.key}: расход {units} units (итого {api.used_units})")
    
        def execute(self, fn, units=0, backoff_max=3):
            attempt = 0
            while True:
                api = self.get_key()
                try:
                    resp = fn(api.service).execute()
                    used = units(resp) if callable(units) else units
                    self.record(api, used)
                    return resp
                except HttpError as e:
                    err_str = str(e)
                    if 'quotaExceeded' in err_str or 'dailyLimitExceeded' in err_str:
                        self.deactivate(api)
                        continue
                    if 'rateLimitExceeded' in err_str:
                        if attempt < backoff_max:
                            delay = 2 ** attempt
                            logger.warning(
                                f"Rate limitExceeded на ключе {api.key}, жду {delay}s (попытка {attempt+1})"
                            )
                            time.sleep(delay)
                            attempt += 1
                            continue
                        else:
                            self.deactivate(api)
                            continue
                    logger.error(f"Unexpected HttpError на ключе {api.key}: {e}")
                    raise
                except (ConnectionResetError, socket.error) as e:
                    if attempt < backoff_max:
                        delay = 2 ** attempt
                        logger.warning(
                            f"Connection error на ключе {api.key}: {e}, жду {delay}s (попытка {attempt+1})"
                        )
                        time.sleep(delay)
                        attempt += 1
                        continue
                    else:
                        self.deactivate(api)
                        continue
    
    # ─── Настройка ключей YouTube API ────────────────────────────────
    raw_keys = os.getenv("YT_API_KEYS") or dotenv_values(env_path).get("YT_API_KEYS", "")
    KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()]
    if not KEYS:
        sys.exit("❌ Нет API-ключей (YT_API_KEYS).")
    logger.info(f"🔑 Найдено {len(KEYS)} ключей")
    key_manager = KeyManager(KEYS)
    
    VIDEO_PARTS = "snippet,contentDetails,status,player,statistics"
    
    # ─── Утилиты ─────────────────────────────────────────────────────
    def iso2hms(iso: str):
        if not iso:
            return "", 0
        td = isodate.parse_duration(iso)
        s = int(td.total_seconds())
        h, r = divmod(s, 3600)
        m, sec = divmod(r, 60)
        return f"{h:02d}:{m:02d}:{sec:02d}", s
    
    def fmt_age(pub: str):
        if not pub:
            return ""
        dt = datetime.fromisoformat(pub.replace('Z', '+00:00'))
        d = (datetime.now(timezone.utc) - dt).days
        y, rem = divmod(d, 365)
        mo = rem // 30
        return f"{y} years {mo} months" if y else f"{mo} months"
    
    def safe_detect(txt: str):
        try:
            return detect(txt) if txt.strip() else ""
        except LangDetectException:
            return ""
    
    # ─── Функции для записи в Google Sheets и CSV ───────────────────
    def append_to_sheets_with_retry(service, spreadsheet_id, sheet_range, values,
                                    max_retries=5, base_delay=1):
        for attempt in range(1, max_retries+1):
            try:
                service.spreadsheets().values().append(
                    spreadsheetId=spreadsheet_id,
                    range=sheet_range,
                    valueInputOption='RAW',
                    body={'values': values}
                ).execute()
                logger.info("✅ Успешно записали результаты в Google Sheets")
                return True
            except (HttpError, ConnectionResetError, socket.error) as e:
                delay = base_delay * 2**(attempt-1)
                logger.warning(
                    f"⚠️ Попытка {attempt}/{max_retries} записи в Sheets упала: {e}. Ждём {delay}s."
                )
                time.sleep(delay)
        logger.error("❌ Не удалось записать в Google Sheets после всех попыток")
        return False
    
    def save_csv_with_retry(df: pd.DataFrame, path: str, max_retries=3, base_delay=1):
        for attempt in range(1, max_retries+1):
            try:
                df.to_csv(path, index=False)
                logger.info(f"✅ CSV сохранён: {path}")
                return True
            except Exception as e:
                delay = base_delay * 2**(attempt-1)
                logger.warning(
                    f"⚠️ Попытка {attempt}/{max_retries} сохранения CSV упала: {e}. Ждём {delay}s."
                )
                time.sleep(delay)
        logger.error("❌ Не удалось сохранить CSV после всех попыток")
        return False
    
    def append_csv_batch(df: pd.DataFrame, path: str, max_retries=3, base_delay=1) -> bool:
        """Дозаписывает DataFrame в CSV, создавая файл с заголовком, если не существует."""
        p = Path(path)
        header = not p.exists()
        for attempt in range(1, max_retries+1):
            try:
                df.to_csv(path, index=False, header=header, mode='a')
                logger.info(f"✅ CSV батч сохранён ({len(df)} строк): {path}")
                return True
            except Exception as e:
                delay = base_delay * 2**(attempt-1)
                logger.warning(f"⚠️ Попытка {attempt}/{max_retries} дозаписи CSV упала: {e}. Ждём {delay}s.")
                time.sleep(delay)
        logger.error("❌ Не удалось дозаписать CSV после всех попыток")
        return False
    
    # ─── Поиск YouTube видео по ключу ────────────────────────────────
    def search_once(keyword: str, input_url: str) -> list:
        rows, token, page = [], None, 0
        while len(rows) < NUM_RESULTS and page < MAX_PAGES:
            page += 1
            sk = f"S:{keyword}:{REGION}:{token or ''}"
            if sk in CACHE:
                sr = CACHE[sk]
            else:
                sr = key_manager.execute(
                    lambda svc: svc.search().list(
                        q=keyword,
                        part="id",
                        type="video",
                        order="relevance",
                        regionCode=REGION,
                        maxResults=min(50, NUM_RESULTS - len(rows)),
                        pageToken=token or "",
                        safeSearch="none"
                    ),
                    units=100
                )
                CACHE[sk] = sr
            vids = [it.get('id', {}).get('videoId') for it in sr.get('items', []) if it.get('id', {}).get('videoId')]
            if not vids:
                break
            vk = f"V:{','.join(vids)}"
            if vk in CACHE:
                vr = CACHE[vk]
            else:
                vr = key_manager.execute(
                    lambda svc: svc.videos().list(
                        id=",".join(vids),
                        part=VIDEO_PARTS
                    ),
                    units=lambda resp: len(resp.get('items', []))
                )
                CACHE[vk] = vr
            cids = list({it['snippet']['channelId'] for it in vr.get('items', [])})
            amap = {}
            if cids:
                ck = f"C:{','.join(cids)}"
                if ck in CACHE:
                    ch = CACHE[ck]
                else:
                    ch = key_manager.execute(
                        lambda svc: svc.channels().list(
                            part="snippet",
                            id=",".join(cids)
                        ),
                        units=1
                    )
                    CACHE[ck] = ch
                amap = {c['id']: c['snippet']['thumbnails']['default']['url'] for c in ch.get('items', [])}
            for it in vr.get('items', []):
                sn = it.get('snippet', {})
                if sn.get('liveBroadcastContent') in {'live', 'upcoming'}:
                    continue
                det = it.get('contentDetails', {})
                dur_str, dur_s = iso2hms(det.get('duration', ''))
                st = it.get('status', {})
                stats = it.get('statistics', {})
                author = sn.get('channelTitle', '')
                avatar = amap.get(sn.get('channelId', ''), '')
                if sn.get('defaultAudioLanguage'):
                    ls, lang = 'audio', sn['defaultAudioLanguage']
                elif sn.get('defaultLanguage'):
                    ls, lang = 'default', sn['defaultLanguage']
                else:
                    ls, lang = 'detect', safe_detect(sn.get('title', '') + ' ' + sn.get('description', '')) or 'unknown'
                lic = st.get('license', '')
                emb = st.get('embeddable', False)
                allowed = emb and lic == 'creativeCommon'
                rows.append([
                    keyword, input_url, it['id'], sn.get('title', ''), sn.get('description', ''),
                    lang, ls, 'short' if dur_s < 60 else 'video',
                    dur_str, dur_s, fmt_age(sn.get('publishedAt', '')), author, avatar,
                    stats.get('viewCount', ''), stats.get('likeCount', ''), stats.get('dislikeCount', ''),
                    emb, lic, allowed,
                    it.get('player', {}).get('embedHtml', f"<iframe src=\"https://www.youtube.com/embed/{it['id']}\" allowfullscreen></iframe>")
                ])
            token = sr.get('nextPageToken') or None
        return rows
    
    # ─── Главная функция ───────────────────────────────────────────
    r1 = sheets_service.spreadsheets().values().get(
        spreadsheetId=SHEET_ID,
        range=f"{KEYWORDS_SHEET}!A2:A"
    ).execute()
    r2 = sheets_service.spreadsheets().values().get(
        spreadsheetId=SHEET_ID,
        range=f"{KEYWORDS_SHEET}!J2:J"
    ).execute()
    keywords = [r[0] for r in r1.get('values', [])]
    input_urls = [r[0] for r in r2.get('values', [])]
    
    prog = shelve.open('progress.db')
    start = prog.get('last_index', 0)
    all_res = []
    batch_res = []
    
    try:
        for idx in range(start, len(keywords)):
            kw = keywords[idx]
            input_url = input_urls[idx] if idx < len(input_urls) else ''
            logger.info(f"🔍 [{idx}] {kw}")
            res = search_once(kw, input_url)
    
            all_res.extend(res)
            batch_res.extend(res)
            prog['last_index'] = idx + 1
    
            # Сохранение батча каждые BATCH_SIZE ключей
            if (idx + 1 - start) % BATCH_SIZE == 0:
                df_batch = pd.DataFrame(batch_res, columns=[
                    'keyword','input_url','videoId','title','description','language','language_source',
                    'video_type','duration','duration_seconds','age','author','author_avatar',
                    'view_count','like_count','dislike_count','embeddable','license',
                    'allowed_on_third_party','iframe'
                ])
                append_csv_batch(df_batch, OUTPUT_CSV)
                append_to_sheets_with_retry(
                    sheets_service,
                    SHEET_ID,
                    f"{RESULTS_SHEET}!A2",
                    batch_res
                )
                logger.info(f"🔄 Батч из {len(batch_res)} результатов сохранён после {(idx+1-start)} ключей")
                batch_res = []
    
    except QuotaExceededAllKeys:
        logger.warning("⚠️ Все ключи исчерпали квоту, завершаем работу")
    finally:
        prog.close()
        CACHE.close()
    
    # Сохраняем остаток батча после завершения
    if batch_res:
        df_batch = pd.DataFrame(batch_res, columns=[
            'keyword','input_url','videoId','title','description','language','language_source',
            'video_type','duration','duration_seconds','age','author','author_avatar',
            'view_count','like_count','dislike_count','embeddable','license',
            'allowed_on_third_party','iframe'
        ])
        append_csv_batch(df_batch, OUTPUT_CSV)
        append_to_sheets_with_retry(
            sheets_service,
            SHEET_ID,
            f"{RESULTS_SHEET}!A2",
            batch_res
        )
        logger.info(f"🔄 Финальный батч из {len(batch_res)} строк сохранён")
    
    # Итоговый расход квоты
    total_units = sum(api.used_units for api in key_manager.keys)
    logger.info(f"ℹ️ Всего расход: {total_units} units")
    

    Скрипт выкладывается как есть, свою работу для меня он выполнил. Следующий этап - очистка результата от мусора, ее я уже провел и скоро расскажу как и главное чем. 

Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+6
Comments4

Articles