На днях мне прилетела задача, в которой нужно было вычерпывать данные по HTTP с такими вводными:

  • Есть ограничение по количеству запросов в минуту

  • Объём данных - миллионы записей

  • Один запрос выполняется долго (возвращает много данных)

  • Нужен асинхронный механизм выгрузки

Не включая мозг, я начал накидывать решение...

Грабли №1: async, который работает синхронно

async def fetch_all_pages():
    ...
    while True:
        response = await fetch(   # ← ошибка
            f"/resource?page={page}"
        )
        ...
        page += 1
        ...

Формально:

  • async

  • await

Фактически:

  • один запрос за раз

  • каждый следующий запрос ждёт предыдущий

Async-код не становится параллельным автоматически. Если каждый запрос ждёт завершения предыдущего - это синхронное исполнение в async-синтаксисе.

Грабли №2: «давайте просто gather всё»

Окей, давайте делать «настоящий async».

async def fetch_all():
    tasks = [fetch(page) for page in range(1, 1000)]
    results = await asyncio.gather(*tasks)  # ← ошибка
    ...

Работает. Быстро. Очень быстро. Но мы только что создали монстра.

Что пошло не так:

  • сoroutine storm - резкий рост потребления RAM

  • закономерные 429 Too Many Requests, а дальше - блокировки и баны

  • gather ждёт все задачи, одна зависла → зависло всё

  • невозможно стримить данные (а это критично в моём кейсе)

asyncio.gather без ограничений - это не параллелизм, а хаос.

Грабли №3: «давайте думать, подсказывайте»

Интегрируем семафор - классическое решение из тренажёров и собеседований.

from asyncio import Semaphore

semaphore = Semaphore(10)

async def fetch_page(page):
  async with semaphore:
    return await fetch(f"/resource?page={page}")

async def main():
    tasks = [fetch_page(page) for page in range(1, 1000)]
    results = await asyncio.gather(*tasks)
    ...

Интуитивное ожидание: «Ну значит 10 запросов в секунду»

Реальность:

latency API *плавает** (в моём случае от 8 до 30 секунд), из-за этого:

  • лимит используется лишь на ~30%

  • выгрузка растягивается во времени

  • если API внезапно ускорится (кэш, CDN, оптимизация) - получаем гарантированное кратное увеличение rps - привет, бан.

Для наглядности:

  1. Долгие ответы: 10 / 8 сек ≈ 75 req/min

  2. Деградация API: 10 / 30 сек ≈ 20 req/min

  3. Ускорение API: 10 / 0.5 сек = 1200 req/min

Семафор связывает пропускную способность системы с самым нестабильным параметром - latency внешнего API.

Semaphore регулирует ширину трубы. Но не регулирует поток воды.

Грабли №4: почти работает (но не совсем)

Чтобы RPS не зависел от latency, нужен механизм лимитирования по времени. Я остановился на алгоритме leaky bucket и его практической реализации - пакете aiolimiter.

Leaky bucket - алгоритм, который контролирует скорость обработки запросов, независимо от того, как быстро они обрабатываются.

Аналогия: кран с фиксированным напором - сколько ни лей, вытекает стабильно.

from aiolimiter import AsyncLimiter

limiter = AsyncLimiter(100, 60)   # 100 запросов в минуту
semaphore = Semaphore(10)         # контроль параллелизма

async def fetch_page(page):
  async with semaphore:
    await limiter.acquire()
    return await fetch(f"/resource?page={page}")

Это действительно решило проблему зависимости от latency.

Но…

  • корутины всё ещё создаются пачкой

  • нет стриминга

  • сложно контролировать ретраи и ошибки

  • память по-прежнему под нагрузкой

Работает - да. Продакшен-реди - нет.

Антиграбли: золотая середина

Я вышел в интрернет с данным вопросом... и нашел несколько интересных подходов с producer/consumer/worker архитектурой, логика следующая:

Очередь задач → воркеры обрабатывают параллельно → AsyncLimiter контролирует скорость → результаты обрабатываются потоком.

from aiolimiter import AsyncLimiter

async def worker(client, queue, results):
    while True:
        offset = await queue.get()
        if offset is None:
            break

        await limiter.acquire()
        response = await client.get(API_URL, params={"offset": offset})
        await results.put(response.json()["results"])
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    results = asyncio.Queue()

    for offset in range(0, 1000, 50):
        await queue.put(offset)

    workers = [
        asyncio.create_task(worker(...))
        for _ in range(5)
    ]

    while ...:
        batch = await results.get()
        # обработка данных

Что мы получили:

  1. Экономия памяти - задачи не создаются одномоментно

  2. Стриминг результатов

  3. Контроль перегрузки через количество воркеров

  4. Гибкая работа с ретраями и ошибками

  5. Предсказуемая и стабильная пропускная способность системы

Подводим итоги

Решение не серебрянная пуля, но для моего кейса - устойчивая и продакшен-реди золотая середина.