На днях мне прилетела задача, в которой нужно было вычерпывать данные по HTTP с такими вводными:
Есть ограничение по количеству запросов в минуту
Объём данных - миллионы записей
Один запрос выполняется долго (возвращает много данных)
Нужен асинхронный механизм выгрузки
Не включая мозг, я начал накидывать решение...
Грабли №1: async, который работает синхронно
async def fetch_all_pages():
...
while True:
response = await fetch( # ← ошибка
f"/resource?page={page}"
)
...
page += 1
...Формально:
async
await
Фактически:
один запрос за раз
каждый следующий запрос ждёт предыдущий
Async-код не становится параллельным автоматически. Если каждый запрос ждёт завершения предыдущего - это синхронное исполнение в async-синтаксисе.
Грабли №2: «давайте просто gather всё»
Окей, давайте делать «настоящий async».
async def fetch_all():
tasks = [fetch(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks) # ← ошибка
...Работает. Быстро. Очень быстро. Но мы только что создали монстра.
Что пошло не так:
сoroutine storm - резкий рост потребления RAM
закономерные
429 Too Many Requests, а дальше - блокировки и баныgatherждёт все задачи, одна зависла → зависло всёневозможно стримить данные (а это критично в моём кейсе)
asyncio.gatherбез ограничений - это не параллелизм, а хаос.
Грабли №3: «давайте думать, подсказывайте»
Интегрируем семафор - классическое решение из тренажёров и собеседований.
from asyncio import Semaphore
semaphore = Semaphore(10)
async def fetch_page(page):
async with semaphore:
return await fetch(f"/resource?page={page}")
async def main():
tasks = [fetch_page(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks)
...Интуитивное ожидание: «Ну значит 10 запросов в секунду»
Реальность:
latency API *плавает** (в моём случае от 8 до 30 секунд), из-за этого:
лимит используется лишь на ~30%
выгрузка растягивается во времени
если API внезапно ускорится (кэш, CDN, оптимизация) - получаем гарантированное кратное увеличение rps - привет, бан.
Для наглядности:
Долгие ответы:
10 / 8 сек ≈ 75 req/minДеградация API:
10 / 30 сек ≈ 20 req/minУскорение API:
10 / 0.5 сек = 1200 req/min
Семафор связывает пропускную способность системы с самым нестабильным параметром - latency внешнего API.
Semaphore регулирует ширину трубы. Но не регулирует поток воды.
Грабли №4: почти работает (но не совсем)
Чтобы RPS не зависел от latency, нужен механизм лимитирования по времени. Я остановился на алгоритме leaky bucket и его практической реализации - пакете aiolimiter.
Leaky bucket - алгоритм, который контролирует скорость обработки запросов, независимо от того, как быстро они обрабатываются.
Аналогия: кран с фиксированным напором - сколько ни лей, вытекает стабильно.
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(100, 60) # 100 запросов в минуту
semaphore = Semaphore(10) # контроль параллелизма
async def fetch_page(page):
async with semaphore:
await limiter.acquire()
return await fetch(f"/resource?page={page}")Это действительно решило проблему зависимости от latency.
Но…
корутины всё ещё создаются пачкой
нет стриминга
сложно контролировать ретраи и ошибки
память по-прежнему под нагрузкой
Работает - да. Продакшен-реди - нет.
Антиграбли: золотая середина
Я вышел в интрернет с данным вопросом... и нашел несколько интересных подходов с producer/consumer/worker архитектурой, логика следующая:
Очередь задач → воркеры обрабатывают параллельно → AsyncLimiter контролирует скорость → результаты обрабатываются потоком.
from aiolimiter import AsyncLimiter
async def worker(client, queue, results):
while True:
offset = await queue.get()
if offset is None:
break
await limiter.acquire()
response = await client.get(API_URL, params={"offset": offset})
await results.put(response.json()["results"])
queue.task_done()
async def main():
queue = asyncio.Queue()
results = asyncio.Queue()
for offset in range(0, 1000, 50):
await queue.put(offset)
workers = [
asyncio.create_task(worker(...))
for _ in range(5)
]
while ...:
batch = await results.get()
# обработка данныхЧто мы получили:
Экономия памяти - задачи не создаются одномоментно
Стриминг результатов
Контроль перегрузки через количество воркеров
Гибкая работа с ретраями и ошибками
Предсказуемая и стабильная пропускная способность системы
Подводим итоги
Решение не серебрянная пуля, но для моего кейса - устойчивая и продакшен-реди золотая середина.