Несмотря на то, что я системный аналитик, проклятие, которое начинается с фразы «тыж программист…», иногда преследует и меня. Недавно у меня спросили совета, как сделать так, чтобы для нужных людей шлагбаум удаленно открывался без транспондера и сложных систем.
К сожалению, сходу я не придумал ответа на вопрос, но задачка показалась мне любопытной, и я немного изучил вопрос.
Решение нашёл быстро — в качестве аппаратной части можно использовать GSM-реле, а контроль доступа реализовать с помощью микро сервиса для переадресации вызова.
И поскольку у меня наличествовал тестовый доступ к МТС Exolve, грех было им не воспользоваться. Тем более, что соответствующее API для управления входящим вызовом есть.
Оглавление
Если честно, я не знаток автоматических шлагбаумов, поэтому в статье я не буду привязываться к аппаратной части, мне кажется подойдёт практически любое устройство, которое предложит поисковик по запросу «GSM модуль для шлагбаума». Как правило у многих моделей шлагбаумов есть контакт, который при замыкании (размыкании реле) будет поднимать и опускать шлагбаум.
Как видите решение очень простое, в принципе можно и не городить никаких сервисов, а просто раздать знакомым номер телефона SIM-карты в GSM-модуле. Но что делать, если этот номер станет известен третьим лицам? Нам явно нужен белый список разрешенных номеров. По хорошему такие проблемы решают комплексные системы безопасности. Но мне очень хотелось покрутить в руках FastAPI. Поэтому мы сделаем свой сервис по контролю звонящих с «картошными играми и прочими увеселениями».
Однако, прежде чем приступать к делу – традиционный дисклеймер. Я не программист. Задачу я решал скорее концептуально и корнер-кейсы особо не продумывал. Поэтому, настоятельно не рекомендую относиться к материалам статьи, как к истине в последней инстанции и использовать код без изменений в продуктовой среде.
Что планируем сделать
Формальности улажены и можно переходить к делу.
Рассмотрим следующий сценарий:
Предусловия.
Есть несколько (минимум две) зоны, защищенные шлагбаумами, в каждом стоит свой GSM-модуль.
Пользователям может быть разрешен доступ к одной или сразу нескольким зонам.
Мы будем прорабатывать следующий:
Пользователь подъезжает к шлагбауму.
Набирает номер.
MTС Exolve делает запрос к нам в сервис
Сервис проверяет в какие зоны разрешен проезд пользователю и делает обзвон соответствующих шлагбаумов.
Шлагбаумы открываются.

База данных
Начнем с базы данных. в которой будем хранить доступы пользователей из белого списка.
Для прототипа я взял SQlite ибо её функций нам хватит с головой, а ее поддержка уже давно реализована «из коробки» в Python 3.
Фактически нам нужно две таблицы:
Таблица для данных о пользователях, которые будут звонить на контрольный номер (users).
Таблица для данных о номерах телефонах GSM-модулей в шлагбаумах и защищаемых ими зонах (barriers).
Однако, для реализации связи многие-ко-многим, мы сделаем третью табличку в которой непосредственно соединим пользователей и доступные им зоны для проезда (users_access).
В результате получим такую структуру.


Поскольку все исходники я разместил на GitHub, воздержусь от подробного описания структуры таблиц..
Отмечу только, что в user_access составной первичный ключ из обоих полей, при этом оба поля это внешние ключи в соответствующих таблицах.
В качестве тестового примера мы создадим два шлагбаума.
Один для обычных гостей (#1), а второй (№2) для сотрудников.

Все имена и номера телефонов выдуманы, любые совпадения случайны.
Создадим двух пользователей.
Одного гостя (Habr) и одного сотрудника (Habra).

Осталось задать доступы. Пользователь Habr сможет проехать только через гостевой шлагбаум, а Habra может проехать через оба.

Настройка УВВ API МТС Exolve
Прежде чем приступить к разработке сервиса, нам необходимо настроить управление входящим вызовов в МТС Exolve. Я надеюсь, что у вас есть к нему доступ.
Согласно документации, необходимо с помощью метода setSipCallControlU
установить URL, к которому Exolve будет обращаться за дальнейшими инструкциями по обработке вызова.
Для этого необходимо отправить POST запрос на адрес:
https://api.mtt.ru/ipcr/ (указав в базовой авторизации логин и пароль).
Тело запроса:
{
"id":"1",
"jsonrpc":"2.0",
"method": "setSipCallControlURL",
"params": {
"sip_id": "Ваш SIP ID (ном",
"url": "Ваш URL"
}
}
В нашем случае sip_id
– это номер телефона, на который будут звонить пользователи. А url
– это адрес метода, который скажет, что делать дальше с вызовом вида <ваш домен или IP>/access.
Дополнительно еще можно установить резервный номер, на случай если сервис будет недоступен, например, номер поста охраны.
Сервис для управления входящим вызовом и логирование запроса
Итак, пришло время браться за разработку сервиса. Я использовал Python v. 3.10 и fastapi v. 0.115, но думаю, что подойдут и другие версии.
Освежим в памяти, схему взаимодействия, рассмотрим её в формате диаграммы последовательности.

Создадим файловую структуру:
├── api
│ ├── access.py (открытие шлагбаума)
│ ├── config.py (общие константы и настройки)
│ ├── crud.py (методы для управления записями в БД)
│ ├── __init__.py (служебный файл для подключения модулей)
│ └── log.py (работа с записями в лог)
├── data.sqlite (база данных)
├── main.py (главный файл скрипта)
└── readme.md
└──app.log (появится после первого логируемого запроса)
Начнем с файла api/config.py
DB_NAME = 'data.sqlite'
ACCESS_TABLE = 'users_access'
BARIERS_TABLE = 'barriers'
USER_TABLE = 'users'
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
},
},
'handlers': {
'file': {
'class': 'logging.FileHandler',
'filename': 'app.log',
'formatter': 'simple'
},
},
'loggers': {
'myapp': {
'handlers': ['file'],
'level': 'INFO',
'propagate': False
},
}
}
all = ["DB_NAME", "ACCESS_TABLE", "BARIERS_TABLE", "USER_TABLE", "LOGGING"]
В нем мы храним константы, которые будем использовать в других скриптах, а также создаем типовую настройку логов для библиотеки logging.
Не отходя от кассы, перейдем к api/logging.py
from fastapi import APIRouter, Body
from typing import Any
from . import config
import logging.config
logging.config.dictConfig(config.LOGGING)
logger = logging.getLogger('myapp')
router = APIRouter(tags=["Logs"])
@router.post('/logs')
async def get_body(body: Any = Body(None)):
"""
Create log for MTC Exolve events.
"""
logger.info(f'log endpoint was called. body: {body}')
return body
В данном скрипте импортируем необходимые библиотеки, создаем экземпляр логгера, который будем использовать в других методах.
А также создаем группу дочерних API (router), в которой будет всего 1 POST метод
Данный метод при вызове просто пишет информационное сообщение с телом запроса.
Он пригодится нам либо для тестирования логов, либо для сохранения сведений о событии при запросе от МТС Exolve.
Пришло время основной бизнес-логики, ради которой все и затеял – скрипту api/access.py
Полный листинг скрипта спрячу под спойлером:
Скрытый текст
from fastapi import APIRouter, HTTPException, Body
from pydantic import BaseModel
from typing import List, Union, Any
import sqlite3
from .config import *
from .log import logger
EVENT_URL = "<url or IP>/logs"
CLIENT_ID = "MTC Exolve client ID"
DISPLAY_NUMBER = "MTC Exolve phone number"
class FollowMeRule(BaseModel):
I_FOLLOW_ORDER: str
ACTIVE: str
NAME: str
REDIRECT_NUMBER: str
PERIOD: str
PERIOD_DESCRIPTION: str
TIMEOUT: str
class FollowMeStruct(BaseModel):
List[FollowMeRule]
class Result(BaseModel):
redirect_type: int
event_URL: str
client_id: str
event_extended: str
masking: str
display_number: str
followme_struct: List[Union[int, List[FollowMeRule]]]
class ExolveResponse(BaseModel):
id: int
jsonrpc: str
result: Result
def open_barriers(phone):
connection = sqlite3.connect(DB_NAME)
cursor = connection.cursor()
cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b
JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone
where u.user_phone = {phone} ''')
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
response = [dict(zip(column_names, row)) for row in rows]
connection.close()
return response
router = APIRouter(tags=["Access control"])
@router.post("/access/", response_model=ExolveResponse)
async def response_to_exolve(body: Any = Body(None)):
"""
Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539))
"""
if 'params' in body and 'numberA' in body['params']:
user_phone = body["params"]["numberA"]
else:
raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required")
barrier_phones = open_barriers(user_phone)
if (barrier_phones == None):
raise HTTPException(status_code=403, detail="Access to barriers not allowed")
followme_struct = []
for i in range(0, len(barrier_phones) ) :
row = barrier_phones[i]
followme_struct.append( {
"I_FOLLOW_ORDER": str(i+1),
"ACTIVE": "Y",
"NAME": "BARRIER_PHONE",
"REDIRECT_NUMBER": str(row["phone"]),
"PERIOD": "always",
"PERIOD_DESCRIPTION": "always",
"TIMEOUT": "30"
})
# Создаем объект Result
result_object = Result(
redirect_type="3",
event_URL=EVENT_URL ,
client_id= CLIENT_ID,
event_extended="N",
masking= "Y",
display_number= DISPLAY_NUMBER,
followme_struct=[len(followme_struct),followme_struct]
)
exolve_response_object = ExolveResponse(
id=1,
jsonrpc="2.0",
result=result_object
)
logger.info(f'barriers {barrier_phones} try to open for {user_phone}')
return exolve_response_object
В самом начале мы импортируем необходимые модули и настраиваем константы:
from fastapi import APIRouter, HTTPException, Body
from pydantic import BaseModel
from typing import List, Union, Any
import sqlite3
from .config import *
from .log import logger
EVENT_URL = "<url or IP>/logs"
CLIENT_ID = "MTC Exolve client ID"
DISPLAY_NUMBER = "MTC Exolve phone number"
Дальше идет определение классов данных для метода. По сути мы как конструктор собираем из нескольких блоков структуру ожидаемого ответа, указанную в документации.
class FollowMeRule(BaseModel):
I_FOLLOW_ORDER: str
ACTIVE: str
NAME: str
REDIRECT_NUMBER: str
PERIOD: str
PERIOD_DESCRIPTION: str
TIMEOUT: str
class FollowMeStruct(BaseModel):
List[FollowMeRule]
class Result(BaseModel):
redirect_type: int
event_URL: str
client_id: str
event_extended: str
masking: str
display_number: str
followme_struct: List[Union[int, List[FollowMeRule]]]
class ExolveResponse(BaseModel):
id: int
jsonrpc: str
result: Result
Дальше определим функцию для поиска доступных к открытию шлагбаумов для пользователя, позвонившего на номер телефона.
Функция open_barriers проверяет по таблице users_access какие зоны безопасности доступны пользователю, склеивает их с номерами телефонов из таблицы barriers, и возвращает их в виде списка.
В принципе функция в скрипте вызывается один раз, и можно не выносить её отдельно, но я решил для удобства чтения сделать отдельно.
def open_barriers(phone):
connection = sqlite3.connect(DB_NAME)
cursor = connection.cursor()
cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b
JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone
where u.user_phone = {phone} ''')
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
response = [dict(zip(column_names, row)) for row in rows]
connection.close()
return response
Осталось рассмотреть описание самого метода POST /access, который вызовет MTC Exolve при звонке.
router = APIRouter(tags=["Access control"])
@router.post("/access/", response_model=ExolveResponse)
async def response_to_exolve(body: Any = Body(None)):
"""
Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539))
"""
if 'params' in body and 'numberA' in body['params']:
user_phone = body["params"]["numberA"]
else:
raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required")
barrier_phones = open_barriers(user_phone)
if (barrier_phones == None):
raise HTTPException(status_code=403, detail="Access to barriers not allowed")
followme_struct = []
for i in range(0, len(barrier_phones) ) :
row = barrier_phones[i]
followme_struct.append( {
"I_FOLLOW_ORDER": str(i+1),
"ACTIVE": "Y",
"NAME": "BARRIER_PHONE",
"REDIRECT_NUMBER": str(row["phone"]),
"PERIOD": "always",
"PERIOD_DESCRIPTION": "always",
"TIMEOUT": "30"
})
# Создаем объект Result
result_object = Result(
redirect_type="3",
event_URL=EVENT_URL ,
client_id= CLIENT_ID,
event_extended="N",
masking= "Y",
display_number= DISPLAY_NUMBER,
followme_struct=[len(followme_struct),followme_struct]
)
exolve_response_object = ExolveResponse(
id=1,
jsonrpc="2.0",
result=result_object
)
logger.info(f'barriers {barrier_phones} try to open for {user_phone}')
return exolve_response_object
В методе проверяется, указан ли в теле запроса номер телефона с которого совершался звонок. По идее он всегда будет указан при нормальной работы, но для удобства тестирования запросов отправляемых вручную, я добавил эту проверку.
Затем мы получаем список телефонов, на которые нужно сделать вызов с помощью функции open_barrier.
И уже на базе полученного списка создаем объект для ответа на запрос.
Я решил не усложнять логику, поскольку тело ответа в нашем случае по большей части одинаковое, мы просто изменяющиеся данные подставим в шаблон.
Пройдемся в цикле по каждой записи, которая вернулась из базы данных, и создадим структуру для каждой записи, изменяя лишь атрибуты I_FOLLOW_ORDER
и REDIRECT_NUMBER
followme_struct.append( {
"I_FOLLOW_ORDER": “ПОРЯДКОВЫЙ НОМЕР ТЕЛЕФОНА ДЛЯ ВЫЗОВА”,
"ACTIVE": "Y",
"NAME": "BARRIER_PHONE",
"REDIRECT_NUMBER": “НОМЕР ТЕЛЕФОНА В ШЛАГБАУМЕ”,
"PERIOD": "always",
"PERIOD_DESCRIPTION": "always",
"TIMEOUT": "30"
})
Затем соберем как матрешку следующую структуру result_object
.
В ней мы в основном подставляем константы, но отдельно стоит обратить внимание на этот фрагмент кода: followme_struct=[len(followme_struct),followme_struct]
len(followme_struct)
– количество номеров, на которые будем делать вызов.
Далее пакуем третий слой матрешки:
exolve_response_object = ExolveResponse(
id=1,
jsonrpc="2.0",
result=result_object
)
Создаем запись в лог о попытке открыть шлагбаум(ы):
logger.info(f'barriers {barrier_phones} try to open for {user_phone}')
И возвращаем ответ:
return exolve_response_obje
ct
Осталось собрать все вместе в файле main.py и посмотреть код в деле
from fastapi import FastAPI
from api import crud, access, log
app = FastAPI( title="API for bariier control via MTC Exolve",
description="This API requires an API key in the X-API-Key header demo key is 12345",
version="1.0.0")
app.include_router(access.router) # Use the router
app.include_router(log.router) # Use the router
app.include_router(crud.router) # Use the router
По сути в данном файле мы создаем главную точку в API сервиса (app = FastAPI).
И далее подключаем к ней ранее созданные роутеры.
Пусть вас не смущает то, что мы еще не разобрали роутер «crud». Это опциональные функции для управления записями в базе данных, если они вам не нужны, вы можете просто удалить или закомментировать все, что связано с «crud».
Итоги
Для запуска сервиса локально воспользуйтесь командой
uvicorn app.main:app
Если вы как и я первый раз встречаетесь с FastAPI, то рекомендую более подробно ознакомится со статьей на Хабре, в которой очень хорошо расписаны азы работы с ним.
После того как сервис запустится, перейдите по адресу
Должен открыться Swagger.

Для теста создадим запись в логах:

В файле app.log должна появиться запись
2025-03-23 16:53:35,382 - myapp - INFO - log endpoint was called. body: Привет мир
Однако протестировать функцию звонка мы сможем только запустив сервис во внешний мир.
Лично я арендовал недорогой облачный сервер и настроил его согласно инструкции.
После этого если мы отправим на адрес
http:/<Ваш IP или домен>/access/
запрос со следующим body:
{
"id": "1",
"jsonrpc": "2.0",
"method": "getcontrolcallfollowme",
"params": {
"h323_conf_id": " BC5F236C 5AD211E9 81BA5CB9 01FED6FC",
"numberA": "79123456784",
"sip_id": "79123456785"
}
}
Вернется следующий ответ:
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"redirect_type": 3,
"event_URL": "http://62.113.44.250/logs",
"client_id": "1215882",
"event_extended": "N",
"masking": "Y",
"display_number": "79841860477",
"followme_struct": [
2,
[
{
"I_FOLLOW_ORDER": "1",
"ACTIVE": "Y",
"NAME": "BARRIER_PHONE",
"REDIRECT_NUMBER": "79123456782",
"PERIOD": "always",
"PERIOD_DESCRIPTION": "always",
"TIMEOUT": "30"
},
{
"I_FOLLOW_ORDER": "2",
"ACTIVE": "Y",
"NAME": "BARRIER_PHONE",
"REDIRECT_NUMBER": "79123456781",
"PERIOD": "always",
"PERIOD_DESCRIPTION": "always",
"TIMEOUT": "30"
}
]
]
}
}
Это означает, что MTC Exolve позвонит на оба шлагбаума одновременно, и они оба откроются.
Если у вас еще остались силы, быстро пробежимся по функциям CRUD.
Бонус – CRUD API
Еще раз напомню, что все материалы доступны на GitHub.
В файле /api/crud.py представлена дополнительная логика, с помощью которой можно выполнять стандартные операции, чтения, удаления, добавления и обновления записей в таблицах базы данных.

Полный листинг модуля под спойлером.
Скрытый текст
from fastapi import APIRouter, HTTPException, Depends, Body
from .config import *
from pydantic import BaseModel, Field
from typing import Optional, List, Annotated
import sqlite3
from .log import logger
from fastapi.security import APIKeyHeader
API_KEY = "12345" # Replace with your key
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True, description="API Key required for access")
async def get_api_key(api_key_header: Annotated[str, Depends(api_key_header)]):
"""
Dependency to validate the API key from the header.
"""
if api_key_header == API_KEY:
return api_key_header
else:
raise HTTPException(status_code=403, detail="Invalid API Key")
class Message(BaseModel):
message:str
class User(BaseModel):
id: int
phone: int
name: str
last_name: str
position: Optional[str] = None
class CreatedUser(BaseModel):
phone: int = Field(..., description="User's phone.")
name: str = Field(..., description="User's name.")
last_name: str = Field(..., description="User's last name.")
last_name: str
position: Optional[str] = Field(None, description="user position (optional).")
class CreatedBarrier(BaseModel):
zone: int = Field(..., description="Security zone (location of the barrier on the site plan).")
phone: int = Field(..., description="Phone of GSM module for opening barrier.")
class Barrier(BaseModel):
id: int
zone: int
phone: int
class Access(BaseModel):
user_phone: int
barrier_zone: int
def read_tables(table, id=None):
"""
Function for read users and barriers from db.
"""
connection = sqlite3.connect(DB_NAME)
cursor = connection.cursor()
where = ""
if id:
where = f" WHERE `id` = {id}"
cursor.execute(f'SELECT * FROM {table} {where}')
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
response = [dict(zip(column_names, row)) for row in rows]
connection.close()
if not response:
return None
return response[0] if id else response
def update_tables(table, operation, data=None, id=None):
"""
Function for read users, barriers, and accesses from DB.
"""
try:
connection = sqlite3.connect(DB_NAME)
cursor = connection.cursor()
sql_data = ()
fields = {
"users":("phone", "name", "last_name", "position"),
"barriers":("zone", "phone"),
"users_access":("user_phone", "barrier_zone")
}
if data:
match table:
case "users":
sql_data = (data.phone, data.name, data.last_name, data.position)
case "barriers":
sql_data = (data.zone, data.phone)
case "users_access":
sql_data = (data.user_phone, data.barrier_zone)
match operation:
case "create":
values=[]
for field in fields[table]:
values.append(f'?')
result_string = ', '.join(values)
sql = f'UPDATE {table} SET {result_string} Where id = {id}'
sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})'
cursor.execute(sql, sql_data)
connection.commit()
case "delete":
sql =""
if id:
sql = f'DELETE FROM {table} WHERE id = {id}'
elif data:
sql = f'DELETE FROM {table} WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} '
cursor.execute(sql)
connection.commit()
case "update":
update_fields = []
for field in fields[table]:
update_fields.append(f'{field} = ?')
result_string = ', '.join(update_fields)
sql = f'UPDATE {table} SET {result_string} Where id = {id}'
sql_data = tuple(data.model_dump().values())
cursor.execute(sql, sql_data)
connection.commit()
case None:
return None
except sqlite3.Error as e:
print(f"Error: {e}")
return None
finally:
if connection:
connection.close()
return cursor.lastrowid
router = APIRouter(tags=["CRUD"])
@router.get("/bariers/", response_model=List[Barrier])
async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]):
"""
List all barriers.
"""
response = read_tables(BARIERS_TABLE)
return response
@router.get("/barrier/{barrier_id}", response_model=Barrier)
async def list_barrier(barrier_id: int, api_key: Annotated[str, Depends(get_api_key)]):
"""
Get a barrier with specific id.
"""
response = read_tables(BARIERS_TABLE, barrier_id)
if not response:
raise HTTPException(status_code=404, detail="Barrier not found")
return response
@router.post("/barriers/", response_model=Message)
async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]):
"""
Creates a new barrier.
"""
result = update_tables(BARIERS_TABLE, "create", data=barrier)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'barrier created {barrier}')
return {"message": f"created barrier with id = {result}"}
@router.delete("/barriers/{barrier_id}", response_model=Message)
async def delete_barrier(barrier_id:int, api_key: Annotated[str, Depends(get_api_key)]):
"""
Delete a barrier with specific id.
"""
result = update_tables(BARIERS_TABLE, "delete", None, barrier_id)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'barrier {barrier_id} deleted')
return {"message":"ok"}
@router.patch("/barriers/{barrier_id}", response_model=Message)
async def edit_barrier(barrier_id:int, barrier:CreatedBarrier,api_key: Annotated[str, Depends(get_api_key)]):
"""
Update a barrier with specific id (fill in all the request attributes).
"""
result = update_tables(BARIERS_TABLE, "update", barrier, barrier_id)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'barrier {barrier_id} updated {barrier}')
return {"message":"ok"}
@router.get("/accesses/", response_model=List[Access])
async def list_accesses(api_key: Annotated[str, Depends(get_api_key)]):
"""
List accesses (user to barrier).
"""
response = read_tables(ACCESS_TABLE)
return response
@router.post("/accesses/", response_model=Message)
async def create_access(access: Access, api_key: Annotated[str, Depends(get_api_key)]):
"""
Creates a new access (user to barrier).
"""
result = update_tables(ACCESS_TABLE, "create", data=access)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'user access created {access}')
return {"message": f"created access with id = {result}"}
@router.delete("/accesses/", response_model=Message)
async def delete_user(access:Access, api_key: Annotated[str, Depends(get_api_key)]):
"""
Delete access (user to barrier).
"""
result = update_tables(ACCESS_TABLE, "delete",access, None)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'user access deleted {access}')
return {"message":"ok"}
@router.get("/users/", response_model=List[User])
async def list_users(api_key: Annotated[str, Depends(get_api_key)]):
"""
List all users.
"""
response = read_tables(USER_TABLE)
return response
@router.get("/users/{user_id}", response_model=User)
async def list_user(user_id: int, api_key: Annotated[str, Depends(get_api_key)]):
"""
Get a user with specific id.
"""
response = read_tables(USER_TABLE, user_id)
if not response:
raise HTTPException(status_code=404, detail="User not found")
return response
@router.post("/users/", response_model=Message)
async def create_user(user: CreatedUser, api_key: Annotated[str, Depends(get_api_key)]):
"""
Creates a new user.
"""
result = update_tables(USER_TABLE, "create", data=user)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'user created {user}')
return {"message": f"created user with id {result}"}
@router.delete("/users/{user_id}", response_model=Message)
async def delete_user(user_id:int, api_key: Annotated[str, Depends(get_api_key)]):
"""
Delete a user with specific id.
"""
result = update_tables(USER_TABLE, "delete", None, user_id)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'user_id = {user_id} deleted')
return {"message":"ok"}
@router.patch("/users/{user_id}", response_model=Message)
async def edit_user(user_id:int, user:CreatedUser, api_key: Annotated[str, Depends(get_api_key)]):
"""
Update a user
with specific id (fill in all the request attributes).
"""
result = update_tables(USER_TABLE, "update", user, user_id)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'user_id = {user_id} updated {user}')
return {"message":"ok"}
Мы не будем разбирать всю логику подробно, разберем лишь функции чтения и изменения данных и пару типовых методов, которые к ним обращаются.
Функция read_tables формирует в БД запрос на получение данных из таблицы. Я постарался сделать ее поведение универсальным, чтобы выбирать из какой таблицы читать, а также указать id, для которого нужно вернуть данные (это пригодится в методах чтения одной записи из выборки).
def read_tables(table, id=None):
"""
Function for read users and barriers from db.
"""
connection = sqlite3.connect(DB_NAME)
cursor = connection.cursor()
where = ""
if id:
where = f" WHERE `id` = {id}"
cursor.execute(f'SELECT * FROM {table} {where}')
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
response = [dict(zip(column_names, row)) for row in rows]
connection.close()
if not response:
return None
return response[0] if id else response
А вот функция update_tables более комплексная. Я тоже попытался сделать её универсальной, чтобы с помощью нее обрабатывать запросы на удаление, обновление и создание новых записей в таблицах.
def update_tables(table, operation, data=None, id=None):
"""
Function for read users, barriers, and accesses from DB.
"""
try:
connection = sqlite3.connect(DB_NAME)
cursor = connection.cursor()
sql_data = ()
fields = {
"users":("phone", "name", "last_name", "position"),
"barriers":("zone", "phone"),
"users_access":("user_phone", "barrier_zone")
}
if data:
match table:
case "users":
sql_data = (data.phone, data.name, data.last_name, data.position)
case "barriers":
sql_data = (data.zone, data.phone)
case "users_access":
sql_data = (data.user_phone, data.barrier_zone)
match operation:
case "create":
values=[]
for field in fields[table]:
values.append(f'?')
result_string = ', '.join(values)
sql = f'UPDATE {table} SET {result_string} Where id = {id}'
sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})'
cursor.execute(sql, sql_data)
connection.commit()
case "delete":
sql =""
if id:
sql = f'DELETE FROM {table} WHERE id = {id}'
elif data:
sql = f'DELETE FROM {table} WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} '
cursor.execute(sql)
connection.commit()
case "update":
update_fields = []
for field in fields[table]:
update_fields.append(f'{field} = ?')
result_string = ', '.join(update_fields)
sql = f'UPDATE {table} SET {result_string} Where id = {id}'
sql_data = tuple(data.model_dump().values())
cursor.execute(sql, sql_data)
connection.commit()
case None:
return None
except sqlite3.Error as e:
print(f"Error: {e}")
return None
finally:
if connection:
connection.close()
return cursor.lastrowid
И рассмотрим код пары методов, в которых используются вышеуказанные функции:
Начнем с чтения всех данных о шлагбаумах. метода GET /barriers/. По сути он лишь обращается к базе данных и возвращает список с данными по всем шлагбаумам.
router = APIRouter(tags=["CRUD"])
@router.get("/bariers/", response_model=List[Barrier])
async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]):
"""
List all barriers.
"""
response = read_tables(BARIERS_TABLE)
return response
Обратите внимание на api_key: Annotated[str, Depends(get_api_key)
, я решил что методы CRUD должны быть с проверкой авторизации, поэтому для доступа к ним нужен будет код доступа 12345. Авторизацию сгенерировала нейросеть, поэтому я не буду её подробно разбирать, если вы изучите полный листинг, то быстро разберетесь в типовом коде.
Результат работы:

Пришло время метода для создания записи о барьере POST /barriers/. Метод принимает на вход номер зоны и телефон внутри GSM-модуля шлагбаума. В случае если запись успешно создана, вернет сообщение с id созданной записи в таблице barrires.
@router.post("/barriers/", response_model=Message)
async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]):
"""
Creates a new barrier.
"""
result = update_tables(BARIERS_TABLE, "create", data=barrier)
if result == None:
raise HTTPException(status_code=400, detail="Bad request")
logger.info(f'barrier created {barrier}')
return {"message": f"created barrier with id = {result}"}
Результат работы:

Теперь у нас есть не только функционал для открытия шлагбаума, но и заготовка для админ-панели по управлению доступом.
Остальные методы вы сможете изучить самостоятельно, развернув сервис локально.
Спасибо, что дочитали до конца! Надеюсь, статья была интересной. Буду рад ознакомится с конструктивными комментариями.