Как я построил автоматический сервис защиты данных: FastAPI + Marzban + Docker
Задача: человек регистрируется, оплачивает, получает ключ на почту и сразу подключается. Без ручной работы с моей стороны. Под капотом — FastAPI, Marzban, PostgreSQL, React, ЮKassa, Resend. Расскажу про архитектуру, грабли и неочевидные решения.
Стек
Backend: Python 3.11, FastAPI, SQLAlchemy (async), Alembic
Database: PostgreSQL
Frontend: React 18, Vite, Tailwind CSS
VPN: XRay-core, Marzban (панель управления)
Email: Resend API
Payments: ЮKassa + Plisio (крипто)
Infra: Docker Compose, Nginx, CloudflareПочему FastAPI а не Django/Flask: нативный async нужен везде — платёжные вебхуки, запросы к Marzban API, отправка писем должны работать параллельно без блокировок. SQLAlchemy async + asyncpg дают чистую картину без thread pool workaround'ов.
Архитектура
Пользователь
↓ HTTPS
Cloudflare (проксирование, скрытие IP сервера)
↓
Nginx
/ → frontend:3000 (React SPA)
/api/ → backend:8080 (FastAPI)
↓
┌─────────────────────────────────────────┐
│ FastAPI │
│ /auth /configs /payment /support │
└──────────────┬──────────────────────────┘
│
┌──────────┼──────────┬──────────────┐
↓ ↓ ↓ ↓
PostgreSQL Marzban Resend ЮKassa
REST API (email) PlisioMarzban запущен на том же хосте, но вне Docker. Backend стучится к нему через прокси на bridge-интерфейсе Docker — стандартное решение для связи контейнера с сервисами на хосте.
Модели данных
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
hashed_password: Mapped[str]
is_verified: Mapped[bool] = mapped_column(default=False)
marzban_username: Mapped[str | None] = mapped_column(unique=True, nullable=True)
referral_code: Mapped[str] = mapped_column(
unique=True, default=lambda: secrets.token_urlsafe(8)
)
trial_used: Mapped[bool] = mapped_column(default=False)
email_verify_token: Mapped[str | None] = mapped_column(nullable=True, index=True)
class Subscription(Base):
__tablename__ = "subscriptions"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
plan: Mapped[str]
status: Mapped[str] # active | expired | pending
devices: Mapped[int] = mapped_column(default=1)
started_at: Mapped[datetime] = mapped_column(server_default=func.now())
expires_at: Mapped[datetime]
marzban_expire_ts: Mapped[int]
class Payment(Base):
__tablename__ = "payments"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
provider: Mapped[str] # yookassa | plisio
external_id: Mapped[str]
amount: Mapped[float]
status: Mapped[str] # pending | paid | failed
plan: Mapped[str]
months: Mapped[int]Marzban интеграция
Кеширование токена
Marzban токен живёт 1 час. Кешируем на 55 минут с локом чтобы избежать race condition при параллельных запросах:
class MarzbanService:
def __init__(self):
self._token: str | None = None
self._token_expires_at: float = 0
self._lock = asyncio.Lock()
async def get_token(self) -> str:
async with self._lock:
if self._token and time.time() < self._token_expires_at:
return self._token
self._token = await self._fetch_token()
self._token_expires_at = time.time() + 55 * 60
return self._tokenСоздание пользователя
async def create_user(self, username: str, months: int, days: int = 0) -> dict:
expire_ts = _months_to_timestamp(months)
if days:
expire_ts = int(datetime.now(timezone.utc).timestamp()) + days * 86400
payload = {
"username": username,
"proxies": {"vless": {"flow": "xtls-rprx-vision"}},
"inbounds": {"vless": ["VLESS Reality"]},
"expire": expire_ts,
"data_limit": 0,
"data_limit_reset_strategy": "no_reset",
}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
f"{settings.MARZBAN_URL}/api/user",
json=payload,
headers=await self._headers(),
)
resp.raise_for_status()
return resp.json()Marzban возвращает объект с полем links — там готовый vless://... ключ. Параметр days добавлен для триала.
Продление подписки
Если подписка уже истекла — продлеваем от текущего момента:
async def extend_subscription(self, username: str, months: int) -> dict:
user = await self.get_user(username)
current_expire = user.get("expire") or 0
base = max(current_expire, int(time.time())) # от now если уже истекло
base_dt = datetime.fromtimestamp(base, tz=timezone.utc)
# вычисляем новую дату и PUT /api/user/{username}Идемпотентность вебхуков
Самое важное место. ЮKassa повторяет вебхук если не получила ответ за 10 секунд. Без защиты от дублей получаем двойные подписки и несколько писем — столкнулся на практике.
async def activate_subscription(payment: Payment) -> None:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Payment).where(Payment.id == payment.id)
)
fresh_payment = result.scalar_one()
# Idempotency check
if fresh_payment.status == "paid":
logger.info(f"Payment {payment.id} already activated, skipping")
return
try:
await _do_activate(fresh_payment, session)
except Exception as e:
await session.rollback()
logger.error(f"Activation failed: {e}", exc_info=True)
raiseВажно: используем отдельную сессию (AsyncSessionLocal), а не ту что пришла из вебхука — к моменту выполнения background task оригинальная сессия уже закрыта.
Описание платежа для ЮKassa — без стоп-слов:
description = "Подписка на сервис защиты данных"Cloudflare + IP whitelist ЮKassa
После подключения Cloudflare вебхуки от ЮKassa перестали проходить — запросы приходят с IP Cloudflare. Cloudflare передаёт реальный IP клиента в заголовке CF-Connecting-IP:
@router.post("/yookassa/webhook")
async def yookassa_webhook(request: Request, ...):
client_ip = (
request.headers.get("CF-Connecting-IP")
or request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
or request.client.host
)
if settings.ENVIRONMENT == "production":
allowed_prefixes = ("185.71.76.", "185.71.77.", "77.75.153.",
"77.75.154.", "77.75.156.")
allowed_exact = {"77.75.156.11", "77.75.156.35"}
if client_ip not in allowed_exact and not any(
client_ip.startswith(p) for p in allowed_prefixes
):
raise HTTPException(status_code=403)Триал: 3 дня через верификацию email
@router.get("/verify-email")
async def verify_email(token: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(User).where(User.email_verify_token == token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=400, detail="Неверная ссылка")
if user.is_verified:
return {"status": "already_verified"}
user.is_verified = True
user.email_verify_token = None
await db.commit()
asyncio.create_task(activate_trial(user.id))
return {"status": "verified"}
async def activate_trial(user_id: int) -> None:
async with AsyncSessionLocal() as session:
user = ...
if user.trial_used:
return # Один email — один триал
existing_sub = await session.execute(
select(Subscription).where(Subscription.user_id == user_id)
)
if existing_sub.scalar_one_or_none():
return # Уже есть платная подписка
marzban_user = await marzban.create_user(username, months=0, days=3)
user.trial_used = True
# создаём Subscription, отправляем письмо с ключомДинамическое продление с доп. устройствами
Пользователь может докупить дополнительные устройства (+99₽). При продлении сохраняем их:
if existing_sub and existing_sub.status == "active":
extra_devices = max(0, existing_sub.devices - plan_cfg["devices"])
existing_sub.expires_at = expires_at
existing_sub.plan = plan_cfg["name"]
existing_sub.devices = plan_cfg["devices"] + extra_devicesЦена продления считается динамически:
async def _calc_total(plan_key: str, user: User, db: AsyncSession) -> int:
plan = PLANS[plan_key]
sub = await _get_active_sub(user.id, db)
extra_devices = max(0, (sub.devices if sub else 0) - plan["devices"])
return plan["price_rub"] + extra_devices * DEVICE_ADD_PRICEФоновые задачи: APScheduler
scheduler = AsyncIOScheduler(timezone="Europe/Moscow")
scheduler.add_job(
send_daily_report,
CronTrigger(hour=8, minute=0),
id="daily_report",
)
scheduler.add_job(
send_expiry_reminders,
CronTrigger(hour=9, minute=0),
id="expiry_reminders",
)
scheduler.add_job(
check_pending_payments,
IntervalTrigger(minutes=5),
id="check_pending_payments",
)Docker Compose v1: баг при пересборке
На Ubuntu 22.04 стоит docker-compose v1 (1.29). При пересборке образов падает:
KeyError: 'ContainerConfig'Баг старой версии при обновлении образа с существующим контейнером. Решение:
docker-compose stop service
docker-compose rm -f service
docker-compose up -d serviceИтог
Полный путь от регистрации до работающего ключа — несколько секунд после оплаты. Ручной работы ноль.
Самые нетривиальные места оказались не там где ожидал: не VPN-протоколы и не платёжки, а идемпотентность вебхуков, сетевая связность Docker↔хост и IP-проксирование через Cloudflare.
Живой пример: data-shield.ru