Мессенджер MAX набирает обороты в корпоративном сегменте. У него есть Bot API, но документации и примеров интеграции в открытом доступе минимум. В этой статье покажу, как за полчаса поднять микросервис, который принимает и отправляет сообщения MAX, и подключить его к любой CRM или внутренней системе.

Что получим в итоге

  • FastAPI-микросервис на Python

  • Приём входящих сообщений через Long Polling

  • Отправка ответов из CRM обратно в MAX

  • Автоматическое переподключение при обрывах

  • Systemd-сервис для продакшена

Архитектура простая:

Пользователь MAX ←→ MAX Platform API ←→ Наш микросервис ←→ CRM / API

Шаг 1. Создаём бота

  1. Откройте MAX и найдите бота @MasterBot

  2. Напишите ему /newbot

  3. Задайте имя и username

  4. Скопируйте полученный токен — он понадобится дальше

Токен выглядит примерно так: f9LHodD0cOJZ0b30xpIJT...

Шаг 2. Структура проекта

max-bot/
├── app/
│   ├── __init__.py
│   ├── config.py      # Конфигурация
│   ├── client.py       # Клиент MAX + обработчики
│   └── main.py         # FastAPI-приложение
├── .env                # Токен и настройки
├── requirements.txt
└── inteltek-max.service  # Systemd (опционально)

Шаг 3. Зависимости

# requirements.txt
maxapi
fastapi==0.109.0
uvicorn[standard]==0.27.0
httpx==0.27.0
python-dotenv==1.0.0

Библиотека maxapi — это обёртка над MAX Bot API. Устанавливаем:

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

Шаг 4. Конфигурация

# app/config.py
import os
from dotenv import load_dotenv

load_dotenv()

MAX_BOT_TOKEN = os.getenv("MAX_BOT_TOKEN", "")
MAX_API_BASE = "https://platform-api.max.ru"

# URL вашей CRM/API, куда пересылать входящие сообщения
MAIN_API_URL = os.getenv("MAIN_API_URL", "http://localhost:8001")
SERVICE_PORT = int(os.getenv("SERVICE_PORT", "8015"))
# .env
MAX_BOT_TOKEN=ваш_токен_от_MasterBot
MAIN_API_URL=http://localhost:8001
SERVICE_PORT=8015

Шаг 5. Клиент MAX с обработчиками

Это ядро сервиса. Здесь происходит:

  • подключение к MAX API

  • Long Polling для получения сообщений

  • пересылка сообщений в CRM

  • отправка ответов

# app/client.py
import asyncio
import logging
import sys
from datetime import datetime, timezone

import httpx
from maxapi import Bot, Dispatcher
from maxapi.types import MessageCreated, BotStarted

from app.config import MAX_BOT_TOKEN, MAIN_API_URL

logger = logging.getLogger(__name__)

RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60

# Глобальная ссылка на бота — нужна для пересоздания при обрывах
_this = sys.modules[__name__]
_this.bot = Bot(MAX_BOT_TOKEN)
dp = Dispatcher()


class MaxBotClient:
    def __init__(self):
        self._main_api = MAIN_API_URL
        self._bot_info = None
        self._is_connected = False
        self._poll_task: asyncio.Task | None = None
        self._stopping = False

    @property
    def is_connected(self) -> bool:
        return self._is_connected

    async def start(self):
        """Подключение к MAX API и запуск polling."""
        if not MAX_BOT_TOKEN:
            logger.error("MAX_BOT_TOKEN is not set")
            return

        self._stopping = False

        try:
            me = await _this.bot.get_me()
            self._bot_info = me
            self._is_connected = True
            logger.info(f"Connected to MAX as {me.first_name} (id={me.user_id})")
        except Exception as e:
            logger.error(f"Failed to connect to MAX: {e}")
            return

        self._poll_task = asyncio.create_task(self._resilient_polling())

    async def _resilient_polling(self):
        """Polling с автоматическим переподключением."""
        delay = RECONNECT_DELAY
        while not self._stopping:
            try:
                logger.info("Starting MAX polling...")
                dp.polling = True
                await dp.start_polling(_this.bot)
                if not self._stopping:
                    logger.warning("Polling ended, reconnecting...")
            except asyncio.CancelledError:
                return
            except Exception as e:
                logger.error(f"Polling error: {e}")

            if self._stopping:
                return

            self._is_connected = False
            logger.info(f"Reconnecting in {delay}s...")
            await asyncio.sleep(delay)
            delay = min(delay * 2, MAX_RECONNECT_DELAY)

            # Пересоздаём сессию
            try:
                await _this.bot.close_session()
            except Exception:
                pass

            try:
                _this.bot = Bot(MAX_BOT_TOKEN)
                me = await _this.bot.get_me()
                self._bot_info = me
                self._is_connected = True
                delay = RECONNECT_DELAY
                logger.info(f"Reconnected to MAX as {me.first_name}")
            except Exception as e:
                logger.error(f"Reconnect failed: {e}")

    async def stop(self):
        self._stopping = True
        dp.polling = False
        if self._poll_task and not self._poll_task.done():
            self._poll_task.cancel()
            try:
                await self._poll_task
            except asyncio.CancelledError:
                pass
        await _this.bot.close_session()
        self._is_connected = False

    async def send_message(self, chat_id: str, text: str) -> dict:
        """Отправить сообщение в MAX."""
        result = await _this.bot.send_message(
            chat_id=int(chat_id), text=text
        )
        return {
            "message_id": str(getattr(result, "message_id", "")),
            "chat_id": chat_id,
            "text": text,
            "sent_at": datetime.now(timezone.utc).isoformat(),
        }

    async def get_status(self) -> dict:
        info = None
        if self._bot_info:
            info = {
                "id": self._bot_info.user_id,
                "name": self._bot_info.first_name,
                "username": self._bot_info.username,
            }
        return {"connected": self._is_connected, "bot": info}


max_bot_client = MaxBotClient()

Теперь — обработчики событий. Они объявляются на уровне модуля через декораторы диспетчера:

# Продолжение app/client.py

@dp.message_created()
async def on_message(event: MessageCreated):
    """Входящее сообщение → пересылаем в CRM."""
    msg = event.message
    sender = msg.sender
    body = msg.body

    text = body.text if body and hasattr(body, "text") else None
    if not text:
        return

    # Игнорируем сообщения от ботов
    if sender and sender.is_bot:
        return

    chat_id = str(msg.recipient.chat_id)
    sender_name = " ".join(
        filter(None, [
            getattr(sender, "first_name", None),
            getattr(sender, "last_name", None),
        ])
    ) or "Неизвестный"

    sent_at = datetime.fromtimestamp(
        msg.timestamp / 1000, tz=timezone.utc
    ).isoformat() if msg.timestamp else datetime.now(timezone.utc).isoformat()

    # Формируем payload для CRM
    payload = {
        "channel": "max",
        "chat_id": chat_id,
        "text": text,
        "sender_name": sender_name,
        "sender_username": getattr(sender, "username", None),
        "sent_at": sent_at,
    }

    try:
        async with httpx.AsyncClient(timeout=10) as http:
            resp = await http.post(
                f"{max_bot_client._main_api}/api/messages/incoming",
                json=payload,
            )
            if resp.status_code in (200, 201):
                logger.info(f"Message forwarded to CRM")
            else:
                logger.error(f"CRM returned {resp.status_code}")
    except Exception as e:
        logger.error(f"Failed to forward message: {e}")


@dp.bot_started()
async def on_bot_started(event: BotStarted):
    """Пользователь нажал 'Начать'."""
    name = event.user.first_name or event.user.username
    logger.info(f"Bot started by {name}")

Важные детали

Зачем _this = sys.modules[__name__]? При обрыве соединения нужно пересоздать объект Bot. Но обработчики (on_message) были зарегистрированы при импорте модуля и ссылаются на старый объект. Через _this.bot мы обращаемся к атрибуту модуля, который всегда указывает на актуальный экземпляр.

Exponential backoff. При обрыве задержка между попытками растёт: 5 → 10 → 20 → 40 → 60 секунд. После успешного переподключения сбрасывается обратно до 5. Это защищает от DDoS'а на API при массовых сбоях.

Шаг 6. FastAPI-приложение

# app/main.py
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from app.client import max_bot_client
from app.config import SERVICE_PORT

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    await max_bot_client.start()
    logger.info("MAX Bot service started")
    yield
    await max_bot_client.stop()
    logger.info("MAX Bot service stopped")


app = FastAPI(title="MAX Bot Service", lifespan=lifespan)


@app.get("/health")
async def health():
    status = await max_bot_client.get_status()
    return {"service": "max-bot", **status}


class SendMessageBody(BaseModel):
    chat_id: str
    text: str


@app.post("/send-message")
async def send_message(body: SendMessageBody):
    """Отправить сообщение из CRM → MAX."""
    if not max_bot_client.is_connected:
        raise HTTPException(status_code=503, detail="Not connected")
    try:
        return await max_bot_client.send_message(body.chat_id, body.text)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Шаг 7. Запуск

Локально (для разработки)

cd max-bot
source venv/bin/activate
uvicorn app.main:app --host 0.0.0.0 --port 8015 --reload

В логах увидим:

Connected to MAX as MyBot (id=12345678)
Starting MAX polling...

На сервере (systemd)

# inteltek-max.service
[Unit]
Description=MAX Bot Service
After=network.target

[Service]
Type=simple
User=www-data
WorkingDirectory=/opt/max-bot
ExecStart=/opt/max-bot/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8015
Restart=on-failure
RestartSec=5
Environment=PYTHONUNBUFFERED=1

[Install]
WantedBy=multi-user.target
sudo cp inteltek-max.service /etc/systemd/system/max-bot.service
sudo systemctl daemon-reload
sudo systemctl enable --now max-bot

Шаг 8. Подключаем к CRM

Теперь ваша CRM должна:

1. Принимать входящие сообщения на эндпоинте, который вы указали в MAIN_API_URL:

# Пример на FastAPI (ваша CRM)
@app.post("/api/messages/incoming")
async def incoming_message(data: dict):
    channel = data["channel"]      # "max"
    chat_id = data["chat_id"]      # ID чата для ответа
    text = data["text"]            # Текст сообщения
    sender = data["sender_name"]   # Имя отправителя

    # Сохраняем в БД, создаём тикет, уведомляем менеджера...
    await save_message(channel, chat_id, text, sender)
    return {"status": "ok"}

2. Отправлять ответы через HTTP-запрос к нашему сервису:

import httpx

async def reply_to_max(chat_id: str, text: str):
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "http://localhost:8015/send-message",
            json={"chat_id": chat_id, "text": text},
        )
        return resp.json()

# Пример использования
await reply_to_max("12345678", "Спасибо за обращение! Менеджер скоро ответит.")

3. Проверять статус бота:

curl http://localhost:8015/health
# {"service": "max-bot", "connected": true, "bot": {"id": 123, "name": "MyBot"}}

Мультиканальность

Прелесть микросервисной архитектуры в том, что по той же схеме можно подключить другие каналы. У нас, например, работают параллельно:

Канал

Порт

Метод получения

Telegram

8010

UserBot (Telethon)

VK

8012

Callback API + User Long Poll

OK

8013

Long Polling (Graph API)

MAX

8015

Long Polling (Bot API)

WhatsApp

8017

Baileys (WebSocket)

Каждый сервис — отдельный процесс со своим venv, systemd-юнитом и health-эндпоинтом. Все они пересылают сообщения в единый API, который распределяет их по менеджерам в CRM.

┌─────────┐  ┌────────┐  ┌────────┐  ┌─────────┐  ┌──────────┐
│Telegram │  │  VK    │  │  OK    │  │  MAX    │  │ WhatsApp │
│ :8010   │  │ :8012  │  │ :8013  │  │ :8015   │  │  :8017   │
└────┬────┘  └───┬────┘  └───┬────┘  └────┬────┘  └────┬─────┘
     │           │           │            │            │
     └───────────┴───────────┴────────────┴────────────┘
                             │
                    ┌────────▼────────┐
                    │   Main API      │
                    │   :8001         │
                    │  (FastAPI +     │
                    │   PostgreSQL)   │
                    └────────┬────────┘
                             │
                    ┌────────▼────────┐
                    │   CRM / Office  │
                    │  (Angular SPA)  │
                    └─────────────────┘

Подводные камни

1. Таймстампы в миллисекундах

MAX API возвращает timestamp в миллисекундах, а не в секундах. Не забудьте делить на 1000:

datetime.fromtimestamp(msg.timestamp / 1000, tz=timezone.utc)

2. Пересоздание сессии

При обрыве соединения недостаточно просто перезапустить polling �� нужно пересоздать объект Bot и закрыть старую aiohttp-сессию. Иначе получите ClientSession is closed.

3. Фильтрация ботов

Обязательно проверяйте sender.is_bot, иначе бот начнёт отвечать сам себе в бесконечном цикле.

4. Graceful shutdown

Используйте lifespan в FastAPI для корректного завершения polling при остановке сервиса. Без этого systemd будет ждать таймаут и убивать процесс через SIGKILL.

Итого

За 30 минут мы получили продакшн-ready микросервис для MAX:

  • ~200 строк Python-кода

  • Автоматическое переподключение с exponential backoff

  • REST API для отправки сообщений из любой системы

  • Health check для мониторинга

Код открыт и работает в продакшене. Такой же подход применим к любому мессенджеру с Bot API.


Автор: Алан, CTO ИнтеллектТех — разрабатываем AI-агентов и мультиканальные CRM-решения.