Привет, Хабр! Меня зовут Анастасия Иванова, я технический писатель МТС Exolve. Сегодня расскажу о системе SMS-уведомлений, созданной с помощью FastAPI. Она оповещает логистов о прибытии автомобилей на склад и автоматически назначает приёмщиков. Система интегрирована с платформой МТС Exolve, на её примере покажем, как применять современные методы асинхронного взаимодействия. Подробности — под катом.
Установка и настройка проекта
Предположим, что у нас есть единый склад и секторы разгрузки, обозначенные латинскими буквами. Нужно оповещать грузчиков о прибытии автомобиля в сектор разгрузки. Эту задачу мы и будем решать.
Для начала убедимся, что у нас установлены все необходимые зависимости. Мы используем FastAPI для создания веб-приложения, Redis для хранения временных данных и FastAPI Cache для эффективного кэширования запросов.
Структура проекта:
smsfastapi/
/venv
/example_db
__init__.py
shemas.py
infodb.py
/router
__init__.py
stock.py
worker.py
config.py
main.py
dev.env
В файле config.py хранятся конфигурационные переменные, такие как API-ключ и номер телефона для отправки SMS. Следующий код инициализирует соединение с Redis в функции "startup" и разъединение в "shutdown" в файле main.py:
Код
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi.responses import HTMLResponse
from redis import asyncio as aioredis
from fastapi_cache.backends.redis import RedisBackend
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from router import stock, worker
origins = [
"http://localhost:3000",
"http://localhost:5173",
"http://localhost:8000"
]
app = FastAPI(title='Rule of Stock',
description='API для управления разгрузкой автомобилей, прибывающих на склад. \n\n')
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS", "DELETE", "PATCH", "PUT"],
allow_headers=["Content-Type", "Set-Cookie", "Access-Control-Allow-Headers", "Access-Control-Allow-Origin",
"Authorization"],
)
@app.get("/", response_class=HTMLResponse)
async def start():
"""
Назначение: \n
Показать пользователю стартовую страницу со ссылкой на Swagger для использования API.\n
"""
return """
<html>
<head>
<title>Rule of Stock</title>
<style>
body {
background-color: white;
font-family: Arial, sans-serif;
text-align: center;
}
h1 {margin-top: 80px;}
</style>
</head>
<body>
<h3>API для управления разгрузкой автомобилей, прибывающих на склад</h3>
<p>Для проверки работы API перейдите по ссылке <a href="http://0.0.0.0:8000/docs">здесь</a>.</p> <p>-------------------------------------------------------------</p>
</body>
</html>
"""
@app.on_event("startup")
async def startup():
redis = await aioredis.from_url("redis://localhost:6379", encoding="utf8", decode_responses=True)
cache_redis = FastAPICache.init(RedisBackend(redis), prefix="stock_cache")
return cache_redis
@app.on_event("shutdown")
async def shutdown():
backend = FastAPICache.get_backend()
if isinstance(backend, RedisBackend):
redis = backend.redis
await redis.close()
Модель данных
Схема данных для автомобилей и приёмщиков создана. Наши автомобили описаны схемой BaseAuto, а приёмщики — BaseWorker с использованием Pydantic:
from pydantic import BaseModel, EmailStr, Field, validator
class BaseAuto(BaseModel):
number: str
name: str
class BaseWorker(BaseModel):
name: str
is_work: bool
is_free: bool
В качестве базы данных для простоты будем использовать список складов с элементами stocks_list = [{'name':'A', 'is_free': True},
{'name':'B', 'is_free': True},……]
и словарь приёмщиков:
workers_list = {
'Ivan': {'is_work': True,
'stocks': ['A', 'B', 'C', 'D'],
'is_free': True,
'phone': '79801331100'},
'Roman': {'is_work': True,
'stocks': ['E', 'F', 'G', 'H'],
'is_free': True,
'phone': '79801034411'},….}
Полную версию примера базы данных можно посмотреть здесь.
В файле config.py получим данные из переменных окружения:
from dotenv import dotenv_values
info_env = dotenv_values('dev.env')
API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')
Работа с приёмщиками
В файле __init__.py. каталога /router импортируем основные библиотеки для работы с маршрутами:
from fastapi import APIRouter
import aiohttp
from fastapi_cache.decorator import cache
from fastapi import FastAPI, HTTPException
Маршруты для работы с приёмщиками определены в файле worker.py. Для улучшения производительности кэшируем данные через FastAPI Cache. Это решение ускорит доступ к постоянным данным о приёмщиках.
Код
from .__init__ import *
from config import API_KEY, PHONE_SEND
from example_db import (infodb, shemas)
routework = APIRouter(prefix="/worker", tags=['Worker'])
@routework.get('/read')
@cache(expire=30, namespace='personal')
async def read_workers(skip: int = 0, limit: int = 100,) -> list:
"""
Назначение: \n
Получить список приёмщиков.\n
"""
result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items()][skip:limit]
return result
@routework.get('/free')
@cache(expire=30, namespace='personal')
async def free_workers(skip: int = 0, limit: int = 100,) -> list:
"""
Назначение: \n
Получить список свободных приёмщиков.\n
"""
result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if worker_data['is_free']][skip:limit]
return result
@routework.get('/busy')
@cache(expire=30, namespace='personal')
async def busy_workers(skip: int = 0, limit: int = 100,) -> list:
"""
Назначение: \n
Получить список занятых приёмщиков.\n
"""
result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if not worker_data['is_free']][skip:limit]
return result
Работа со складами
Маршруты для работы со складами находятся в файле stock.py. И мы снова кэшируем данные.
Код
from .__init__ import *
from example_db import (infodb, shemas)
routestock = APIRouter(prefix="/stock", tags=['Stock'])
@routestock.get('/read')
@cache(expire=30, namespace='stock')
async def read_stock(skip: int = 0, limit: int = 100,) -> list:
"""
Назначение: \n
Получить список постов.\n
"""
result = infodb.stocks_list[skip:limit]
return result
@routestock.get('/free')
@cache(expire=30, namespace='stock')
async def free_stock(skip: int = 0, limit: int = 100,) -> list:
"""
Назначение: \n
Получить список свободных складов.\n
"""
result = [stock_busy for stock_busy in infodb.stocks_list if stock_busy["is_free"]][skip:limit]
return result
@routestock.get('/busy')
@cache(expire=30, namespace='stock')
async def busy_stock(skip: int = 0, limit: int = 100,) -> list:
"""
Назначение: \n
Получить список занятых складов.\n
"""
result = [stock_busy for stock_busy in infodb.stocks_list if not stock_busy["is_free"]][skip:limit]
return result
Назначение приёмщика и отправка SMS
Когда автомобиль приезжает на склад, водитель прикладывает электронный пропуск к воротам. В этот момент мы назначаем приёмщика, отправляя SMS-уведомление с использованием МТС Exolve.
Взаимодействие с aiohttp
Один из ключевых аспектов этого проекта — взаимодействие с внешним API для отправки SMS-уведомлений через платформу МТС Exolve. Для этого нам нужна библиотека aiohttp, через которую мы выполним асинхронные HTTP-запросы.
Взглянем на маршрут назначения приёмщика и отправки SMS-уведомления:
Код
@routework.post("/{stock}/message")
async def send_sms(stock: str, info_auto: shemas.BaseAuto):
"""
Назначение: \n
Назначить приёмщика для разгрузки автомобиля и оповестить его по SMS.\n
"""
url = "https://api.exolve.ru/messaging/v1/SendSMS"
headers = {"Authorization": f"Bearer {API_KEY}"}
# Находим свободного приёмщика в базе данных по условию привязки к складу и занятости
worker_free = next(({worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if (stock in worker_data['stocks'] and worker_data['is_free'])), None)
worker_name = list(worker_free.keys())[0]
# Меняем статус сектора на «занятый»
for elem in infodb.stocks_list:
if elem["name"] == stock: elem["is_free"] = False
# Меняем статус приёмщика на «занятый»
infodb.workers_list[worker_name]["is_free"] = False
sms_data = {
"number": PHONE_SEND,
"destination": worker_free[worker_name]["phone"],
"text": f"Автомобиль {info_auto.number} прибыл на разгрузку в сектор {stock}"
}
# Создаём асинхронную сессию
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=sms_data) as response:
response_data = await response.json()
return response_data
Особенности асинхронных сессий
Ключевой момент в этом коде — асинхронная сессия aiohttp.ClientSession. Она позволяет отправлять HTTP-запросы, не блокируя основной поток выполнения. Это критически важно для высокой производительности и эффективного использования ресурсов.
Особенности асинхронного подхода
При таком подходе все операции в приложении должны быть асинхронными. Например, если используем реляционную базу данных PostgreSQL, то для взаимодействия с ней нужен асинхронный драйвер, такой как asyncpg. Так мы предотвратим блокировку сокета и обеспечим асинхронную обработку запросов.
Такому принципу нужно следовать на всех этапах разработки чтобы достичь максимальной производительности и эффективности веб-приложения.
Завершение разгрузки
Когда разгрузка завершена, приёмщик прикладывает пропуск в секторе, где он находится. Так он посылает сигнал на маршрут в обозначенный файл stock.py:
@routestock.post("/{stock}/change")
async def end_load(stock: str, info_worker: shemas.BaseWorker):
"""
Назначение: \n
Изменить статус сектора и сотрудника на «свободный».\n
"""
# меняем статус сектора на «свободный»
for elem in infodb.stocks_list:
if elem["name"] == stock: elem["is_free"] = True
# меняем статус сотрудника на «свободный»
infodb.workers_list[info_worker.name][“is_free”] = True
return infodb.workers_list[info_worker.name]
Завершение и начало рабочего дня
Рабочий день приёмщика начинается и заканчивается с использованием электронных пропусков. Сигнал от пропуска приходит на API. Это показано в следующих маршрутах:
@routework.post("/finish")
async def end_work(info_worker: shemas.BaseWorker)-> dict:
"""
Назначение: \n
Сигнал от электронного пропуска. Конец рабочего дня.\n
"""
infodb.workers_list[info_worker.name][”is_free”] = False
infodb.workers_list[info_worker.name][”is_work”] = False
return infodb.workers_list[info_worker.name]
@routework.post("/workday")
async def start_work(info_worker: shemas.BaseWorker)-> dict:
"""
Назначение: \n
Сигнал от электронного пропуска. Начало рабочего дня.\n
"""
infodb.workers_list[info_worker.name][”is_free”] = True
infodb.workers_list[info_worker.name][”is_work”] = True
return infodb.workers_list[info_worker.name]
Дополнение к главному файлу приложения
Дополним main.py строками смонтированных маршрутов и создадим команду запуска:
app.include_router(stock.routestock)
app.include_router(worker.routework)
if __name__ == "__main__":
uvicorn.run("main:app", port=8000, host="0.0.0.0", reload=True, workers=4)
Для тестирования API нужно запустить приложение из среды разработки и перейти по адресу http://0.0.0.0:8000/docs#/. FastAPI сразу формирует готовую спецификацию Swagger, с её помощью можно быстро проверить работоспособность проекта.
Заключение
Этот проект — пример того, как можно использовать FastAPI, Redis и aiohttp для создания системы SMS-уведомлений, интегрированной с внешними сервисами. Он показывает, как важна асинхронность при работе с внешними API.
В реальных проектах нужно обеспечить безопасность данных и обработку ошибок, а также использовать реляционные базы данных для более сложных операций. На базе этого проекта можно разработать более сложную систему с интеграцией внешних сервисов и асинхронным взаимодействием с базами данных.