Мессенджер MAX набирает обороты в корпоративном сегменте. У него есть Bot API, но документации и примеров интеграции в открытом доступе минимум. В этой статье покажу, как за полчаса поднять микросервис, который принимает и отправляет сообщения MAX, и подключить его к любой CRM или внутренней системе.
Что получим в итоге
FastAPI-микросервис на Python
Приём входящих сообщений через Long Polling
Отправка ответов из CRM обратно в MAX
Автоматическое переподключение при обрывах
Systemd-сервис для продакшена
Архитектура простая:
Пользователь MAX ←→ MAX Platform API ←→ Наш микросервис ←→ CRM / API
Шаг 1. Создаём бота
Откройте MAX и найдите бота @MasterBot
Напишите ему
/newbotЗадайте имя и username
Скопируйте полученный токен — он понадобится дальше
Токен выглядит примерно так:
f9LHodD0cOJZ0b30xpIJT...
Шаг 2. Структура проекта
max-bot/ ├── app/ │ ├── __init__.py │ ├── config.py # Конфигурация │ ├── client.py # Клиент MAX + обработчики │ └── main.py # FastAPI-приложение ├── .env # Токен и настройки ├── requirements.txt └── inteltek-max.service # Systemd (опционально)
Шаг 3. Зависимости
# requirements.txt maxapi fastapi==0.109.0 uvicorn[standard]==0.27.0 httpx==0.27.0 python-dotenv==1.0.0
Библиотека maxapi — это обёртка над MAX Bot API. Устанавливаем:
python3 -m venv venv source venv/bin/activate pip install -r requirements.txt
Шаг 4. Конфигурация
# app/config.py import os from dotenv import load_dotenv load_dotenv() MAX_BOT_TOKEN = os.getenv("MAX_BOT_TOKEN", "") MAX_API_BASE = "https://platform-api.max.ru" # URL вашей CRM/API, куда пересылать входящие сообщения MAIN_API_URL = os.getenv("MAIN_API_URL", "http://localhost:8001") SERVICE_PORT = int(os.getenv("SERVICE_PORT", "8015"))
# .env MAX_BOT_TOKEN=ваш_токен_от_MasterBot MAIN_API_URL=http://localhost:8001 SERVICE_PORT=8015
Шаг 5. Клиент MAX с обработчиками
Это ядро сервиса. Здесь происходит:
подключение к MAX API
Long Polling для получения сообщений
пересылка сообщений в CRM
отправка ответов
# app/client.py import asyncio import logging import sys from datetime import datetime, timezone import httpx from maxapi import Bot, Dispatcher from maxapi.types import MessageCreated, BotStarted from app.config import MAX_BOT_TOKEN, MAIN_API_URL logger = logging.getLogger(__name__) RECONNECT_DELAY = 5 MAX_RECONNECT_DELAY = 60 # Глобальная ссылка на бота — нужна для пересоздания при обрывах _this = sys.modules[__name__] _this.bot = Bot(MAX_BOT_TOKEN) dp = Dispatcher() class MaxBotClient: def __init__(self): self._main_api = MAIN_API_URL self._bot_info = None self._is_connected = False self._poll_task: asyncio.Task | None = None self._stopping = False @property def is_connected(self) -> bool: return self._is_connected async def start(self): """Подключение к MAX API и запуск polling.""" if not MAX_BOT_TOKEN: logger.error("MAX_BOT_TOKEN is not set") return self._stopping = False try: me = await _this.bot.get_me() self._bot_info = me self._is_connected = True logger.info(f"Connected to MAX as {me.first_name} (id={me.user_id})") except Exception as e: logger.error(f"Failed to connect to MAX: {e}") return self._poll_task = asyncio.create_task(self._resilient_polling()) async def _resilient_polling(self): """Polling с автоматическим переподключением.""" delay = RECONNECT_DELAY while not self._stopping: try: logger.info("Starting MAX polling...") dp.polling = True await dp.start_polling(_this.bot) if not self._stopping: logger.warning("Polling ended, reconnecting...") except asyncio.CancelledError: return except Exception as e: logger.error(f"Polling error: {e}") if self._stopping: return self._is_connected = False logger.info(f"Reconnecting in {delay}s...") await asyncio.sleep(delay) delay = min(delay * 2, MAX_RECONNECT_DELAY) # Пересоздаём сессию try: await _this.bot.close_session() except Exception: pass try: _this.bot = Bot(MAX_BOT_TOKEN) me = await _this.bot.get_me() self._bot_info = me self._is_connected = True delay = RECONNECT_DELAY logger.info(f"Reconnected to MAX as {me.first_name}") except Exception as e: logger.error(f"Reconnect failed: {e}") async def stop(self): self._stopping = True dp.polling = False if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() try: await self._poll_task except asyncio.CancelledError: pass await _this.bot.close_session() self._is_connected = False async def send_message(self, chat_id: str, text: str) -> dict: """Отправить сообщение в MAX.""" result = await _this.bot.send_message( chat_id=int(chat_id), text=text ) return { "message_id": str(getattr(result, "message_id", "")), "chat_id": chat_id, "text": text, "sent_at": datetime.now(timezone.utc).isoformat(), } async def get_status(self) -> dict: info = None if self._bot_info: info = { "id": self._bot_info.user_id, "name": self._bot_info.first_name, "username": self._bot_info.username, } return {"connected": self._is_connected, "bot": info} max_bot_client = MaxBotClient()
Теперь — обработчики событий. Они объявляются на уровне модуля через декораторы диспетчера:
# Продолжение app/client.py @dp.message_created() async def on_message(event: MessageCreated): """Входящее сообщение → пересылаем в CRM.""" msg = event.message sender = msg.sender body = msg.body text = body.text if body and hasattr(body, "text") else None if not text: return # Игнорируем сообщения от ботов if sender and sender.is_bot: return chat_id = str(msg.recipient.chat_id) sender_name = " ".join( filter(None, [ getattr(sender, "first_name", None), getattr(sender, "last_name", None), ]) ) or "Неизвестный" sent_at = datetime.fromtimestamp( msg.timestamp / 1000, tz=timezone.utc ).isoformat() if msg.timestamp else datetime.now(timezone.utc).isoformat() # Формируем payload для CRM payload = { "channel": "max", "chat_id": chat_id, "text": text, "sender_name": sender_name, "sender_username": getattr(sender, "username", None), "sent_at": sent_at, } try: async with httpx.AsyncClient(timeout=10) as http: resp = await http.post( f"{max_bot_client._main_api}/api/messages/incoming", json=payload, ) if resp.status_code in (200, 201): logger.info(f"Message forwarded to CRM") else: logger.error(f"CRM returned {resp.status_code}") except Exception as e: logger.error(f"Failed to forward message: {e}") @dp.bot_started() async def on_bot_started(event: BotStarted): """Пользователь нажал 'Начать'.""" name = event.user.first_name or event.user.username logger.info(f"Bot started by {name}")
Важные детали
Зачем _this = sys.modules[__name__]? При обрыве соединения нужно пересоздать объект Bot. Но обработчики (on_message) были зарегистрированы при импорте модуля и ссылаются на старый объект. Через _this.bot мы обращаемся к атрибуту модуля, который всегда указывает на актуальный экземпляр.
Exponential backoff. При обрыве задержка между попытками растёт: 5 → 10 → 20 → 40 → 60 секунд. После успешного переподключения сбрасывается обратно до 5. Это защищает от DDoS'а на API при массовых сбоях.
Шаг 6. FastAPI-приложение
# app/main.py import logging from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from pydantic import BaseModel from app.client import max_bot_client from app.config import SERVICE_PORT logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): await max_bot_client.start() logger.info("MAX Bot service started") yield await max_bot_client.stop() logger.info("MAX Bot service stopped") app = FastAPI(title="MAX Bot Service", lifespan=lifespan) @app.get("/health") async def health(): status = await max_bot_client.get_status() return {"service": "max-bot", **status} class SendMessageBody(BaseModel): chat_id: str text: str @app.post("/send-message") async def send_message(body: SendMessageBody): """Отправить сообщение из CRM → MAX.""" if not max_bot_client.is_connected: raise HTTPException(status_code=503, detail="Not connected") try: return await max_bot_client.send_message(body.chat_id, body.text) except Exception as e: raise HTTPException(status_code=500, detail=str(e))
Шаг 7. Запуск
Локально (для разработки)
cd max-bot source venv/bin/activate uvicorn app.main:app --host 0.0.0.0 --port 8015 --reload
В логах увидим:
Connected to MAX as MyBot (id=12345678) Starting MAX polling...
На сервере (systemd)
# inteltek-max.service [Unit] Description=MAX Bot Service After=network.target [Service] Type=simple User=www-data WorkingDirectory=/opt/max-bot ExecStart=/opt/max-bot/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8015 Restart=on-failure RestartSec=5 Environment=PYTHONUNBUFFERED=1 [Install] WantedBy=multi-user.target
sudo cp inteltek-max.service /etc/systemd/system/max-bot.service sudo systemctl daemon-reload sudo systemctl enable --now max-bot
Шаг 8. Подключаем к CRM
Теперь ваша CRM должна:
1. Принимать входящие сообщения на эндпоинте, который вы указали в MAIN_API_URL:
# Пример на FastAPI (ваша CRM) @app.post("/api/messages/incoming") async def incoming_message(data: dict): channel = data["channel"] # "max" chat_id = data["chat_id"] # ID чата для ответа text = data["text"] # Текст сообщения sender = data["sender_name"] # Имя отправителя # Сохраняем в БД, создаём тикет, уведомляем менеджера... await save_message(channel, chat_id, text, sender) return {"status": "ok"}
2. Отправлять ответы через HTTP-запрос к нашему сервису:
import httpx async def reply_to_max(chat_id: str, text: str): async with httpx.AsyncClient() as client: resp = await client.post( "http://localhost:8015/send-message", json={"chat_id": chat_id, "text": text}, ) return resp.json() # Пример использования await reply_to_max("12345678", "Спасибо за обращение! Менеджер скоро ответит.")
3. Проверять статус бота:
curl http://localhost:8015/health # {"service": "max-bot", "connected": true, "bot": {"id": 123, "name": "MyBot"}}
Мультиканальность
Прелесть микросервисной архитектуры в том, что по той же схеме можно подключить другие каналы. У нас, например, работают параллельно:
Канал | Порт | Метод получения |
|---|---|---|
Telegram | 8010 | UserBot (Telethon) |
VK | 8012 | Callback API + User Long Poll |
OK | 8013 | Long Polling (Graph API) |
MAX | 8015 | Long Polling (Bot API) |
8017 | Baileys (WebSocket) |
Каждый сервис — отдельный процесс со своим venv, systemd-юнитом и health-эндпоинтом. Все они пересылают сообщения в единый API, который распределяет их по менеджерам в CRM.
┌─────────┐ ┌────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ │Telegram │ │ VK │ │ OK │ │ MAX │ │ WhatsApp │ │ :8010 │ │ :8012 │ │ :8013 │ │ :8015 │ │ :8017 │ └────┬────┘ └───┬────┘ └───┬────┘ └────┬────┘ └────┬─────┘ │ │ │ │ │ └───────────┴───────────┴────────────┴────────────┘ │ ┌────────▼────────┐ │ Main API │ │ :8001 │ │ (FastAPI + │ │ PostgreSQL) │ └────────┬────────┘ │ ┌────────▼────────┐ │ CRM / Office │ │ (Angular SPA) │ └─────────────────┘
Подводные камни
1. Таймстампы в миллисекундах
MAX API возвращает timestamp в миллисекундах, а не в секундах. Не забудьте делить на 1000:
datetime.fromtimestamp(msg.timestamp / 1000, tz=timezone.utc)
2. Пересоздание сессии
При обрыве соединения недостаточно просто перезапустить polling �� нужно пересоздать объект Bot и закрыть старую aiohttp-сессию. Иначе получите ClientSession is closed.
3. Фильтрация ботов
Обязательно проверяйте sender.is_bot, иначе бот начнёт отвечать сам себе в бесконечном цикле.
4. Graceful shutdown
Используйте lifespan в FastAPI для корректного завершения polling при остановке сервиса. Без этого systemd будет ждать таймаут и убивать процесс через SIGKILL.
Итого
За 30 минут мы получили продакшн-ready микросервис для MAX:
~200 строк Python-кода
Автоматическое переподключение с exponential backoff
REST API для отправки сообщений из любой системы
Health check для мониторинга
Код открыт и работает в продакшене. Такой же подход применим к любому мессенджеру с Bot API.
Автор: Алан, CTO ИнтеллектТех — разрабатываем AI-агентов и мультиканальные CRM-решения.
