Приветствую!
Сегодня будем писать бота для хранения истории личных сообщений
Безусловно, идея не уникальная: часть людей уже использует неофициальные клиенты, другая – юзерботов (например, на pyrogram)
Тогда что здесь?
Официальный бизнес-режим бота, не нарушающий TOS, работающий 24/7 и за который не сносят аккаунты (!)
Ближе к делу
Создание бота (Для новичков)
Зайдите в бота @BotFather
Напишите /newbot и как-нибудь обзовите бота, например "Уведомления"
Придумайте ему @юзернейм, подходят только латинские буквы, цифры и нижнее подчеркивание, обязательно напишите "bot" в конце
Бот готов – скопируйте и сохраните токен
Настройте бота:
Напишите /mybots и выберите своего бота
Edit Bot -> Edit Botpic -> Отправьте аватарку для бота
Back to Bot -> Bot Settings -> Business Mode -> Turn On
Написание бота
Стек: python 3.12, aiogram, redis, docker
(Ссылка на GitHub репозиторий внизу статьи)
Для начала придумываем файловую структуру, что-то типа:
.
├── main.py # Запуск бота
├── pyproject.toml # Управление зависимостями Poetry
├── Dockerfile # Инструкции для сборки Docker образа
├── docker-compose.yml # Конфигурация Docker контейнеров (бот + redis)
├── .env # Переменные окружения (токен, настройки)
└── src/
├── bot.py # Основная логика бота, хендлеры
├── keyboards.py # Клавиатуры и коллбэки
└── settings.py # Pydantic парсер .env файла
Теперь нужно установить библиотеки:
poetry add aiogram pydantic pydantic_settings loguru redis
pyproject.toml
должен получится таким:
[tool.poetry]
name = "business-bot"
version = "0.1.0"
description = ""
authors = []
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.12"
aiogram = "^3.13.1"
loguru = "^0.7.2"
redis = "^5.2.0"
pydantic = "^2.9.2"
pydantic-settings = "^2.6.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Далее прописываем конфиг в settings.py
, используя pydantic_settings, который парсит .env
:
REDIS_HOST
,REDIS_PORT
,REDIS_PASSWORD
по умолчанию устанавливаются вdocker-compose.yml
, но они нужны, если Вы будете запускать бота вне контейнера, используя локальный/облачный редис
from typing import Optional
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Environment(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
class Redis(Environment):
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDIS_PASSWORD: Optional[SecretStr] = None
class Bot(Environment):
TOKEN: SecretStr = SecretStr("7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc")
class Settings(Bot, Redis):
USER_ID: int = 0
settings = Settings()
И соответственно настраиваем .env
файл:
Токен бота ранее доставали из Bot Father
Свой ID можно получить с помощью @getmyid_bot
TOKEN="7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc"
USER_ID=123456789
Прописываем переменные проекта в bot.py
:
from aiogram import Bot, Dispatcher
from redis.asyncio import Redis
from .settings import settings
bot = Bot(token=settings.TOKEN.get_secret_value())
dp = Dispatcher()
redis = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=(
settings.REDIS_PASSWORD.get_secret_value()
if settings.REDIS_PASSWORD else None
),
)
и используем их в main.py
для запуска:
from loguru import logger
from src.bot import dp, bot
if __name__ == "__main__":
logger.info("Starting...")
dp.run_polling(
bot,
allowed_updates=[
"callback_query",
"business_message",
"edited_business_message",
"deleted_business_messages",
],
)
Важно! aiogram по умолчанию слушает только следующие хэндлеры:
message
,callback_query
,errors
. Остальные он автоматически не парсит и их нужно добавлять вручную черезallowed_updates
.
В файле bot.py
сохраняем все новые сообщения в Redis с авто-удалением через 3 недели:
EX_TIME = 60 * 60 * 24 * 21
async def set_message(message: types.Message):
await redis.set(
f"{message.chat.id}:{message.message_id}",
message.model_dump_json(),
ex=EX_TIME,
)
@dp.business_message()
async def message(message: types.Message):
await set_message(message)
Теперь нужно как-то красиво сообщать об изменении статуса сообщений. Лучшим вариантом я считаю добавление inline клавиатуры, так не будет редактироваться исходное сообщение.
Первой кнопкой будет статус сообщения – ✏️ или 🗑️
Второй – ссылка на профиль отправителя вида tg://user?id=0
И третьей – кнопка для закрытия уведомления
Для этого прописываем markup и callbacks в файле keyboards.py
:
from enum import StrEnum
from aiogram.utils.keyboard import InlineKeyboardBuilder
class Callbacks(StrEnum):
EMPTY = "empty"
CLOSE = "close"
def link_markup(title: str, user_id: int):
builder = InlineKeyboardBuilder()
builder.button(text=title, callback_data=Callbacks.EMPTY)
builder.button(text="👤", url=f"tg://user?id={user_id}")
builder.button(text="❌", callback_data=Callbacks.CLOSE)
return builder.adjust(3).as_markup()
В примере выше я использую
StrEnum
для константныхcallback_data
. На моей практике, импортировать класс с енамами и в следствии использовать их сF.data
гораздо удобнее, чем в остальных вариантах, например, с глобальными переменными.
Отправляем уведомление при изменении сообщения (здесь создается копия сообщения на основе json строки из Redis):
@dp.edited_business_message()
async def edited_message(message: types.Message):
model_dump = await redis.get(f"{message.chat.id}:{message.message_id}")
await set_message(message)
if not model_dump:
return
original_message = types.Message.model_validate_json(model_dump)
if not original_message.from_user:
return
await original_message.send_copy(
chat_id=settings.USER_ID,
reply_markup=link_markup("✏️", original_message.from_user.id),
).as_(bot)
И аналогично отправляем уведомления при удалении сообщений:
Одним конвейером (pipeline) достаем все удаленные сообщения по их ID и отправляем сохраненные копии
Если сообщений слишком много, мы можем столкнуться с flood wait от телеграма, но все удаленные сообщения должны гарантировано доставиться, поэтому дожидаемся его окончания
И в конце очищаем хранилище от удаленных сообщений
async def copy_message(message: types.Message):
await message.send_copy(
chat_id=settings.USER_ID,
).as_(bot)
@dp.deleted_business_messages()
async def deleted_message(business_messages: types.BusinessMessagesDeleted):
pipe = redis.pipeline()
for message_id in business_messages.message_ids:
pipe.get(f"{business_messages.chat.id}:{message_id}")
messages_data = await pipe.execute()
keys_to_delete = []
for message_id, model_dump in zip(business_messages.message_ids, messages_data):
if not model_dump:
continue
original_message = types.Message.model_validate_json(model_dump)
if not original_message.from_user:
continue
send_copy = original_message.send_copy(
chat_id=settings.USER_ID,
reply_markup=link_markup("🗑️", original_message.from_user.id),
).as_(bot)
try:
await send_copy
except exceptions.TelegramRetryAfter as exp:
logger.warning(f"Retry after {exp.retry_after} seconds")
await asyncio.sleep(exp.retry_after + 0.1)
await send_copy
finally:
await asyncio.sleep(0.1)
keys_to_delete.append(f"{business_messages.chat.id}:{message_id}")
if keys_to_delete:
await redis.delete(*keys_to_delete)
Еще нужно добавить примитивную логику для inline кнопок:
@dp.callback_query(F.data == Callbacks.EMPTY)
async def empty(query: types.CallbackQuery):
await query.answer()
@dp.callback_query(F.data == Callbacks.CLOSE)
async def close(query: types.CallbackQuery):
await query.answer()
if isinstance(query.message, types.Message):
await query.message.delete()
Полный файл bot.py
:
import asyncio
from aiogram import F, Bot, Dispatcher, types, exceptions
from loguru import logger
from redis.asyncio import Redis
from .settings import settings
from .keyboards import link_markup, Callbacks
bot = Bot(token=settings.TOKEN.get_secret_value())
dp = Dispatcher()
redis = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=(
settings.REDIS_PASSWORD.get_secret_value() if settings.REDIS_PASSWORD else None
),
)
EX_TIME = 60 * 60 * 24 * 21
async def set_message(message: types.Message):
await redis.set(
f"{message.chat.id}:{message.message_id}",
message.model_dump_json(),
ex=EX_TIME,
)
@dp.business_message()
async def message(message: types.Message):
await set_message(message)
@dp.edited_business_message()
async def edited_message(message: types.Message):
model_dump = await redis.get(f"{message.chat.id}:{message.message_id}")
await set_message(message)
if not model_dump:
return
original_message = types.Message.model_validate_json(model_dump)
if not original_message.from_user:
return
await original_message.send_copy(
chat_id=settings.USER_ID,
reply_markup=link_markup("✏️", original_message.from_user.id),
).as_(bot)
async def copy_message(message: types.Message):
await message.send_copy(
chat_id=settings.USER_ID,
).as_(bot)
@dp.deleted_business_messages()
async def deleted_message(business_messages: types.BusinessMessagesDeleted):
pipe = redis.pipeline()
for message_id in business_messages.message_ids:
pipe.get(f"{business_messages.chat.id}:{message_id}")
messages_data = await pipe.execute()
keys_to_delete = []
for message_id, model_dump in zip(business_messages.message_ids, messages_data):
if not model_dump:
continue
original_message = types.Message.model_validate_json(model_dump)
if not original_message.from_user:
continue
send_copy = original_message.send_copy(
chat_id=settings.USER_ID,
reply_markup=link_markup("🗑️", original_message.from_user.id),
).as_(bot)
try:
await send_copy
except exceptions.TelegramRetryAfter as exp:
logger.warning(f"Retry after {exp.retry_after} seconds")
await asyncio.sleep(exp.retry_after + 0.1)
await send_copy
finally:
await asyncio.sleep(0.1)
keys_to_delete.append(f"{business_messages.chat.id}:{message_id}")
if keys_to_delete:
await redis.delete(*keys_to_delete)
@dp.callback_query(F.data == Callbacks.EMPTY)
async def empty(query: types.CallbackQuery):
await query.answer()
@dp.callback_query(F.data == Callbacks.CLOSE)
async def close(query: types.CallbackQuery):
await query.answer()
if isinstance(query.message, types.Message):
await query.message.delete()
Почти закончили!
Осталось обернуть все в контейнер:
Dockerfile
FROM python:3.12-slim
RUN pip install poetry
WORKDIR /app
# Копируем и устанавливаем зависимости
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --no-root
# Копируем файлы проекта и переменные окружения
COPY . .
COPY .env .env
EXPOSE 80
CMD ["python3", "main.py"]
docker-compose.yml
services:
redis:
image: redis:alpine
command: redis-server --requirepass ${REDIS_PASSWORD:-password} --save 60 1
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 3
ports:
- "6380:6379"
volumes:
- redis_data:/data
web:
build: .
restart: unless-stopped
ports:
- "8081:80"
depends_on:
redis:
condition: service_healthy
environment:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: ${REDIS_PASSWORD:-password}
volumes:
redis_data:
Вот теперь готово!
Билдим бота и проверяем, что все работает:
docker-compose up --build
Исходный код: https://github.com/Aidenable/BusinessLogger
Это первая статья на Хабре, буду рад любой конструктивной критике!