Я чувстовал себя клоуном подключая 5ю библиотеку для написания устойчивого к ошибкам API клиента. После этого я написал библиотеку объединяющую все воедино. Мотивация и история архитектурных решений.
Проблема: когда простой API-клиент превращается в кошмар
Представьте типичную задачу: вам нужно написать клиент для внешнего API. Кажется, что может быть проще?
import httpx async def fetch_user(user_id: str): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json()
Но затем начинаются "маленькие" проблемы:
Проблема #1: API имеет rate limit 100 запросов в минуту
Окей, добавляем ratelimit:
from ratelimit import limits, sleep_and_retry @sleep_and_retry @limits(calls=100, period=60) async def fetch_user(user_id: str): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json()
Проблема #2: API иногда возвращает 503 или timeout
Добавляем tenacity для retry:
from ratelimit import limits, sleep_and_retry from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) @sleep_and_retry @limits(calls=100, period=60) async def fetch_user(user_id: str): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") response.raise_for_status() return response.json()
Проблема #3: API может лечь полностью, и мы не хотим убить свой сервис retry-ями
Добавляем pybreaker для circuit breaker:
from ratelimit import limits, sleep_and_retry from tenacity import retry, stop_after_attempt, wait_exponential from pybreaker import CircuitBreaker api_breaker = CircuitBreaker(fail_max=5, reset_timeout=60) @api_breaker @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) @sleep_and_retry @limits(calls=100, period=60) async def fetch_user(user_id: str): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") response.raise_for_status() return response.json()
Что мы получили?
3 библиотеки в зависимостях (
ratelimit,tenacity,pybreaker)4 декораторов на одной функции (и порядок важен!)
Разные API для каждой библиотеки
Проблемы с async —
ratelimitне работает корректно с async/awaitСложное тестирование — нужны моки времени для каждой библиотеки по отдельности
Конфликты абстракций — каждая библиотека живёт в своём мире
И это для одной функции. Теперь представьте, что у вас:
10 разных внешних API
Разные политики rate limit-еров
Разные retry стратегии
Per-user и global limiting
Получается кошмар для поддержки всего этого зоопарка.
Решение: одна библиотека для всего
А что если можно было бы написать так?
from limitpal import ( AsyncResilientExecutor, AsyncTokenBucket, RetryPolicy, CircuitBreaker ) import httpx # Конфигурация один раз limiter = AsyncTokenBucket(capacity=10, refill_rate=100/60) # 100 req/min retry = RetryPolicy(max_attempts=3, base_delay=0.5, backoff=2.0) breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30.0) executor = AsyncResilientExecutor( limiter=limiter, retry_policy=retry, circuit_breaker=breaker ) # Использование async def fetch_user(user_id: str): async def _call(): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") response.raise_for_status() return response.json() return await executor.run(f"user:{user_id}", _call)
Что мы получили?
Одна библиотека —
limitpalЧистый код — никаких новостроек из декораторов
Единый API для всех компонентов
Полная поддержка async/await и sync кода
Простое тестирование с
MockClock(библиотека поддерживает проброс кастомных классов времени)Кастомизация — каждый компонент опционален
Именно для решения этой проблемы я создал свою либу LimitPal.
И тогда Саурон выковал единое кольцо...
Limitpal — это модульный инструментарий для построения устойчивых сервисов в Python.
Он объединяет:
Executors комбинация стратегий устойчивости
Rate limiting (Token Bucket, Leaky Bucket)
Retry logic с exponential backoff и jitter
Circuit Breaker для защиты от каскадных сбоев
Composite limiters для многоуровневого контроля
Всё это с полной поддержкой sync и async API, zero dependencies, и thread-safe по умолчанию.
Ключевые особенности
# Sync и Async с одинаковым API from limitpal import TokenBucket, AsyncTokenBucket sync_limiter = TokenBucket(capacity=10, refill_rate=5) async_limiter = AsyncTokenBucket(capacity=10, refill_rate=5) # Key-based limiting (per-user, per-IP, per-tenant) limiter.allow("user:123") limiter.allow("ip:192.168.1.1") # Композиция стратегий from limitpal import AsyncCompositeLimiter, AsyncLeakyBucket per_user = AsyncTokenBucket(capacity=10, refill_rate=5) global_smooth = AsyncLeakyBucket(capacity=50, leak_rate=20) limiter = AsyncCompositeLimiter([per_user, global_smooth]) # MockClock для детерминированных тестов from limitpal import MockClock clock = MockClock(start_time=0.0) limiter = TokenBucket(capacity=1, refill_rate=2.0, clock=clock) assert limiter.allow("test") is True clock.advance(0.5) assert limiter.allow("test") is True
Почему "просто использовать несколько библиотек" не работает
Давайте честно: в Python экосистеме уже есть масса библиотек для каждой задачи:
limits,ratelimitдля rate limitingtenacity,backoffдля retrypybreaker,circuitbreakerдля circuit breaker...миллионы их...
Так зачем ещё одна?
1. Конфликт абстракций
Каждая библиотека использует свой подход:
# tenacity использует декораторы @retry(stop=stop_after_attempt(3)) def my_func(): pass # ratelimit тоже декораторы, но другие @limits(calls=10, period=60) def my_func(): pass # pybreaker — объекты breaker = CircuitBreaker() @breaker def my_func(): pass
Попробуйте скомбинировать всё это для функции — получите боль.
2. Разрозненное управление временем для тестов
# Как вы будете тестировать ЭТО? from unittest.mock import patch from freezegun import freeze_time import time # Каждая библиотека по-своему работает со временем @freeze_time("2024-01-01") # для одной библиотеки @patch('time.monotonic') # для другой @patch('asyncio.sleep') # для третьей async def test_my_function(): # удачи с синхронизацией моков pass
За счет встраиваемого класса для управления временем (pluggable) LimitPal всё просто:
from limitpal import AsyncResilientExecutor, MockClock clock = MockClock(start_time=0.0) executor = AsyncResilientExecutor( limiter=AsyncTokenBucket(capacity=1, refill_rate=1, clock=clock), retry_policy=RetryPolicy(max_attempts=3), clock=clock ) # Полный контроль над временем clock.advance(1.0)
3. Отсутствие интеграции
# Сколько запросов отклонил rate limiter? # Сколько раз сработал circuit breaker? # Какой процент успешных retry? # С разными библиотеками — собирайте метрики вручную из каждой
LimitPal может предоставлять единую точку для observability (встроенная интеграция в планах на следующие версии).
4. Overhead зависимостей
$ pip install tenacity ratelimit pybreaker # + их транзитивные зависимости # + potential security vulnerabilities # + конфликты версий $ pip install limitpal # Всё включено, zero dependencies
Примеры с полей
Пример 1: Устойчивый HTTP Client
Типичная задача — интеграция с внешним API, который:
Имеет rate limit 100 req/min
Может отдавать 429/503
Иногда совсем ложится
from limitpal import ( AsyncResilientExecutor, AsyncTokenBucket, RetryPolicy, CircuitBreaker ) import httpx # Настройка устойчивости limiter = AsyncTokenBucket( capacity=10, # burst(всплеск нагрузки) до 10 запросов refill_rate=100/60 # 100 req/min ) retry = RetryPolicy( max_attempts=3, base_delay=0.5, backoff=2.0, # экспоненциальный backoff jitter=0.1, # добавляем jitter retry_on=(httpx.TimeoutException, httpx.HTTPStatusError) ) breaker = CircuitBreaker( failure_threshold=5, # открываем после 5 ошибок подряд recovery_timeout=30.0, # пытаемся восстановиться через 30 сек half_open_success_threshold=2 # нужно 2 успешных запроса для закрытия ) executor = AsyncResilientExecutor( limiter=limiter, retry_policy=retry, circuit_breaker=breaker ) # Использование async def fetch_user_orders(user_id: str): async def _fetch(): async with httpx.AsyncClient() as client: response = await client.get( f"https://api.example.com/users/{user_id}/orders" ) response.raise_for_status() return response.json() return await executor.run(f"user:{user_id}", _fetch) # Всё работает автоматически: # - Rate limiting с контролем всплесков нагрузки # - Retry при ошибках с exponential backoff # - Circuit breaker защищает от каскадных сбоев orders = await fetch_user_orders("12345")
Пример 2: Многоуровневая защита лимитами
Часто нужна многоуровневая защита:
Per-user лимит (защита от злоупотреблений)
Глобальный лимит (защита инфраструктуры)
from limitpal import AsyncCompositeLimiter, AsyncTokenBucket, AsyncLeakyBucket # Per-user: burst до 10, потом 5 req/sec per_user = AsyncTokenBucket(capacity=10, refill_rate=5) # Глобальный: гладкий лимит 100 req/sec на весь сервис global_limiter = AsyncLeakyBucket(capacity=100, leak_rate=100) # Композитный лимитер — оба должны разрешить limiter = AsyncCompositeLimiter([per_user, global_limiter]) # Использование async def handle_request(user_id: str): if await limiter.allow(f"user:{user_id}"): return await process_request() else: return {"error": "Rate limit exceeded"}, 429
Пример 3: Background Job Processing(Celery и т.д.)
Обработка фоновых задач с контролем нагрузки:
from limitpal import LeakyBucket import asyncio # Обрабатываем макс 10 задач в секунду (гладко) job_limiter = LeakyBucket(capacity=20, leak_rate=10) async def process_jobs(jobs): for job in jobs: # acquire блокирует, пока не появится место await job_limiter.acquire(f"job:{job.id}", timeout=60.0) await process_job(job) # Задачи обрабатываются ровно 10/sec, без burst'ов
Пример 4: FastAPI интеграция
from fastapi import FastAPI, HTTPException, Request from limitpal import AsyncTokenBucket, RateLimitExceeded app = FastAPI() limiter = AsyncTokenBucket(capacity=100, refill_rate=10) @app.middleware("http") async def rate_limit_middleware(request: Request, call_next): key = f"ip:{request.client.host}" if not await limiter.allow(key): raise HTTPException( status_code=429, detail="Too many requests" ) return await call_next(request) # Или для конкретного endpoint @app.post("/api/expensive-operation") async def expensive_operation(request: Request): try: await limiter.acquire( f"user:{request.user.id}", timeout=5.0 ) except RateLimitExceeded as e: raise HTTPException( status_code=429, detail=f"Retry after {e.retry_after:.1f}s" ) return await do_expensive_work()
Под капотом: как это работает
Circuit Breaker Pattern
Защита от каскадных сбоев в распределённых системах:
Состояния:
CLOSED — всё работает, запросы проходят
OPEN — слишком много ошибок, запросы блокируются
HALF_OPEN — пробуем восстановиться, пропускаем ограниченное количество запросов
from limitpal import CircuitBreaker, CircuitBreakerOpen breaker = CircuitBreaker( failure_threshold=3, # открываем после 3 ошибок recovery_timeout=10.0, # пытаемся восстановиться через 10 сек half_open_success_threshold=2 # нужно 2 успеха для закрытия ) # Нормальная работа (CLOSED) breaker.record_success() # OK breaker.record_success() # OK # Начинаются ошибки breaker.record_failure() # 1/3 breaker.record_failure() # 2/3 breaker.record_failure() # 3/3 -> переходим в OPEN # Теперь все запросы блокируются try: breaker.call(lambda: external_api()) except CircuitBreakerOpen: return "Service temporarily unavailable" # Через 10 сек переходим в HALF_OPEN time.sleep(10) # Пробуем восстановиться breaker.record_success() # 1/2 breaker.record_success() # 2/2 -> переходим в CLOSED # Снова работаем нормально
Retry Policy
Умные retry с exponential backoff и jitter:
from limitpal import RetryPolicy retry = RetryPolicy( max_attempts=3, # максимум 3 попытки base_delay=0.5, # начинаем с 0.5 сек max_delay=10.0, # не больше 10 сек backoff=2.0, # каждый раз удваиваем задержку jitter=0.1, # добавляем ±10% случайности retry_on=(TimeoutError, ConnectionError) # только эти ошибки ) # Попытка 1: немедленно # Ошибка -> ждём 0.5 * (1 ± 0.1) сек # Попытка 2: через ~0.5 сек # Ошибка -> ждём 1.0 * (1 ± 0.1) сек # Попытка 3: через ~1.0 сек # Ошибка -> RetryExhausted exception
Зачем jitter? Если 1000 клиентов одновременно получили ошибку и начали retry с одинаковым backoff — они создадут новый скачок нагрузки. Jitter размазывает retry по времени.
ResilientExecutor - оркестратор устойчивости
ResilientExecutor — это сердце LimitPal. Он объединяет все паттерны устойчивости в единый execution pipeline.
# Схема работы оркестратора Request ↓ ┌──────────────────────────────┐ │ ResilientExecutor │ │ │ │ 1. Circuit Breaker Check │ ← Быстрая проверка: можно ли вообще выполнять? │ ↓ (if OPEN → fail fast) │ │ │ │ 2. Rate Limiter │ ← Ждём квоту (acquire) или отклоняем │ ↓ (wait for token) │ │ │ │ 3. Execute + Retry Loop │ ← Выполняем с автоматическими retry │ ├─ attempt 1 │ │ ├─ (fail) → backoff │ │ ├─ attempt 2 │ │ └─ (success) │ │ │ │ 4. Record Result │ ← Обновляем Circuit Breaker │ ↓ │ └──────────────────────────────┘ ↓ Response / Exception # Пример кода from limitpal import ( AsyncResilientExecutor, AsyncTokenBucket, RetryPolicy, CircuitBreaker ) limiter = AsyncTokenBucket(capacity=5, refill_rate=2) retry = RetryPolicy(max_attempts=3, base_delay=1.0, backoff=2.0) breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10.0) executor = AsyncResilientExecutor( limiter=limiter, #каждая политика опциональна retry_policy=retry, circuit_breaker=breaker ) async def problematic_api(): # Симулируем нестабильный API if random.random() < 0.7: raise TimeoutError("API timeout") return "success" result = await executor.run("api_key", problematic_api)
Тестирование: MockClock спасает жизнь
Одна из самых больших проблем при тестировании rate limiting / retry логики — это работа со временем.
Плохой подход:
import time def test_rate_limiter(): limiter = TokenBucket(capacity=1, refill_rate=1) assert limiter.allow("test") is True assert limiter.allow("test") is False time.sleep(1) # тест длится 1 секунду assert limiter.allow("test") is True
Хороший подход с MockClock:
from limitpal import TokenBucket, MockClock def test_rate_limiter(): clock = MockClock(start_time=0.0) limiter = TokenBucket(capacity=1, refill_rate=1, clock=clock) assert limiter.allow("test") is True assert limiter.allow("test") is False clock.advance(1.0) # мгновенно assert limiter.allow("test") is True
Тест выполняется мгновенно, но поведение идентично реальному.
Тестирование ResilientExecutor:
from limitpal import ( AsyncResilientExecutor, AsyncTokenBucket, RetryPolicy, MockClock ) import pytest @pytest.mark.asyncio async def test_resilient_executor_with_retry(): clock = MockClock(start_time=0.0) retry = RetryPolicy(max_attempts=3, base_delay=1.0, backoff=2.0) executor = AsyncResilientExecutor( retry_policy=retry, clock=clock ) call_count = 0 async def flaky_function(): nonlocal call_count call_count += 1 if call_count < 3: raise TimeoutError("Temporary failure") return "success" # Запускаем task = asyncio.create_task(executor.run("test", flaky_function)) # Первая попытка — немедленно await asyncio.sleep(0) assert call_count == 1 # Retry #1 через 1 сек clock.advance(1.0) await asyncio.sleep(0) assert call_count == 2 # Retry #2 через 2 сек (backoff) clock.advance(2.0) await asyncio.sleep(0) assert call_count == 3 result = await task assert result == "success"
Детерминированный, быстрый, понятный тест.
Сравнение с существующими решениями
Feature | LimitPal | limits | slowapi | tenacity | pybreaker |
|---|---|---|---|---|---|
Rate Limiting | ✅ | ✅ | ✅ | ❌ | ❌ |
Retry Logic | ✅ | ❌ | ❌ | ✅ | ❌ |
Circuit Breaker | ✅ | ❌ | ❌ | ❌ | ✅ |
Async Support | ✅ | ✅ | ✅ | ✅ | ⚠️ partial |
Композиция всех паттернов | ✅ | ❌ | ❌ | ❌ | ❌ |
MockClock для тестов | ✅ | ❌ | ❌ | ❌ | ❌ |
Zero dependencies | ✅ | ❌ | ❌ | ❌ | ❌ |
Распределённый (Redis) | ❌* | ✅ | ❌ | ❌ | ⚠️ partial |
* В планах для будущих версий
Когда использовать что?
Используйте LimitPal, если:
Вам нужна комбинация rate limiting + retry + circuit breaker
Вы строите resilient API clients
Вам важна чистота кода и тестируемость
Вы хотите полноценный async support
Используйте limits, если:
Вам нужен ТОЛЬКО rate limiting
Вам нужна distributed система с Redis
Используйте tenacity, если:
Вам нужен ТОЛЬКО retry без rate limiting
Используйте pybreaker, если:
Вам нужен ТОЛЬКО circuit breaker
Для остальных случаев можно рассмотреть LimitPal
Архитектурные решения
1. Единая абстракция времени
Все компоненты принимают Clock интерфейс:
class Clock(Protocol): def now(self) -> float: ... def sleep(self, seconds: float) -> None: ... async def sleep_async(self, seconds: float) -> None: ...
Это позволяет:
Использовать
MonotonicClockв проде (защита от системных часов)Использовать
MockClockв тестахЛегко добавлять кастомные реализации
2. Опциональная комбинация стратегий с ResilientExecutor
# Каждый компонент опционален executor = AsyncResilientExecutor( limiter=None, # можно без rate limiting retry_policy=retry, # только retry circuit_breaker=None, # без circuit breaker ) # Или все вместе executor = AsyncResilientExecutor( limiter=limiter, retry_policy=retry, circuit_breaker=breaker, ) # Или комбинируем несколько limiters composite = AsyncCompositeLimiter([ AsyncTokenBucket(capacity=10, refill_rate=5), AsyncLeakyBucket(capacity=20, leak_rate=10), ])
3. Sync/Async паритет
Одинаковый API для sync и async:
# Sync from limitpal import TokenBucket, ResilientExecutor limiter = TokenBucket(capacity=10, refill_rate=5) executor = ResilientExecutor(limiter=limiter) result = executor.run("key", sync_callable) # Async — только добавьте Async в название from limitpal import AsyncTokenBucket, AsyncResilientExecutor limiter = AsyncTokenBucket(capacity=10, refill_rate=5) executor = AsyncResilientExecutor(limiter=limiter) result = await executor.run("key", async_callable)
4. Key-based излляция
Все limiters поддерживают изоляцию по ключам:
limiter = TokenBucket(capacity=10, refill_rate=5) # Разные пользователи — разные buckets limiter.allow("user:123") limiter.allow("user:456") limiter.allow("user:789") # Разные измерения limiter.allow(f"ip:{request.ip}") limiter.allow(f"tenant:{org_id}") limiter.allow(f"endpoint:/api/users")
С автоматическим управлением памятью:
limiter = TokenBucket( capacity=10, refill_rate=5, ttl=300.0, # удаляем неактивные buckets через 5 минут max_buckets=10000, # макс 10k buckets (LRU вытеснение) )
Roadmap
Уже работает (v0.1.1)
Token Bucket и Leaky Bucket
RetryPolicy с exponential backoff и jitter
CircuitBreaker с CLOSED/OPEN/HALF_OPEN
ResilientExecutor для композиции
CompositeLimiter
Полный sync/async support
MockClock для тестов
Zero dependencies
В планах
Observability: встроенные метрики (Prometheus, StatsD)
Sliding Window и другие алгоритмы лимитирования алгоритм
Adaptive Rate Limiting (автоматическая подстройка)
Bulkhead Pattern для изоляции ресурсов
Интеграции с фреймворками: FastAPI middleware, Flask decorator
Distributed backends: Redis, Memcached
Продвинутые Circuit Breaker: callbacks, state transitions
Документация
Полная документация: limitpal.readthedocs.io
Примеры
Больше примеров в репозитории: github.com/Guli-vali/limitpal
Заключение
Построение устойчивых сервисов — это не про "просто добавить retry". Это про правильную композицию паттернов:
Rate Limiting защищает от перегрузки (вашей и чужой)
Retry справляется с временными сбоями
Circuit Breaker предотвращает каскадные отказы
Backpressure (через
acquire) контролирует нагрузку
LimitPal даёт вам всё это в одной библиотеке с чистым API и отличной тестируемостью.
Что дальше?
Попробуйте сами
pip install limitpalИзучите примеры: github.com/Guli-vali/limitpal
Поделитесь опытом: создайте issue или PR
Расскажите другим: если заинтересовало, буду рад звездам на Github
