Я чувстовал себя клоуном подключая 5ю библиотеку для написания устойчивого к ошибкам API клиента. После этого я написал библиотеку объединяющую все воедино. Мотивация и история архитектурных решений.

Проблема: когда простой API-клиент превращается в кошмар

Представьте типичную задачу: вам нужно написать клиент для внешнего API. Кажется, что может быть проще?

import httpx

async def fetch_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        return response.json()

Но затем начинаются "маленькие" проблемы:

Проблема #1: API имеет rate limit 100 запросов в минуту

Окей, добавляем ratelimit:

from ratelimit import limits, sleep_and_retry

@sleep_and_retry
@limits(calls=100, period=60)
async def fetch_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        return response.json()

Проблема #2: API иногда возвращает 503 или timeout

Добавляем tenacity для retry:

from ratelimit import limits, sleep_and_retry
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
@sleep_and_retry
@limits(calls=100, period=60)
async def fetch_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        response.raise_for_status()
        return response.json()

Проблема #3: API может лечь полностью, и мы не хотим убить свой сервис retry-ями

Добавляем pybreaker для circuit breaker:

from ratelimit import limits, sleep_and_retry
from tenacity import retry, stop_after_attempt, wait_exponential
from pybreaker import CircuitBreaker

api_breaker = CircuitBreaker(fail_max=5, reset_timeout=60)

@api_breaker
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
@sleep_and_retry
@limits(calls=100, period=60)
async def fetch_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        response.raise_for_status()
        return response.json()

Что мы получили?

  • 3 библиотеки в зависимостях (ratelimit, tenacity, pybreaker)

  • 4 декораторов на одной функции (и порядок важен!)

  • Разные API для каждой библиотеки

  • Проблемы с asyncratelimit не работает корректно с async/await

  • Сложное тестирование — нужны моки времени для каждой библиотеки по отдельности

  • Конфликты абстракций — каждая библиотека живёт в своём мире

И это для одной функции. Теперь представьте, что у вас:

  • 10 разных внешних API

  • Разные политики rate limit-еров

  • Разные retry стратегии

  • Per-user и global limiting

Получается кошмар для поддержки всего этого зоопарка.

Решение: одна библиотека для всего

А что если можно было бы написать так?

from limitpal import (
    AsyncResilientExecutor,
    AsyncTokenBucket,
    RetryPolicy,
    CircuitBreaker
)
import httpx

# Конфигурация один раз
limiter = AsyncTokenBucket(capacity=10, refill_rate=100/60)  # 100 req/min
retry = RetryPolicy(max_attempts=3, base_delay=0.5, backoff=2.0)
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30.0)

executor = AsyncResilientExecutor(
    limiter=limiter,
    retry_policy=retry,
    circuit_breaker=breaker
)

# Использование
async def fetch_user(user_id: str):
    async def _call():
        async with httpx.AsyncClient() as client:
            response = await client.get(f"https://api.example.com/users/{user_id}")
            response.raise_for_status()
            return response.json()
    
    return await executor.run(f"user:{user_id}", _call)

Что мы получили?

  • Одна библиотекаlimitpal

  • Чистый код — никаких новостроек из декораторов

  • Единый API для всех компонентов

  • Полная поддержка async/await и sync кода

  • Простое тестирование с MockClock (библиотека поддерживает проброс кастомных классов времени)

  • Кастомизация — каждый компонент опционален

Именно для решения этой проблемы я создал свою либу LimitPal.

И тогда Саурон выковал единое кольцо...

Limitpal — это модульный инструментарий для построения устойчивых сервисов в Python.

Он объединяет:

  • Executors комбинация стратегий устойчивости

  • Rate limiting (Token Bucket, Leaky Bucket)

  • Retry logic с exponential backoff и jitter

  • Circuit Breaker для защиты от каскадных сбоев

  • Composite limiters для многоуровневого контроля

Всё это с полной поддержкой sync и async API, zero dependencies, и thread-safe по умолчанию.

Ключевые особенности

# Sync и Async с одинаковым API
from limitpal import TokenBucket, AsyncTokenBucket

sync_limiter = TokenBucket(capacity=10, refill_rate=5)
async_limiter = AsyncTokenBucket(capacity=10, refill_rate=5)

# Key-based limiting (per-user, per-IP, per-tenant)
limiter.allow("user:123")
limiter.allow("ip:192.168.1.1")

# Композиция стратегий
from limitpal import AsyncCompositeLimiter, AsyncLeakyBucket

per_user = AsyncTokenBucket(capacity=10, refill_rate=5)
global_smooth = AsyncLeakyBucket(capacity=50, leak_rate=20)

limiter = AsyncCompositeLimiter([per_user, global_smooth])

# MockClock для детерминированных тестов
from limitpal import MockClock

clock = MockClock(start_time=0.0)
limiter = TokenBucket(capacity=1, refill_rate=2.0, clock=clock)
assert limiter.allow("test") is True
clock.advance(0.5)
assert limiter.allow("test") is True

Почему "просто использовать несколько библиотек" не работает

Давайте честно: в Python экосистеме уже есть масса библиотек для каждой задачи:

  • limits, ratelimit для rate limiting

  • tenacity, backoff для retry

  • pybreaker, circuitbreaker для circuit breaker

  • ...миллионы их...

Так зачем ещё одна?

1. Конфликт абстракций

Каждая библиотека использует свой подход:

# tenacity использует декораторы
@retry(stop=stop_after_attempt(3))
def my_func():
    pass

# ratelimit тоже декораторы, но другие
@limits(calls=10, period=60)
def my_func():
    pass

# pybreaker — объекты
breaker = CircuitBreaker()
@breaker
def my_func():
    pass

Попробуйте скомбинировать всё это для функции — получите боль.

2. Разрозненное управление временем для тестов

# Как вы будете тестировать ЭТО?
from unittest.mock import patch
from freezegun import freeze_time
import time

# Каждая библиотека по-своему работает со временем
@freeze_time("2024-01-01")  # для одной библиотеки
@patch('time.monotonic')     # для другой
@patch('asyncio.sleep')      # для третьей
async def test_my_function():
    # удачи с синхронизацией моков
    pass

За счет встраиваемого класса для управления временем (pluggable) LimitPal всё просто:

from limitpal import AsyncResilientExecutor, MockClock

clock = MockClock(start_time=0.0)
executor = AsyncResilientExecutor(
    limiter=AsyncTokenBucket(capacity=1, refill_rate=1, clock=clock),
    retry_policy=RetryPolicy(max_attempts=3),
    clock=clock
)

# Полный контроль над временем
clock.advance(1.0)

3. Отсутствие интеграции

# Сколько запросов отклонил rate limiter?
# Сколько раз сработал circuit breaker?
# Какой процент успешных retry?

# С разными библиотеками — собирайте метрики вручную из каждой

LimitPal может предоставлять единую точку для observability (встроенная интеграция в планах на следующие версии).

4. Overhead зависимостей

$ pip install tenacity ratelimit pybreaker
# + их транзитивные зависимости
# + potential security vulnerabilities
# + конфликты версий

$ pip install limitpal
# Всё включено, zero dependencies

Примеры с полей

Пример 1: Устойчивый HTTP Client

Типичная задача — интеграция с внешним API, который:

  • Имеет rate limit 100 req/min

  • Может отдавать 429/503

  • Иногда совсем ложится

from limitpal import (
    AsyncResilientExecutor,
    AsyncTokenBucket,
    RetryPolicy,
    CircuitBreaker
)
import httpx

# Настройка устойчивости
limiter = AsyncTokenBucket(
    capacity=10,        # burst(всплеск нагрузки) до 10 запросов
    refill_rate=100/60  # 100 req/min
)

retry = RetryPolicy(
    max_attempts=3,
    base_delay=0.5,
    backoff=2.0,                                    # экспоненциальный backoff
    jitter=0.1,                                     # добавляем jitter
    retry_on=(httpx.TimeoutException, httpx.HTTPStatusError)
)

breaker = CircuitBreaker(
    failure_threshold=5,      # открываем после 5 ошибок подряд
    recovery_timeout=30.0,    # пытаемся восстановиться через 30 сек
    half_open_success_threshold=2  # нужно 2 успешных запроса для закрытия
)

executor = AsyncResilientExecutor(
    limiter=limiter,
    retry_policy=retry,
    circuit_breaker=breaker
)

# Использование
async def fetch_user_orders(user_id: str):
    async def _fetch():
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"https://api.example.com/users/{user_id}/orders"
            )
            response.raise_for_status()
            return response.json()
    
    return await executor.run(f"user:{user_id}", _fetch)

# Всё работает автоматически:
# - Rate limiting с контролем всплесков нагрузки
# - Retry при ошибках с exponential backoff
# - Circuit breaker защищает от каскадных сбоев
orders = await fetch_user_orders("12345")

Пример 2: Многоуровневая защита лимитами

Часто нужна многоуровневая защита:

  • Per-user лимит (защита от злоупотреблений)

  • Глобальный лимит (защита инфраструктуры)

from limitpal import AsyncCompositeLimiter, AsyncTokenBucket, AsyncLeakyBucket

# Per-user: burst до 10, потом 5 req/sec
per_user = AsyncTokenBucket(capacity=10, refill_rate=5)

# Глобальный: гладкий лимит 100 req/sec на весь сервис
global_limiter = AsyncLeakyBucket(capacity=100, leak_rate=100)

# Композитный лимитер — оба должны разрешить
limiter = AsyncCompositeLimiter([per_user, global_limiter])

# Использование
async def handle_request(user_id: str):
    if await limiter.allow(f"user:{user_id}"):
        return await process_request()
    else:
        return {"error": "Rate limit exceeded"}, 429

Пример 3: Background Job Processing(Celery и т.д.)

Обработка фоновых задач с контролем нагрузки:

from limitpal import LeakyBucket
import asyncio

# Обрабатываем макс 10 задач в секунду (гладко)
job_limiter = LeakyBucket(capacity=20, leak_rate=10)

async def process_jobs(jobs):
    for job in jobs:
        # acquire блокирует, пока не появится место
        await job_limiter.acquire(f"job:{job.id}", timeout=60.0)
        await process_job(job)

# Задачи обрабатываются ровно 10/sec, без burst'ов

Пример 4: FastAPI интеграция

from fastapi import FastAPI, HTTPException, Request
from limitpal import AsyncTokenBucket, RateLimitExceeded

app = FastAPI()
limiter = AsyncTokenBucket(capacity=100, refill_rate=10)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    key = f"ip:{request.client.host}"
    
    if not await limiter.allow(key):
        raise HTTPException(
            status_code=429,
            detail="Too many requests"
        )
    
    return await call_next(request)

# Или для конкретного endpoint
@app.post("/api/expensive-operation")
async def expensive_operation(request: Request):
    try:
        await limiter.acquire(
            f"user:{request.user.id}",
            timeout=5.0
        )
    except RateLimitExceeded as e:
        raise HTTPException(
            status_code=429,
            detail=f"Retry after {e.retry_after:.1f}s"
        )
    
    return await do_expensive_work()

Под капотом: как это работает

Circuit Breaker Pattern

Защита от каскадных сбоев в распределённых системах:

Состояния:

  • CLOSED — всё работает, запросы проходят

  • OPEN — слишком много ошибок, запросы блокируются

  • HALF_OPEN — пробуем восстановиться, пропускаем ограниченное количество запросов

from limitpal import CircuitBreaker, CircuitBreakerOpen

breaker = CircuitBreaker(
    failure_threshold=3,         # открываем после 3 ошибок
    recovery_timeout=10.0,       # пытаемся восстановиться через 10 сек
    half_open_success_threshold=2  # нужно 2 успеха для закрытия
)

# Нормальная работа (CLOSED)
breaker.record_success()  # OK
breaker.record_success()  # OK

# Начинаются ошибки
breaker.record_failure()  # 1/3
breaker.record_failure()  # 2/3
breaker.record_failure()  # 3/3 -> переходим в OPEN

# Теперь все запросы блокируются
try:
    breaker.call(lambda: external_api())
except CircuitBreakerOpen:
    return "Service temporarily unavailable"

# Через 10 сек переходим в HALF_OPEN
time.sleep(10)

# Пробуем восстановиться
breaker.record_success()  # 1/2
breaker.record_success()  # 2/2 -> переходим в CLOSED

# Снова работаем нормально

Retry Policy

Умные retry с exponential backoff и jitter:

from limitpal import RetryPolicy

retry = RetryPolicy(
    max_attempts=3,     # максимум 3 попытки
    base_delay=0.5,     # начинаем с 0.5 сек
    max_delay=10.0,     # не больше 10 сек
    backoff=2.0,        # каждый раз удваиваем задержку
    jitter=0.1,         # добавляем ±10% случайности
    retry_on=(TimeoutError, ConnectionError)  # только эти ошибки
)

# Попытка 1: немедленно
# Ошибка -> ждём 0.5 * (1 ± 0.1) сек
# Попытка 2: через ~0.5 сек
# Ошибка -> ждём 1.0 * (1 ± 0.1) сек
# Попытка 3: через ~1.0 сек
# Ошибка -> RetryExhausted exception

Зачем jitter? Если 1000 клиентов одновременно получили ошибку и начали retry с одинаковым backoff — они создадут новый скачок нагрузки. Jitter размазывает retry по времени.

ResilientExecutor - оркестратор устойчивости

ResilientExecutor — это сердце LimitPal. Он объединяет все паттерны устойчивости в единый execution pipeline.


# Схема работы оркестратора

Request
   ↓
┌──────────────────────────────┐
│   ResilientExecutor          │
│                              │
│  1. Circuit Breaker Check    │ ← Быстрая проверка: можно ли вообще выполнять?
│     ↓ (if OPEN → fail fast)  │
│                              │
│  2. Rate Limiter             │ ← Ждём квоту (acquire) или отклоняем
│     ↓ (wait for token)       │
│                              │
│  3. Execute + Retry Loop     │ ← Выполняем с автоматическими retry
│     ├─ attempt 1             │
│     ├─ (fail) → backoff      │
│     ├─ attempt 2             │
│     └─ (success)             │
│                              │
│  4. Record Result            │ ← Обновляем Circuit Breaker
│     ↓                        │
└──────────────────────────────┘
   ↓
Response / Exception

# Пример кода
from limitpal import (
    AsyncResilientExecutor,
    AsyncTokenBucket,
    RetryPolicy,
    CircuitBreaker
)

limiter = AsyncTokenBucket(capacity=5, refill_rate=2)
retry = RetryPolicy(max_attempts=3, base_delay=1.0, backoff=2.0)
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10.0)

executor = AsyncResilientExecutor(
    limiter=limiter, #каждая политика опциональна
    retry_policy=retry,
    circuit_breaker=breaker
)

async def problematic_api():
    # Симулируем нестабильный API
    if random.random() < 0.7:
        raise TimeoutError("API timeout")
    return "success"

result = await executor.run("api_key", problematic_api)

Тестирование: MockClock спасает жизнь

Одна из самых больших проблем при тестировании rate limiting / retry логики — это работа со временем.

Плохой подход:

import time

def test_rate_limiter():
    limiter = TokenBucket(capacity=1, refill_rate=1)
    
    assert limiter.allow("test") is True
    assert limiter.allow("test") is False
    
    time.sleep(1)  # тест длится 1 секунду
    assert limiter.allow("test") is True

Хороший подход с MockClock:

from limitpal import TokenBucket, MockClock

def test_rate_limiter():
    clock = MockClock(start_time=0.0)
    limiter = TokenBucket(capacity=1, refill_rate=1, clock=clock)
    
    assert limiter.allow("test") is True
    assert limiter.allow("test") is False
    
    clock.advance(1.0)  # мгновенно
    assert limiter.allow("test") is True

Тест выполняется мгновенно, но поведение идентично реальному.

Тестирование ResilientExecutor:

from limitpal import (
    AsyncResilientExecutor,
    AsyncTokenBucket,
    RetryPolicy,
    MockClock
)
import pytest

@pytest.mark.asyncio
async def test_resilient_executor_with_retry():
    clock = MockClock(start_time=0.0)
    
    retry = RetryPolicy(max_attempts=3, base_delay=1.0, backoff=2.0)
    executor = AsyncResilientExecutor(
        retry_policy=retry,
        clock=clock
    )
    
    call_count = 0
    
    async def flaky_function():
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            raise TimeoutError("Temporary failure")
        return "success"
    
    # Запускаем
    task = asyncio.create_task(executor.run("test", flaky_function))
    
    # Первая попытка — немедленно
    await asyncio.sleep(0)
    assert call_count == 1
    
    # Retry #1 через 1 сек
    clock.advance(1.0)
    await asyncio.sleep(0)
    assert call_count == 2
    
    # Retry #2 через 2 сек (backoff)
    clock.advance(2.0)
    await asyncio.sleep(0)
    assert call_count == 3
    
    result = await task
    assert result == "success"

Детерминированный, быстрый, понятный тест.

Сравнение с существующими решениями

Feature

LimitPal

limits

slowapi

tenacity

pybreaker

Rate Limiting

Retry Logic

Circuit Breaker

Async Support

⚠️ partial

Композиция всех паттернов

MockClock для тестов

Zero dependencies

Распределённый (Redis)

❌*

⚠️ partial

* В планах для будущих версий

Когда использовать что?

Используйте LimitPal, если:

  • Вам нужна комбинация rate limiting + retry + circuit breaker

  • Вы строите resilient API clients

  • Вам важна чистота кода и тестируемость

  • Вы хотите полноценный async support

Используйте limits, если:

  • Вам нужен ТОЛЬКО rate limiting

  • Вам нужна distributed система с Redis

Используйте tenacity, если:

  • Вам нужен ТОЛЬКО retry без rate limiting

Используйте pybreaker, если:

  • Вам нужен ТОЛЬКО circuit breaker

Для остальных случаев можно рассмотреть LimitPal

Архитектурные решения

1. Единая абстракция времени

Все компоненты принимают Clock интерфейс:

class Clock(Protocol):
    def now(self) -> float: ...
    def sleep(self, seconds: float) -> None: ...
    async def sleep_async(self, seconds: float) -> None: ...

Это позволяет:

  • Использовать MonotonicClock в проде (защита от системных часов)

  • Использовать MockClock в тестах

  • Легко добавлять кастомные реализации

2. Опциональная комбинация стратегий с ResilientExecutor

# Каждый компонент опционален
executor = AsyncResilientExecutor(
    limiter=None,              # можно без rate limiting
    retry_policy=retry,        # только retry
    circuit_breaker=None,      # без circuit breaker
)

# Или все вместе
executor = AsyncResilientExecutor(
    limiter=limiter,
    retry_policy=retry,
    circuit_breaker=breaker,
)

# Или комбинируем несколько limiters
composite = AsyncCompositeLimiter([
    AsyncTokenBucket(capacity=10, refill_rate=5),
    AsyncLeakyBucket(capacity=20, leak_rate=10),
])

3. Sync/Async паритет

Одинаковый API для sync и async:

# Sync
from limitpal import TokenBucket, ResilientExecutor

limiter = TokenBucket(capacity=10, refill_rate=5)
executor = ResilientExecutor(limiter=limiter)

result = executor.run("key", sync_callable)

# Async — только добавьте Async в название
from limitpal import AsyncTokenBucket, AsyncResilientExecutor

limiter = AsyncTokenBucket(capacity=10, refill_rate=5)
executor = AsyncResilientExecutor(limiter=limiter)

result = await executor.run("key", async_callable)

4. Key-based излляция

Все limiters поддерживают изоляцию по ключам:

limiter = TokenBucket(capacity=10, refill_rate=5)

# Разные пользователи — разные buckets
limiter.allow("user:123")
limiter.allow("user:456")
limiter.allow("user:789")

# Разные измерения
limiter.allow(f"ip:{request.ip}")
limiter.allow(f"tenant:{org_id}")
limiter.allow(f"endpoint:/api/users")

С автоматическим управлением памятью:

limiter = TokenBucket(
    capacity=10,
    refill_rate=5,
    ttl=300.0,           # удаляем неактивные buckets через 5 минут
    max_buckets=10000,   # макс 10k buckets (LRU вытеснение)
)

Roadmap

Уже работает (v0.1.1)

  • Token Bucket и Leaky Bucket

  • RetryPolicy с exponential backoff и jitter

  • CircuitBreaker с CLOSED/OPEN/HALF_OPEN

  • ResilientExecutor для композиции

  • CompositeLimiter

  • Полный sync/async support

  • MockClock для тестов

  • Zero dependencies

В планах

  • Observability: встроенные метрики (Prometheus, StatsD)

  • Sliding Window и другие алгоритмы лимитирования алгоритм

  • Adaptive Rate Limiting (автоматическая подстройка)

  • Bulkhead Pattern для изоляции ресурсов

  • Интеграции с фреймворками: FastAPI middleware, Flask decorator

  • Distributed backends: Redis, Memcached

  • Продвинутые Circuit Breaker: callbacks, state transitions

Документация

Полная документация: limitpal.readthedocs.io

Примеры

Больше примеров в репозитории: github.com/Guli-vali/limitpal


Заключение

Построение устойчивых сервисов — это не про "просто добавить retry". Это про правильную композицию паттернов:

  • Rate Limiting защищает от перегрузки (вашей и чужой)

  • Retry справляется с временными сбоями

  • Circuit Breaker предотвращает каскадные отказы

  • Backpressure (через acquire) контролирует нагрузку

LimitPal даёт вам всё это в одной библиотеке с чистым API и отличной тестируемостью.

Что дальше?

  1. Попробуйте сами pip install limitpal

  2. Изучите примеры: github.com/Guli-vali/limitpal

  3. Поделитесь опытом: создайте issue или PR

  4. Расскажите другим: если заинтересовало, буду рад звездам на Github