Задача: человек регистрируется, оплачивает, получает ключ на почту и сразу подключается. Без ручной работы с моей стороны. Под капотом — FastAPI, Marzban, PostgreSQL, React, ЮKassa, Resend. Расскажу про архитектуру, грабли и неочевидные решения.

Стек

Backend:    Python 3.11, FastAPI, SQLAlchemy (async), Alembic
Database:   PostgreSQL
Frontend:   React 18, Vite, Tailwind CSS
VPN:        XRay-core, Marzban (панель управления)
Email:      Resend API
Payments:   ЮKassa + Plisio (крипто)
Infra:      Docker Compose, Nginx, Cloudflare

Почему FastAPI а не Django/Flask: нативный async нужен везде — платёжные вебхуки, запросы к Marzban API, отправка писем должны работать параллельно без блокировок. SQLAlchemy async + asyncpg дают чистую картину без thread pool workaround'ов.

Архитектура

Пользователь
    ↓ HTTPS
Cloudflare (проксирование, скрытие IP сервера)
    ↓
Nginx
  /          → frontend:3000  (React SPA)
  /api/      → backend:8080   (FastAPI)
    ↓
┌─────────────────────────────────────────┐
│              FastAPI                     │
│  /auth  /configs  /payment  /support    │
└──────────────┬──────────────────────────┘
               │
    ┌──────────┼──────────┬──────────────┐
    ↓          ↓          ↓              ↓
PostgreSQL  Marzban    Resend         ЮKassa
            REST API   (email)        Plisio

Marzban запущен на том же хосте, но вне Docker. Backend стучится к нему через прокси на bridge-интерфейсе Docker — стандартное решение для связи контейнера с сервисами на хосте.

Модели данных

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(unique=True, index=True)
    hashed_password: Mapped[str]
    is_verified: Mapped[bool] = mapped_column(default=False)
    marzban_username: Mapped[str | None] = mapped_column(unique=True, nullable=True)
    referral_code: Mapped[str] = mapped_column(
        unique=True, default=lambda: secrets.token_urlsafe(8)
    )
    trial_used: Mapped[bool] = mapped_column(default=False)
    email_verify_token: Mapped[str | None] = mapped_column(nullable=True, index=True)

class Subscription(Base):
    __tablename__ = "subscriptions"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    plan: Mapped[str]
    status: Mapped[str]   # active | expired | pending
    devices: Mapped[int] = mapped_column(default=1)
    started_at: Mapped[datetime] = mapped_column(server_default=func.now())
    expires_at: Mapped[datetime]
    marzban_expire_ts: Mapped[int]

class Payment(Base):
    __tablename__ = "payments"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    provider: Mapped[str]      # yookassa | plisio
    external_id: Mapped[str]
    amount: Mapped[float]
    status: Mapped[str]        # pending | paid | failed
    plan: Mapped[str]
    months: Mapped[int]

Marzban интеграция

Кеширование токена

Marzban токен живёт 1 час. Кешируем на 55 минут с локом чтобы избежать race condition при параллельных запросах:

class MarzbanService:
    def __init__(self):
        self._token: str | None = None
        self._token_expires_at: float = 0
        self._lock = asyncio.Lock()

    async def get_token(self) -> str:
        async with self._lock:
            if self._token and time.time() < self._token_expires_at:
                return self._token
            self._token = await self._fetch_token()
            self._token_expires_at = time.time() + 55 * 60
            return self._token

Создание пользователя

async def create_user(self, username: str, months: int, days: int = 0) -> dict:
    expire_ts = _months_to_timestamp(months)
    if days:
        expire_ts = int(datetime.now(timezone.utc).timestamp()) + days * 86400

    payload = {
        "username": username,
        "proxies": {"vless": {"flow": "xtls-rprx-vision"}},
        "inbounds": {"vless": ["VLESS Reality"]},
        "expire": expire_ts,
        "data_limit": 0,
        "data_limit_reset_strategy": "no_reset",
    }
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.post(
            f"{settings.MARZBAN_URL}/api/user",
            json=payload,
            headers=await self._headers(),
        )
        resp.raise_for_status()
        return resp.json()

Marzban возвращает объект с полем links — там готовый vless://... ключ. Параметр days добавлен для триала.

Продление подписки

Если подписка уже истекла — продлеваем от текущего момента:

async def extend_subscription(self, username: str, months: int) -> dict:
    user = await self.get_user(username)
    current_expire = user.get("expire") or 0
    base = max(current_expire, int(time.time()))  # от now если уже истекло
    base_dt = datetime.fromtimestamp(base, tz=timezone.utc)
    # вычисляем новую дату и PUT /api/user/{username}

Идемпотентность вебхуков

Самое важное место. ЮKassa повторяет вебхук если не получила ответ за 10 секунд. Без защиты от дублей получаем двойные подписки и несколько писем — столкнулся на практике.

async def activate_subscription(payment: Payment) -> None:
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(Payment).where(Payment.id == payment.id)
        )
        fresh_payment = result.scalar_one()

        # Idempotency check
        if fresh_payment.status == "paid":
            logger.info(f"Payment {payment.id} already activated, skipping")
            return

        try:
            await _do_activate(fresh_payment, session)
        except Exception as e:
            await session.rollback()
            logger.error(f"Activation failed: {e}", exc_info=True)
            raise

Важно: используем отдельную сессию (AsyncSessionLocal), а не ту что пришла из вебхука — к моменту выполнения background task оригинальная сессия уже закрыта.

Описание платежа для ЮKassa — без стоп-слов:

description = "Подписка на сервис защиты данных"

Cloudflare + IP whitelist ЮKassa

После подключения Cloudflare вебхуки от ЮKassa перестали проходить — запросы приходят с IP Cloudflare. Cloudflare передаёт реальный IP клиента в заголовке CF-Connecting-IP:

@router.post("/yookassa/webhook")
async def yookassa_webhook(request: Request, ...):
    client_ip = (
        request.headers.get("CF-Connecting-IP")
        or request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
        or request.client.host
    )
    if settings.ENVIRONMENT == "production":
        allowed_prefixes = ("185.71.76.", "185.71.77.", "77.75.153.",
                           "77.75.154.", "77.75.156.")
        allowed_exact = {"77.75.156.11", "77.75.156.35"}
        if client_ip not in allowed_exact and not any(
            client_ip.startswith(p) for p in allowed_prefixes
        ):
            raise HTTPException(status_code=403)

Триал: 3 дня через верификацию email

@router.get("/verify-email")
async def verify_email(token: str, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(User).where(User.email_verify_token == token)
    )
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=400, detail="Неверная ссылка")
    if user.is_verified:
        return {"status": "already_verified"}

    user.is_verified = True
    user.email_verify_token = None
    await db.commit()

    asyncio.create_task(activate_trial(user.id))
    return {"status": "verified"}

async def activate_trial(user_id: int) -> None:
    async with AsyncSessionLocal() as session:
        user = ...
        if user.trial_used:
            return  # Один email — один триал

        existing_sub = await session.execute(
            select(Subscription).where(Subscription.user_id == user_id)
        )
        if existing_sub.scalar_one_or_none():
            return  # Уже есть платная подписка

        marzban_user = await marzban.create_user(username, months=0, days=3)
        user.trial_used = True
        # создаём Subscription, отправляем письмо с ключом

Динамическое продление с доп. устройствами

Пользователь может докупить дополнительные устройства (+99₽). При продлении сохраняем их:

if existing_sub and existing_sub.status == "active":
    extra_devices = max(0, existing_sub.devices - plan_cfg["devices"])
    existing_sub.expires_at = expires_at
    existing_sub.plan = plan_cfg["name"]
    existing_sub.devices = plan_cfg["devices"] + extra_devices

Цена продления считается динамически:

async def _calc_total(plan_key: str, user: User, db: AsyncSession) -> int:
    plan = PLANS[plan_key]
    sub = await _get_active_sub(user.id, db)
    extra_devices = max(0, (sub.devices if sub else 0) - plan["devices"])
    return plan["price_rub"] + extra_devices * DEVICE_ADD_PRICE

Фоновые задачи: APScheduler

scheduler = AsyncIOScheduler(timezone="Europe/Moscow")

scheduler.add_job(
    send_daily_report,
    CronTrigger(hour=8, minute=0),
    id="daily_report",
)
scheduler.add_job(
    send_expiry_reminders,
    CronTrigger(hour=9, minute=0),
    id="expiry_reminders",
)
scheduler.add_job(
    check_pending_payments,
    IntervalTrigger(minutes=5),
    id="check_pending_payments",
)

Docker Compose v1: баг при пересборке

На Ubuntu 22.04 стоит docker-compose v1 (1.29). При пересборке образов падает:

KeyError: 'ContainerConfig'

Баг старой версии при обновлении образа с существующим контейнером. Решение:

docker-compose stop service
docker-compose rm -f service
docker-compose up -d service

Итог

Полный путь от регистрации до работающего ключа — несколько секунд после оплаты. Ручной работы ноль.

Самые нетривиальные места оказались не там где ожидал: не VPN-протоколы и не платёжки, а идемпотентность вебхуковсетевая связность Docker↔хост и IP-проксирование через Cloudflare.

Живой пример: data-shield.ru