Расскажу, как собрал бота для AI-суммаризации Telegram-каналов: архитектура, выбор LLM-провайдера, оптимизация скорости и неочевидные проблемы при деплое на российский VPS.
Проблема
Подписан на 50+ Telegram-каналов. Каждое утро - 200+ непрочитанных сообщений. Читаю 10%, остальное скроллю. Классический information overload.
Идея: пусть LLM читает посты и присылает выжимку. Но не просто "саммари", а двухступенчатый подход:
Real Topic - о чём пост на самом деле (1-2 предложения)
TL;DR - полная выжимка с ключевыми тезисами
Логика: даже саммари 50 постов - много. А пробежать глазами "о чём это" - секунды.
Архитектура

Почему Telethon, а не Bot API
Bot API не умеет читать сообщения из каналов, на которые бот не добавлен администратором. Telethon работает через MTProto как обычный клиент - может читать любые публичные каналы.
python
from telethon import TelegramClient client = TelegramClient('session', api_id, api_hash) async def get_posts(channel_username: str, limit: int = 20): entity = await client.get_entity(channel_username) posts = [] async for message in client.iter_messages(entity, limit=limit): if message.text and len(message.text) > 50: posts.append({ 'id': message.id, 'text': message.text, 'date': message.date, 'url': f'https://t.me/{channel_username}/{message.id}' }) return posts
Стек
Компонент | Технология | Почему |
|---|---|---|
Bot Framework | aiogram 3.x | Async, типизация, активная разработка |
Channel Parser | Telethon | MTProto, читает любые публичные каналы |
LLM | DeepSeek API | Работает из РФ, дёшево ($0.14/1M input) |
Database | SQLite + aiosqlite | Достаточно для MVP, zero config |
Scheduler | APScheduler | Простой, async-совместимый |
Config | Pydantic Settings | Валидация, типизация, .env из коробки |
Промпт для анализа постов
python
ANALYZE_POST_PROMPT = """Проанализируй пост из Telegram-канала и верни JSON. Канал: {channel_name} Текст поста: --- {post_content} --- Верни JSON строго в таком формате: {{ "real_topic": "1-2 предложения о чём НА САМОМ ДЕЛЕ этот пост", "tldr": "Краткое изложение в 2-4 предложениях с главными фактами", "key_insights": ["ключевой инсайт 1", "ключевой инсайт 2"], "relevance_score": 7, "content_type": "news" }} Где content_type: "news", "opinion", "tutorial", "announcement", "other" Отвечай ТОЛЬКО валидным JSON без markdown-разметки."""
Ключевые моменты:
Явное указание формата вывода (JSON)
Примеры полей прямо в промпте
Ограничение на markdown - иначе LLM оборачивает JSON в
json
Грабля #1: Groq не работает из России
Изначально выбрал Groq - бесплатный, быстрый (Llama 3.3 70B за ~1 сек). Локально всё работало. После деплоя на Timeweb (российский VPS) - 403 Forbidden.
Сравнение провайдеров
Провайдер | Скорость | Доступ из РФ | Цена (1M input) | Цена (1M output) |
|---|---|---|---|---|
Groq | ~1 сек | ❌ Blocked | Бесплатно (лимиты) | Бесплатно |
OpenAI | ~2-3 сек | ❌ Blocked | $2.50 (GPT-4o-mini) | $10.00 |
DeepSeek | ~5-10 сек | ✅ Работает | $0.14 | $0.28 |
Mistral | ~3-5 сек | ✅ Работает | $0.25 | $0.25 |
Выбрал DeepSeek - работает из России, адекватная цена, качество на уровне GPT-4o-mini.
Универсальный клиент
Сделал клиент с поддержкой разных провайдеров через OpenAI-совместимый API:
python
from openai import AsyncOpenAI class LLMClient: PROVIDERS = { "deepseek": { "base_url": "https://api.deepseek.com", "default_model": "deepseek-chat" }, "groq": { "base_url": "https://api.groq.com/openai/v1", "default_model": "llama-3.3-70b-versatile" }, } def __init__(self, provider: str, api_key: str): config = self.PROVIDERS[provider] self.client = AsyncOpenAI( base_url=config["base_url"], api_key=api_key ) self.model = config["default_model"] async def complete(self, prompt: str, system: str = None) -> str: messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) response = await self.client.chat.completions.create( model=self.model, messages=messages, temperature=0.3 ) return response.choices[0].message.content
Грабля #2: Дайджест генерируется 2 минуты
После перехода на DeepSeek время ответа выросло с ~1 сек (Groq) до ~8-10 сек. При 9 постах последовательная обработка занимала 80+ секунд.
Диагностика
Добавил логирование времени:
python
import time async def analyze_post(self, content: str, channel: str): start = time.time() result = await self.client.complete(...) elapsed = time.time() - start logger.info(f"LLM call took {elapsed:.2f}s for {channel}") return result
Вывод:
LLM call took 5.39s for channel: channel_1 LLM call took 8.57s for channel: channel_1 LLM call took 9.12s for channel: channel_2 ... Total: 86.3s for 9 posts
Решение: параллельная обработка
python
import asyncio async def analyze_batch( self, posts: list[dict], concurrency: int = 5 ) -> list[dict]: semaphore = asyncio.Semaphore(concurrency) async def process_one(post: dict): async with semaphore: return await self.analyze_post( content=post["content"], channel=post["channel"] ) tasks = [process_one(post) for post in posts] return await asyncio.gather(*tasks)
Результаты
Режим | Время (9 постов) | Ускорение |
|---|---|---|
Последовательно | 86 сек | — |
concurrency=3 | 28 сек | 3x |
concurrency=5 | 19 сек | 4.5x |
Семафор нужен, чтобы не упереться в rate limit API.
Грабля #3: Один канал забивает весь дайджест
Пользователь добавил 3 канала:
Канал A: 20 постов/день
Канал B: 5 постов/день
Канал C: 3 поста/день
При лимите 15 постов на дайджест канал C не попадал вообще - посты собирались в порядке добавления каналов.
Алгоритм равномерного распределения
python
def distribute_posts_evenly( posts_by_channel: dict[str, list], max_total: int = 30 ) -> list: """ Round-robin распределение постов из разных каналов. Args: posts_by_channel: {"channel_1": [post1, post2], ...} max_total: максимум постов в итоговом списке Returns: Список постов, равномерно распределённых по каналам """ channels = list(posts_by_channel.keys()) if not channels: return [] fair_share = max_total // len(channels) result = [] # Фаза 1: берём по fair_share из каждого канала for channel in channels: posts = posts_by_channel[channel][:fair_share] for post in posts: post['_channel'] = channel result.extend(posts) # Фаза 2: добираем оставшиеся слоты remaining = max_total - len(result) if remaining > 0: for channel in channels: leftover = posts_by_channel[channel][fair_share:] for post in leftover[:remaining]: post['_channel'] = channel result.append(post) remaining -= 1 if remaining <= 0: break if remaining <= 0: break return result
Теперь при 3 каналах и лимите 30: каждый получает минимум 10 слотов.
Модель данных
python
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, BigInteger from sqlalchemy.orm import relationship, declarative_base Base = declarative_base() class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) telegram_id = Column(BigInteger, unique=True, index=True) digest_time = Column(String, default="07:00") # HH:MM timezone = Column(String, default="Europe/Moscow") is_active = Column(Boolean, default=True) subscriptions = relationship("UserChannel", back_populates="user") class Channel(Base): __tablename__ = "channels" id = Column(Integer, primary_key=True) username = Column(String, unique=True, index=True) # без @ title = Column(String, nullable=True) class UserChannel(Base): """Many-to-many: пользователь <-> канал""" __tablename__ = "user_channels" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("users.id")) channel_id = Column(Integer, ForeignKey("channels.id")) added_at = Column(DateTime, default=datetime.utcnow) user = relationship("User", back_populates="subscriptions") channel = relationship("Channel")
SQLite с WAL-режимом справляется с конкурентными запросами:
python
async with engine.begin() as conn: await conn.execute(text("PRAGMA journal_mode=WAL"))
Планировщик ежедневных дайджестов
python
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger scheduler = AsyncIOScheduler() async def schedule_user_digests(): """Планирует дайджест для каждого пользователя на его время""" users = await get_active_users() for user in users: hour, minute = map(int, user.digest_time.split(':')) scheduler.add_job( send_digest, CronTrigger(hour=hour, minute=minute, timezone=user.timezone), args=[user.telegram_id], id=f"digest_{user.telegram_id}", replace_existing=True ) async def send_digest(telegram_id: int): """Генерирует и отправляет дайджест пользователю""" user = await get_user(telegram_id) channels = await get_user_channels(user) # Собираем посты all_posts = {} for channel in channels: posts = await parser.get_posts(channel.username, hours_back=24) all_posts[channel.username] = posts # Равномерно распределяем distributed = distribute_posts_evenly(all_posts, max_total=30) # Анализируем через LLM (параллельно) analyzed = await processor.analyze_batch(distributed, concurrency=5) # Формируем и отправляем digest_text = format_digest(analyzed) await bot.send_message(telegram_id, digest_text)
Деплой
Systemd unit
ini
# /etc/systemd/system/briefka.service [Unit] Description=Briefka Telegram Bot After=network.target [Service] Type=simple User=briefka WorkingDirectory=/opt/briefka ExecStart=/opt/briefka/venv/bin/python scripts/run_bot.py Restart=always RestartSec=10 Environment=PYTHONUNBUFFERED=1 [Install] WantedBy=multi-user.target
Бэкап базы (cron)
bash
#!/bin/bash # /opt/briefka/scripts/backup.sh BACKUP_DIR="/opt/briefka/backups" DB_PATH="/opt/briefka/briefka.db" DATE=$(date +%Y%m%d_%H%M%S) mkdir -p $BACKUP_DIR cp $DB_PATH "$BACKUP_DIR/briefka_$DATE.db" # Удаляем бэкапы старше 7 дней find $BACKUP_DIR -name "*.db" -mtime +7 -delete
cron
0 3 * * * /opt/briefka/scripts/backup.sh >> /opt/briefka/logs/backup.log 2>&1
Метрики
Параметр | Значение |
|---|---|
Время генерации дайджеста (10 постов) | 15-20 сек |
Потребление RAM | ~95 MB |
Стоимость VPS (Timeweb) | 530 ₽/мес |
Стоимость LLM (DeepSeek, ~100 дайджестов) | ~$0.10/мес |
Что дальше
Персонализация - сохранять фидбек (лайк/дизлайк), обучать ранжирование под интересы пользователя
Кэширование - не парсить каналы при каждом запросе, фоновое обновление по cron
Rate limiting - защита от абуза (сейчас лимит: 10 каналов на пользователя)
Выводы
Telethon > Bot API для чтения каналов. Bot API не умеет читать сообщения из чужих каналов.
Региональные ограничения реальны. Groq, OpenAI заблокированы в РФ. DeepSeek работает.
Параллельность решает. Простой
asyncio.gather()с семафором дал ускорение в 4.5 раза.SQLite достаточно для MVP. С WAL-режимом справляется с конкурентными запросами.
MVP можно запустить за 500 ₽/мес. Не нужен дорогой сервер для Telegram-бота.
Бот работает в бета-режиме: t.me/briefka_bot
Буду рад вопросам и фидбеку в комментариях.
