Несмотря на то, что я системный аналитик, проклятие, которое начинается с фразы  «тыж программист…», иногда преследует и меня. Недавно у меня спросили совета, как сделать так, чтобы для нужных людей шлагбаум удаленно открывался без транспондера и сложных систем. 

К сожалению, сходу я не придумал ответа на вопрос, но задачка показалась мне любопытной, и я немного изучил вопрос. 

Решение нашёл быстро — в качестве аппаратной части можно использовать GSM-реле, а контроль доступа реализовать с помощью микро сервиса для переадресации вызова.

И поскольку у меня наличествовал тестовый доступ к МТС Exolve, грех было им не воспользоваться. Тем более, что соответствующее API для управления входящим вызовом есть.

Оглавление

Если честно, я не знаток автоматических шлагбаумов, поэтому в статье я не буду привязываться к аппаратной части, мне кажется подойдёт практически любое устройство, которое предложит поисковик по запросу «GSM модуль для шлагбаума».  Как правило у многих моделей шлагбаумов есть контакт, который при замыкании (размыкании реле) будет поднимать и опускать шлагбаум.

Как видите решение очень простое, в принципе можно и не городить никаких сервисов, а просто раздать знакомым номер телефона SIM-карты в GSM-модуле. Но что делать, если этот номер станет известен третьим лицам? Нам явно нужен белый список разрешенных номеров. По хорошему такие проблемы решают комплексные системы безопасности. Но мне очень хотелось покрутить в руках FastAPI. Поэтому мы сделаем свой сервис по контролю звонящих с «картошными играми и прочими увеселениями».

Однако, прежде чем приступать к делу – традиционный дисклеймер. Я не программист.  Задачу я решал скорее концептуально и корнер-кейсы особо не продумывал. Поэтому, настоятельно не рекомендую относиться к материалам статьи, как к истине в последней инстанции и использовать код без изменений в продуктовой среде.

Что планируем сделать

Формальности улажены и можно переходить к делу.

Рассмотрим следующий сценарий:

Предусловия.

  1. Есть несколько (минимум две) зоны, защищенные шлагбаумами, в каждом стоит свой GSM-модуль. 

  2. Пользователям может быть разрешен доступ к одной или сразу нескольким зонам.

Мы будем прорабатывать следующий:

  1. Пользователь подъезжает к шлагбауму.

  2. Набирает номер.

  3. MTС Exolve делает запрос к нам в сервис

  4. Сервис проверяет в какие зоны разрешен проезд пользователю и делает обзвон соответствующих шлагбаумов.

  5. Шлагбаумы открываются.


Схема управления шлагбаумом

База данных

Начнем с базы данных. в которой будем хранить доступы пользователей из белого списка.

Для прототипа я взял SQlite ибо её функций нам хватит с головой, а ее поддержка уже давно реализована «из коробки» в Python 3.

Фактически нам нужно две таблицы:

  • Таблица для данных о пользователях, которые будут звонить на контрольный номер (users). 

  • Таблица для данных о номерах телефонах GSM-модулей в шлагбаумах и защищаемых ими зонах (barriers).

Однако, для реализации связи многие-ко-многим, мы сделаем третью табличку в которой непосредственно соединим пользователей и доступные им зоны для проезда (users_access).

В результате получим такую структуру.

ER-диаграмма
Структура БД

Поскольку все исходники я разместил на GitHub, воздержусь от подробного описания структуры таблиц..

Отмечу только, что в user_access составной первичный ключ из обоих полей, при этом оба поля это внешние ключи в соответствующих таблицах. 

В качестве тестового примера мы создадим два шлагбаума.

Один для обычных гостей (#1), а второй (№2) для сотрудников.

Данные таблицы barriers

Все имена и номера телефонов выдуманы, любые совпадения случайны. 

Создадим двух пользователей.

Одного гостя (Habr) и одного сотрудника (Habra).

Данные таблицы users

Осталось задать доступы. Пользователь Habr сможет проехать только через гостевой шлагбаум, а Habra может проехать через оба.

Данные таблицы users_access

Настройка УВВ API МТС Exolve

Прежде чем приступить к разработке сервиса, нам необходимо настроить управление входящим вызовов в МТС Exolve. Я надеюсь, что у вас есть к нему доступ.

Согласно документации, необходимо с помощью метода setSipCallControlU установить URL, к которому Exolve будет обращаться за дальнейшими инструкциями по обработке вызова.

Для этого необходимо отправить POST запрос на адрес:

https://api.mtt.ru/ipcr/ (указав в базовой авторизации логин и пароль).

Тело запроса:

{
    "id":"1",
    "jsonrpc":"2.0",
    "method": "setSipCallControlURL",
    "params": {
        "sip_id": "Ваш SIP ID (ном",
         "url": "Ваш URL"
    }
}

В нашем случае  sip_id – это номер телефона, на который будут звонить пользователи. А url – это адрес метода, который скажет, что делать дальше с вызовом вида <ваш домен или IP>/access.

Дополнительно еще можно установить резервный номер, на случай если сервис будет недоступен, например, номер поста охраны.

Сервис для управления входящим вызовом и логирование запроса

Итак, пришло время браться за разработку сервиса. Я использовал Python v. 3.10 и fastapi v. 0.115, но думаю, что подойдут и другие версии.

Освежим в памяти, схему взаимодействия, рассмотрим её в формате диаграммы последовательности.

Схема работы с Exolve

Создадим файловую структуру:

├── api

│   ├── access.py (открытие шлагбаума)

│   ├── config.py (общие константы и настройки)

│   ├── crud.py (методы для управления записями в БД)

│   ├── __init__.py (служебный файл для подключения модулей)

│   └── log.py (работа с записями в лог)

├── data.sqlite (база данных)

├── main.py (главный файл скрипта)

└── readme.md

└──app.log (появится после первого логируемого запроса)

Начнем с файла api/config.py

DB_NAME = 'data.sqlite'
ACCESS_TABLE = 'users_access'
BARIERS_TABLE = 'barriers'
USER_TABLE = 'users'
LOGGING = {
   'version': 1,
   'disable_existing_loggers': False,
   'formatters': {
       'simple': {
           'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
       },
   },
   'handlers': {
       'file': {
           'class': 'logging.FileHandler',
           'filename': 'app.log',
           'formatter': 'simple'
       },
   },

   'loggers': {
       'myapp': {
           'handlers': ['file'],
           'level': 'INFO',
           'propagate': False
       },
   }
}
all = ["DB_NAME", "ACCESS_TABLE", "BARIERS_TABLE", "USER_TABLE", "LOGGING"]

В нем мы храним константы, которые будем использовать в других скриптах, а также создаем типовую настройку логов для библиотеки logging.

Не отходя от кассы, перейдем к api/logging.py

from fastapi import APIRouter, Body
from typing import Any
from . import config
import logging.config

logging.config.dictConfig(config.LOGGING)
logger = logging.getLogger('myapp')
router = APIRouter(tags=["Logs"])

@router.post('/logs')
async def get_body(body: Any = Body(None)):
   """
   Create log for MTC Exolve events.
   """
   logger.info(f'log endpoint was called. body: {body}')
   return body

В данном скрипте импортируем необходимые библиотеки, создаем экземпляр логгера, который будем использовать в других методах.
А также создаем группу дочерних API (router), в которой будет всего 1 POST метод 

Данный метод при вызове просто пишет информационное сообщение с телом запроса.
Он пригодится нам либо для тестирования логов, либо для сохранения сведений о событии при запросе от МТС Exolve.

Пришло время основной бизнес-логики, ради которой все и затеял – скрипту api/access.py

Полный листинг скрипта спрячу под спойлером:

Скрытый текст
from fastapi import APIRouter, HTTPException,  Body
from pydantic import BaseModel
from typing import  List,  Union, Any
import sqlite3

from .config import *
from .log import logger

EVENT_URL = "<url or IP>/logs"
CLIENT_ID = "MTC Exolve client ID"
DISPLAY_NUMBER  = "MTC Exolve phone number"

class FollowMeRule(BaseModel):
   I_FOLLOW_ORDER: str
   ACTIVE: str
   NAME: str
   REDIRECT_NUMBER: str
   PERIOD: str
   PERIOD_DESCRIPTION: str
   TIMEOUT: str

class FollowMeStruct(BaseModel):
   List[FollowMeRule] 

class Result(BaseModel):
   redirect_type: int
   event_URL: str
   client_id: str
   event_extended: str
   masking: str
   display_number: str
   followme_struct: List[Union[int, List[FollowMeRule]]] 

class ExolveResponse(BaseModel):
   id: int
   jsonrpc: str
   result: Result

def open_barriers(phone):
   connection = sqlite3.connect(DB_NAME)
   cursor = connection.cursor()
   cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b
   JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone
   where u.user_phone = {phone} ''')
   column_names = [col[0] for col in cursor.description]
   rows = cursor.fetchall()
   response = [dict(zip(column_names, row)) for row in rows]
   connection.close()
   return response

router = APIRouter(tags=["Access control"])

@router.post("/access/", response_model=ExolveResponse)
async def response_to_exolve(body: Any = Body(None)):
   """
   Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539))
   """
   if 'params' in body and 'numberA' in body['params']:
       user_phone = body["params"]["numberA"]
   else:
       raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required")
   barrier_phones =  open_barriers(user_phone)
   if (barrier_phones == None):
       raise HTTPException(status_code=403, detail="Access to barriers not allowed")
   followme_struct = []
   for i in range(0, len(barrier_phones) ) :
       row = barrier_phones[i]
       followme_struct.append(            {
               "I_FOLLOW_ORDER": str(i+1),
               "ACTIVE": "Y",
               "NAME": "BARRIER_PHONE",
               "REDIRECT_NUMBER": str(row["phone"]),
               "PERIOD": "always",
               "PERIOD_DESCRIPTION": "always",
               "TIMEOUT": "30"
           })
       # Создаем объект Result
   result_object = Result(
       redirect_type="3",
       event_URL=EVENT_URL ,
       client_id= CLIENT_ID,
       event_extended="N",
       masking= "Y",
       display_number= DISPLAY_NUMBER,
       followme_struct=[len(followme_struct),followme_struct]
   )
   exolve_response_object = ExolveResponse(
           id=1,
           jsonrpc="2.0",
           result=result_object
   )
   logger.info(f'barriers {barrier_phones} try to open for {user_phone}')
   return exolve_response_object

В самом начале мы импортируем необходимые модули и настраиваем константы:

from fastapi import APIRouter, HTTPException,  Body
from pydantic import BaseModel
from typing import  List,  Union, Any
import sqlite3

from .config import *
from .log import logger
EVENT_URL = "<url or IP>/logs"
CLIENT_ID = "MTC Exolve client ID"
DISPLAY_NUMBER  = "MTC Exolve phone number"

Дальше идет определение классов данных для метода. По сути мы как конструктор собираем из нескольких блоков структуру ожидаемого ответа, указанную в документации.

class FollowMeRule(BaseModel):
   I_FOLLOW_ORDER: str
   ACTIVE: str
   NAME: str
   REDIRECT_NUMBER: str
   PERIOD: str
   PERIOD_DESCRIPTION: str
   TIMEOUT: str


class FollowMeStruct(BaseModel):
   List[FollowMeRule] 


class Result(BaseModel):
   redirect_type: int
   event_URL: str
   client_id: str
   event_extended: str
   masking: str
   display_number: str
   followme_struct: List[Union[int, List[FollowMeRule]]] 


class ExolveResponse(BaseModel):
   id: int
   jsonrpc: str
   result: Result

Дальше определим функцию для поиска доступных к открытию шлагбаумов для пользователя, позвонившего на номер телефона.

Функция open_barriers проверяет по таблице users_access какие зоны безопасности доступны пользователю, склеивает их с номерами телефонов из таблицы barriers, и возвращает их в виде списка.

В принципе функция в  скрипте вызывается один раз, и можно не выносить её отдельно, но я решил для удобства чтения сделать отдельно.

def open_barriers(phone):
   connection = sqlite3.connect(DB_NAME)
   cursor = connection.cursor()
   cursor.execute(f'''SELECT phone from {BARIERS_TABLE} as b
   JOIN {ACCESS_TABLE} as u on b.zone = u.barrier_zone
   where u.user_phone = {phone} ''')
   column_names = [col[0] for col in cursor.description]
   rows = cursor.fetchall()
   response = [dict(zip(column_names, row)) for row in rows]
   connection.close()
   return response

Осталось рассмотреть описание самого метода POST /access, который вызовет MTC Exolve при звонке.

router = APIRouter(tags=["Access control"])
@router.post("/access/", response_model=ExolveResponse)
async def response_to_exolve(body: Any = Body(None)):
   """
   Get response to MTC Exolve for redirect phone call to barrier GSM \n (s.a [documentation](https://wiki.exolve.ru/pages/viewpage.action?pageId=106332539))
   """
   if 'params' in body and 'numberA' in body['params']:
       user_phone = body["params"]["numberA"]
   else:
       raise HTTPException(status_code=400, detail="Bad request.Filed params.numberA required")
   barrier_phones =  open_barriers(user_phone)
   if (barrier_phones == None):
       raise HTTPException(status_code=403, detail="Access to barriers not allowed")
   followme_struct = []
   for i in range(0, len(barrier_phones) ) :
       row = barrier_phones[i]
       followme_struct.append(            {
               "I_FOLLOW_ORDER": str(i+1),
               "ACTIVE": "Y",
               "NAME": "BARRIER_PHONE",
               "REDIRECT_NUMBER": str(row["phone"]),
               "PERIOD": "always",
               "PERIOD_DESCRIPTION": "always",
               "TIMEOUT": "30"
           })
       # Создаем объект Result
   result_object = Result(
       redirect_type="3",
       event_URL=EVENT_URL ,
       client_id= CLIENT_ID,
       event_extended="N",
       masking= "Y",
       display_number= DISPLAY_NUMBER,
       followme_struct=[len(followme_struct),followme_struct]
   )
   exolve_response_object = ExolveResponse(
           id=1,
           jsonrpc="2.0",
           result=result_object
   )
   logger.info(f'barriers {barrier_phones} try to open for {user_phone}')
   return exolve_response_object

В методе проверяется, указан ли в теле запроса номер телефона с которого совершался звонок. По идее он всегда будет указан при нормальной работы, но для удобства тестирования запросов отправляемых вручную, я добавил эту проверку.

Затем мы получаем список телефонов, на которые нужно сделать вызов с помощью функции open_barrier.

И уже на базе полученного списка создаем объект для ответа на запрос.

Я решил не усложнять логику, поскольку тело ответа в нашем случае по большей части одинаковое, мы просто изменяющиеся данные подставим в шаблон.

Пройдемся в цикле по каждой записи, которая вернулась из базы данных, и создадим структуру для каждой записи, изменяя лишь атрибуты I_FOLLOW_ORDER и REDIRECT_NUMBER

       followme_struct.append(            {
               "I_FOLLOW_ORDER": “ПОРЯДКОВЫЙ НОМЕР ТЕЛЕФОНА ДЛЯ ВЫЗОВА”,
               "ACTIVE": "Y",
               "NAME": "BARRIER_PHONE",
               "REDIRECT_NUMBER": “НОМЕР ТЕЛЕФОНА В ШЛАГБАУМЕ”,
               "PERIOD": "always",
               "PERIOD_DESCRIPTION": "always",
               "TIMEOUT": "30"
           })

Затем соберем как матрешку следующую структуру result_object.

В ней мы в основном подставляем константы, но отдельно стоит обратить внимание на этот фрагмент кода: followme_struct=[len(followme_struct),followme_struct]

len(followme_struct) – количество номеров, на которые будем делать вызов. 

Далее пакуем третий слой матрешки:

   exolve_response_object = ExolveResponse(
           id=1,
           jsonrpc="2.0",
           result=result_object
   )

Создаем запись в лог о попытке открыть шлагбаум(ы):

logger.info(f'barriers {barrier_phones} try to open for {user_phone}')

И возвращаем ответ:

return exolve_response_object

Осталось собрать все вместе в файле main.py и посмотреть код в деле

from fastapi import FastAPI
from api import crud, access, log
app = FastAPI( title="API for bariier control via MTC Exolve",
   description="This API requires an API key in the X-API-Key header demo key is 12345",
   version="1.0.0")

app.include_router(access.router) # Use the router
app.include_router(log.router) # Use the router
app.include_router(crud.router) # Use the router

По сути в данном файле мы создаем главную точку в API сервиса (app = FastAPI).

И далее подключаем к ней ранее созданные роутеры.

Пусть вас не смущает то, что мы еще не разобрали роутер «crud». Это опциональные функции для управления записями в базе данных, если они вам не нужны, вы можете просто удалить или закомментировать все, что связано с  «crud».

Итоги

Для запуска сервиса локально воспользуйтесь командой

uvicorn app.main:app

Если вы как и я первый раз встречаетесь с FastAPI, то рекомендую более подробно ознакомится со статьей на Хабре, в которой очень хорошо расписаны азы работы с ним.

После того как сервис запустится, перейдите по адресу

http://127.0.0.1:8000/docs//

Должен открыться Swagger.

Swagger сервиса

Для теста создадим запись в логах:

Вызов метода для логирования.

В файле app.log должна появиться запись

2025-03-23 16:53:35,382 - myapp - INFO - log endpoint was called. body: Привет мир

Однако протестировать функцию звонка мы сможем только запустив сервис во внешний мир.

Лично я арендовал недорогой облачный сервер и настроил его согласно инструкции.

После этого если мы отправим на адрес

http:/<Ваш IP или домен>/access/ запрос со следующим body:

{
   "id": "1",
   "jsonrpc": "2.0",
   "method": "getcontrolcallfollowme",
   "params": {
       "h323_conf_id": " BC5F236C 5AD211E9 81BA5CB9 01FED6FC",
       "numberA": "79123456784",
       "sip_id": "79123456785"
   }
}

Вернется следующий ответ:

{
  "id": 1,
  "jsonrpc": "2.0",
  "result": {
    "redirect_type": 3,
    "event_URL": "http://62.113.44.250/logs",
    "client_id": "1215882",
    "event_extended": "N",
    "masking": "Y",
    "display_number": "79841860477",
    "followme_struct": [
      2,
      [
        {
          "I_FOLLOW_ORDER": "1",
          "ACTIVE": "Y",
          "NAME": "BARRIER_PHONE",
          "REDIRECT_NUMBER": "79123456782",
          "PERIOD": "always",
          "PERIOD_DESCRIPTION": "always",
          "TIMEOUT": "30"
        },
        {
          "I_FOLLOW_ORDER": "2",
          "ACTIVE": "Y",
          "NAME": "BARRIER_PHONE",
          "REDIRECT_NUMBER": "79123456781",
          "PERIOD": "always",
          "PERIOD_DESCRIPTION": "always",
          "TIMEOUT": "30"
        }
      ]
    ]
  }
}

Это означает, что MTC Exolve позвонит на оба шлагбаума одновременно, и они оба откроются.

Если у вас еще остались силы, быстро пробежимся по функциям CRUD.

Бонус – CRUD API

Еще раз напомню, что все материалы доступны на GitHub.

В файле /api/crud.py представлена дополнительная логика, с помощью которой можно выполнять стандартные операции, чтения, удаления, добавления и обновления записей в таблицах базы данных.

Схема работы с API. Операции CRUD

Полный листинг модуля под спойлером.

Скрытый текст
from fastapi import APIRouter, HTTPException,  Depends,  Body
from .config import *
from pydantic import BaseModel, Field
from typing import Optional, List, Annotated
import sqlite3
from .log import logger
from fastapi.security import APIKeyHeader


API_KEY = "12345"  # Replace with your key

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=True, description="API Key required for access")


async def get_api_key(api_key_header: Annotated[str, Depends(api_key_header)]):
   """
   Dependency to validate the API key from the header.
   """
   if api_key_header == API_KEY:
       return api_key_header
   else:
       raise HTTPException(status_code=403, detail="Invalid API Key")

class Message(BaseModel):
   message:str


class User(BaseModel):
   id: int
   phone: int
   name: str
   last_name: str
   position: Optional[str] = None


class CreatedUser(BaseModel):
   phone: int = Field(...,  description="User's phone.")
   name: str = Field(...,  description="User's name.")
   last_name: str = Field(...,  description="User's last name.")
   last_name: str
   position: Optional[str] = Field(None, description="user position (optional).")


class CreatedBarrier(BaseModel):
   zone: int = Field(...,  description="Security zone (location of the barrier on the site plan).")
   phone: int = Field(...,  description="Phone of GSM module for opening barrier.")  

class Barrier(BaseModel):
   id: int
   zone: int
   phone: int


class Access(BaseModel):
   user_phone: int
   barrier_zone: int


def read_tables(table, id=None):
   """
   Function for read users and barriers from db.
   """
   connection = sqlite3.connect(DB_NAME)
   cursor = connection.cursor()
   where = ""
   if id:
       where = f" WHERE `id` = {id}"
   cursor.execute(f'SELECT * FROM {table} {where}')
   column_names = [col[0] for col in cursor.description]
   rows = cursor.fetchall()
   response = [dict(zip(column_names, row)) for row in rows]
   connection.close()
   if not response:
       return None
   return response[0] if id else response


def update_tables(table, operation, data=None, id=None):
   """
   Function for read users, barriers, and accesses from DB.
   """
   try:
       connection = sqlite3.connect(DB_NAME)
       cursor = connection.cursor()
       sql_data = ()
       fields = {
           "users":("phone", "name", "last_name", "position"),
           "barriers":("zone", "phone"),
           "users_access":("user_phone", "barrier_zone")
           }
       if data:
           match table:
               case "users":
                   sql_data = (data.phone, data.name, data.last_name, data.position)
               case "barriers":
                   sql_data = (data.zone, data.phone)
               case "users_access":
                   sql_data = (data.user_phone, data.barrier_zone)
       match operation:
           case "create":
               values=[]
               for field in fields[table]:
                    values.append(f'?')
               result_string = ', '.join(values)
               sql = f'UPDATE {table} SET {result_string} Where id = {id}'
               sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})'
               cursor.execute(sql, sql_data)
               connection.commit()
           case "delete":
               sql =""
               if id:
                   sql = f'DELETE FROM {table}  WHERE id = {id}'
               elif data:
                   sql = f'DELETE FROM {table}  WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} '
               cursor.execute(sql)
               connection.commit()
           case "update":
               update_fields = []
               for field in fields[table]:
                    update_fields.append(f'{field} = ?')
               result_string = ', '.join(update_fields)
               sql = f'UPDATE {table} SET {result_string} Where id = {id}'
               sql_data = tuple(data.model_dump().values())
               cursor.execute(sql, sql_data)
               connection.commit()
           case None:
               return None
   except sqlite3.Error as e:
           print(f"Error: {e}")
           return None
   finally:
           if connection:
               connection.close()
   return cursor.lastrowid

router = APIRouter(tags=["CRUD"])

@router.get("/bariers/", response_model=List[Barrier])
async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]):
   """
   List all barriers.
   """
   response = read_tables(BARIERS_TABLE)
   return response


@router.get("/barrier/{barrier_id}", response_model=Barrier)
async def list_barrier(barrier_id: int, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Get a barrier with specific id.
   """
   response = read_tables(BARIERS_TABLE, barrier_id)
   if not response:
       raise HTTPException(status_code=404, detail="Barrier not found")
   return response

@router.post("/barriers/", response_model=Message)
async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Creates a new barrier.
   """
   result = update_tables(BARIERS_TABLE, "create", data=barrier)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'barrier created {barrier}')
   return {"message": f"created barrier with id = {result}"}


@router.delete("/barriers/{barrier_id}", response_model=Message)
async def delete_barrier(barrier_id:int, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Delete a barrier with specific id.
   """
   result = update_tables(BARIERS_TABLE, "delete", None, barrier_id)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'barrier {barrier_id} deleted')
   return {"message":"ok"}

@router.patch("/barriers/{barrier_id}", response_model=Message)
async def edit_barrier(barrier_id:int, barrier:CreatedBarrier,api_key: Annotated[str, Depends(get_api_key)]):
   """
   Update a barrier with specific id (fill in all the request attributes).
   """
   result = update_tables(BARIERS_TABLE, "update", barrier, barrier_id)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'barrier {barrier_id} updated {barrier}')
   return {"message":"ok"}


@router.get("/accesses/", response_model=List[Access])
async def list_accesses(api_key: Annotated[str, Depends(get_api_key)]):
   """
   List accesses (user to barrier).
   """
   response = read_tables(ACCESS_TABLE)
   return response

@router.post("/accesses/", response_model=Message)
async def create_access(access: Access, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Creates a new access (user to barrier).
   """
   result = update_tables(ACCESS_TABLE, "create", data=access)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'user access created {access}')
   return {"message": f"created access with id = {result}"}


@router.delete("/accesses/", response_model=Message)
async def delete_user(access:Access, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Delete access (user to barrier).
   """
   result = update_tables(ACCESS_TABLE, "delete",access, None)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'user access deleted {access}')
   return {"message":"ok"}

@router.get("/users/", response_model=List[User])
async def list_users(api_key: Annotated[str, Depends(get_api_key)]):
   """
   List all users.
   """
   response = read_tables(USER_TABLE)
   return response




@router.get("/users/{user_id}", response_model=User)
async def list_user(user_id: int, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Get a user with specific id.
   """
   response = read_tables(USER_TABLE, user_id)
   if not response:
       raise HTTPException(status_code=404, detail="User not found")
   return response


@router.post("/users/", response_model=Message)
async def create_user(user: CreatedUser, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Creates a new user.
   """
   result = update_tables(USER_TABLE, "create", data=user)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'user created {user}')
   return {"message": f"created  user with id {result}"}


@router.delete("/users/{user_id}", response_model=Message)
async def delete_user(user_id:int, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Delete a user with specific id.
   """
   result = update_tables(USER_TABLE, "delete", None, user_id)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'user_id = {user_id} deleted')
   return {"message":"ok"}




@router.patch("/users/{user_id}", response_model=Message)
async def edit_user(user_id:int, user:CreatedUser, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Update a user
     with specific id (fill in all the request attributes).
   """
   result = update_tables(USER_TABLE, "update", user, user_id)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'user_id = {user_id} updated {user}')
   return {"message":"ok"}

Мы не будем разбирать всю логику подробно, разберем лишь функции чтения и изменения данных и пару типовых методов, которые к ним обращаются.

Функция read_tables формирует в БД запрос на получение данных из таблицы. Я постарался сделать ее поведение универсальным, чтобы выбирать из какой таблицы читать, а также указать id, для которого нужно вернуть данные (это пригодится в методах чтения одной записи из выборки). 

def read_tables(table, id=None):
   """
   Function for read users and barriers from db.
   """
   connection = sqlite3.connect(DB_NAME)
   cursor = connection.cursor()
   where = ""
   if id:
       where = f" WHERE `id` = {id}"
   cursor.execute(f'SELECT * FROM {table} {where}')
   column_names = [col[0] for col in cursor.description]
   rows = cursor.fetchall()
   response = [dict(zip(column_names, row)) for row in rows]
   connection.close()
   if not response:
       return None
   return response[0] if id else response

А вот функция update_tables более комплексная. Я тоже попытался сделать её универсальной, чтобы с помощью нее обрабатывать запросы на удаление, обновление и создание новых записей в таблицах.

def update_tables(table, operation, data=None, id=None):
   """
   Function for read users, barriers, and accesses from DB.
   """
   try:
       connection = sqlite3.connect(DB_NAME)
       cursor = connection.cursor()
       sql_data = ()
       fields = {
           "users":("phone", "name", "last_name", "position"),
           "barriers":("zone", "phone"),
           "users_access":("user_phone", "barrier_zone")
           }
       if data:
           match table:
               case "users":
                   sql_data = (data.phone, data.name, data.last_name, data.position)
               case "barriers":
                   sql_data = (data.zone, data.phone)
               case "users_access":
                   sql_data = (data.user_phone, data.barrier_zone)
       match operation:
           case "create":
               values=[]
               for field in fields[table]:
                    values.append(f'?')
               result_string = ', '.join(values)
               sql = f'UPDATE {table} SET {result_string} Where id = {id}'
               sql = f'INSERT INTO {table} {fields[table]} VALUES ({result_string})'
               cursor.execute(sql, sql_data)
               connection.commit()
           case "delete":
               sql =""
               if id:
                   sql = f'DELETE FROM {table}  WHERE id = {id}'
               elif data:
                   sql = f'DELETE FROM {table}  WHERE user_phone = {data.user_phone} AND barrier_zone = {data.barrier_zone} '
               cursor.execute(sql)
               connection.commit()
           case "update":
               update_fields = []
               for field in fields[table]:
                    update_fields.append(f'{field} = ?')
               result_string = ', '.join(update_fields)
               sql = f'UPDATE {table} SET {result_string} Where id = {id}'
               sql_data = tuple(data.model_dump().values())
               cursor.execute(sql, sql_data)
               connection.commit()
           case None:
               return None
   except sqlite3.Error as e:
           print(f"Error: {e}")
           return None
   finally:
           if connection:
               connection.close()
   return cursor.lastrowid

И рассмотрим код пары методов, в которых используются вышеуказанные функции:

Начнем с чтения всех данных о шлагбаумах. метода GET /barriers/. По сути он лишь обращается к базе данных и возвращает список с данными по всем шлагбаумам.

router = APIRouter(tags=["CRUD"])


@router.get("/bariers/", response_model=List[Barrier])
async def list_bariers(api_key: Annotated[str, Depends(get_api_key)]):
   """
   List all barriers.
   """
   response = read_tables(BARIERS_TABLE)
   return response

Обратите внимание на  api_key: Annotated[str, Depends(get_api_key), я решил  что методы CRUD должны быть с проверкой авторизации, поэтому для доступа к ним нужен будет код доступа 12345. Авторизацию сгенерировала нейросеть, поэтому я не буду её подробно разбирать, если вы изучите полный листинг, то быстро разберетесь в типовом коде.

Результат работы:

Запуск метода GET /barriers в Swagger

Пришло время метода для создания записи о барьере POST /barriers/. Метод принимает на вход номер зоны и телефон внутри GSM-модуля шлагбаума. В случае если запись успешно создана, вернет сообщение с id созданной записи в таблице barrires.

@router.post("/barriers/", response_model=Message)
async def create_barrier(barrier: CreatedBarrier, api_key: Annotated[str, Depends(get_api_key)]):
   """
   Creates a new barrier.
   """
   result = update_tables(BARIERS_TABLE, "create", data=barrier)
   if result == None:
        raise HTTPException(status_code=400, detail="Bad request")
   logger.info(f'barrier created {barrier}')
   return {"message": f"created barrier with id = {result}"}

Результат работы:

Запуск метода POST /barriers в Swagger

Теперь у нас есть не только функционал для открытия шлагбаума, но и заготовка для админ-панели по управлению доступом.

Остальные методы вы сможете изучить самостоятельно, развернув сервис локально.

Спасибо, что дочитали до конца! Надеюсь, статья была интересной. Буду рад ознакомится с конструктивными комментариями.