❝ Паспорта для того и устроены, чтобы мешать честным людям и помогать мошенникам. 

Жюль Верн. Вокруг света за восемьдесят дней

Всем привет! Меня зовут Станислав, я работаю в небольшой ИТ команде, занимаюсь тестированием и автоматизацией процесса нагрузочного тестирования, пишу прикладной софт на python.

Вашему вниманию продолжение статьи в которой я постарался описать процесс масштабирования бэкенд приложения на основе flask_restx и OpenApi. В этой части покажу как реализовать авторизацию в приложении и обеспечить базовый уровень безопасности.

Дисклеймер: Решения приведенные в статье не претендуют на единственно правильные. При этом являются рабочими и, возможно, будут полезны в вашем проекте. Конструктивная критика категорически приветствуется!

В первой части мы описали структуру проекта которая позволит нам относительно просто масштабировать web приложение. Не смотря на свою простоту проект получится весьма объемным по количеству строк и файлов. Но благодаря однообразности подходов и простому неймингу файлов в нем будет просто разобраться. В статье приведены примеры кода, который, на мой взгляд наиболее интересен. А полный листинг будет опубликован на GitHub. Довольно прелюдий, приступаем!

Давайте вспомним, на чем мы остановились в прошлый раз: получили веб сервер в котором реализован всего-навсего один API запрос.
Тем не менее это хорошая заготовка под проект любого масштаба. Следующее, чего не хватает всякому уважающему себя приложению - это ограничение доступа для неавторизованных пользователей. Это объемная задача, для ее успешной реализации разделим на несколько частей:

  1. Создание пользователей

  2. Хранение пользователей

  3. Верификация пользователей

Создание пользователей

Реализуем новое пространство имен в директории /web_plugin/api_package/users_ns, основной целью которого будет объединение методов операций с пользователями.

📦 py_api_template
├── 📁 env
├── 📁 src
└── 📁 web_plugin
│......└── 📁 api_package
│......│......├── 📁 default_ns
│......│......├── 📁 exceptions
│......│......└── 📁 users_ns
│......│......│......├── 🐍 const.py
│......│......│......├── 🐍 ns.py
│......│......│......├── 🐍 ns_schemas.py
│......│......│......└── 🐍 urls.py
├── 🐍 api_extension.py
└── 🐍 main.py

В файле /web_plugin/api_package/users_ns/ns.py опишем эндпоинты. На текущем этапе нам пока достаточно будет проверить работоспособность запросов, а логику опишем чуть позже. Опишем POST запрос для создания пользователя. В теле запроса укажем имя пользователя, пароль и уровень доступа. В целях демонстрации метод будет возвращать то, что принял в запросе.

users_ns = Namespace("Users", description="Users methods")

@users_ns.route(CREATE_USER_URL)
class CreateUser(Resource):
    """HTTP method described in self doc interface swagger"""

    @cross_origin()
    @users_ns.doc("Create a new user")
    @users_ns.expect(users_ns.model("create_user_post_schema", CREATE_USER_POST_SCHEMA))
    @users_ns.marshal_with(users_ns.model("common_return_schema", COMMON_RETURN_SCHEMA))
    def post(self) -> tuple[dict, int]:
        """request to create user in db"""

        username = request.get_json().get(LOGIN_FIELD)
        password = request.get_json().get(PASSWORD_FIELD)
        role = request.get_json().get(ROLE_FIELD)
        return {"msg": f"Successfully created user '{username}'", "err": False}, 200        

Текстовые константы CREATE_USER_URL помещаем в файл /web_plugin/api_package/users_ns/urls.py.LOGIN_FIELD, PASSWORD_FIELD, ROLE_FIELD в /web_plugin/api_package/users_ns/const.py. В будущем - это сильно упростит поддержание кода.
Декоратор метода post() @users_ns.expect добавляет в документацию модель тела запроса. Опишем ее в файле /web_plugin/api_package/users_ns/ns_schemas.py

"""Users namespace openApi schemas"""

from flask_restx import fields
from web_plugin.api_package.users_ns.const import LOGIN_FIELD, PASSWORD_FIELD, ROLE_FIELD

CREATE_USER_POST_SCHEMA = {
    LOGIN_FIELD: fields.String(),
    PASSWORD_FIELD: fields.String(),
    ROLE_FIELD: fields.String(),
}  

Схема представляет собой обычный словарь, где при помощи модуля fields из пакета flask_restx, все значения ключей обозначаются как строки.

Как вы помните, у нас простой для масштабирования проект... Простота заключается в том что новое пространство добавляется фактически одной строкой в файле api_extension.py.

"""Init api self doc service"""

from flask_restx import Api

from web_plugin.api_package.default_ns.ns import default_ns
from web_plugin.api_package.users_ns.ns import users_ns

open_api = Api(
    version="1.0",
    title="web app template API",
    description="swagger doc service",
    doc="/swagger",
)

open_api.add_namespace(default_ns, path="/api/v1")
open_api.add_namespace(users_ns, path="/api/v1")

Давайте посмотрим, что получилось!
Запускаем веб сервер через консоль в корне проекта командой python main.py и в браузере заходим на страницу документации http://127.0.0.1:5555/swagger

Видим, появился новый раздел Users в котором будет реализованы методы управления пользователями приложения.

Обращу ваше внимание на поле role модели POST запроса /create_user. В нем мы должны указать тип пользователя который хотим создать. Причем тип данных - строка. В дальнейшем мы его оптимизируем.

Продолжим развивать функциональность нашей документации. Пусть на начальном этапе жизни нашего приложения будет достаточно всего 2 роли: Администратор и рядовой Пользователь.

Давайте сообщим другим разработчикам, пользователям нашего API, как именно передать данные о роли пользователя. Создадим в папке /src отдельный модуль посвященный пользователям:

📦 src
└── 📁 users
..........└── 🐍 roles.py

"""Roles of users"""
from enum import Enum

class Roles(int, Enum):
    """Roles of users"""

    ADMIN = 1
    USER = 2

Это обычный enum который весьма удобно вписывается в код моделей. Теперь немного доработаем схему в файле /web_plugin/api_package/users_ns/ns_schemas.py

"""Users namespace openApi schemas"""

from flask_restx import fields

from src.users.roles import Roles
from web_plugin.api_package.users_ns.const import (
    LOGIN_FIELD,
    PASSWORD_FIELD,
    ROLE_FIELD,
)

roles_enum = {item.name: item.value for item in Roles}
CREATE_USER_POST_SCHEMA = {
    LOGIN_FIELD: fields.String(),
    PASSWORD_FIELD: fields.String(),
    ROLE_FIELD: fields.Integer(description=f"Possible roles:{roles_enum}"),
}

Сохраняем файл, flask сервер в режиме debug перезапускается автоматически, поэтому надо достаточно обновить страницу в браузере. И получаем красивую схему тела POST запроса с описанием возможных вариантов данных для поля role. При дальнейшем расширении количества ролей схема обновится автоматически.

Схема ответа сервера достаточно тривиальна, пример был рассмотрен в предыдущей статье и ради экономии времени подробно рассматриваться не будет.

Теперь достаточно нажать кнопку Try It Out в SwaggerUI и проверить, что эндпоинт /create_user доступен и работает... Заполняем форму запроса:

{
  "login": "admin",
  "password": "admin",
  "role": 1
}

Нажимаем Execute, получаем ответ вида.

Выглядит как успех. К сожалению, тут есть одно НО. Описанный способ формирования схемы для API запроса годится только лишь для демонстрации в SwaggerUI и будет корректно валидироваться исключительно на данной веб странице. Для решения задачи валидации получаемых данных на более высоком уровне существует библиотека pydantic о которой упоминалось ранее в первой части.

Хранение учетных данных пользователей

Приступим к следующему акту повествования: описание логики сохранения данных учетных записей.
В крупных проектах данные пользователей (хотелось бы в это верить) хранятся в зашифрованном виде в защищенных от взлома базах данных. В нашем же проекте реализуем максимально простую логику которая позволит продемонстрировать общие принципы работы авторизации.
В рамках нашего проекта механизм ограничения доступа условно можно разделить на несколько составных операций:
1. Определение уровня доступа учетной записи
2. Сохранение в базе данных сведений об учетной записи
3. Извлечение из базы данных сведений об учетной записи
4. Верификация логина и пароля на этапе авторизации

Все перечисленное реализуем в директории проекта /src:

📦 py_api_template
└── 📁 src
│......├── 📁 hashmap
│......│......├── 🐍 encryption.py
│......│......├── 🐍 hash_map.py
│......│......└── 🐍 password_hash_map.py
│......├── 📁 users
│......│......├── 🐍 roles.py
│......│......└── 🐍 users.py
│......└── 📁 utils
│......│......└── 🐍 rw.py

Теперь подробнее о каждом этапе.
Очевидно, что хранить данные авторизации пользователей в открытом виде заведомо плохая идея.
Реализуем функции шифрования в файле /src/hashmap/encryption.py.
Python содержит замечательный встроенный модуль hashlib для задач шифрования паролей. Он позволяет максимально обезопасить от подбора хранимый пароль в БД посредством использования "соли" - случайных данных, которые добавляются к паролю перед хешированием.

"""Encryption methods for hashing passwords."""

import hashlib
import os


def get_salt_password_hash(password) -> tuple[bytes, bytes]:
    """Generate salt and hash password."""

    salt = os.urandom(32)

    password_hash = hashlib.pbkdf2_hmac(
        hash_name="sha256",
        password=password.encode("utf-8"),
        salt=salt,
        iterations=100000,
        dklen=128,
    )
    return salt, password_hash


def verify_password(
    stored_salt: bytes, stored_password_hash: bytes, provided_password: str
) -> bool:
    """Verify provided password against stored password."""

    password_hash = hashlib.pbkdf2_hmac(
        hash_name="sha256",
        password=provided_password.encode("utf-8"),
        salt=stored_salt,
        iterations=100000,
        dklen=128,
    )

    return password_hash == stored_password_hash

Для удобной работы с учетными записями и взаимодействия с локальной БД реализуем структуру данных - хеш-мапу. Добавим следующий код в файл /src/hashmap/hash_map.py

from typing import Any

class HashMap:
    """Hash Map template"""

    def __init__(self, size=10):
        self.size = size
        self.buckets = [[] for _ in range(size)]

    def _hash(self, key: str) -> int:
        """Returns hash from input string"""

        return hash(key) % self.size

    def put(self, key: str, value: Any) -> None:
        """Put pair key/value into hashmap or update existing key"""

        index = self._hash(key)
        bucket = self.buckets[index]

        for i, k in enumerate(bucket):
            if k == key:
                bucket[i] = (key, value)
                return

        if index <= self.size:
            bucket.append((key, value))
        else:
            self.size *= 2
            bucket.append((key, value))

    def get(self, key) -> Any:
        """Get item"""

        index = self._hash(key)
        bucket = self.buckets[index]

        for k, v in bucket:
            if k == key:
                return v
        raise KeyError(f"Key '{key}' not found")

    def remove(self, key):
        """remove pair key/value from hash map"""

        index = self._hash(key)
        bucket = self.buckets[index]

        for i, (k, _) in enumerate(bucket):
            if k == key:
                del bucket[i]
                return
        raise KeyError(f"Key '{key}' not found")

    def __contains__(self, key):
        """Check existing key in hashmap object"""

        index = self._hash(key)
        bucket = self.buckets[index]

        for k, _ in bucket:
            if k == key:
                return True
        return False

Пример всего лишь описывает абстрактную структуру данных. А теперь добавим немного магии в файл /src/hashmap/password_hash_map.py:

"""Hashing passwords for authorization"""

import dataclasses

from env.env_constants import USERS_STORAGE
from src.hashmap.hash_map import HashMap
from src.users.roles import Roles
from src.utils.rw import read_from_csv_file, write_data_to_csv_file


@dataclasses.dataclass
class UserPasswordHash:
    """Hashing passwords for authorization"""

    role: Roles
    salt: bytes
    hashed_password: bytes


class UserPasswordHashMap(HashMap):
    """Hash map for hide passwords"""

    def update(self, file_path: str = USERS_STORAGE):
        """Update users local storage"""


    def store(self, file_path: str = USERS_STORAGE):
        """save hashmap in local storage"""

        password_hash_list = []
        for password in self.buckets:
            if password and isinstance(password[0][1].salt, bytes):
                password_hash_list.append(
                    [
                        password[0][0],
                        password[0][1].role.value,
                        password[0][1].salt.hex(),
                        password[0][1].hashed_password.hex(),
                    ]
                )
            if password and isinstance(password[0][1].salt, str):
                password_hash_list.append(
                    [
                        password[0][0],
                        password[0][1].role.value,
                        password[0][1].salt,
                        password[0][1].hashed_password,
                    ]
                )


        write_data_to_csv_file(file_path, password_hash_list)

    def load_credentials(self, file_path: str = USERS_STORAGE):
        """Restore hash map from local storage"""

        # read external storage
        data = read_from_csv_file(file_path)

        # revoke buckets
        self.buckets = [[] for _ in range(self.size)]

        for item in data:
            key, role, salt, hash_password = item
            index = self._hash(key)
            bucket = self.buckets[index]

            # Check if key already exists
            for i, (k, _) in enumerate(bucket):
                if k == key:
                    bucket[i] = (
                        key,
                        UserPasswordHash(
                            Roles(int(role)),
                            salt,
                            hash_password
                        ),
                    )
                    return

            # Add new key-value pair
            if index <= self.size:
                bucket.append(
                    (
                        key,
                        UserPasswordHash(
                            Roles(int(role)),
                            salt,
                            hash_password

                        ),
                    )
                )
            else:
                self.size *= 2
                bucket.append(
                    (
                        key,
                        UserPasswordHash(
                            Roles(int(role)),
                            salt,
                            hash_password
                        ),
                    )
                )

Что тут произошло? В начале описали объект UserPasswordHash в котором хранится роль пользователя, соль, и хеш пароля пользователя. Имя пользователя (или логин) будет ключом в структуре хеш-мапе. Далее унаследовав все стандартные методы оригинальной HashMap создали новую структуру UserPasswordHashMap в которой реализованы методы взаимодействия с БД (в данном конкретном случае реализовано сохранение данных в csv файл). Соответственно, получить пару логин/пароль возможно только в том случае, если получить доступ к алгоритмам хеширования хеш-мапы, соль, и хеш пароля пользователя. В целом получается достаточно безопасно.

В файле /src/utils/rw.py описаны функции взаимодействия с локальной БД - чтение и запись данных в csv файл. Они достаточно просты, при необходимости загляните в полный листинг на гитхабе. Надо отметить, что в более продвинутом варианте приложения, методы UserPasswordHashMap.store() и UserPasswordHashMap.load_credentials() должны взаимодействовать с полноценной БД

Теперь все готово для того, что-бы создавать учетные записи!
Модифицируем класс CreateUser в файле /web_plugin/api_package/users_ns/ns.py так, что бы при его вызове подгружалась база данных пользователей и помещалась в нее новая запись о пользователе. В случае успешного выполнения операции возвращается http-код 200 .

@users_ns.route(CREATE_USER_URL)
class CreateUser(Resource):
    """HTTP method described in self doc interface swagger"""

    @cross_origin()
    @users_ns.doc("Create a new user")
    @users_ns.expect(users_ns.model("create_user_post_schema", CREATE_USER_POST_SCHEMA))
    @users_ns.marshal_with(users_ns.model("common_return_schema", COMMON_RETURN_SCHEMA))
    def post(self) -> tuple[dict, int]:
        """request to create user in db"""

        try:
            username = request.get_json().get(LOGIN_FIELD)
            password = request.get_json().get(PASSWORD_FIELD)
            role = request.get_json().get(ROLE_FIELD)

            role = Roles(int(role))
            s, h = get_salt_password_hash(password)

            users_db = UserPasswordHashMap()
            users_db.load_credentials()
            users_db.put(username, UserPasswordHash(role, s, h))
            users_db.store()
            return {"msg": f"Successfully created user '{username}'",
                    "err": False}, 200

        except ApiExceptions as err:
            print(err)
            return { "msg": "failed to create user",
                     "err": True,}, 400

Проверим!... Вновь отправляем запрос на эндпоинт с той же самой формой

{
  "login": "admin",
  "password": "admin",
  "role": 1
}

После успешного ответа сервера в локальной БД сохранится csv файл примерно со следующим содержанием:

admin,1,2978a41e77....,81e1a461d118cace...

Запись через запятую означает логин пользователя, роль (код enum), соль, хеш пароля.

Авторизация (верификация пользователей)

Замечательно, половина дела сделана! У нас есть база, в которую можем добавлять пользователей.
Следующим шагом реализуем авторизацию пользователя в нашем API. Создадим еще одно пространство имен API под названием Authorization:

📦 web_plugin
├── 📁 api_package
│......├── 📁 auth_ns
│......│......└── 📁 utils
│......│......│......└── 🐍 check_pas.py
│......│......├── 🐍 const.py
│......│......├── 🐍 ns.py
│......│......├── 🐍 ns_schemas.py
│......│......└── 🐍 urls.py

В нем реализуем два эндпоинта /login и /logout. Принципиально нового ничего нового в реализации методов нет, описываются аналогичным образом как и рассмотренные ранее типы запросов с одной оговоркой: /logout запрос типа DELETE, но описывается аналогично POST запросу с токеном авторизации.

Остановимся на важных моментах. Рассмотрим листинг эндпоинта /login:

"""Authorization namespace"""

from flask import request
from flask_cors import cross_origin
from flask_jwt_extended import create_access_token, jwt_required
from flask_restx import Namespace, Resource

from src.users.users import is_user_exists, save_session
from web_plugin.api_package.auth_ns.const import (
    IS_SUCCESS_FIELD,
    LOGIN_FIELD,
    MESSAGE_FIELD,
    PASSWORD_FIELD,
    TOKEN_FIELD,
)
from web_plugin.api_package.auth_ns.ns_schemas import (
    LOGIN_POST_SCHEMA,
    LOGIN_RESP_SCHEMA,
    LOGOUT_DELETE_SCHEMA,
)
from web_plugin.api_package.auth_ns.urls import (
    DELETE_LOGOUT_URL,
    POST_LOGIN_URL,
)
from web_plugin.api_package.auth_ns.utils.check_pas import check_pas
from web_plugin.api_package.exceptions.exceptions import ApiExceptions
from web_plugin.api_package.utils.errors_enum import Errors

auth_ns = Namespace("Authorization", description="Authorization methods")


@auth_ns.route(POST_LOGIN_URL)
class Login(Resource):
    """HTTP method described in self doc interface swagger"""

    @cross_origin()
    @auth_ns.doc("returns authorization model")
    @auth_ns.expect(auth_ns.model("login_post_schema", LOGIN_POST_SCHEMA))
    @auth_ns.marshal_with(auth_ns.model("authorization model", LOGIN_RESP_SCHEMA))
    def post(self) -> tuple[dict, int]:
        """request returns authorization token"""

        username = request.get_json().get(LOGIN_FIELD)
        password = request.get_json().get(PASSWORD_FIELD)

        if is_user_exists(username):
            correct_password = check_pas(username, password)
            if correct_password:
                access_token = create_access_token(identity=username)
                save_session(username, access_token)

                return {
                    LOGIN_FIELD: username,
                    TOKEN_FIELD: access_token,
                    IS_SUCCESS_FIELD: True,
                    MESSAGE_FIELD: None,
                }, 200

            if not correct_password:

                return {
                    LOGIN_FIELD: username,
                    TOKEN_FIELD: None,
                    IS_SUCCESS_FIELD: False,
                    MESSAGE_FIELD: Errors.INVALID_PASSWORD.text(),
                }, 401

        if not is_user_exists(username):

            return {
                LOGIN_FIELD: username,
                TOKEN_FIELD: None,
                IS_SUCCESS_FIELD: False,
                MESSAGE_FIELD: Errors.USER_NOT_FOUND.text(),
            }, 401

        return {}, 400

В приведенном примере сначала мы проверяем существование пользователя в базе данных при помощи функции is_user_exists() которая реализована в модуле src.users, если пользователь существует в созданной нами ранее базе данных, но проверяем идентичность введенного пароля и сохраненного в БД при помощи функции correct_password().
Если проверки прошли успешно выдаем токен авторизации при помощи функции create_access_token() из пакета flask_jwt_extended.

А что представляет собой токен авторизации и как это работает в рамках нашего проекта?

Flask-JWT-Extended обеспечивает решение задачи аутентификации на основе токенов. объект JWTManager конфигурируется с секретным ключом, который записывается например в конфигурационном файле приложения, а к эндпоинту применяется декоратор @jwt_required(), который требует, от получаемых сервером запросов, заголовка содержащий действительный JWT. А функция get_jwt_identity() (из пакета flask_jwt_extended) извлекает информацию о пользователе непосредственно из токена при вызове того или иного защищенного эндпоинта.
Согласно документации, для повышения безопасности рекомендуется периодически менять секретный ключ JWT. Кроме того, необходимо задать время истечения срока действия токена, для снижения риска авторизации с помощью подобранных или скомпрометированных токенов.

Все эти настройки подгружаются из файла /env/flask_config.py на этапе запуска веб сервера:

# Authorisation settings
# Change this to a secure secret key
app.config["JWT_SECRET_KEY"] = "FooBar"
app.config["JWT_ALGORITHM"] = "HS256"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(days=2)

Надеюсь, принцип работы JWT токенов, стал более понятным... Идем дальше.

Подключим JWTManager из пакета flask_jwt_extended к веб серверу в файле main.py в упрощенном виде это будет выглядеть так:

from flask_jwt_extended import JWTManager

app = Flask(__name__)
jwt = JWTManager(app)

@jwt.additional_claims_loader
def add_claims_to_access_token(identity):
    users_db = UserPasswordHashMap()
    users_db.load_credentials()
    user = users_db.get(identity)
    if user:
        return {
            "role": user.role.value,
        }
    return {}

Тут нужно пояснить неочевидную вещь. Функция add_claims_to_access_token() и декоратор @jwt.additional_claims_loader объекта JWTManager добавляют в токен сведения о роли пользователя, которые нам пригодятся для дальнейшего разделения уровней доступа пользователей.

Следующий очевидный вопрос а как передать заголовок авторизации через SwaggerUI для дальнейшей демонстрации работы API?
Для этой цели добавим функциональную кнопку Authorize. Модифицируем файл api_extension.py в корне проекта следующим образом:

authorizations = {
    "Bearer": {
        "type": "apiKey",
        "in": "header",
        "name": "Authorization",
        "description": "Enter authorization header like this: Bearer <token>",
    }
}

open_api = Api(
    version="1.0",
    title="web app template API",
    description="swagger doc service",
    doc="/swagger",
    authorizations=authorizations,
    security="Bearer",
)

Проверим что получилось! Уже привычным образом перезагружаем страницу http://127.0.0.1:5555/swagger , кнопка Authorize отобразилась в правой верхней части страницы:

Нажатие на кнопку авторизации открывает поле для ввода токена:

Превосходно! теперь у нас есть полноценная авторизация в приложении. Дальше рассмотрим как ее применить для защиты наших эндпоинтов.

Роли пользователей

Теперь когда у нас есть механизм создания пользователей, база для их хранения и авторизация мы можем ограничивать доступ в эднпоинтам веб приложения.

Очевидно, что создавать других пользователей может пользователь с уровнем доступа администратор. Реализуем эту идею!

Модифицируем код в файле /web_plugin/api_package/users_ns/ns.py:

@users_ns.route(CREATE_USER_URL)
class CreateUser(Resource):
    """HTTP method described in self doc interface swagger"""

    @jwt_required()
    @cross_origin()
    @users_ns.doc("Create a new user")
    @users_ns.expect(users_ns.model("create_user_post_schema", CREATE_USER_POST_SCHEMA))
    @users_ns.marshal_with(users_ns.model("common_return_schema", COMMON_RETURN_SCHEMA))
    def post(self) -> tuple[dict, int]:
        """request to create user in db"""

        try:
            username = request.get_json().get(LOGIN_FIELD)
            password = request.get_json().get(PASSWORD_FIELD)
            role = request.get_json().get(ROLE_FIELD)

            access_level = get_jwt()['role']
            if access_level != Roles.ADMIN.value:
                raise ApiExceptions(Errors.NOT_ALLOWED.text())

            role = Roles(int(role))
            s, h = get_salt_password_hash(password)

            users_db = UserPasswordHashMap()
            users_db.load_credentials()
            users_db.put(username, UserPasswordHash(role, s, h))
            users_db.store()
            return {"msg": f"Successfully created user '{username}'", "err": False}, 200

        except ApiExceptions as err:
            return {
                "msg": f"failed to create user. Error: {err}.",
                "err": True,
            }, 400

Что тут изменилось? Первое, добавлен декоратор @jwt_required(), который требует наличия токена авторизации в заголовке запроса. Второе, проверка уровня доступа пользователя (строки 18-20). В случае если пользователь не имеет соответствующих прав доступа, то веб сервер вернет ошибку.

Теперь проверим как работает наш код. Получим токен авторизации при помощи эндпоинта /login. Добавим заголовок авторизации в запросы через окно ввода токена под кнопкой Authorize:

И попробуем создать еще одного пользователя, например с логином и паролем user/user.

Формируем тело для запроса /api/v1/create_user, и отправляем данные на сервер.

{
  "login": "user",
  "password": "user",
  "role": 2
}

Вуаля, получаем ответ от сервера:

{
  "err": false,
  "msg": "Successfully created user 'user'"
}

Проверим что записалось в базу данных пользователей...

admin,1,7274c19d54...,d14f8583906d396f6e3773c279bc1fe2e0...
user,2,865a5e5d698...,c7795bcc01c3ba56b5ad69faa0889e6e8b...

Отлично, теперь попробуем создать еще одного пользователя но от имени user с ограниченными правами. Получаем токен, авторизуемся, формируем тело запроса равно как немногим ранее. И получаем ответ:

{
  "err": true,
  "msg": "failed to create user. Error: not allowed for the user's access level."
}

Превосходно, ожидаемый ответ получен и в БД новый пользователь не был добавлен.

Уйти не по-английски

Мы на финишной прямой! На данный момент почти реализован механизм авторизации... Почему почти? Потому что в момент когда пользователь закончил работать с приложением в целях безопасности необходимо завершить сессию и произвести некоторые манипуляции с токеном.
В документации к модулю flask_jwt_extended говорится о необходимости организовать хранение отозванных токенов, например в redis для чего имеются даже встроенные функции. Но в рамках статьи для демонстрации информацию от токенах будем хранить в csv файле по аналогии с хешами паролей пользователей. Эта задача реализована при помощи функции save_session() в файле /src/users/users.py

def save_session(username: str, access_token: str) -> None:
    """Saves a user's session"""

    db = SessionInfoHashMap()
    session = SessionInfo(
        login=username,
        token=access_token,
        expires_in=(datetime.now() + timedelta(days=2)).isoformat(),
        is_revoked=1,
    )
    db.restore_hash_map()

    if username in db:
        print(
            f"{username} is already present in the database. Update token expiration time"
        )
        db.remove(username)
        db.put(username, session)
    if username not in db:
        db.put(username, session)
    db.store()

Объект SessionInfoHashMap описан в файле /src/hashmap/session_hash_map.py. Он содержит информацию о логине, токене, дате истечения срока действия токена и флаг is_revoked, обозначающего что сессия пользователя активна. Важное замечание: периодически необходимо очищать базу данных от токенов, срок действия которых истек (отличительной особенностью redis является автоматическое выполнение этой операции).

@dataclass
class SessionInfo:
    """Session info"""

    login: str
    token: str
    expires_in: str
    is_revoked: int

Хранение и загрузка из csv файла реализована похожим образом как это было показано выше для объекта UserPasswordHash. В случае успешной авторизации пользователей в наш csv файл сохранится информация: имя пользователя, логин (логином кстати может выступать номер телефона), токен, дате истечения срока действия токена и флаг активности сессии (0 - токен не отозван, 1 - токен отозван):

user,user,eyJhbGciOiJIUzI1...,2026-02-06T00:21:42.402407,0
admin,admin,eyJhbGciOiJIUz...,2026-02-06T00:06:05.277779,0

Добавим функцию завершения сессии пользователя finish_session() и проверку токена на то что он отозван is_token_revoked() в файл /src/users/users.py

"""Users module"""

from datetime import datetime, timedelta

from src.hashmap.password_hash_map import UserPasswordHashMap
from src.hashmap.session_hash_map import SessionInfo, SessionInfoHashMap


def is_user_exists(user_name) -> bool:
    """Checks if a user is present in the database"""
    db = UserPasswordHashMap()
    db.load_credentials()
    return user_name in db


def save_session(username: str, access_token: str) -> None:
    """Saves a user's session"""

    db = SessionInfoHashMap()
    session = SessionInfo(
        username, access_token, (datetime.now() + timedelta(days=2)).isoformat(), False
    )
    db.restore_hash_map()

    if username in db:
        print(
            f"{username} is already present in the database. Update token expiration time"
        )
        db.remove(username)
        db.put(username, session)
    if username not in db:
        db.put(username, session)
    db.store()

def finish_session(username: str) -> None:
    """Revoke user's access token"""

    db = SessionInfoHashMap()
    db.restore_hash_map()
    if username in db:
        session = db.get(username)
        session.is_revoked = True
        db.store()

def is_token_revoked(username: str) -> bool:
    """Check is token is revoked or not"""

    db = SessionInfoHashMap()
    db.restore_hash_map()
    if username in db:
        session = db.get(username)
        return bool(session.is_revoked)
    return True

И вот теперь можно говорить о завершении реализации задачи авторизации в приложении, но с некоторыми замечаниями. Во-первых необходима реализация логики удаления пользователей, которая тоже должна содержать завершение сессий, во-вторых дальнейшее развитие приложения никак не обойдется без подключения какой либо базы данных, в-третьих если приложение рассчитано на глобальную аудиторию в сети, то имеет смысл рассмотреть современный подход к авторизации с помощью протокола OAuth.

И как я обещал в начале статьи ссылка на полный листинг.

Спасибо за внимание! Буду рад комментариям и замечаниям к коду.