Как стать автором
Обновить
1043.79
МТС
Про жизнь и развитие в IT

Помощь логистам: как сделать SMS-уведомления о разгрузке товара на складе на основе FastAPI

Время на прочтение8 мин
Количество просмотров1.5K

Привет, Хабр! Меня зовут Анастасия Иванова, я технический писатель МТС Exolve. Сегодня расскажу о системе SMS-уведомлений, созданной с помощью FastAPI. Она оповещает логистов о прибытии автомобилей на склад и автоматически назначает приёмщиков.  Система интегрирована с платформой МТС Exolve, на её примере покажем, как применять современные методы асинхронного взаимодействия. Подробности — под катом.

Установка и настройка проекта

Предположим, что у нас есть единый склад и секторы разгрузки, обозначенные латинскими буквами. Нужно оповещать грузчиков о прибытии автомобиля в сектор разгрузки. Эту задачу мы и будем решать.

Для начала убедимся, что у нас установлены все необходимые зависимости. Мы используем FastAPI для создания веб-приложения, Redis для хранения временных данных и FastAPI Cache для эффективного кэширования запросов.

Структура проекта:

smsfastapi/
    /venv
    /example_db
        __init__.py
        shemas.py
        infodb.py
    /router
        __init__.py
        stock.py
        worker.py
    config.py
    main.py
    dev.env

В файле config.py хранятся конфигурационные переменные, такие как API-ключ и номер телефона для отправки SMS. Следующий код инициализирует соединение с Redis в функции "startup" и разъединение в "shutdown" в файле main.py:

Код
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi.responses import HTMLResponse
from redis import asyncio as aioredis
from fastapi_cache.backends.redis import RedisBackend
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from router import stock, worker


origins = [
	"http://localhost:3000",
	"http://localhost:5173",
	"http://localhost:8000"
]


app = FastAPI(title='Rule of Stock',
          	description='API для управления разгрузкой автомобилей, прибывающих на склад. \n\n')


app.add_middleware(
	CORSMiddleware,
	allow_origins=origins,
	allow_credentials=True,
	allow_methods=["GET", "POST", "OPTIONS", "DELETE", "PATCH", "PUT"],
	allow_headers=["Content-Type", "Set-Cookie", "Access-Control-Allow-Headers", "Access-Control-Allow-Origin",
               	"Authorization"],
)


@app.get("/", response_class=HTMLResponse)
async def start():
	"""
	Назначение: \n
	Показать пользователю стартовую страницу со ссылкой на Swagger для использования API.\n
	"""
	return """
    	<html>
        	<head>
            	<title>Rule of Stock</title>
            	<style>
                	body {
                    	background-color: white;
                    	font-family: Arial, sans-serif;
                    	text-align: center;
                	}
                	h1 {margin-top: 80px;}
            	</style>
        	</head>
        	<body>
            	<h3>API для управления разгрузкой автомобилей, прибывающих на склад</h3>
<p>Для проверки работы API перейдите по ссылке <a href="http://0.0.0.0:8000/docs">здесь</a>.</p>	<p>-------------------------------------------------------------</p>
        	</body>
    	</html>
    	"""

@app.on_event("startup")
async def startup():
	redis = await aioredis.from_url("redis://localhost:6379", encoding="utf8", decode_responses=True)
	cache_redis = FastAPICache.init(RedisBackend(redis), prefix="stock_cache")
	return cache_redis


@app.on_event("shutdown")
async def shutdown():
	backend = FastAPICache.get_backend()
	if isinstance(backend, RedisBackend):
    	redis = backend.redis
    	await redis.close()

Модель данных

Схема данных для автомобилей и приёмщиков создана. Наши автомобили описаны схемой BaseAuto, а приёмщики — BaseWorker с использованием Pydantic:

from pydantic import BaseModel, EmailStr, Field, validator

class BaseAuto(BaseModel):
	number: str
	name: str

class BaseWorker(BaseModel):
	name: str
	is_work: bool
	is_free: bool

В качестве базы данных для простоты будем использовать список складов с элементами stocks_list = [{'name':'A', 'is_free': True},
{'name':'B', 'is_free': True},……]

и словарь приёмщиков:

workers_list = {
 'Ivan': {'is_work': True,
 'stocks': ['A', 'B', 'C', 'D'],
 'is_free': True,
 'phone': '79801331100'},
 'Roman': {'is_work': True,
 'stocks': ['E', 'F', 'G', 'H'],
 'is_free': True,
 'phone': '79801034411'},….}

Полную версию примера базы данных можно посмотреть здесь.

В файле config.py получим данные из переменных окружения:

from dotenv import dotenv_values

info_env = dotenv_values('dev.env')

API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')	 

Работа с приёмщиками

В файле __init__.py. каталога /router импортируем основные библиотеки для работы с маршрутами:

from fastapi import APIRouter
import aiohttp
from fastapi_cache.decorator import cache
from fastapi import FastAPI, HTTPException

Маршруты для работы с приёмщиками определены в файле worker.py. Для улучшения производительности кэшируем данные через FastAPI Cache. Это решение ускорит доступ к постоянным данным о приёмщиках.

Код
from .__init__ import *
from config import API_KEY, PHONE_SEND
from example_db import (infodb, shemas)

routework = APIRouter(prefix="/worker", tags=['Worker'])

@routework.get('/read')
@cache(expire=30, namespace='personal')
async def read_workers(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список приёмщиков.\n
	"""
	result =  [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items()][skip:limit]
	return result
@routework.get('/free')
@cache(expire=30, namespace='personal')
async def free_workers(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список свободных приёмщиков.\n
	"""
	result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if worker_data['is_free']][skip:limit]
	return result

@routework.get('/busy')
@cache(expire=30, namespace='personal')
async def busy_workers(skip: int = 0, limit: int = 100,) -> list:
	"""
Назначение: \n
	Получить список занятых приёмщиков.\n
	"""
	result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if not worker_data['is_free']][skip:limit]
	return result

Работа со складами

Маршруты для работы со складами находятся в файле stock.py. И мы снова кэшируем данные.

Код
from .__init__ import *

from example_db import (infodb, shemas)

routestock = APIRouter(prefix="/stock", tags=['Stock'])

@routestock.get('/read')
@cache(expire=30, namespace='stock')
async def read_stock(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список постов.\n
	"""
	result = infodb.stocks_list[skip:limit]
	return result


@routestock.get('/free')
@cache(expire=30, namespace='stock')
async def free_stock(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список свободных складов.\n
	"""
	result = [stock_busy for stock_busy in infodb.stocks_list if stock_busy["is_free"]][skip:limit]
	return result


@routestock.get('/busy')
@cache(expire=30, namespace='stock')
async def busy_stock(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список занятых складов.\n
	"""
	result = [stock_busy for stock_busy in infodb.stocks_list if not stock_busy["is_free"]][skip:limit]
	return result

Назначение приёмщика и отправка SMS

Когда автомобиль приезжает на склад, водитель прикладывает электронный пропуск к воротам. В этот момент мы назначаем приёмщика, отправляя SMS-уведомление с использованием МТС Exolve.

Взаимодействие с aiohttp

Один из ключевых аспектов этого проекта — взаимодействие с внешним API для отправки SMS-уведомлений через платформу МТС Exolve. Для этого нам нужна библиотека aiohttp, через которую мы выполним асинхронные HTTP-запросы.

Взглянем на маршрут назначения приёмщика и отправки SMS-уведомления:

Код
@routework.post("/{stock}/message")
async def send_sms(stock: str, info_auto: shemas.BaseAuto):
    """
    Назначение: \n
    Назначить приёмщика для разгрузки автомобиля и оповестить его по SMS.\n
    """
    url = "https://api.exolve.ru/messaging/v1/SendSMS"
    headers = {"Authorization": f"Bearer {API_KEY}"}
    
    # Находим свободного приёмщика в базе данных по условию привязки к складу и занятости
    worker_free = next(({worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if (stock in worker_data['stocks'] and worker_data['is_free'])), None)
    worker_name = list(worker_free.keys())[0]
   
    # Меняем статус сектора на «занятый»
    for elem in infodb.stocks_list:
        if elem["name"] == stock: elem["is_free"] = False
    
    # Меняем статус приёмщика на «занятый»
    infodb.workers_list[worker_name]["is_free"] = False
    
    sms_data = {
        "number": PHONE_SEND,
        "destination": worker_free[worker_name]["phone"],
        "text": f"Автомобиль {info_auto.number} прибыл на разгрузку в сектор {stock}"
    }
    
    # Создаём асинхронную сессию
    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=sms_data) as response:
            response_data = await response.json()
            return response_data

Особенности асинхронных сессий

Ключевой момент в этом коде — асинхронная сессия aiohttp.ClientSession. Она позволяет отправлять HTTP-запросы, не блокируя основной поток выполнения. Это критически важно для высокой производительности и эффективного использования ресурсов.

Особенности асинхронного подхода

При таком подходе все операции в приложении должны быть асинхронными. Например, если используем реляционную базу данных PostgreSQL, то для взаимодействия с ней нужен асинхронный драйвер, такой как asyncpg. Так мы предотвратим блокировку сокета и обеспечим асинхронную обработку запросов.

Такому принципу нужно следовать на всех этапах разработки чтобы достичь максимальной производительности и эффективности веб-приложения.

Завершение разгрузки

Когда разгрузка завершена, приёмщик прикладывает пропуск в секторе, где он находится. Так он посылает сигнал на маршрут в обозначенный файл stock.py:

@routestock.post("/{stock}/change")
async def end_load(stock: str, info_worker: shemas.BaseWorker):
	"""
	Назначение: \n
	Изменить статус сектора и сотрудника на «свободный».\n
	"""
	# меняем статус сектора на «свободный»
	for elem in infodb.stocks_list:
    	if elem["name"] == stock: elem["is_free"] = True
	# меняем статус сотрудника на «свободный»
	infodb.workers_list[info_worker.name][“is_free”] = True
	return infodb.workers_list[info_worker.name]

Завершение и начало рабочего дня

Рабочий день приёмщика начинается и заканчивается с использованием электронных пропусков. Сигнал от пропуска приходит на API. Это показано в следующих маршрутах:

@routework.post("/finish")
async def end_work(info_worker: shemas.BaseWorker)-> dict:
	"""
	Назначение: \n
	Сигнал от электронного пропуска. Конец рабочего дня.\n
	"""
	infodb.workers_list[info_worker.name][”is_free”] = False
	infodb.workers_list[info_worker.name][”is_work”] = False
	return infodb.workers_list[info_worker.name]


@routework.post("/workday")
async def start_work(info_worker: shemas.BaseWorker)-> dict:
	"""
	Назначение: \n
	Сигнал от электронного пропуска. Начало рабочего дня.\n
	"""
	infodb.workers_list[info_worker.name][”is_free”] = True
	infodb.workers_list[info_worker.name][”is_work”] = True
	return infodb.workers_list[info_worker.name]

Дополнение к главному файлу приложения

Дополним main.py строками смонтированных маршрутов и создадим команду запуска:

app.include_router(stock.routestock)
app.include_router(worker.routework)

if __name__ == "__main__":
	uvicorn.run("main:app", port=8000, host="0.0.0.0", reload=True, workers=4)

Для тестирования API нужно запустить приложение из среды разработки и перейти по адресу http://0.0.0.0:8000/docs#/. FastAPI сразу формирует готовую спецификацию Swagger, с её помощью можно быстро проверить работоспособность проекта.

Заключение

Этот проект — пример того, как можно использовать FastAPI, Redis и aiohttp для создания системы SMS-уведомлений, интегрированной с внешними сервисами. Он показывает, как важна асинхронность при работе с внешними API.

В реальных проектах нужно обеспечить безопасность данных и обработку ошибок, а также использовать реляционные базы данных для более сложных операций. На базе этого проекта можно разработать более сложную систему с интеграцией внешних сервисов и асинхронным взаимодействием с базами данных.

Дополнительные ресурсы

Теги:
Хабы:
Всего голосов 5: ↑4 и ↓1+5
Комментарии4

Публикации

Информация

Сайт
www.mts.ru
Дата регистрации
Дата основания
Численность
свыше 10 000 человек
Местоположение
Россия