Привет!
Предыстория
Была у меня задача: проверить качество работы API для поиска похожих векторов. Штука довольно стандартная для ML-систем — на вход подаёшь 128-мерный вектор (например, эмбеддинг изображения или текста), API ищет в базе ближайшие вектора и возвращает их.
Тестовый датасет: 10 миллионов векторов. Задача: для каждого вектора проверить, что API правильно находит его же (similarity должна быть 100%).
Коллеги написали API, я написал клиент для тестирования. Что может пойти не так?
Попытка 1: Тупо в лоб:
async def check_vector(vector: list[float]) -> bool: """Проверить один вектор.""" response = await client.post( "http://api.internal/search", json={"vector": vector, "top_k": 1} ) result = response.json() return result["similarity"] >= 0.99 # Это условная величина async def main(): # Читаем векторы с диска vectors = read_vectors_from_disk("vectors.bin") results = [] for vector in vectors: result = await check_vector(vector) results.append(result) accuracy = sum(results) / len(results) * 100 print(f"Accuracy: {accuracy}%")
Запустил. Понятно стало, что это будет работать долго, очень долго. 10 миллионов запросов по очереди — не самая быстрая идея.
Попытка 2: Пробую параллелить (asyncio.gather):
async def main(): vectors = read_vectors_from_disk("vectors.bin") # 10M векторов tasks = [check_vector(v) for v in vectors] results = await asyncio.gather(*tasks) accuracy = sum(results) / len(results) * 100 print(f"Accuracy: {accuracy}%")
Первые минуты всё летало. Потом клиент начал тормозить. Через какое-то время (не помню сколько, но меньше 10 минут) — вообще ��авис. Смотрю в htop — память жрёт, CPU на 100%, файловые дескрипторы кончились.
Перезапускаю. Та же история.
Проблема была очевидна (это сейчас очевидно, на тот момент не очень очевидно было): я запустил 10 миллионов одновременных HTTP запросов. Мой клиент пытался:
Держать в памяти 10M корутин
Читать с диска и отправлять всё одновременно А их API просто не был рассчитан на такую нагрузку. Rate limiter срабатывал, запросы дропались, база не справлялась.
Что будет в статье
Сначала покажу, почему наивные подходы не работают (на примере моего реального факапа с 10М векторов). Потом — как asyncio.Semaphore решает проблему. И в конце — решение с обработкой ошибок, timeout-ами и retry.
В чём была проблема
Задача:
Прочитать 10 миллионов векторов с диска
Для каждого вектора отправить POST запрос на API
Проверить, что API вернул правильный результат
Посчитать accuracy
Попытка №1: Последовательно
import asyncio import aiohttp async def check_vector(session: aiohttp.ClientSession, vector: list[float]) -> bool: """Проверить один вектор через API.""" async with session.post( "https://api.internal/search", json={"vector": vector, "top_k": 1} ) as response: result = await response.json() return result["similarity"] >= 0.99 async def main(): vectors = load_vectors("vectors.bin") # 10M векторов async with aiohttp.ClientSession() as session: results = [] for vector in vectors: result = await check_vector(session, vector) # ← Ждём каждый results.append(result) accuracy = sum(results) / len(results) * 100 print(f"Accuracy: {accuracy}%") asyncio.run(main())
Проблема: Каждый запрос ждёт предыдущий. Если один запрос = 50ms, то:
10,000,000 запросов × 50ms = 500,000 секунд = 138 часов = 5.7 дней
Пять дней на тестирование? Не, спасибо
Попытка №2: Всё сразу (катастрофа)
async def main(): vectors = load_vectors("vectors.bin") # 10M векторов async with aiohttp.ClientSession() as session: # Создаём 10 миллионов задач СРАЗУ tasks = [check_vector(session, v) for v in vectors] results = await asyncio.gather(*tasks) accuracy = sum(results) / len(results) * 100 print(f"Accuracy: {accuracy}%")
Что произошло:
На стороне клиента:
Python создал 10M корутин в памяти → жрёт 20+ GB RAM
Чтение с диска не успевает → I/O bottleneck
Программа зависла, потом упала с
MemoryError
На стороне API:
10M запросов прилетели за 30 секунд
Connection pool исчерпан (обычно 100-1000 соединений)
Rate limiter включился:
429 Too Many RequestsБаза данных под нагрузкой начала тормозить
Итог: положил и свой клиент, и их API, и базу данных. Тройное комбо!
Решение: Semaphore
После разбора полётов я полез в документацию asyncio искать "как ограничить количество одновременных запросов". Наткнулся на Semaphore.
Семафор — это по сути счётчик с двумя операциями:
acquire() — уменьшить счётчик (или ждать, если он уже 0)
release() — увеличить счётчик обратно
Создаёшь семафор с лимитом:
semaphore = asyncio.Semaphore(100) # Максимум 100 одновременных операций
Теперь у тебя есть "100 слотов". Захватил слот — работаешь. Освободил — следующая задача может начать.
Исправленный код
import asyncio import aiohttp from typing import List async def check_vector( session: aiohttp.ClientSession, vector: list[float], semaphore: asyncio.Semaphore ) -> bool: """Проверить вектор с rate limiting.""" async with semaphore: async with session.post( "https://api.internal/search", json={"vector": vector, "top_k": 1} ) as response: result = await response.json() return result["similarity"] >= 0.99 async def main(): vectors = load_vectors("vectors.bin") semaphore = asyncio.Semaphore(100) async with aiohttp.ClientSession() as session: # Создаём все задачи (но они будут ждать слота) tasks = [ check_vector(session, vector, semaphore) for vector in vectors ] results = await asyncio.gather(*tasks) accuracy = sum(results) / len(results) * 100 print(f"Accuracy: {accuracy}%") asyncio.run(main())
Что изменилось?
Было:
Все 10M задач стартуют одновременно
Память взрывается, API падает
Стало:
Создаются все 10M задач (это дёшево, просто корутины)
Но работают только 100 одновременно
Как только одна завершилась → следующая хватает слот
Результат:
Время выполнения: ~14 часов (вместо 5 дней или краша)
Память клиента: стабильные 500 MB (вместо 20+ GB)
API живой и счастливый
Accuracy: 99.97%
Как это работает внутри
async with semaphore: # Попытка захватить слот # Если есть свободные слоты (счётчик > 0): # - Уменьшаем счётчик # - Входим в блок # Если слотов нет (счётчик = 0): # - Ждём в очереди # После выхода из блока: # - Увеличиваем счётчик # - Следующая задача из очереди может войти await do_work()
Примерная timeline:
t=0s: Задачи 1-100 захватили слоты, начали работу t=50ms: Задача 1 завершилась → Задача 101 начала работу t=51ms: Задача 2 завершилась → Задача 102 начала работу ... Всегда 100 активных задач
Подбор лимита
Как выбрать число для Semaphore(N)? Зависит от:
Возможности API — сколько он может обработать?
У нас API мог ~500 req/sec
При 50ms на запрос: 100 concurrent = ~2000 req/sec
Значит 100 — много, снизил до 50
Скорость ответа — сколько времени API отвечает?
Быстрые ответы (10-50ms) → можно больше concurrent
Медленные (500ms+) → меньше concurrent, иначе задачи накапливаются
В итоге я остановился на 50 одновременных запросов:
semaphore = asyncio.Semaphore(50)
API не перегружался, клиент работал стабильно, коллеги были довольны
Production паттерны
После всех приключений я понял: копипастить async with semaphore в каждую функцию — плохая идея. Код превращается в кашу, и когда нужно поменять логику (например, добавить retry), приходится лезть в 20 мест.
Поэтому я завёл себе класс-обёртку. Выглядит избыточно, но когда в проекте 50+ мест с HTTP запросами — это спасение.
Вариант 1: Простая обёртка
from typing import TypeVar, Callable, Awaitable T = TypeVar('T') class RateLimitedExecutor: """Делает rate limiting за вас.""" def __init__(self, max_concurrent: int = 10): self._semaphore = asyncio.Semaphore(max_concurrent) async def execute( self, func: Callable[[str], Awaitable[T]], items: list[str], ) -> list[T]: """Запустить функцию для всех items с rate limiting.""" tasks = [ self._run_with_limit(func, item) for item in items ] return await asyncio.gather(*tasks) async def _run_with_limit(self, func, item): async with self._semaphore: return await func(item) # Использование простое: executor = RateLimitedExecutor(max_concurrent=10) results = await executor.execute(fetch_data, urls)
Уже лучше! Но есть проблема: если одна задача упадёт, gather() по умолчанию завалит всё остальное.
Вариант 2: С обработкой ошиб��к
from dataclasses import dataclass @dataclass(frozen=True) class Result: """Либо value, либо error.""" url: str value: dict | None = None error: Exception | None = None @property def is_ok(self) -> bool: return self.error is None class RateLimitedExecutor: # ... __init__ тот же ... async def execute(self, func, items) -> list[Result]: tasks = [self._run_with_limit(func, item) for item in items] # return_exceptions=True — не валим всё если одна упала results = await asyncio.gather(*tasks, return_exceptions=True) return [ self._wrap_result(item, result) for item, result in zip(items, results) ] def _wrap_result(self, url: str, result) -> Result: if isinstance(result, Exception): return Result(url=url, error=result) return Result(url=url, value=result) # Теперь можно обрабатывать отдельно успехи и фейлы results = await executor.execute(fetch_data, urls) successful = [r.value for r in results if r.is_ok] failed = [r for r in results if not r.is_ok] print(f"OK: {len(successful)}, Failed: {len(failed)}") # Посмотреть первые 5 ошибок for failure in failed[:5]: print(f"{failure.url}: {failure.error}")
Это реально удобно! Особенно когда обрабатываешь тысячи запросов и 1-2% могут упасть по таймауту или 500-кой — не хочется из-за них терять остальные 98%.
Timeout'ы и retry (потому что всё ломается)
Семафор решает проблему с количеством запросов, но не защищает от другой беды — зависших соединений.
История из жизни: один из наших внешних API иногда просто... виснет. Запрос отправляется, но ответа нет. И нет. И нет. Минута проходит, две, пять... А код терпеливо ждёт. В итоге все 10 слотов семафора заняты зависшими запросами, и новые встают в очередь.
Добавляем timeout
Решение простое — timeout на каждый запрос:
class RateLimitedExecutor: def __init__( self, max_concurrent: int = 10, timeout_seconds: float = 5.0 ): self._semaphore = asyncio.Semaphore(max_concurrent) self._timeout = timeout_seconds async def _run_with_limit(self, func, item): async with self._semaphore: try: async with asyncio.timeout(self._timeout): return await func(item) except TimeoutError: print(f"Timeout for {item}") raise
Теперь если запрос не ответил за 5 секунд — мы его убиваем и освобождаем слот для следующего.
А что насчёт retry?
Timeout решает проблему зависших запросов, но иногда хочется дать API второй шанс. Например, он может вернуть 503 (service unavailable) просто потому что перегружен на секунду.
Вот как я добавил retry с exponential backoff:
from dataclasses import dataclass @dataclass class RetryConfig: max_retries: int = 3 timeout_seconds: float = 5.0 backoff_factor: float = 2.0 class RateLimitedExecutor: def __init__(self, max_concurrent: int = 10, retry_config: RetryConfig = None): self._semaphore = asyncio.Semaphore(max_concurrent) self._retry_config = retry_config or RetryConfig() async def _run_with_limit(self, func, item): async with self._semaphore: return await self._try_with_retry(func, item) async def _try_with_retry(self, func, item): last_error = None for attempt in range(self._retry_config.max_retries): try: async with asyncio.timeout(self._retry_config.timeout_seconds): result = await func(item) if attempt > 0: print(f"Успех со {attempt + 1}-й попытки: {item}") return result except (TimeoutError, ConnectionError) as e: last_error = e print(f"Попытка {attempt + 1} провалилась: {item}") # Если это не последняя попытка — ждём if attempt < self._retry_config.max_retries - 1: pause = self._retry_config.backoff_factor ** attempt await asyncio.sleep(pause) print(f"Все {self._retry_config.max_retries} попытки провалились: {item}") raise last_error
Exponential backoff — это когда паузы между попытками растут: 1 сек, 2 сек, 4 сек, 8 сек... Идея в том, что если сервис перегружен, мы даём ему время восстановиться, а не долбим с прежней скоростью.
Использование:
executor = RateLimitedExecutor( max_concurrent=10, retry_config=RetryConfig( max_retries=3, timeout_seconds=5.0, backoff_factor=2.0, ) ) results = await executor.execute(fetch_data, urls)
После добавления retry количество успешных запросов у меня выросло процентов на 5-7. Оказалось, многие фейлы были временными.
Bonus: Что ещё можно улучшить
После того как основная задача была решена, я добавил ещё пару улучшений:
1. Progress bar
from tqdm.asyncio import tqdm # Вместо обычного gather results = await tqdm.gather(*tasks, desc="Checking vectors")
Теперь я вижу прогресс выполнения в реальном времени. Психологически легче, когда знаешь, что осталось 2 часа, а не "хз сколько".
2. Батчирование чтения с диска
def load_vectors_lazy(filename: str, batch_size: int = 10000): """Читать вектора батчами вместо все сразу.""" with open(filename, 'rb') as f: while batch := read_batch(f, batch_size): yield batch # Обрабатываем по 10k векторов за раз for batch in load_vectors_lazy("vectors.bin", batch_size=10000): results = await checker.check_all(batch) save_results(results)
Так не нужно держать все 10M векторов в памяти одновременно.
3. Graceful shutdown
import signal shutdown_event = asyncio.Event() def signal_handler(signum, frame): print("Получен сигнал остановки, завершаю текущие задачи...") shutdown_event.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # В основном цикле for batch in batches: if shutdown_event.is_set(): print("Остановка по сигналу") break results = await process_batch(batch)
Теперь можно прервать по Ctrl+C без потери данных — все текущие запросы завершатся корректно.
Заключение: что я вынес из этой истории
В итоге, после всех переделок, финальный код выглядел так:
class VectorChecker: def __init__(self, api_url: str, max_concurrent: int = 50): self.api_url = api_url self.semaphore = asyncio.Semaphore(max_concurrent) self.session: aiohttp.ClientSession | None = None async def __aenter__(self): timeout = aiohttp.ClientTimeout(total=30, connect=5) self.session = aiohttp.ClientSession(timeout=timeout) return self async def __aexit__(self, *args): if self.session: await self.session.close() async def check_vector(self, vector: list[float]) -> Result: """Проверить один вектор с retry и timeout.""" async with self.semaphore: for attempt in range(3): try: async with self.session.post( f"{self.api_url}/search", json={"vector": vector, "top_k": 1} ) as response: if response.status == 200: data = await response.json() return Result( success=True, similarity=data["similarity"] ) elif response.status == 429: # Rate limit — подождём await asyncio.sleep(2 ** attempt) continue else: return Result(success=False, error=f"HTTP {response.status}") except asyncio.TimeoutError: if attempt == 2: return Result(success=False, error="Timeout") await asyncio.sleep(1) return Result(success=False, error="Max retries exceeded") async def check_all(self, vectors: list[list[float]]) -> dict: """Проверить все вектора и вернуть статистику.""" tasks = [self.check_vector(v) for v in vectors] results = await asyncio.gather(*tasks) successful = [r for r in results if r.success] failed = [r for r in results if not r.success] return { "total": len(results), "successful": len(successful), "failed": len(failed), "accuracy": len(successful) / len(results) * 100, "avg_similarity": sum(r.similarity for r in successful) / len(successful) if successful else 0 } # Использование async def main(): vectors = load_vectors("vectors.bin") # 10M векторов async with VectorChecker("https://api.internal", max_concurrent=50) as checker: stats = await checker.check_all(vectors) print(f""" Результаты тестирования: Всего проверено: {stats['total']:,} Успешно: {stats['successful']:,} Ошибок: {stats['failed']:,} Accuracy: {stats['accuracy']:.2f}% Средняя схожесть: {stats['avg_similarity']:.4f} """) asyncio.run(main())
Финальные метрики
Метрика | До оптимизации | После оптимизации |
|---|---|---|
Время выполнения | ∞ (краш) | ~12 часов |
Память клиента | 20+ GB → краш | 500 MB (стабильно) |
Нагрузка на API | 10M req/30s → краш | 50 req/sec (стабильно) |
Accuracy | N/A | 99.97% |
Успешных запросов | 0% (всё упало) | 99.95% |
Основные выводы
Semaphore — это must have для любых массовых I/O операций
Защищает и твой код, и внешние сервисы
Простой в использовании, эффективный
Добавляй timeout везде
Один зависший запрос блокирует слот навсегда
asyncio.timeout()илиasyncio.wait_for()
Retry решает 90% временных проблем
API иногда отдаёт 429 или 503 — это нормально
Exponential backoff даёт API время восстановиться
Result type вместо exceptions
Один упавший запрос не должен ронять всё
Собирай статистику: сколько успешных, сколько failed, какие ошибки
Подбирай concurrency лимит экспериментально
Начни с малого (10-50)
Смотри на метрики API и клиента
Увеличивай постепенно
Чек-лист перед production
✅ Используешь async with для Semaphore
✅ Добавлен timeout на каждый I/O вызов
✅ Реализован retry для временных ошибок
✅ Errors обрабатываются через Result type
✅ Есть логирование и метрики
✅ Протестировано на реальной нагрузке
✅ Подобран оптимальный concurrency лимит
P.S. Про память
Кстати, после всех оптимизаций с семафором я заметил, что клиент всё ещё жрёт ~5 GB RAM. Оказалось, это все вектора, загруженные разом через numpy.fromfile():
# Так делал я (плохо): vectors = np.fromfile("vectors.bin", dtype=np.float32) vectors = vectors.reshape(-1, 128) # 10M × 128 × 4 bytes = 5.12 GB tasks = [check_vector(session, v) for v in vectors]
Решение оказалось простым — читать батчами:
def load_vectors_in_batches(filename: str, batch_size: int = 10000): """Читать вектора батчами по 10k.""" with open(filename, 'rb') as f: while True: chunk = np.fromfile(f, dtype=np.float32, count=batch_size * 128) if chunk.size == 0: break yield chunk.reshape(-1, 128) async def main(): async with VectorChecker(api_url, max_concurrent=50) as checker: for batch in load_vectors_in_batches("vectors.bin", batch_size=10000): stats = await checker.check_all(batch) print(f"Batch: {stats['successful']}/{len(batch)} OK")
Результат:
Память клиента: с 5 GB → 50 MB
Можно прервать и продолжить с нужного батча
Можно сохранять промежуточные результаты
