Друзья, приветствую.
На Хабре я уже описал достаточно много материала, но гештальт с Aiogram dialog и взаимодействие с брокером сообщений RabbitMQ так и не закрыл. Сегодня хочу исправить это.
В рамках статьи, которую вы сейчас читаете, мы реализуем функционального телеграм-бота. Как вы поняли из названия, для реализации этой задачи мы будем использовать брокер сообщений RabbitMQ и ряд мощных Python-инструментов.
Постараюсь вас сильно не грузить теорией, но, к сожалению, без неё никуда. Поэтому буду придерживаться баланса между теорией и написанием кода по ходу повествования далее.
В рамках теоретического блока мы коротко пройдемся по таким темам, как брокеры сообщений и зачем они нужны, поговорим про вебхуки в мире разработки телеграм-ботов и разберемся, зачем в этот проект я решил подтянуть Aiogram dialog, FastStream, SQLAlchemy и прочие технологии.
Функционал, который будут видеть пользователи
Теперь давайте отдельно поговорим о том, что за бота мы будем писать или, другими словами, давайте разберемся, что наш бот будет уметь делать.
Напишем мы телеграм-бота для бронирования столиков в выдуманном ресторане «Binary Bites». Под бронированием я подразумеваю следующие этапы:
Выбор количества мест
Выбор конкретного стола
Выбор даты, на когда бронь
Выбор времени, на когда бронь
Подтверждение
Кроме того, пользователь сможет просматривать свои брони и, при необходимости, отменять их. Также он получит доступ к информации о ресторане.
Добавим простенькую админку, чтобы пользователь с уровнем доступа администратора мог просматривать все существующие брони и мог получать информацию по всем пользователям.
Функционал «под капотом»
Как вы понимаете, вся основная «магия» у нас будет происходить «под капотом». Сейчас я не буду говорить про логику взаимодействия с базой данных, думаю, очевидно, что нам нужно будет как-то сохранить пользователя, добавить столы с параметрами, проверить, свободен ли стол, и прочее — это очевидно. Сейчас же я хочу сосредоточиться на более интересной логике.
Во-первых, мы сделаем так, чтоб наш бот работал не через лонг поллинг, который идет «из коробки» при разработке ботов, а через технологию вебхуков. О том, зачем я выбрал такой подход и какой профит он нам даст, подробнее поговорим далее.
В видео «Вебхуки для начинающих: от основ до Telegram-бота за 60 минут» я подробно рассказал о том, что такое вебхуки и как они работают в контексте Telegram-ботов. Если вам интересен аудио-видео формат, приглашаю вас к просмотру. Я постарался максимально раскрыть тему (видео доступно и на Rutube).
Во-вторых, наш бот будет взаимодействовать с брокером сообщений RabbitMQ. Это говорит о том, что наша система, кроме данного брокера, будет подразумевать наличие продюсера (того, кто ставит задачу) и слушателя (того, кто эту задачу будет исполнять). О том, что это значит, поговорим в отдельном блоке. Сейчас скажу, что эту задачу мы будем закрывать через FastStream.
В-третьих, наш бот будет уметь работать с отложенными и задачами по расписанию. К сожалению, в FastStream нет метода «из коробки» для работы с отложенными и задачами по расписанию, поэтому мы дополнительно поставим APScheduler. Но в статье я дополнительно покажу, как использовать связку FastStream + APScheduler.
В общих чертах определились, теперь давайте более детально рассмотрим стек используемых технологий в данном проекте.
Стек технологий
Как вы поняли, писать код мы будем на языке Python, а в качестве брокера будем использовать RabbitMQ.
Технологии Python, которые сегодня будут использованы:
Aiogram 3: лучший фреймворк для разработки телеграм-ботов на Python
Aiogram dialog 2.3: фреймворк, который очень сильно упрощает работу с FSM в Aiogram 3
SQLAlchemy 2: фреймворк для работы с табличными базами данных (демонстрацию покажу на примере SQLite, но можно будет легко переключить на любую другую табличную базу данных)
FastStream 0.5.3: python-фреймворк для работы с брокерами сообщений (многие нарекают его «убийцей Celery»)
APScheduler 3.1: фреймворк, который позволит удобно работать с отложенными задачами и задачами по расписанию
FastAPI: фреймворк, который позволит всех этих ребят: телеграм-бота, APScheduler, FastStream и так далее объединить в рамках одного приложения (одной экосистемы)
Alembic: для миграций базы данных
Uvicorn: для запуска веб-сервера
Дополнительно используем aiosqlite, pydantic, loguru и прочие библиотеки.
Как видите, стек достаточно большой.
План на статью
Начнем мы с того, что просто разберемся теоретически с брокерами, продюсерами, слушателями, фоновыми задачами и прочими сущностями, которых будем сегодня касаться. Условно, начнем мы с теории.
Далее приступим к разработке. Тут можно будет выделить следующие этапы:
Подготовка. В рамках этапа мы поднимем RabbitMQ на локальной машине, подготовим структуру проекта, установим зависимости и прочее.
База данных. На этом этапе мы опишем модели таблиц, превратим их в реальные таблицы базы данных (выполним миграции) и напишем все необходимые методы взаимодействия с базой данных. Говорю сразу, блок этот сегодня досканально, как в прошлых статьях своих, разбирать не буду, так как процесс тогда снова может занять пол статьи.
Разработаем базовую пользовательскую часть бота и админку.
Подробно разберем блок с диалогом бронирования (Aiogram dialog).
Разберемся с блоком взаимодействия с APScheduler и FastStream.
Выполним деплой проекта (удаленный запуск).
Что касается блока с удаленным запуском, хочу остановиться подробнее. Дело тут в том, что, несмотря на то, что мы будем запускать сразу несколько разных сервисов через FastAPI, у нас в любом случае будет необходимость где-то поднимать отдельно RabbitMQ. Считаю, что удобно, когда все в одном месте, поэтому для деплоя проекта я выбрал сервис Amvera Cloud.
Почему Amvera?
Первая и основная причина — это простота деплоя. Нам буквально достаточно будет закинуть файлы проекта (наш код) на сервис Amvera. Затем, прямо на сервисе, можно будет собрать конфигурационный файл, и проект запустится.
Доставить файлы можно или через интерфейс, или через стандартные GIT-команды. Я пример покажу через загрузку через интерфейс на сайте.
Бонусом Amvera дает HTTPS-доменное имя, которого будет более чем достаточно для полноценной работы бота в формате вебхуков.
Кроме того, новые пользователи получают 111 рублей на основной баланс.
Вторая причина — это возможность прямо на Амвере поднять RabbitMQ. В этом случае вам даже не нужно будет никакие файлы добавлять. Буквально пару кликов мышкой, и бот взаимодействует с RabbitMQ на серверах Амверы. После останется боту подключаться уже к новому RabbitMQ. На деле все ещё проще, чем кажется.
Зачем выбран FastApi
В архитектуре нашего проекта важное место занимают микросервисы, каждый из которых может функционировать как самостоятельная единица. FastStream и APScheduler — яркие примеры таких компонентов, обладающих собственным жизненным циклом и требующих отдельной инициализации и запуска.
Когда мы говорим о запуске этих сервисов, речь идёт не просто о вызове функций, а о полноценном запуске долгоживущих процессов, которые после активации начинают выполнять свою логику (хотя с FastStream ситуация несколько сложнее, об этом поговорим позже).
Ключевой особенностью нашего бота является использование технологии вебхуков вместо традиционного лонг-поллинга. Эта архитектурная концепция полностью меняет подход к получению обновлений: вместо того, чтобы наш бот периодически опрашивал серверы Telegram, серверы сами уведомляют нас о новых событиях.
Для тех, кто хочет глубже понять принципы работы вебхуков, особенно в контексте разработки Telegram-ботов, рекомендую посмотреть моё видео «Вебхуки для начинающих: от основ до Telegram-бота за 60 минут» (https://www.youtube.com/watch?v=g4yZDnoVD3g&t=2146s).
Если объяснить кратко: при любом действии пользователя в Telegram (отправка сообщения, нажатие кнопки) генерируется обновление. При использовании лонг-поллинга бот регулярно подключается к серверам Telegram и проверяет наличие новых обновлений. В случае с вебхуками серверы Telegram сами инициируют отправку обновлений на указанный нами эндпоинт.
Для реализации такого подхода требуется веб-сервер со специальным маршрутом, способным обрабатывать поступающие обновления. И здесь на сцену выходит FastAPI — высокопроизводительный фреймворк, который идеально справляется с задачей быстрой обработки запросов от серверов Telegram. Это позволяет нам создать масштабируемого и отзывчивого бота, способного эффективно обрабатывать значительные нагрузки.
Помимо превосходной производительности, FastAPI предлагает гибкую систему управления жизненным циклом приложения. Это даёт нам возможность элегантно организовать запуск всех компонентов — Telegram-бота, FastStream, APScheduler и других микросервисов — при старте основного приложения.
Таким образом, FastAPI выступает не просто как веб-сервер, а как объединяющая платформа, позволяющая гармонично интегрировать все элементы нашей системы в рамках единого, хорошо структурированного приложения.
Архитектура брокеров сообщений: основы и применение в проекте
В сердце нашего проекта лежит архитектура, основанная на брокерах сообщений — элегантное решение для организации асинхронного взаимодействия между компонентами системы. Эта архитектура строится на трех фундаментальных компонентах, каждый из которых выполняет свою уникальную роль.
Ключевые компоненты системы
Продюсеры (Producers) — источники сообщений, стоящие в начале информационного потока. Они формируют задачи, упаковывают их в структурированные сообщения и направляют их в специализированные каналы (очереди) брокера. Продюсеры инициируют процесс коммуникации, не заботясь о том, кто и когда обработает их сообщения.
Слушатели (Consumers) — исполнители, постоянно прослушивающие определенные каналы в ожидании новых сообщений. Как только сообщение появляется в очереди, слушатель мгновенно реагирует, извлекает его и приступает к выполнению соответствующей бизнес-логики. Эти компоненты отвечают за фактическую реализацию задач, порожденных продюсерами.
Брокер сообщений (Message Broker) — центральное звено системы, представленное в нашем случае RabbitMQ. Брокер выполняет роль высоконадежного посредника, который:
Принимает и сохраняет сообщения от продюсеров
Организует маршрутизацию сообщений к соответствующим слушателям
Гарантирует доставку, сохраняя сообщения до подтверждения их успешной обработки
Обеспечивает балансировку нагрузки при наличии нескольких слушателей
Информационный поток в системе
Взаимодействие компонентов образует элегантный цикл: продюсер генерирует сообщение и отправляет его брокеру, который временно хранит это сообщение и затем маршрутизирует его к подходящему слушателю. Слушатель обрабатывает сообщение и, при необходимости, отправляет подтверждение об успешном выполнении. Такая архитектура обеспечивает:
Асинхронность выполнения задач
Высокую масштабируемость системы
Отказоустойчивость при сбоях отдельных компонентов
Гарантированную доставку сообщений
Технологический стек реализации
Для воплощения этой архитектуры я выбрал FastStream — современный и высокопроизводительный фреймворк, предлагающий не только удобный API для работы с брокерами сообщений, но и специализированные интеграции с FastAPI, которые мы будем активно использовать в этом проекте.
Настройка компонентов
Реализация системы требует конфигурации трех основных элементов:
Брокер: Настройка RabbitMQ выполняется по стандартной процедуре, применимой для любой системы обмена сообщениями. Необходимо развернуть экземпляр брокера и обеспечить к нему необходимый уровень доступа.
Продюсер и слушатель: Их конфигурация требует более тонкой настройки, которую мы детально рассмотрим в ходе разработки. Важно отметить, что FastStream существенно упрощает этот процесс благодаря своему декларативному API.
Типы задач в нашей системе
В рамках проекта мы будем работать с двумя категориями задач:
Немедленные (фоновые) задачи — требующие моментального выполнения. Их обработку берет на себя FastStream, обеспечивая быстрое асинхронное выполнение.
Отложенные и периодические задачи — требующие выполнения в определенное время или с заданной периодичностью. Для них мы задействуем APScheduler — мощный планировщик задач, о котором я подробно рассказывал в предыдущей серии публикаций.
Комбинированный подход
Особый интерес представляют сценарии, где мы комбинируем возможности обеих систем. Например, для циклических задач (таких как автоматическое закрытие просроченных бронирований) мы настроим отдельное расписание, а для более сложных кейсов реализуем многоступенчатую схему:
Формирование задачи продюсером
Передача через брокер к слушателю
Регистрация в планировщике APScheduler для отложенного или периодического выполнения
Такой комбинированный подход позволяет нам максимально гибко управлять жизненным циклом каждой задачи и оптимально использовать возможности обеих технологий.
База данных: архитектура и взаимодействие с данными
Концепция бота для бронирования столиков в ресторане "Binary Bites" естественным образом определяет ключевые требования к нашей системе хранения и обработки данных:
Необходимость систематизированного хранения информации о пользователях, доступных столиках и текущих бронированиях
Потребность в эффективных механизмах манипулирования этими данными: создание новых записей, обновление существующих, удаление устаревших, а также проведение различных проверок и валидаций
Требование к надежному и масштабируемому хранилищу всей этой информации
Для решения этих задач мы обратимся к мощному инструментарию, предоставляемому SQLAlchemy 2 – современному и гибкому ORM-фреймворку для Python. Этот инструмент, особенно в сочетании с системой миграций Alembic, обеспечит не только скорость разработки, но и высокий уровень безопасности при работе с данными.
SQLAlchemy 2 позволяет нам элегантно абстрагироваться от низкоуровневых деталей работы с базой данных, предоставляя интуитивно понятный интерфейс для:
Декларативного описания структуры таблиц и связей между ними
Построения сложных запросов с использованием выразительного API
Управления транзакциями и обеспечения целостности данных
Оптимизации производительности при работе с большими объемами информации
Для тех, кто хочет углубиться в возможности этого фреймворка, я подготовил целую серию публикаций на Хабре, посвященную именно SQLAlchemy 2, где детально рассматриваются как базовые концепции, так и продвинутые техники работы с данным инструментом.
Важно отметить, что SQLAlchemy сам по себе предоставляет только средства для описания моделей и логики взаимодействия с базой данных, но не трансформирует эти абстрактные описания в реальные структуры таблиц. Именно здесь на сцену выходит Alembic – специализированный инструмент для управления миграциями, который позволяет:
Преобразовывать декларативные модели в физические таблицы
Отслеживать изменения в структуре базы данных
Безопасно применять эти изменения на рабочей системе
Откатывать изменения при необходимости
Такой тандем технологий обеспечивает нам необходимую гибкость в развитии схемы данных вместе с эволюцией функциональности нашего бота, сохраняя при этом высокий уровень надежности и производительности системы в целом.
Aiogram и Aiogram Dialog: удобство работы с машиной состояний
Aiogram заслуженно считается одним из лучших инструментов для создания Telegram-ботов на Python, а возможно, и в мире программирования в целом. Однако у него есть один значительный недостаток — сложность работы с машиной состояний.
Машина состояний в Telegram-ботах
Машина состояний — это модель поведения системы, позволяющая управлять её переходами между различными состояниями в зависимости от условий. В контексте Telegram-ботов это означает возможность вести пользователя по заранее определённому сценарию: бот задаёт вопросы, а пользователь отвечает с помощью текста, кнопок или отправки файлов. Основная цель — корректный сбор и сохранение данных в базе.
Проблемы работы с машиной состояний в Aiogram
Несмотря на преимущества Aiogram, управление состояниями в нём оставляет желать лучшего:
Отсутствие поддержки одновременного нахождения в нескольких сценариях. Пользователь может участвовать только в одном диалоге. Чтобы вернуться к предыдущему, необходимо завершить текущий.
Обилие однотипного кода. Для реализации базовых элементов интерфейса (например, пагинации в клавиатуре, календаря, перемещения между состояниями) приходится писать много кода вручную.
Решение: Aiogram Dialog
Здесь на помощь приходит Aiogram Dialog — расширение Aiogram, которое со временем эволюционировало в самостоятельный фреймворк. Он решает главные проблемы работы с состояниями, предлагая:
Разделение получения данных и отображения сообщений.
Объединение кнопок и их обработчиков в единую структуру.
Упрощённую маршрутизацию состояний.
Гибкие виджеты для удобного построения интерфейса.
Основные концепции Aiogram Dialog
В центре архитектуры Aiogram Dialog — окна (Window). Каждое окно представляет собой сообщение, отправляемое пользователю, и содержит:
Виджеты (текстовые сообщения, кнопки, клавиатуры);
Функции обратного вызова, отвечающие за обработку пользовательских действий и передачу данных.
Окна объединяются в диалоги (Dialog), что позволяет легко переключаться между этапами взаимодействия с пользователем. Более того, Aiogram Dialog поддерживает одновременную работу с несколькими диалогами, передачу данных между ними и автоматическое возвращение после завершения вложенного сценария.
На этом, думаю, теоретический блок можно завершать. Приступаем к практике!
Поднимаем RabbitMQ на локальном компьютере
Начнём с развертывания брокера сообщений RabbitMQ на вашем локальном компьютере. Один из самых простых и быстрых способов сделать это — использовать Docker.
Установка Docker Desktop
Если вы новичок в работе с Docker, первым шагом будет установка Docker Desktop. Этот инструмент доступен для Windows, macOS и Linux, обеспечивая удобную среду для работы с контейнерами. Скачайте и установите Docker Desktop с официального сайта, затем запустите его.
После успешной установки вы сможете использовать командную строку для управления контейнерами Docker.
Запуск RabbitMQ через Docker
Чтобы поднять RabbitMQ, выполните следующую команду:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
-e RABBITMQ_DEFAULT_VHOST=myapp_vhost \
rabbitmq:3-management
Эта команда создаст контейнер с RabbitMQ, настроит учетные данные администратора и развернет веб-интерфейс для управления очередями сообщений.
Доступ к веб-интерфейсу RabbitMQ
После успешного запуска контейнера веб-интерфейс RabbitMQ будет доступен по адресу:
http://localhost:15672/
Для входа используйте логин admin и пароль password (рекомендуется заменить их на собственные данные для безопасности).

Получаем токен бота
Для взаимодействия с Telegram-ботом потребуется токен, который можно получить в BotFather — официальном Telegram-боте для создания и управления другими ботами.
Откройте Telegram и найдите BotFather.
Отправьте команду /newbot.
Следуйте инструкциям, укажите имя и юзернейм бота.
В конце процесса вы получите токен — сохраните его, так как он понадобится для настройки бота.

Поднимаем туннель для локального доступа
Перед началом разработки важно обеспечить доступ к локальному серверу из внешней сети. Это особенно необходимо, если ваш бот работает на вебхуках. Для этого можно использовать ngrok:
Скачайте и установите ngrok с официального сайта.
Выполните команду для создания туннеля:
ngrok http 8000
Ngrok создаст внешний URL, который можно использовать для настройки вебхуков.
Теперь ваш локальный сервер будет доступен извне, и вы сможете тестировать взаимодействие бота с Telegram API.
Создание проекта
Настроим среду для разработки, чтобы подготовить основу для будущего проекта. Откройте свою любимую среду разработки, например PyCharm, и создайте новый проект.
Установка зависимостей
В корневой директории проекта создайте файл requirements.txt и добавьте в него следующие зависимости:
aiogram==3.17.0
aiogram_dialog==2.3.1
sqlalchemy==2.0.38
pydantic==2.10.6
pydantic_settings==2.7.1
aio-pika==9.5.4
faststream==0.5.34
loguru==0.7.3
aiosqlite==0.21.0
alembic==1.14.1
pytz==2025.1
apscheduler==3.11.0
fastapi==0.115.8
uvicorn==0.34.0
Ранее мы уже разобрали основные технологии, которые будем использовать, поэтому не будем углубляться в их описание. Однако обратите внимание на две ключевые библиотеки:
aio-pika – это основной движок для FastStream, обеспечивающий корректное взаимодействие с брокером сообщений RabbitMQ.
aiosqlite – асинхронный драйвер для работы с базой данных, используемый в связке с SQLAlchemy.
Для установки зависимостей выполните команду:
pip install -r requirements.txt
Создание файла конфигурации
Следующим шагом создайте в корневой директории проекта файл .env и заполните его следующим содержимым:
BOT_TOKEN=bot_token
ADMIN_IDS=[admin_id1, admin_id2]
INIT_DB=0
BASE_URL=https://ngrok_url
RABBITMQ_USERNAME=admin
RABBITMQ_PASSWORD=password
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
VHOST=myapp_vhost
Разбор переменных окружения
BOT_TOKEN – токен вашего Telegram-бота.
ADMIN_IDS – список ID администраторов, имеющих доступ к управлению ботом.
INIT_DB – флаг для инициализации базы данных (далее подробнее разберем зачем он нужен).
BASE_URL – URL, предоставленный ngrok, для работы вебхуков (может быть любой другой туннель, предварительно не забудьте запустить).
RABBITMQ_USERNAME / RABBITMQ_PASSWORD – учетные данные для подключения к RabbitMQ.
RABBITMQ_HOST / RABBITMQ_PORT – параметры подключения к брокеру сообщений.
VHOST – виртуальный хост, используемый в RabbitMQ для изоляции задач.
После этих шагов среда разработки полностью подготовлена, и можно переходить к следующему этапу — написанию кода. Начнем мы с разработки логики для взаимодействия с базой данных.
Настройка файла конфигурации
Чтобы упростить работу с основными переменными проекта, создадим централизованный файл конфигурации.
Создание структуры проекта
В корневой директории проекта создайте папку app.
Внутри нее создайте файл config.py и заполните следующим содержимым:
import os
from typing import List
from urllib.parse import quote
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from faststream.rabbit import RabbitBroker
from loguru import logger
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
BOT_TOKEN: str
ADMIN_IDS: List[int]
INIT_DB: bool
FORMAT_LOG: str = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}"
LOG_ROTATION: str = "10 MB"
DB_URL: str = 'sqlite+aiosqlite:///data/db.sqlite3'
STORE_URL: str = 'sqlite:///data/jobs.sqlite'
TABLES_JSON: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "dao", "tables.json")
SLOTS_JSON: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "dao", "slots.json")
BASE_URL: str
RABBITMQ_USERNAME: str
RABBITMQ_PASSWORD: str
RABBITMQ_HOST: str
RABBITMQ_PORT: int
VHOST: str
@property
def rabbitmq_url(self) -> str:
return (
f"amqp://{self.RABBITMQ_USERNAME}:{quote(self.RABBITMQ_PASSWORD)}@"
f"{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/{self.VHOST}"
)
@property
def hook_url(self) -> str:
"""Возвращает URL вебхука"""
return f"{self.BASE_URL}/webhook"
model_config = SettingsConfigDict(
env_file=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".env")
)
# Инициализация конфигурации
settings = Settings()
# Настройка логирования
log_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "log.txt")
logger.add(log_file_path, format=settings.FORMAT_LOG, level="INFO", rotation=settings.LOG_ROTATION)
# Создание брокера сообщений RabbitMQ
broker = RabbitBroker(url=settings.rabbitmq_url)
# Создание планировщика задач
scheduler = AsyncIOScheduler(jobstores={'default': SQLAlchemyJobStore(url=settings.STORE_URL)})
Разбор конфигурации
Использование Pydantic Settings
В этом коде основная логика завязана на Pydantic Settings, что позволяет удобно управлять переменными окружения.
Класс Settings отвечает за хранение всех конфигурационных параметров проекта, обеспечивая:
Гибкость – возможность менять настройки через .env файл.
Безопасность – пароли и ключи API не хранятся в коде.
Читаемость – все параметры собраны в одном месте.
Дополнительно определены два вспомогательных свойства:
rabbitmq_url – формирует строку подключения к RabbitMQ.
hook_url – возвращает URL вебхука.
Инициализация логирования
Файл логов log.txt создается автоматически, а Loguru позволяет удобно управлять логами, включая их ротацию (удаление старых записей, когда размер превышает 10MB).
Подключение к RabbitMQ
Для взаимодействия с брокером сообщений создается объект RabbitBroker, использующий данные из rabbitmq_url. В нашем приложении данный объект будет отвечать за отправку отправку сообщений в очередь в асинхронном режиме.
Планировщик задач AsyncIOScheduler
Используем AsyncIOScheduler из APScheduler для управления фоновыми задачами.
К сожалению, APScheduler не умеет взаимодействовать с брокером сообщений напрямую, поэтому под него будет использоваться свое хранилище. Я для удобства выбрал в качестве хранилища заданий SQLite, но его можно заменить на PostgreSQL или другой источник.
База данных под APSCheduler будет создана автоматически.
Вас могли заинтересовать эти строки:
TABLES_JSON: str = os.path.join(os.path.dirname(os.path.abspath(**file**)), "dao", "tables.json")
SLOTS_JSON: str = os.path.join(os.path.dirname(os.path.abspath(**file**)), "dao", "slots.json")
Я решил показать вам, как можно сделать миграцию базы данных через JSON файлы.
Нельзя сказать, что это единственный подход, но в некоторых случаях, особенно когда речь заходит за быструю установку в базе данных статических файлов выручает.
Как вы поняли, это пути к файлам с JSON с определенными данными. Вместе с ними, кстати, работает переменная INIT_DB. Если вы установите в файле .env в значении для этого параметра 1, то при запуске FastApi приложения произойдет загрузка данных из этих JSON в базу данных. Сама база данных должна будет предварительно существовать
Логика работы с базой данных
Друзья, практика показывает, что в каждой статье я подробно разбираю взаимодействие с базой данных, и из раза в раз этот блок повторяется. Это связано с тем, что я использую базовый класс с универсальными методами для работы с SQLAlchemy, что значительно упрощает процесс взаимодействия с данными.
Чтобы не дублировать одну и ту же информацию, в этот раз я не буду останавливаться на деталях миграций, особенностях связей между таблицами и универсальных методах работы с данными. Вместо этого сосредоточимся только на тех элементах, которые имеют непосредственное отношение к текущему проекту.
Если вы хотите глубже разобраться в подходах, которые я применяю, рекомендую ознакомиться с моими предыдущими статьями или видео, где я подробно объясняю, как описывать модели таблиц и работать с методами SQLAlchemy.
Кроме того, важно отметить, что в этой статье мы активно будем использовать Pydantic — он понадобится как для работы с базой данных, так и в целом для корректного функционирования FastAPI. Если вы еще не знакомы с Pydantic, рекомендую изучить мою "Pydantic 2: Полное руководство для Python-разработчиков — от основ до продвинутых техник" статью, где я подробно разобрал его возможности.
Подготовка конфигурации SQLAlchemy
Начнем с создания файла конфигурации для SQLAlchemy. Он будет находиться в папке app/dao, которую необходимо предварительно создать. Сам файл назовем database.py и заполним следующим содержимым:
import uuid
from datetime import datetime
from decimal import Decimal
from sqlalchemy import inspect, TIMESTAMP, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession
from app.config import settings
engine = create_async_engine(url=settings.DB_URL)
async_session_maker = async_sessionmaker(engine, class_=AsyncSession)
class Base(AsyncAttrs, DeclarativeBase):
__abstract__ = True
created_at: Mapped[datetime] = mapped_column(
TIMESTAMP,
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
TIMESTAMP,
server_default=func.now(),
onupdate=func.now()
)
def to_dict(self, exclude_none: bool = False):
"""
Преобразует объект модели в словарь.
Args:
exclude_none (bool): Исключать ли None значения из результата
Returns:
dict: Словарь с данными объекта
"""
result = {}
for column in inspect(self.__class__).columns:
value = getattr(self, column.key)
# Преобразование специальных типов данных
if isinstance(value, datetime):
value = value.isoformat()
elif isinstance(value, Decimal):
value = float(value)
elif isinstance(value, uuid.UUID):
value = str(value)
# Добавляем значение в результат
if not exclude_none or value is not None:
result[column.key] = value
return result
Этот файл выполняет несколько важных задач:
Создает движок для взаимодействия с базой данных.
Настраивает асинхронную фабрику сессий (async_session_maker).
Определяет базовый класс моделей, от которого будут наследоваться все таблицы.
Добавляет две универсальные колонки: created_at и updated_at, которые автоматически фиксируют время создания и обновления записей.
Реализует метод to_dict(), который позволяет конвертировать объекты SQLAlchemy в обычные Python-словари.
Если вы уже знакомы с моими предыдущими статьями, эта структура не должна вызывать вопросов.
Описание моделей базы данных
Теперь мы готовы описать основные модели таблиц. Для этого создадим файл app/dao/models.py и добавим в него следующий код:
from datetime import datetime
from sqlalchemy import BigInteger, String
from app.dao.database import Base
from sqlalchemy import Integer, Date, ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
username: Mapped[str | None]
first_name: Mapped[str | None]
last_name: Mapped[str | None]
bookings: Mapped[list["Booking"]] = relationship("Booking", back_populates="user")
class Table(Base):
__tablename__ = "tables"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
capacity: Mapped[int]
description: Mapped[str | None]
bookings: Mapped[list["Booking"]] = relationship("Booking", back_populates="table")
class TimeSlot(Base):
__tablename__ = "time_slots"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
# Используем String для времени, так как SQLite не имеет встроенного типа Time
start_time: Mapped[str] = mapped_column(String(5), nullable=False) # формат: HH:MM
end_time: Mapped[str] = mapped_column(String(5), nullable=False) # формат: HH:MM
bookings: Mapped[list["Booking"]] = relationship(
"Booking",
back_populates="time_slot",
cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"TimeSlot(id={self.id}, {self.start_time}-{self.end_time})"
class Booking(Base):
__tablename__ = "bookings"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("users.id"))
table_id: Mapped[int] = mapped_column(Integer, ForeignKey("tables.id"))
time_slot_id: Mapped[int] = mapped_column(Integer, ForeignKey("time_slots.id"))
date: Mapped[datetime] = mapped_column(Date)
status: Mapped[str]
user: Mapped["User"] = relationship("User", back_populates="bookings")
table: Mapped["Table"] = relationship("Table", back_populates="bookings")
time_slot: Mapped["TimeSlot"] = relationship("TimeSlot", back_populates="bookings")
Разбираем структуру данных
В проекте у нас есть несколько ключевых сущностей:
Пользователи (User) – информация о клиентах, которые совершают бронирования.
Столы (Table) – описание столиков, доступных для бронирования.
Временные слоты (TimeSlot) – промежутки времени, в которые можно забронировать стол.
Бронирования (Booking) – связывают пользователей, столы и временные слоты.
Некоторые важные моменты:
Столы и временные слоты – статические данные.
Например, описание стола может выглядеть так:
{ "id": 1, "capacity": 2, "description": "Уютный столик для двоих у окна" }
Айди будет соответствовать номеру стола в заведении.
Временные слоты фиксированы.
{ "id": 1, "start_time": "06:00", "end_time": "08:00" }
В системе будет всего 9 таких записей, так как ресторан работает до полуночи, а каждый слот рассчитан на 2 часа.
Связывание данных
В таблице Booking мы объединяем пользователей, столы и временные интервалы, что позволяет эффективно управлять процессом бронирования.
Теперь, когда у нас есть описанные модели, мы можем переходить к следующему этапу — настройке работы с данными через SQLAlchemy.
Настройка миграций с Alembic
На данный момент мы только описали модели таблиц, но сами таблицы, как и база данных, еще не созданы. Давайте исправим это с помощью Alembic — инструмента для управления миграциями.
Инициализация Alembic и создание первой миграции
Сначала переходим в директорию app. В терминале выполняем команду:
cd app
Теперь инициализируем Alembic с поддержкой асинхронной работы с базой данных:
alembic init -t async migration
После выполнения команды в проекте появится папка migration и файл alembic.ini. Для удобства работы переместите alembic.ini в корневую директорию проекта.
Настройка файла alembic.ini
Открываем alembic.ini и изменяем путь к папке миграций:
Было:
script_location = migration
Стало:
script_location = app/migration
Это позволит запускать миграции из корневой директории проекта без лишних путей.
Подключение к базе данных в env.py
Теперь необходимо обновить файл app/migration/env.py, чтобы Alembic корректно взаимодействовал с нашей базой данных.
Открываем env.py и заменяем его содержимое следующим образом:
Исходный код (было):
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = None
Обновленный код (стало):
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.config import settings
from app.dao.database import Base
from app.dao.models import User, Table, TimeSlot, Booking
config = context.config
config.set_main_option("sqlalchemy.url", settings.DB_URL)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
Мы добавили:
Подключение к настройкам проекта (settings.DB_URL), чтобы Alembic использовал актуальный URL базы данных.
Импорт моделей (User, Table, TimeSlot, Booking) для корректного отслеживания изменений.
Передачу Base.metadata в target_metadata, что позволит Alembic автоматически определять структуру таблиц.
Остальная часть файла остается без изменений.
Создание первой миграции
Теперь можно создать и применить первую миграцию.
Возвращаемся в корневую директорию проекта:
cd ../
Генерируем файл миграции:
alembic revision --autogenerate -m "Initial revision"
Создаем папку для хранения базы данных:
В корне проекта (на уровень выше app) создаем папку data:
mkdir data
Применяем миграции и создаем таблицы в базе данных:
alembic upgrade head
После успешного выполнения этой команды в проекте появится файл базы данных с созданными таблицами.
Теперь наша база данных полностью готова к работе, а мы готовы к описанию методов взаимодействия с базой данных.
Универсальный класс BaseDAO для работы с базой данных
Как я уже упоминал, в проекте используется универсальный базовый класс для работы с базой данных через SQLAlchemy. Этот класс позволяет удобно выполнять основные операции с таблицами.
Полный код находится в файле app/dao/base.py, и его общий вид следующий:
from typing import List, TypeVar, Generic, Type
from pydantic import BaseModel
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.future import select
from sqlalchemy import update as sqlalchemy_update, delete as sqlalchemy_delete, func
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession
from app.dao.database import Base
T = TypeVar("T", bound=Base)
class BaseDAO(Generic[T]):
model: Type[T] = None
def __init__(self, session: AsyncSession):
self._session = session
if self.model is None:
raise ValueError("Модель должна быть указана в дочернем классе")
async def find_one_or_none_by_id(self, data_id: int):
try:
query = select(self.model).filter_by(id=data_id)
result = await self._session.execute(query)
record = result.scalar_one_or_none()
log_message = f"Запись {self.model.__name__} с ID {data_id} {'найдена' if record else 'не найдена'}."
logger.info(log_message)
return record
except SQLAlchemyError as e:
logger.error(f"Ошибка при поиске записи с ID {data_id}: {e}")
raise
Полный код этого файла и всего проекта можно найти в моём телеграм-канале «Лёгкий путь в Python». В настоящее время сообщество насчитывает более 2600 участников и продолжает активно расти. Присоединяйтесь — всё совершенно бесплатно!
Ключевые особенности
Вынос сессии на глобальный уровень
Теперь сессию не нужно передавать в каждый класс отдельно, что упрощает работу с базой данных.Гибкость и масштабируемость
Базовый класс BaseDAO является обобщенным (Generic), что позволяет использовать его с любыми моделями.
Для каждой таблицы создается отдельный дочерний DAO-класс, который наследуется от BaseDAO.
Все базовые методы для работы с базой уже реализованы, а если нужно добавить специфичную логику, это можно сделать в дочернем классе.
Отсутствие commit в методах
Коммиты вынесены в middleware AIOGram и зависимости FastAPI, что позволяет централизованно управлять транзакциями.
Как использовать BaseDAO в проекте
Для каждой модели создается отдельный DAO-класс, который наследуется от BaseDAO. Например, DAO для работы с пользователями может выглядеть так:
from app.dao.base import BaseDAO
from app.dao.models import User
class UserDAO(BaseDAO[User]):
model = User
Теперь UserDAO автоматически получает все базовые методы, такие как find_one_or_none_by_id, и может дополняться уникальными методами, если это необходимо.
Дочерние DAO классы с новыми методами
Теперь подготовим дочерние классы. Опишем их в файле app/dao/dao.py.
Специфические методы нужны будут только под таблицу с бронями столиков, а остальным будет достаточно унаследовать от BaseDao:
from datetime import date, datetime
from typing import Dict
from loguru import logger
from sqlalchemy import select, update, delete, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import joinedload
from app.dao.base import BaseDAO
from app.dao.models import User, TimeSlot, Table, Booking
class UserDAO(BaseDAO[User]):
model = User
class TimeSlotUserDAO(BaseDAO[TimeSlot]):
model = TimeSlot
class TableDAO(BaseDAO[Table]):
model = Table
Теперь отдельно рассмотрим дочерний класс модели Booking (таблица с бронями столиков). Там кода будет чуть больше. Сначала покажу полный код, а после некоторые его аспекты разберем более детально.
class BookingDAO(BaseDAO[Booking]):
model = Booking
async def check_available_bookings(self,
table_id: int,
booking_date: date,
time_slot_id: int):
"""Проверяет наличие существующих броней для стола на указанную дату и временной слот."""
try:
query = select(self.model).filter_by(
table_id=table_id,
date=booking_date,
time_slot_id=time_slot_id
)
result = await self._session.execute(query)
# Если результатов нет, стол свободен
if not result.scalars().all():
return True
# Проверяем статус существующих бронирований
for booking in result.scalars().all():
if booking.status == "booked":
return False # Стол занят
# Для других статусов считаем стол свободным
continue
# Если все брони имеют неактивные статусы
return True
except SQLAlchemyError as e:
logger.error(f"Ошибка при проверке доступности брони: {e}")
async def get_available_time_slots(self, table_id: int, booking_date: date):
"""Получает список доступных временных слотов для стола на указанную дату."""
try:
# Получаем все брони для данного стола и даты
bookings_query = select(self.model).filter_by(
table_id=table_id,
date=booking_date
)
bookings_result = await self._session.execute(bookings_query)
# Составляем набор занятых слотов (только с активными бронями)
booked_slots = {booking.time_slot_id for booking in bookings_result.scalars().all() if
booking.status == "booked"}
# Получаем все доступные слоты, исключая занятые
available_slots_query = select(TimeSlot).filter(
~TimeSlot.id.in_(booked_slots)
)
available_slots_result = await self._session.execute(available_slots_query)
return available_slots_result.scalars().all()
except SQLAlchemyError as e:
logger.error(f"Ошибка при получении доступных временных слотов: {e}")
async def get_bookings_with_details(self, user_id: int):
"""
Получает список всех бронирований пользователя с полной информацией о столике и временном слоте.
:param user_id: ID пользователя, брони которого нужно получить.
:return: Список объектов Booking с загруженными данными о столе и времени.
"""
try:
query = select(self.model).options(
joinedload(self.model.table),
joinedload(self.model.time_slot)
).filter_by(user_id=user_id)
result = await self._session.execute(query)
return result.scalars().all()
except SQLAlchemyError as e:
logger.error(f"Ошибка при получении бронирований с деталями: {e}")
return []
async def complete_past_bookings(self):
"""
Обновляет статус бронирований на 'completed', если дата и время бронирования уже прошли.
"""
try:
# Получаем текущее время
now = datetime.now()
subquery = select(TimeSlot.start_time).where(TimeSlot.id == self.model.time_slot_id).scalar_subquery()
query = select(Booking.id).where(
Booking.date < now.date(),
self.model.status == "booked"
).union_all(
select(Booking.id).where(
self.model.date == now.date(),
subquery < now.time(),
self.model.status == "booked"
)
)
# Выполняем запрос и получаем id бронирований, которые нужно обновить
result = await self._session.execute(query)
booking_ids_to_update = result.scalars().all()
if booking_ids_to_update:
# Формируем запрос на обновление статуса бронирований
update_query = update(Booking).where(
Booking.id.in_(booking_ids_to_update)
).values(status="completed")
# Выполняем запрос на обновление
await self._session.execute(update_query)
# Подтверждаем изменения
await self._session.commit()
logger.info(f"Обновлен статус для {len(booking_ids_to_update)} бронирований на 'completed'")
else:
logger.info("Нет бронирований для обновления статуса.")
except SQLAlchemyError as e:
logger.error(f"Ошибка при обновлении статуса бронирований: {e}")
await self._session.rollback()
async def cancel_book(self, book_id: int):
try:
query = (
update(self.model)
.filter_by(id=book_id)
.values(status="canceled")
.execution_options(synchronize_session="fetch")
)
result = await self._session.execute(query)
await self._session.flush()
return result.rowcount
except SQLAlchemyError as e:
logger.error(f"Ошибка при отмене книги с ID {book_id}: {e}")
await self._session.rollback()
raise
async def delete_book(self, book_id: int):
try:
query = delete(self.model).filter_by(id=book_id)
result = await self._session.execute(query)
logger.info(f"Удалено {result.rowcount} записей.")
await self._session.flush()
return result.rowcount
except SQLAlchemyError as e:
logger.error(f"Ошибка при удалении записей: {e}")
raise
async def book_count(self) -> Dict[str, int]:
"""
Подсчитывает количество заявок по каждому статусу (booked, completed, canceled).
"""
try:
status_counts = {}
statuses = ["booked", "completed", "canceled"]
for status in statuses:
query = select(func.count(self.model.id)).where(self.model.status == status)
result = await self._session.execute(query)
count = result.scalar()
status_counts[status] = count
logger.info(f"Найдено {count} заявок со статусом '{status}'.")
total_query = select(func.count(self.model.id))
total_result = await self._session.execute(total_query)
total_count = total_result.scalar()
status_counts['total'] = total_count
logger.info(f"Всего найдено {total_count} заявок.")
return status_counts
except SQLAlchemyError as e:
logger.error(f"Ошибка при подсчете заявок по статусам: {e}")
raise
Описание методов BookingDAO
check_available_bookings(table_id, booking_date, time_slot_id)
Проверяет, свободен ли столик на указанную дату и временной слот.
get_available_time_slots(table_id, booking_date)
Возвращает список доступных временных слотов для бронирования конкретного столика на выбранную дату.
get_bookings_with_details(user_id)
Получает список всех бронирований пользователя с информацией о столике и времени бронирования.
complete_past_bookings()
Обновляет статус всех прошедших бронирований на "completed", если их время уже истекло.
cancel_book(book_id)
Меняет статус бронирования на "canceled".
delete_book(book_id)
Удаляет запись о бронировании из базы данных.
book_count()
Подсчитывает количество бронирований по статусам ("booked", "completed", "canceled") и общее число записей.
Так-же в полном исходнике кода вы можете найти логику по автоматическому заполнению таблиц базы данных информацией из JSON файлов. Сейчас на это время тратить не будем.
Теперь мы уверенно можем сказать, что логика взаимодействия с базой данных у нас полностью закрыта и мы можем переходить к тому из-за чего большинство из вас тут собралось – описание логики бронирования через Aiogram dialog.
Микросервис бронирования на Aiogram Dialog
Сейчас мы готовы к тому, чтобы описать логику бронирования столиков с использованием Aiogram Dialog.
Для начала давайте создадим пакет app/bot. В этом пакете (папка с файлом init.py) мы будем описывать логику, которая будет иметь прямое отношение к нашему телеграм-боту.
Напоминаю, что Aiogram Dialog имеет следующие основные сущности:
Функции обратного вызова
Функции-геттеры
Виджеты
Окна
Сам диалог
И работать это всё может только в рамках состояний (state).
Далее выстраивается следующая логика: мы собираем из виджетов и функций окно, используя принцип вложенного конструктора. После этого мы объединяем их в окно, а окно уже является кирпичиком для создания конкретного диалога.
Создадим в папке app/bot/ папку с именем booking и там уже сосредоточимся на структуре. Для удобства предлагаю использовать следующую структуру файлов:
booking/
- dialog.py: тут мы объединим все окна в один диалог
- state.py: тут мы опишем класс с нашими состояниями
- schemas.py: Pydantic-схемы
- windows.py: функции для создания окон (каждая функция будет включать в себя разный набор виджетов, геттеров и т.д.)
- getters.py: функции-геттеры
- handlers.py: функции обратного вызова (обработчики)
Такая структура позволит не только удобно организовать код, но и даст общее понимание философии Aiogram Dialog на практике.
Сразу опишем класс с состояниями в booking/state.py:
from aiogram.fsm.state import StatesGroup, State
class BookingState(StatesGroup):
count = State()
table = State()
booking_date = State()
booking_time = State()
confirmation = State()
success = State()
Тут используется стандартный синтаксис Aiogram 3.
Затем опишем схемы Pydantic в файле booking/schemas.py:
from datetime import date
from pydantic import BaseModel
class SCapacity(BaseModel):
capacity: int
class SNewBooking(BaseModel):
user_id: int
table_id: int
time_slot_id: int
date: date
status: str
Они нам понадобятся для взаимодействия с базовыми методами из BaseDAO.
Теперь опишем геттеры. Это специальные функции, которые нужны для отображения информации в окнах нашего диалога. Опишем их в файле booking/getters.py:
from aiogram_dialog import DialogManager
async def get_all_tables(dialog_manager: DialogManager, **kwargs):
"""Получение списка столов с учетом выбранной вместимости."""
tables = dialog_manager.dialog_data['tables']
capacity = dialog_manager.dialog_data['capacity']
return {"tables": [table.to_dict() for table in tables],
"text_table": f'Всего для {capacity} человек найдено {len(tables)} столов. Выберите нужный по описанию'}
async def get_all_available_slots(dialog_manager: DialogManager, **kwargs):
"""Получение списка доступных временных слотов для выбранного стола и даты."""
selected_table = dialog_manager.dialog_data["selected_table"]
slots = dialog_manager.dialog_data["slots"]
text_slots = (
f'Для стола №{selected_table.id} найдено {len(slots)} '
f'{"свободных слотов" if len(slots) != 1 else "свободный слот"}. '
'Выберите удобное время'
)
return {"slots": [slot.to_dict() for slot in slots], "text_slots": text_slots}
async def get_confirmed_data(dialog_manager: DialogManager, **kwargs):
"""Получение данных для подтверждения бронирования."""
selected_table = dialog_manager.dialog_data['selected_table']
booking_date = dialog_manager.dialog_data['booking_date']
selected_slot = dialog_manager.dialog_data['selected_slot']
confirmed_text = (
"<b>📅 Подтверждение бронирования</b>\n\n"
f"<b>📆 Дата:</b> {booking_date}\n\n"
f"<b>🍴 Информация о столике:</b>\n"
f" - 📝 Описание: {selected_table.description}\n"
f" - 👥 Кол-во мест: {selected_table.capacity}\n"
f" - 📍 Номер столика: {selected_table.id}\n\n"
f"<b>⏰ Время бронирования:</b>\n"
f" - С <i>{selected_slot.start_time}</i> до <i>{selected_slot.end_time}</i>\n\n"
"✅ Все ли верно?"
)
return {"confirmed_text": confirmed_text}
Обязательное условие: геттеры должны возвращать питоновский словарь. Также обратите внимание на формат передачи аргументов в функции-геттеры.
Обязательное условие тут — это передача **kwargs. Кроме того, есть некоторые аргументы, которые можно автоматически извлечь из контекста. Например, таким аргументом является dialog_manager.
Dialog manager – это сущность, в которой хранятся ответы пользователя в рамках диалога, и на примере выше вы видите, как работать с этими переменными.
Когда мы передаем геттер в окно Aiogram Dialog – он будет вызываться автоматически. Чуть дальше посмотрим на это на практике, и всё станет понятно.
Теперь мы переходим к двум самым сложным и, возможно, неочевидным логикам, а именно, описанию функций обратного вызова (хендлеров) и формированию окон. Об этом мы поговорим далее детально.
Функции обратного вызова (хендлеры) в Aiogram Dialog
Функции обратного вызова вызываются во время перехода от одного окна к другому. Вот простой пример:
Бот спрашивает у пользователя имя. Пользователь ему отвечает. И вот в момент ответа запускается функция обратного вызова. Её смысл в том, чтобы обработать ответ и после либо остановить сценарий, если это был последний вопрос в диалоге, либо переместить пользователя в следующее состояние, например, задав ему вопрос о возрасте.
Логику будем описывать в файле booking/handlers.py. Начнем с импортов:
from datetime import date
from aiogram.types import CallbackQuery
from aiogram_dialog import DialogManager
from aiogram_dialog.widgets.kbd import Button
from app.bot.booking.schemas import SCapacity, SNevBooking
from app.bot.user.kbs import main_user_kb
from app.dao.dao import BookingDAO, TimeSlotUserDAO, TableDAO
У нас не готова только пользовательская клавиатура. Её путь вы видите. Напишем её чуть позже. Остальное мы всё подготовили.
Начнем с описания универсальной функции для отмены сценария:
async def cancel_logic(callback: CallbackQuery, button: Button, dialog_manager: DialogManager):
await callback.answer("Сценарий бронирования отменен!")
await callback.message.answer("Вы отменили сценарий бронирования.",
reply_markup=main_user_kb(callback.from_user.id))
Первое, на что стоит обратить внимание — мы тут явно не вызывали метод для остановки диалога: await dialog_manager.done(), и это не случайно.
Дело в том, что в Aiogram Dialog существует специальный виджет, который выступает в роли синтаксического сахара — виджет Cancel(). При клике на такой виджет сценарий автоматически останавливается. Примечательно то, что функцию обратного вызова под него можно было бы и не писать, но я решил добавить некую кастомизацию, чтобы выводить сообщения о том, что сценарий остановлен, с соответствующей клавиатурой.
Далее вы видите специфический синтаксис в объявлении аргументов функции, несвойственный для классического Aiogram. В частности, в обработчиках мы всегда должны передавать виджет в качестве аргумента, даже если он не используется явно в самом обработчике.
Кроме того, обязательно нужно передавать и dialog_manager. Такая особенность типизации для функций обратного вызова в Aiogram Dialog.
Теперь напишем первый обработчик для выбора количества гостей:
async def process_add_count_capacity(callback: CallbackQuery, button: Button, dialog_manager: DialogManager):
"""Обработчик выбора количества гостей."""
session = dialog_manager.middleware_data.get("session_without_commit")
selected_capacity = int(button.widget_id)
dialog_manager.dialog_data["capacity"] = selected_capacity
dialog_manager.dialog_data['tables'] = await TableDAO(session).find_all(SCapacity(capacity=selected_capacity))
await callback.answer(f"Выбрано {selected_capacity} гостей")
await dialog_manager.next()
Тут вы можете заметить интересную особенность по работе с мидлварями. Мы пока их не подключили (сделаем это немного позже), но на этом примере можно наглядно увидеть, что в отличие от классического Aiogram мы не можем напрямую пробросить мидлвари в качестве аргумента функции, а должны извлекать их из dialog_manager.middleware_data. К этому просто нужно привыкнуть.
Кроме того, тут нужно понять, в какой момент должна вызваться эта функция. А вызывается она в момент ответа на первый вопрос бота. Сам вопрос мы будем задавать через стартовое окно диалога, которое опишем чуть позже.
То есть, мы сейчас как бы сразу начали с ответа на первый вопрос, не задав его.
Для перемещения к следующему состоянию я использовал:
await dialog_manager.next()
Для более явного перемещения в новое состояние можно было бы использовать:
await dialog_manager.switch_to(SOME_STATE)
Я выбрал такой вариант просто из-за его лаконичности.
По логике взаимодействия с базой данных, думаю, всё понятно.
Мы генерируем нужную сессию. После используем её для взаимодействия с базой данных. На конкретном примере мы воспользовались универсальным методом из BaseDAO для извлечения всех строк из таблицы со столами.
В итоге мы вызвали функцию, обработали ответ от пользователя и поместили его в словарь dialog_data. Затем на основании введенных данных от пользователя (кол-во мест) мы выполнили запрос к базе данных. Извлекли все столы с подходящим параметром и тоже сохранили их в словаре диалога.
После ответили на колбэк и переместили пользователя в новое состояние (новое окно). Примечательно то, что явно мы никаких новых вопросов не задали. Понимаю, что для разработчиков, которые много писали ботов на Aiogram 3, может быть этот подход не очевиден. К этому просто нужно привыкнуть, оно того стоит.
Пишем обработчик выбора конкретного стола:
async def on_table_selected(callback: CallbackQuery, widget, dialog_manager: DialogManager, item_id: str):
"""Обработчик выбора стола."""
session = dialog_manager.middleware_data.get("session_without_commit")
table_id = int(item_id)
selected_table = await TableDAO(session).find_one_or_none_by_id(table_id)
dialog_manager.dialog_data["selected_table"] = selected_table
await callback.answer(f"Выбран стол №{table_id} на {selected_table.capacity} мест")
await dialog_manager.next()
Тут логика похожа на предыдущую. Единственное, что в качестве виджета я просто пробросил переменную widget. Это вариант, когда не хочется подбирать конкретный виджет.
Следующий обработчик, который мы напишем — это обработчик выбранной даты (дня брони):
async def process_date_selected(callback: CallbackQuery, widget, dialog_manager: DialogManager, selected_date: date):
"""Обработчик выбора даты."""
dialog_manager.dialog_data["booking_date"] = selected_date
session = dialog_manager.middleware_data.get("session_without_commit")
selected_table = dialog_manager.dialog_data["selected_table"]
slots = await BookingDAO(session).get_available_time_slots(table_id=selected_table.id, booking_date=selected_date)
if slots:
await callback.answer(f"Выбрана дата: {selected_date}")
dialog_manager.dialog_data["slots"] = slots
await dialog_manager.next()
else:
await callback.answer(f"Нет мест на {selected_date} для стола №{selected_table.id}!")
await dialog_manager.back()
Тут мы будем использовать виджет календарь, и у него есть специфический параметр selected_date, который необходимо обязательно указать.
В остальном логика должна быть понятна. Обрабатываем дату брони, проверяем, свободна ли она, и в зависимости от этого перемещаем пользователя либо к следующему окну, либо к предыдущему (окно выбора другой даты).
Далее обработчик выбора слота (временного отрезка брони):
async def process_slots_selected(callback: CallbackQuery, widget, dialog_manager: DialogManager, item_id: str):
"""Обработчик выбора слота."""
session = dialog_manager.middleware_data.get("session_without_commit")
slot_id = int(item_id)
selected_slot = await TimeSlotUserDAO(session).find_one_or_none_by_id(slot_id)
await callback.answer(f"Выбрано время с {selected_slot.start_time} до {selected_slot.end_time}")
dialog_manager.dialog_data['selected_slot'] = selected_slot
await dialog_manager.next()
И после реализуем последний обработчик, в рамках которого осуществим проверку и сохранение данных в базу данных:
async def on_confirmation(callback: CallbackQuery, widget, dialog_manager: DialogManager, **kwargs):
"""Обработчик подтверждения бронирования."""
session = dialog_manager.middleware_data.get("session_with_commit")
# Получаем выбранные данные
selected_table = dialog_manager.dialog_data['selected_table']
selected_slot = dialog_manager.dialog_data['selected_slot']
booking_date = dialog_manager.dialog_data['booking_date']
user_id = callback.from_user.id
check = await BookingDAO(session).check_available_bookings(table_id=selected_table.id,
time_slot_id=selected_slot.id,
booking_date=booking_date)
if check:
await callback.answer("Приступаю к сохранению")
add_model = SNevBooking(user_id=user_id, table_id=selected_table.id,
time_slot_id=selected_slot.id, date=booking_date, status="booked")
await BookingDAO(session).add(add_model)
await callback.answer(f"Бронирование успешно создано!")
text = "Бронь успешно сохранена🔢🍴 Со списком своих броней можно ознакомиться в меню 'МОИ БРОНИ'"
await callback.message.answer(text, reply_markup=main_user_kb(user_id))
admin_text = (f"Внимание! Пользователь с ID {callback.from_user.id} забронировал столик №{selected_table.id} "
f"на {booking_date}. Время брони с {selected_slot.start_time} до {selected_slot.end_time}")
await dialog_manager.done()
else:
await callback.answer("Места на этот слот уже заняты!")
await dialog_manager.back()
По логике должно быть всё понятно. Единственное, что для завершения диалога тут мы используем не привычный state.clear(), а его аналог dialog_manager.done().
Мы ещё вернемся к обработчикам, когда будем внедрять связку с брокером RabbitMQ.
Реализуем окна диалога
Теперь мы приступаем к самому интересному, к тому что может сломать мозг Python-разработчику и сейчас я сразу объясню почему.
Во фреймворках JavaScript, таких как React или VueJS, принято использовать компонентный подход. Это когда мы, например, сначала описываем конкретную кнопку, после описываем определенную группу в рамках которой будет существовать эта кнопка, затем мы описываем определенные формы, которые будут состоять из таких вот групп кнопок. Затем, наши формы тоже могут быть объденены в группу форм и прочее.
Такая себя матрешка из мира IT. Так вот, философия Aiogram dialog основана именно на таком подходе, что, в целом, не особо свойственно для Python.
На данный момент мы пока этого не ощутили, так как опсиание класса состояний, описание геттеров и описание хендлеров имеют классический для Python вид, а вот с окнами дела обстоят иначе, в чем вы сейчас убедитесь.
Окна мы опишем в функции booking/windows.py и начнем мы с импортов.
from datetime import date, timedelta, timezone
from aiogram_dialog import Window
from aiogram_dialog.widgets.kbd import Button, Group, ScrollingGroup, Select, Calendar, CalendarConfig, Back, Cancel
from aiogram_dialog.widgets.text import Const, Format
from app.bot.booking.getters import get_all_tables, get_all_available_slots, get_confirmed_data
from app.bot.booking.handlers import (process_add_count_capacity, on_table_selected,
process_date_selected, process_slots_selected, on_confirmation, cancel_logic)
from app.bot.booking.state import BookingState
Тут вы сразу можете увидеть, что для реализации окон мы импортируем сами окна и затем ряд текстовых и кнопочных виджетов. По названию виджетов можно догадаться за что будет отвечать каждый.
Текстовый виджеты Const и Format нужны для отображения текста. Тексты это сообщения, которые будет получать пользователь в рамках конкретного диалога. Отличие Format и Const в том, что Const принимает статический вопрос по типу:
«Как тебя зовут, дружище?»
А Format позволяет в такую строку пробросить переменную:
«Василий Пупкин, сколько тебе лет?»
И Format и Const могут использовать и для формирования сообщений пользователю и для формировании надписей на кнопках.
Теперь пройдемся по кнопочным виджетам:
Button: базовый виджет для создания обычных кнопок с текстом и обработчиком
Group: позволяет объединять кнопки в группы и формировать структуру клавиатуры
ScrollingGroup: группа с возможностью пагинации для больших списков
Select: виджет для выбора из списка элементов
Calendar: интерактивный календарь для выбора даты
CalendarConfig: настройки для календаря (временная зона, первый день недели и т.д.)
Back: специальная кнопка для возврата на предыдущий шаг диалога
Cancel: кнопка для отмены и выхода из диалога
Окно (Window) принимает следующие основные параметры:
Текстовый виджет - содержимое сообщения (Const или Format)
Виджеты клавиатуры - кнопки, группы, календари и т.д.
state - состояние, в котором будет находиться диалог при показе окна
getter (опционально) - функция для получения данных, которые будут использоваться в окне
Наш диалог бронирования будет состоять из пяти последовательных окон:
Выбор количества гостей
Выбор стола
Выбор даты бронирования
Выбор временного слота
Подтверждение бронирования
Давайте опишем каждое из них по порядку.
Теперь приступим к описанию функций по генерации окон. Начнем с функции, которая сгенерирует окно с первым вопросом по поводу необходимого количества мест:
def get_capacity_window() -> Window:
"""Окно выбора количества гостей."""
return Window(
Const("Выберите кол-во гостей:"),
Group(
*[Button(
text=Const(str(i)),
id=str(i),
on_click=process_add_count_capacity
) for i in range(1, 7)],
Cancel(Const("Отмена"), on_click=cancel_logic),
width=2
),
state=BookingState.count
)
Вот тут и начинается веселье. Обратите внимание на то каким образом мы выстраиваем наше окно.
Тут мы видим вопрос «Выберите кол-во гостей» и после него мы создаем группу из кнопок.
Группа нужна для объединения кнопок в одну группу. Технически данный виджет позволяет удобно формировать инлайн клавиатуру. Кнопок внутри группы может быть сколько угодно и для удобства я тут прогнал часть кнопок в цикле, чтоб не описывать каждую отдельно. Кроме того, дополнительно, я передал кнопку отмена.
То есть, мы создали группу и в нее поместили 7 кнопок.
У обычных кнопок (виджет Button) есть ряд обязательных параметров:
текст: надпись на кнопке
айди кнопки
функция, которая должна вызываться при клике на кнопку.
Кнопка Cancel() может вообще не принимать никаких дополнительных параметров.
Если вы их не передадите, но в надпись подставится «Сancel" и при клике на нее просто остановится сценарий без доп сообщений. Напоминаю, что мы описали отдельный обработчик, который будет срабатывать после клика на эту кнопку.
У группы, кстати, есть обязательный параметр ширины ряда. Width = 2 означает что кнопки выстроятся в 2 колонки.
Окно в качестве обязательного парметра должен принимать текст, который получит пользователь и состояние.
В целом, если вы поймете структуру и описание этого окна, то трудностей в освоении следующих виджетов у вас не будет. Логика похожая везде будет.
Само окно нам ещё предстоит попасть. То есть далее мы опишем логику запуска диалога, но пока суть сводится к следующему.
Пользователь увидит текст «Выберите кол-во гостей» и инлайн клавиутуру с кнопками мест от 1 до 6 и кнопку «Отмена», которая будет завершать сценарий диалога при клике.
Обязательным параметром в окне Aiogram dialog выступает state. В него мы передаем необходимое состояние в котором окажется пользователь при вызове окна. Как вы видите, геттеры мы тут не испольовали.

Опишем следующее окно:
def get_table_window() -> Window:
"""Окно выбора стола."""
return Window(
Format("{text_table}"),
ScrollingGroup(
Select(
Format("Стол №{item[id]} - {item[description]}"),
id="table_select",
item_id_getter=lambda item: str(item["id"]),
items="tables",
on_click=on_table_selected,
),
id="tables_scrolling",
width=1,
height=1,
),
Group(
Back(Const("Назад")),
Cancel(Const("Отмена"), on_click=cancel_logic),
width=2
),
getter=get_all_tables,
state=BookingState.table,
)
Как вы поняли по комментарию, данное окно отвечает за выбор стола.
В это окно я включил ряд дополнительных элементов и виджетов. В частности добавлен:
Геттер для получения списка столов
Format для динамической передачи текста в сообщение пользователю
ScrolingGroup – виджет для автоматической пагинации
Кнопка Back, которая автоматически перебрасывает на страницу назад.
Как вы видите, принцып строения окна не особо отличается от первого. Отличия только в том, что мы начали использовать дополнительные виджеты, но все в той-же парадигме вложенностей.
Select - это специальный виджет для выбора из списка элементов. Он принимает:
Format или Const для отображения текста каждого элемента
id - уникальный идентификатор виджета
item_id_getter - функция для получения ID каждого элемента
items - имя ключа в данных, полученных из getter
on_click - обработчик выбора элемента
Геттер get_all_tables позволил нам получить данные на основании которых мы выстроили и кнопки в инайн клавиатуре сообщения и подготоили динамический текст для сообщения.

Следующее окно – это окно выбора дня брони:
def get_date_window() -> Window:
"""Окно выбора даты."""
return Window(
Const("На какой день бронируем столик?"),
Calendar(
id="cal",
on_click=process_date_selected,
config=CalendarConfig(
firstweekday=0,
timezone=timezone(timedelta(hours=3)),
min_date=date.today()
)
),
Back(Const("Назад")),
Cancel(Const("Отмена"), on_click=cancel_logic),
state=BookingState.booking_date,
)
Из нового – использование виджета календарь.
К виджету календаря, так-же, можно подключить настройки, используя CalendarConfig. Благодаря настройкам я установил московскую временную зону, указал что неделя у нас начинается с понедельника и установил минимальную дату брони. То есть, на вчера стол явно нельзя забронировать.
Есть один момент. По умолчанию дни недели и названия месяцев в календаре описываются на английском языке. Для того, чтоб это исправить на уровне проекта необходимо поставить локализацию русскую. Это мы сделаем далее в main файле прилоежения.

Следующее окно – это окно выбора временного слота. Вот как его можно оформить:
def get_slots_window() -> Window:
"""Окно выбора слота."""
return Window(
Format("{text_slots}"),
ScrollingGroup(
Select(
Format("{item[start_time]} до {item[end_time]}"),
id="slotes_select",
item_id_getter=lambda item: str(item["id"]),
items="slots",
on_click=process_slots_selected,
),
id="slotes_scrolling",
width=2,
height=3,
),
Back(Const("Назад")),
Cancel(Const("Отмена"), on_click=cancel_logic),
getter=get_all_available_slots,
state=BookingState.booking_time,
)
Тут мы снова использовали виджет Select на основании данных с геттера.

И последнее окно будет окном подтверждения. Реализовать можно так:
def get_confirmed_windows():
return Window(
Format("{confirmed_text}"),
Group(
Button(Const("Все верно"), id="confirm", on_click=on_confirmation),
Back(Const("Назад")),
Cancel(Const("Отмена"), on_click=cancel_logic),
),
state=BookingState.confirmation,
getter=get_confirmed_data
)

Новых элементов тут я не использовал.
В целом по окнам все и в следующих главах статья я покажу вам как окна объеденить в диалог, а затем расскажу как диалог включить в бота.
Объединяем окна в диалог
Напоминаю, что окна – это только кирпичики, из которых строится основной диалог. Теперь нам предстоит эти кирпичики превратить в полноценную законченную сущность – диалог.
Сделаем мы это в файле /app/booking/dialog.py. Код получился следующим:
from aiogram_dialog import Dialog
from app.bot.booking.windows import (get_capacity_window, get_table_window, get_date_window,
get_slots_window, get_confirmed_windows)
booking_dialog = Dialog(
get_capacity_window(),
get_table_window(),
get_date_window(),
get_slots_window(),
get_confirmed_windows()
)
Все достаточно просто. Мы импортируем Dialog из aiogram_dialog и импортируем все наши окна. Затем создаем объект класса Dialog, передавая в него все наши окна в том порядке, в котором они должны следовать в процессе бронирования.
Важно отметить, что порядок окон в списке имеет значение – он определяет последовательность переходов по кнопкам "Назад" и "Вперед" (если они используются). Первым указывается окно, которое будет показано пользователю при запуске диалога.
Объект Dialog автоматически управляет переходами между состояниями, основываясь на действиях пользователя и состояниях, указанных в каждом окне. Например, когда пользователь нажимает кнопку "Назад", диалог автоматически показывает предыдущее окно в последовательности.
Далее, для того чтобы все заработало, нам достаточно будет зарегистрировать этот объект как обычный роутер в Aiogram. Это мы сделаем в главном файле приложения (файл main.py).
Настраиваем бота
Напоминаю, что Aiogram dialog – это только надстройка над Aiogram. Следовательно, для того чтобы наш бот вообще запустился – нам необходимо будет написать основу бота. Далее, когда она будет готова – нам достаточно будет включить созданный диалог как отдельный роутер.
Основную логику бота мы продолжим описывать в папке app/bot. Напоминаю, что там, в папке booking, мы описывали наш диалог.
Давайте создадим файл app/bot/create_bot.py. В него мы опишем функции и логику, которая позволит инициализировать нашего бота.
Начнем с импортов:
import locale
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import BotCommand, BotCommandScopeDefault
from aiogram_dialog import setup_dialogs
from loguru import logger
from app.bot.booking.dialog import booking_dialog
from app.bot.user.router import router as user_router
from app.bot.admin.router import router as admin_router
from app.config import settings
from app.dao.database_middleware import DatabaseMiddlewareWithoutCommit, DatabaseMiddlewareWithCommit
from app.dao.init_logic import init_db
Здесь вы сразу можете заметить импорт некоторых мидлварей. В частности, DatabaseMiddlewareWithoutCommit и DatabaseMiddlewareWithCommit.
Эти мидлвари нужны будут для удобного управления сессией для взаимодействия с базой данных. Импорт идет из файла app/dao/database_midlware.py и выглядит он следующим образом:
from typing import Callable, Dict, Any, Awaitable
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery
from app.dao.database import async_session_maker
class BaseDatabaseMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[Message | CallbackQuery, Dict[str, Any]], Awaitable[Any]],
event: Message | CallbackQuery,
data: Dict[str, Any]
) -> Any:
async with async_session_maker() as session:
self.set_session(data, session)
try:
result = await handler(event, data)
await self.after_handler(session)
return result
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
def set_session(self, data: Dict[str, Any], session) -> None:
"""Метод для установки сессии в словарь данных."""
raise NotImplementedError("Этот метод должен быть реализован в подклассах.")
async def after_handler(self, session) -> None:
"""Метод для выполнения действий после вызова хендлера (например, коммит)."""
pass
class DatabaseMiddlewareWithoutCommit(BaseDatabaseMiddleware):
def set_session(self, data: Dict[str, Any], session) -> None:
data['session_without_commit'] = session
class DatabaseMiddlewareWithCommit(BaseDatabaseMiddleware):
def set_session(self, data: Dict[str, Any], session) -> None:
data['session_with_commit'] = session
async def after_handler(self, session) -> None:
await session.commit()
Если вы читали мои прошлые статьи, то пояснений этот код не требует. Иначе, посмотрите, например, статью "Как сделать оплату в Telegram боте-магазине на вебхуках". В ней я подробно рассмотрел эти мидлвари.
Вернемся к файлу create_bot.py.
Выполним инициализацию бота:
bot = Bot(token=settings.BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
Настроим диспетчера:
dp = Dispatcher(storage=MemoryStorage())
Обратите внимание: диалог с пользователем должен где-то храниться (его ответы на вопросы). Для этого в Aiogram предусмотрены разные хранилища. Я, для учебных целей, использовал MemoryStorage(). Это самое быстрое хранилище, но в то же время, самое ненадежное. Дело в том, что любая перезагрузка бота приведет к тому, что бот «забудет» все диалоги с пользователями.
Для боевых проектов я обычно использую хранилище Redis, и в своем телеграм-канале «Легкий путь в Python» отдельным постом я покажу, как подключить хранилище Redis к данному боту.
В результате использования хранилища Redis, в случае сбоев, перезагрузок бота, перезагрузок сервера и прочего – бот не забудет диалогов с пользователями и сможет продолжить их с места, где они завершились.
Функция для генерации командного меню:
async def set_commands():
commands = [BotCommand(command='start', description='Старт')]
await bot.set_my_commands(commands, BotCommandScopeDefault())
Далее отдельно опишем универсальную функцию для смены локализации бота:
def set_russian_locale():
try:
# Пробуем установить локаль для Windows
locale.setlocale(locale.LC_TIME, 'Russian_Russia.1251')
except locale.Error:
try:
# Пробуем установить локаль для Linux/macOS
locale.setlocale(locale.LC_TIME, 'ru_RU.utf8')
except locale.Error:
# Игнорируем ошибку, если локаль не поддерживается
pass
Дело в том, что на разных операционных системах может отличаться локализация. Кроме того, у вас может быть необходимость использовать другой язык.
Локализацию мы будем менять для корректного отображения данных из виджета календаря в Aiogram Dialog.
Теперь напишем очень важную функцию, которая будет выполняться при запуске бота:
async def start_bot():
set_russian_locale()
if settings.INIT_DB:
await init_db()
setup_dialogs(dp)
dp.update.middleware.register(DatabaseMiddlewareWithoutCommit())
dp.update.middleware.register(DatabaseMiddlewareWithCommit())
await set_commands()
dp.include_router(booking_dialog)
dp.include_router(user_router)
dp.include_router(admin_router)
for admin_id in settings.ADMIN_IDS:
try:
await bot.send_message(admin_id, f'Я запущен🥳.')
except:
pass
logger.info("Бот успешно запущен.")
Первое, что делает функция – это устанавливает русскую локализацию (в моем случае).
Далее, если переменная INIT_DB равняется True, то происходит первичное заполнение базы данных данными, и затем начинается блок установки мидлварей.
Классическая запись:
dp.update.middleware.register(DatabaseMiddlewareWithoutCommit())
dp.update.middleware.register(DatabaseMiddlewareWithCommit())
Тут должно быть все понятно. А вот запись для настройки нашего Aiogram Dialog:
setup_dialogs(dp)
Это очень важная строка! Если вы ее не укажете, то возможности использовать aiogram dialog в проекте у вас не будет. Функция setup_dialogs регистрирует все необходимые мидлвари и обработчики для работы диалогов.
Далее мы устанавливаем команды бота, регистрируем роутеры бота (наш диалог, а также роутеры для пользователя и администратора, которые будут описаны позже) и уведомляем админов о том, что бот запущен.
И опишем функцию, которая будет вызываться при завершении работы бота:
async def stop_bot():
try:
for admin_id in settings.ADMIN_IDS:
await bot.send_message(admin_id, 'Бот остановлен. За что?😔')
except:
pass
logger.error("Бот остановлен!")
Тут реализована простая отправка уведомлений администраторам о том, что бот остановлен.
Пользовательский блок в боте
В нашем боте, кроме диалогового взаимодействия, предусмотрено два дополнительных блока: пользовательская и админская части. Начнем с реализации пользовательской логики. Для ее описания создайте папку app/bot/user.
Там необходимо подготовить следующие файлы:
kbs.py: пользовательские клавиатуры бота
schemas.py: Pydantic-модель пользователя
router.py: основной пользовательский роутер
Клавиатуры (kbs.py)
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from app.config import settings
def main_user_kb(user_id: int) -> InlineKeyboardMarkup:
kb = InlineKeyboardBuilder()
kb.add(InlineKeyboardButton(text="🍽️ Забронировать столик", callback_data="book_table"))
kb.add(InlineKeyboardButton(text="📅 Мои брони", callback_data="my_bookings"))
kb.add(InlineKeyboardButton(text="ℹ️ О нас", callback_data="about_us"))
if user_id in settings.ADMIN_IDS:
kb.add(InlineKeyboardButton(text="🔐 Админ-панель", callback_data="admin_panel"))
kb.adjust(1)
return kb.as_markup()
def user_booking_kb(user_id: int, book: bool = False) -> InlineKeyboardMarkup:
kb = InlineKeyboardBuilder()
if book:
kb.add(InlineKeyboardButton(text="🎫 Мои брони", callback_data="my_booking_all"))
kb.add(InlineKeyboardButton(text="🍽️ Забронировать столик", callback_data="book_table"))
kb.add(InlineKeyboardButton(text="🏠 На главную", callback_data="back_home"))
if user_id in settings.ADMIN_IDS:
kb.add(InlineKeyboardButton(text="🔐 Админ-панель", callback_data="admin_panel"))
kb.adjust(1)
return kb.as_markup()
def cancel_book_kb(book_id: int, cancel: bool = False, home_page: bool = False) -> InlineKeyboardMarkup:
kb = InlineKeyboardBuilder()
if cancel:
kb.add(InlineKeyboardButton(text="Отменить бронь", callback_data=f"cancel_book_{book_id}"))
kb.add(InlineKeyboardButton(text="Удалить запись", callback_data=f"dell_book_{book_id}"))
if home_page:
kb.add(InlineKeyboardButton(text="🏠 На главную", callback_data="back_home"))
kb.adjust(1)
return kb.as_markup()
Тут использован классический синтаксис Aiogrram 3. Подробнее о построении инайн клавиатур в AIogram 3 я писал в этой статье.
Схемы (schemas.py)
from pydantic import BaseModel
class SUser(BaseModel):
id: int
username: str | None
first_name: str | None
last_name: str | None
Эта модель понадобится для добавления пользователей в базу данных.
Приступаем к файлу router.py
Импорты:
from aiogram import F
from aiogram.filters import CommandStart
from aiogram.types import Message, CallbackQuery
from aiogram.dispatcher.router import Router
from aiogram_dialog import DialogManager, StartMode
from pydantic import create_model
from sqlalchemy.ext.asyncio import AsyncSession
from app.bot.booking.state import BookingState
from app.bot.user.kbs import main_user_kb, user_booking_kb, cancel_book_kb
from app.bot.user.schemas import SUser
from app.dao.dao import UserDAO, BookingDAO
У нас все готово для импортов. Единственное, обратите внимание на create_model из pydantic, который позволяет динамически создавать модели.
Инициализируем роуетер:
router = Router()
Обработчик команды /start
@router.message(CommandStart())
async def cmd_start(message: Message, session_with_commit: AsyncSession, state: FSMContext):
await state.clear()
user_data = message.from_user
user_id = user_data.id
user_info = await UserDAO(session_with_commit).find_one_or_none_by_id(user_id)
if user_info is None:
user_schema = SUser(id=user_id, first_name=user_data.first_name,
last_name=user_data.last_name, username=user_data.username)
await UserDAO(session_with_commit).add(user_schema)
text = ("👋 Добро пожаловать в Binary Bites! 🍽️\n\nЗдесь каждый байт вкуса закодирован в удовольствие. 😋💻\n"
"Используйте клавиатуру ниже, чтобы зарезервировать свой столик и избежать переполнения буфера! 🔢🍴")
await message.answer(text, reply_markup=main_user_kb(user_id))
Тут идет проверка на наличие пользователя в базе данных. Если его там нет, то мы регистрируем его. Затем отправляем приветственное сообщение.
Тут как раз нам пригодилась заранее подготовленная модель Pydanctic и пользовательская инлайн клавиатура.
Обратите внимание на то, как мы установили, а затем вызвали мидлварь.
Хендлер «О нас»:
@router.callback_query(F.data == "about_us")
async def cmd_about(call: CallbackQuery):
await call.answer("О нас")
about_text = (
"🖥️ О Binary Bites 🍔\n\n"
"Мы - первый ресторан, где кулинария встречается с кодом! 👨💻👩💻\n\n"
"🍽️ Наше меню - это настоящий алгоритм вкуса:\n\n"
"• Закуски начинаются с 'Hello World' салата 🥗\n"
"• Основные блюда включают 'Full Stack' бургер 🍔\n"
"• Не забудьте про наш фирменный 'Python' кофе ☕\n\n"
"🏆 Наша миссия - оптимизировать ваше гастрономическое удовольствие!\n\n"
"📍 Мы находимся по адресу: ул. Программная, д. 404\n"
"🕒 Работаем 24/7, потому что настоящие разработчики не спят😉\n\n"
"Приходите к нам, чтобы отладить свой аппетит! 🍽️💻"
)
await call.message.edit_text(about_text, reply_markup=main_user_kb(call.from_user.id))
Далее начинается простая логика описания классического взаимодействия со своими бронями. Сейчас не хочу много времени тратить на этом, а просто приведу код с минимальными комментариями:
@router.callback_query(F.data == "my_bookings")
async def show_my_bookings(call: CallbackQuery, session_without_commit: AsyncSession):
await call.answer("Мои брони")
user_filter = create_model('UserIDModel', user_id=(int, ...))(user_id=call.from_user.id)
my_bookings = await BookingDAO(session_without_commit).find_all(user_filter)
count_booking = len(my_bookings)
if count_booking:
book = True
text = (f"🎉 Отлично! У вас {count_booking} забронированных столика(ов). \n\n"
f"Чтобы просмотреть детали брони и, при необходимости, отменить бронь, воспользуйтесь кнопками ниже. 👇")
else:
book = False
text = ("🤔 Кажется, у вас пока нет активных бронирований. \n\n"
f"Не проблема! Вы можете забронировать столик прямо сейчас, нажав на кнопку ниже. 😉👇")
await call.message.edit_text(text, reply_markup=user_booking_kb(call.from_user.id, book))
@router.callback_query(F.data == "my_booking_all")
async def show_all_my_bookings(call: CallbackQuery, session_without_commit: AsyncSession):
await call.answer("Все мои брони")
user_bookings = await BookingDAO(session_without_commit).get_bookings_with_details(call.from_user.id)
if not user_bookings:
await call.message.edit_text("😔 У вас пока нет активных бронирований.", reply_markup=None)
return
for i, book in enumerate(user_bookings):
# Форматируем дату и время для удобства чтения
booking_date = book.date.strftime("%d.%m.%Y") # День.Месяц.Год
start_time = book.time_slot.start_time
end_time = book.time_slot.end_time
booking_number = i + 1
status = book.status
cancel = False
home_page = False
if status == "booked":
cancel = True
status_text = "Забронирован"
elif status == "canceled":
status_text = "Отменен"
else:
status_text = "Завершен"
message_text = (f"<b>Бронь №{booking_number}:</b>\n\n"
f"📅 <b>Дата:</b> {booking_date}\n"
f"🕒 <b>Время:</b> {start_time} - {end_time}\n"
f"🪑 <b>Столик:</b> №{book.table.id}, Вместимость: {book.table.capacity}\n"
f"ℹ️ <b>Описание:</b> {book.table.description}\n"
f"📌 <b>Статус:</b> {status_text}\n\n")
if booking_number == len(user_bookings):
home_page = True
await call.message.answer(message_text, reply_markup=cancel_book_kb(book.id, cancel, home_page))
@router.callback_query(F.data.startswith("cancel_book_"))
async def cancel_booking(call: CallbackQuery, session_with_commit: AsyncSession):
book_id = int(call.data.split("_")[-1])
booking_dao = BookingDAO(session_with_commit)
await booking_dao.cancel_book(book_id)
await call.answer("Бронь отменена!", show_alert=True)
await call.message.edit_reply_markup(reply_markup=cancel_book_kb(book_id))
@router.callback_query(F.data.startswith("dell_book_"))
async def delete_booking(call: CallbackQuery, session_with_commit: AsyncSession):
book_id = int(call.data.split("_")[-1])
await BookingDAO(session_with_commit).delete_book(book_id)
await call.answer("Запись о брони удалена!", show_alert=True)
await call.message.delete()
К этому коду мы ещё вернемся, так как некоторые хендлеры будут требовать взаимодействия с брокером сообщений.
Роутер, который будет вызываться при клике на кнопку "Главное меню":
@router.callback_query(F.data == "back_home")
async def start_dialog(call: CallbackQuery):
await call.answer("Главное меню")
await call.message.edit_text("Выберите необходимое действие", reply_markup=main_user_kb(call.from_user.id))
Хочу подробнее остановиться на функции, запуска сценария диалога:
@router.callback_query(F.data == "book_table")
async def start_dialog(call: CallbackQuery, dialog_manager: DialogManager):
await call.answer("Бронирование столика")
await dialog_manager.start(state=BookingState.count, mode=StartMode.RESET_STACK)
Эти пару строчек кода чрезвычайно важны, так как они связывают Aiogram с Aiogram Dialog.
Так как мы зарегистрировали наш диалог, у нас появился неявный мидлавь - dialog_manager. Для запуска диалога нам достаточно только вызвать через него команду start, тем самым, мы запустим наше первое окно в диалоге, а далее, думаю, вы уже понимаете, что произойдет.
Админская часть
Админская часть в этом проекте будет достаточно простой, но она необходима для удобного управления и расширения функционала.
Структура админки
Создадим папку app/bot/admin и добавим в нее следующие файлы:
kbs.py — содержит клавиатуры админ-панели.
router.py — основной роутер для обработки административных команд.
Создание клавиатур для админ-панели
Файл: admin/kbs.py
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
def main_admin_kb() -> InlineKeyboardMarkup:
"""
Функция создаёт клавиатуру для главного меню админ-панели.
"""
kb = InlineKeyboardBuilder()
kb.add(InlineKeyboardButton(text="📊 Статистика по пользователям", callback_data="admin_users_stats"))
kb.add(InlineKeyboardButton(text="📈 Статистика по броням", callback_data="admin_bookings_stats"))
kb.add(InlineKeyboardButton(text="🏠 На главную", callback_data="back_home"))
kb.adjust(1) # Располагаем кнопки в один столбец
return kb.as_markup()
def admin_back_kb() -> InlineKeyboardMarkup:
"""
Функция создаёт клавиатуру для возврата в админ-панель.
"""
kb = InlineKeyboardBuilder()
kb.add(InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_panel"))
kb.add(InlineKeyboardButton(text="🏠 На главную", callback_data="back_home"))
kb.adjust(1)
return kb.as_markup()
Настройка маршрутизации
Файл: admin/router.py
from aiogram import Router, F
from aiogram.types import CallbackQuery
from sqlalchemy.ext.asyncio import AsyncSession
from app.bot.admin.kbs import main_admin_kb, admin_back_kb
from app.config import settings
from app.dao.dao import UserDAO, BookingDAO
router = Router()
@router.callback_query(F.data == "admin_panel", F.from_user.id.in_(settings.ADMIN_IDS))
async def admin_start(call: CallbackQuery):
"""
Обработчик входа в админ-панель. Доступ разрешен только администраторам.
"""
await call.answer("Доступ в админ-панель разрешен!")
await call.message.edit_text("Выберите действие:", reply_markup=main_admin_kb())
@router.callback_query(F.data == "admin_users_stats", F.from_user.id.in_(settings.ADMIN_IDS))
async def admin_users_stats(call: CallbackQuery, session_without_commit: AsyncSession):
"""
Обработчик запроса статистики пользователей.
Получает общее количество пользователей в базе данных и отправляет информацию админу.
"""
await call.answer("Загружаю статистику пользователей...")
users_stats = await UserDAO(session_without_commit).count()
await call.message.edit_text(f'Всего в базе данных {users_stats} пользователей.', reply_markup=admin_back_kb())
@router.callback_query(F.data == "admin_bookings_stats", F.from_user.id.in_(settings.ADMIN_IDS))
async def admin_bookings_stats(call: CallbackQuery, session_without_commit: AsyncSession):
"""
Обработчик запроса статистики бронирований.
Получает данные о бронированиях, включая общее количество, завершенные, активные и отмененные.
"""
await call.answer("Загружаю статистику...")
bookings_stats = await BookingDAO(session_without_commit).book_count()
booked_count = bookings_stats.get("booked", 0) # Количество активных бронирований
completed_count = bookings_stats.get("completed", 0) # Количество завершенных бронирований
canceled_count = bookings_stats.get("canceled", 0) # Количество отмененных бронирований
total_count = bookings_stats.get("total", 0) # Общее количество бронирований
message = (
"<b>📊 Статистика бронирований:</b>\n\n"
f"<i>Всего бронирований:</i> <b>{total_count}</b>\n"
f"✅ <i>Забронировано:</i> <b>{booked_count}</b>\n"
f"☑️ <i>Завершено:</i> <b>{completed_count}</b>\n"
f"🚫 <i>Отменено:</i> <b>{canceled_count}</b>"
)
await call.message.edit_text(message, reply_markup=admin_back_kb())
Для удобства описал комментарии в каждом хендлере. Вопросов у вас возникнуть не должно.
Интеграция FastStream и RabbitMQ в проект
В рамках нашего проекта мы планируем реализовать фоновые задачи и запустить выполнение скриптов по расписанию. Для этого мы используем FastStream и брокер сообщений RabbitMQ.
Зачем нам нужен брокер сообщений?
Одним из примеров немедленных задач в нашем проекте является мгновенное уведомление администраторов о важных событиях. Например, мы хотим, чтобы они получали оповещения о создании, отмене или удалении брони.
На первый взгляд может показаться, что в этом нет необходимости — ведь можно просто отправить уведомления напрямую. Однако давайте разберемся, почему использование брокера сообщений и FastStream даёт нам значительные преимущества.
Разгрузка системы и асинхронность
Отправка уведомления — не всегда простая задача. В реальных проектах фоновые процессы могут включать сложные вычисления, занимающие от нескольких минут до нескольких часов. Если запускать их напрямую в основном процессе, это приведет к задержкам и ухудшению производительности.Гарантия доставки критически важных сообщений
Представим, что мы разрабатываем банковское приложение. Пользователь A хочет перевести 100 рублей пользователю B. Эта операция включает несколько этапов: проверку баланса, проверку пользователя на мошенничество, подтверждение существования получателя и т. д. Если на каком-то этапе система, отвечающая за перевод денег, не получит команду из-за ошибки в передаче, перевод просто не состоится.Именно поэтому важно использовать брокеры сообщений, которые обеспечивают надёжную передачу данных между различными частями системы.
Как мы будем использовать брокер?
В рамках нашего проекта RabbitMQ будет выполнять две основные задачи:
Немедленную передачу сообщений администраторам.
Запуск отложенных задач по расписанию с использованием FastStream и APScheduler.
На простом примере мы разберем, как работают продюсеры, брокеры и слушатели сообщений, а также увидим преимущества использования этой связки в реальных проектах.
Подключаем слушателя
Начнем с подключения слушателя. В этом нам поможет FastStream.
Напоминаю, что слушатель – это тот, кто подключается к определенной очереди сообщений и затем, если там появляется новое сообщение, немедленно на него реагирует.
К примеру, таким сообщением может быть: «Отправь пользователю с ID 12345 вот это сообщение» (такой пример мы сегодня и реализуем).
Слушатель, как только получил сообщение в своей очереди, немедленно приступит к выполнению поставленной задачи. Например, запустит скрипт отправки уведомления пользователю.
Напоминаю, что сами очереди хранятся в брокере сообщений. В нашем случае – это RabbitMQ.
Настройка слушателя
Для настройки слушателя в папке app создадим пакет api и добавим в него всего один файл: router.py.
FastStream из коробки имеет плагин интеграции в FastAPI. Его мы сейчас и используем.
Выполним в файле api/router.py необходимые импорты:
from datetime import datetime, timedelta
from faststream.rabbit.fastapi import RabbitRouter
from loguru import logger
from app.bot.create_bot import bot
from app.config import settings, scheduler
from app.dao.dao import BookingDAO
from app.dao.database import async_session_maker
Обратите внимание: я сразу импортировал scheduler, который мы уже ранее настроили, так как у нас APScheduler и FastStream в проекте будут тесно связаны.
Инициализация роутера FastStream
router = RabbitRouter(url=settings.rabbitmq_url)
Благодаря такому подходу у нас появляется возможность использовать router FastStream как обычный роутер для FastAPI. То есть, для активации слушателя достаточно просто подключить этот роутер в FastAPI-приложение.
Реализация первого слушателя
@router.subscriber("admin_msg")
async def send_booking_msg(msg: str):
for admin in settings.ADMIN_IDS:
await bot.send_message(admin, text=msg)
Разберем этот код подробнее.
Декоратор @router.subscriber("admin_msg") подписывает функцию на очередь сообщений admin_msg. Это означает, что при появлении нового сообщения в этой очереди автоматически начнет выполняться функция send_booking_msg.
Функция принимает текст сообщения и отправляет его всем администраторам.
На практике в этой функции может быть более сложная логика, и она может вообще не быть связанной с Telegram-ботом.
Интеграция FastStream и APScheduler
Теперь обеспечим обещанную связку FastStream и APScheduler:
async def send_user_msg(user_id: int, text: str):
await bot.send_message(user_id, text=text)
@router.subscriber("noti_user")
async def schedule_user_notifications(user_id: int):
"""Планирует отправку серии сообщений пользователю с разными интервалами."""
now = datetime.now()
notifications = [
{
"time": now + timedelta(hours=1),
"text": "Спасибо за выбор нашего ресторана! Мы надеемся, вам понравится. "
"Оставьте отзыв, чтобы мы стали лучше! 😊",
},
{
"time": now + timedelta(hours=3),
"text": "Не хотите забронировать столик снова? Попробуйте наше новое меню! 🍽️",
},
{
"time": now + timedelta(hours=12),
"text": "Специально для вас! Скидка 10% на следующее посещение по промокоду WELCOMEBACK. 🎉",
},
{
"time": now + timedelta(hours=24),
"text": "Мы ценим ваше мнение! Расскажите о своем опыте и получите приятный бонус! 🎁",
},
]
for i, notification in enumerate(notifications):
job_id = f"user_notification_{user_id}_{i}"
scheduler.add_job(
send_user_msg,
"date",
run_date=notification["time"],
args=[user_id, notification["text"]],
id=job_id,
replace_existing=True,
)
logger.info(
f"Запланировано уведомление для пользователя {user_id} на {notification['time']}"
)
Разбор кода
У нас есть новая очередь сообщений – noti_user.
Соответствующий слушатель принимает на вход ID пользователя.
При получении нового сообщения слушатель создает несколько запланированных задач в APScheduler.
Эти задачи будут отправлять пользователю заранее подготовленные сообщения через определенные интервалы времени.
В боевом проекте можно было бы вычитывать точное время брони и отправлять более точечные уведомления. Например:
попросить пользователя поставить 5 звезд после посещения ресторана,
предложить скидку на следующее посещение,
напомнить о повторном бронировании.
Таким образом, связка FastStream и APScheduler работает следующим образом:
Подписываемся на очередь сообщений.
При появлении нового сообщения активируем отложенные задачи через APScheduler.
И добавим функцию, которая будет выполнятся по времени. Вот код:
async def disable_booking():
async with async_session_maker() as session:
await BookingDAO(session).complete_past_bookings()
Задача функции в том, чтоб после истечения времени брони переводить статус брони в «Завершено».
Функцию мы зарегистрируем таким образом, чтоб APSCheduler выполнял ее 1 раз в 30 минут.
То есть, тут у нас будет чистый APSCheduler без FastStream.
Теперь остается решить другой важный момент — постановку задач (publish) в очередь RabbitMQ. Этим мы займемся далее.
Публикация задач в очередь сообщений через FastStream
На данный момент мы настроили RabbitMQ и слушателей сообщений, которые находятся в очередях RabbitMQ. Теперь нам остается ещё один важный момент – настройка того, кто будет ставить задачи в очередь.
В контексте брокеров автора задач принято называть продюсером, и в файле app/config.py мы его уже частично настроили строкой:
broker = RabbitBroker(url=settings.rabbitmq_url)
В целом, мы готовы к тому, чтобы начать устанавливать задачи. Единственное, что нужно не забыть – это активировать брокера, но этим мы займемся в следующем разделе, когда начнем собирать файл app/main.py – главный файл проекта, с которого будет происходить запуск.
Сейчас же мы просто настроим публикацию сообщений в очереди там, где это действительно необходимо.
Публикация сообщений
Когда слушатель уже настроен, публикация сообщений становится достаточно простой.
Давайте настроим публикацию задач (сообщений) в наши подготовленные очереди. Сделаем это в файле app/bot/dialog/handlers.py в блоке, который запускается после успешного бронирования.
Первое, что нужно сделать – это выполнить импорт объекта broker в этот файл:
from app.config import broker
Теперь у нас есть возможность публиковать сообщения в очереди.
Приведем полный код обработчика регистрации бронирования:
async def on_confirmation(callback: CallbackQuery, widget, dialog_manager: DialogManager, **kwargs):
"""Обработчик подтверждения бронирования."""
session = dialog_manager.middleware_data.get("session_with_commit")
# Получаем выбранные данные
selected_table = dialog_manager.dialog_data['selected_table']
selected_slot = dialog_manager.dialog_data['selected_slot']
booking_date = dialog_manager.dialog_data['booking_date']
user_id = callback.from_user.id
check = await BookingDAO(session).check_available_bookings(
table_id=selected_table.id,
time_slot_id=selected_slot.id,
booking_date=booking_date
)
if check:
await callback.answer("Приступаю к сохранению")
add_model = SNevBooking(
user_id=user_id, table_id=selected_table.id,
time_slot_id=selected_slot.id, date=booking_date, status="booked"
)
await BookingDAO(session).add(add_model)
await callback.answer("Бронирование успешно создано!")
text = "Бронь успешно сохранена🔢🍴 Со списком своих броней можно ознакомиться в меню 'МОИ БРОНИ'"
await callback.message.answer(text, reply_markup=main_user_kb(user_id))
admin_text = (
f"Внимание! Пользователь с ID {callback.from_user.id} забронировал столик №{selected_table.id} "
f"на {booking_date}. Время брони с {selected_slot.start_time} до {selected_slot.end_time}"
)
await broker.publish(admin_text, "admin_msg")
await broker.publish(callback.from_user.id, "noti_user")
await dialog_manager.done()
else:
await callback.answer("Места на этот слот уже заняты!")
await dialog_manager.back()
Здесь появились строки:
admin_text = (
f"Внимание! Пользователь с ID {callback.from_user.id} забронировал столик №{selected_table.id} "
f"на {booking_date}. Время брони с {selected_slot.start_time} до {selected_slot.end_time}"
)
await broker.publish(admin_text, "admin_msg")
await broker.publish(callback.from_user.id, "noti_user")
Таким образом, мы подготовили сообщение для администратора и отправили его в очередь admin_msg, а затем отправили уведомление пользователю в очередь noti_user.
Функция не заботится о дальнейшей обработке сообщений – она просто ставит их в очередь, а затем уже может запуститься любая сложная логика обработки.
Этим примером я хочу продемонстрировать, что работа с брокерами сообщений не так сложна, как может показаться. Разобраться с этим может каждый.
Публикация сообщений при отмене и удалении брони
Теперь настроим публикацию сообщений в файле app/bot/user/router.py, когда пользователь отменяет или удаляет бронь:
@router.callback_query(F.data.startswith("cancel_book_"))
async def cancel_booking(call: CallbackQuery, session_with_commit: AsyncSession):
book_id = int(call.data.split("_")[-1])
booking_dao = BookingDAO(session_with_commit)
await booking_dao.cancel_book(book_id)
await call.answer("Бронь отменена!", show_alert=True)
await broker.publish(f"Пользователь отменил запись о брони с ID {book_id}", "admin_msg")
await call.message.edit_reply_markup(reply_markup=cancel_book_kb(book_id))
@router.callback_query(F.data.startswith("dell_book_"))
async def delete_booking(call: CallbackQuery, session_with_commit: AsyncSession):
book_id = int(call.data.split("_")[-1])
await BookingDAO(session_with_commit).delete_book(book_id)
await call.answer("Запись о брони удалена!", show_alert=True)
await broker.publish(f"Пользователь удалил запись о брони с ID {book_id}", "admin_msg")
await call.message.delete()
Здесь используется та же логика – добавление сообщений в очередь admin_msg при отмене или удалении брони.
Мы практически закончили, и теперь перед запуском готового проекта остается последний шаг – настройка файла main.py. Об этом мы поговорим в следующей главе.
Настройка главного файла проекта
Главный файл проекта у нас должен находиться в файле app/main.py. Суть этого файла в том, чтобы собрать множество микросервисов нашего проекта — FastStream, Aiogram, Aiogram Dialog и прочее — под крышей одного приложения FastAPI.
Импорты
Начнем с импортов:
from contextlib import asynccontextmanager
from app.bot.create_bot import dp, start_bot, bot, stop_bot
from app.config import settings, broker, scheduler
from aiogram.types import Update
from fastapi import FastAPI, Request
from loguru import logger
from app.api.router import router as router_fast_stream, disable_booking
Жизненный цикл приложения
FastAPI предоставляет удобный механизм управления жизненным циклом приложения. Опишем его и разберемся подробнее с тем, что происходит:
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Бот запущен...")
await start_bot()
await broker.start()
scheduler.start()
scheduler.add_job(
disable_booking,
trigger='interval',
minutes=30,
id='disable_booking_task',
replace_existing=True
)
webhook_url = settings.hook_url
await bot.set_webhook(
url=webhook_url,
allowed_updates=dp.resolve_used_update_types(),
drop_pending_updates=True
)
logger.success(f"Вебхук установлен: {webhook_url}")
yield
logger.info("Бот остановлен...")
await stop_bot()
await broker.close()
scheduler.shutdown()
Жизненный цикл состоит из трех основных частей:
Действия, выполняемые при запуске приложения (до yield).
Работа приложения (yield).
Действия при завершении работы приложения (после yield).
Разбор запуска приложения
Запускаем функцию start_bot(), которая отвечает за инициализацию бота.
Запускаем broker, который публикует сообщения в соответствующие очереди.
Активируем scheduler (основной объект APScheduler) и добавляем задачу, выполняемую каждые 30 минут.
Устанавливаем вебхук, чтобы Telegram знал, куда отправлять уведомления о событиях в боте.
После этих действий приложение переходит в рабочее состояние.
При завершении работы:
Останавливаем брокер сообщений,
Завершаем работу бота,
Останавливаем APScheduler.
Инициализация FastAPI
Теперь создаем экземпляр FastAPI с указанным жизненным циклом:
app = FastAPI(lifespan=lifespan)
Эндпоинт вебхука
Описываем эндпоинт, который будет обрабатывать входящие запросы от Telegram:
@app.post("/webhook")
async def webhook(request: Request) -> None:
logger.info("Получен запрос с вебхука.")
try:
update_data = await request.json()
update = Update.model_validate(update_data, context={"bot": bot})
await dp.feed_update(bot, update)
logger.info("Обновление успешно обработано.")
except Exception as e:
logger.error(f"Ошибка при обработке обновления с вебхука: {e}")
Подключение роутера FastStream
Добавляем в приложение маршрутизатор FastStream:
app.include_router(router_fast_stream)
Запуск проекта
Для запуска проекта из корневой директории выполняем команду:
uvicorn app.main:app --port 8000
Не забудьте проверить, чтобы ваш туннель соответствовал порту, на котором запущено FastAPI-приложение. После запуска проект должен работать следующим образом:
Работающего бота можно поклацать тут: https://t.me/tableHanterBot
Напоминаю, что полный код проекта, как и прочий эксклюзивный материал, который я не публикую на Хабре, можно найти в моем телеграмм канале "Легкий путь в Python".
После успешного тестирования можно переходить к завершающему этапу — деплою. В данном примере для развертывания FastAPI-приложения и запуска брокера RabbitMQ используется сервис Amvera Cloud.
Разворачиваем RabbitMQ на Amvera Cloud
Первое что необходимо сделать – это пройти простую регистрацию в сервисе Amvera Cloud. За регистрацию вы получите 111 рублей на основной баланс. Суммы этой будет вполне достаточно для тестирования деплоя бота и работы с RabbitMQ, чтоб понять подходит ли вам сервис.
После регистрации и входа на сайт можно приступать к запуску RabbitMQ. На это мы потратим не больше пары минут. Начнем.
Переходим в раздел "Преднастроенные сервисы" и кликаем на "Создать преднастроенный сервис"

Выбираем необходимый тип сервиса

Выбираем тариф и даем название тарифу. Для учебных целей подойдет минимальный тариф. Для боевых целей советую брать тариф выше.

Задаем логин, пароль и имя виртуального хоста

После нажимаем на "Завершить"
Подъем RabbitMQ обычно занимает не больше 2-3 минут.
Для подключения к RabbitMQ вам, кроме логина, пароля и имени виртуального окружения достаточно ещё будет получить ссылку на подключение. Для этого необходимо перейти в созданный RabbitMQ проект и просто скопировать ссылку, которую вы увидите на первом экране.

Теперь необходимо заменить данные для подключения к брокеру RabbitMQ в локальном проекте, в файле .env (переменная RABBITMQ_HOST).
Обратите внимание. Доступ будет открыт только для приложений развернутых на Amvera. То есть, со своей локальной машины вы не сможете подключиться к этому экземпляру RabbitMQ.
Разворачиваем FastApi приложение на Amvera Cloud
Теперь мы можем развернуть и наше FastApi приложение (телеграмм бота). Для этих целей необходимо создать на Amvera отдельный проект. Быстро пройдемся по основным шагам:
Кликаем на "Приложения", затем на "Создать приложение". В открывшемся окне даем имя проекту и выбираем тариф. Для текущего проекта выбирать необходимо минимум "Начальный", так на пробном бот, скорее всего, не запустится.

Доставляем файлы приложения в проект. Можно через команды GIT, но для удобства я выбрал "Через интерфейс"

Заполняем конфигурацию, как на скрине ниже и кликаем на "Завершить"

Далее необходимо к проекту привязать доменное имя. Для подключения бесплатного доменного имени от Amvera Cloud достаточно зайти в проект, там выбрать вкладку "Домены" и активировать домен, как на скрине ниже.

Теперь остается только заменить ссылку в .env файле на локальной машине, убедиться в том, чтоб все переменные там были корректны и после перезаписать файл .env в проекте Amvera и затем пересоберите проект, чтоб все переменные корректно подтянулись.

Если вы всё сделали правильно, то через пару минут ваш бот сообщит вам, что он запущен. Проект официально завершён!
Заключение
Вот мы и завершили это увлекательный и не то чтоб простой путь. На выходе мы получили весьма себе функционально бота, который, к тому же, имеет большой потенциал для дальнейшего расширения.
Сегодня мы, на большом практическом примере, рассмотрели связку Aiogram, Aiogram Dialog, FastApi, FastStream, SQLAlchemy и ряда других мощных Python технологий в рамках одной большой экосистемы.
Надеюсь, что мне удалось достаточно подробно и детально раскрыть все аспекты этой связки и, если это так, не забудьте об этом сообщить комментарием или, как минимум, лайком к этой публикации.
Без вашей обратной связи смысла создавать такой контент и тратить на него такую гору времени просто не будет.
Для тех кто хочет бесплатно получить полный код сегодняшнего проекта, либо кому интересен контент, который я даю – приглашаю вас в свой телеграмм канал «Легкий путь в Python».
До скорого!