Однажды тимлид поставил передо мной задачу реализовать механизм взаимодействия пользователя через веб-интерфейс с микросервисами через единую точку входа с использованием FastAPI и RabbitMQ. Спешу поделиться с тобой, мой читатель, тем, что у меня получилось. По мере повествования дам пояснения по представленному коду. И, да, сделаю интересные отступления по вопросам валидации и хранения, в т.ч. приватных, данных.
Задача
Пользователь через единый веб-интерфейс («Front» side) должен обращаться к микросервисной архитектуре, расположенной в закрытом контуре («Back» side). Взаимодействие должно быть построено посредством очередей RabbitMQ. Все работает под управлением Python, FastAPI и Gunicorn. В графическом виде получается что‑то такое:

Планирование
API-шлюз будет принимать запрос от пользователя и исходя из частностей поставленной задачи интерпретировать. Далее будет отправлять в определенном формате сообщение в очередь RabbitMQ и переходить в режим «ожидания». С другой стороны RabbitMQ так же в режиме ожидания сообщений будет работать RPC-сервер. При получении запроса он определит, в какое из приложений его перенаправить, запулит его в сторону соответствующего Gunicorn, который распределит нагрузку между экземплярами микросервиса и полученный ответ вернет RPC-серверу, а тот в свою очередь прокинет его дальше в специально созданную очередь. С той стороны «ждущий» шлюз получит это сообщение, выйдет из режима ожидания и сформированный ответ вернет пользователю.
Исполнение
Объединим все модули реализуемой схемы в одном проекте, подразумевая при этом, что у каждой составляющей может быть свое хранилище, виртуальное пространство, способы и команды запуска и т.д. Сервисы будут простые простейшие, но достаточные для представления и понимания общей концепции. Полный исходный код в моем репозитории.
Создадим корневой каталог и перейдем в него:
mkdir api-gateway && cd api-gateway
Сформируем файл с основными используемыми библиотеками и назовем его традиционно requirements.txt, остальные подтянутся зависимостями:
echo " aiohttp==3.8.3 aio-pika==8.2.5 fastapi==0.88.0 gunicorn==20.1.0 path==16.5.0 PyYAML==6.0 pyyaml-tags==1.0.4 six==1.16.0 SQLAlchemy==1.4.45 uvicorn==0.20.0 " > requirements.txt
Создадим виртуальное окружение в каталоге.venv3.10 и активируем его. Кстати, я использую Python 3.10.6 под Ubuntu 20.04:
python3 -m venv .venv3.10 && source .venv3.10/bin/activate
Здесь и далее все скрипты будем запускать под этой средой.
Также установим RabbitMQ. Я предпочитаю разворачивать его в Docker‑контейнере. Хорошая подробная инструкция по установке здесь.
Начнем с... Конца.:‑)
1. Сервис
Создадим каталог первого микросервиса и перейдем в него:
mkdir srv_clients && cd srv_clients
Создадим структуру файлов сервиса так же одной командой:
touch __init__.py main.py schemas.py database.py models.py querysets.py
__init__.py — служебный файл пакета python будет пуст
database.py — инициализирует подключение к БД
Код
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import declarative_base, sessionmaker engine = create_async_engine("sqlite+aiosqlite:///clients.sqlite") sm = sessionmaker( engine, autocommit=False, autoflush=False, class_=AsyncSession ) Base = declarative_base()
Для работы используем асинхронные сессии и SQLite в качестве СУБД.
models.py — реализует модели структур данных
Код
from sqlalchemy import Column, Integer, Text from sqlalchemy.orm.collections import InstrumentedList from srv_clients.database import Base class BaseModel(Base): __abstract__ = True id = Column(Integer, primary_key=True, comment="Идентификатор") def to_dict(self) -> dict: data = dict() for k, v in self.__dict__.items(): if k.startswith("_"): continue elif isinstance(v, Base): v = v.to_dict() elif isinstance(v, InstrumentedList): v = [item.to_dict() for item in v] data[k] = v return data
Здесь мы создали базовую абстрактную модель. В ней объявлено поле уникального идентификатора, являющееся также первичным ключом. Кроме того, реализовали простейший рекурсивный метод to_dict для преобразования экземпляра класса в словарь. Да, он не охватывает все ситуации и может вызывать кучу ошибок и требует более детальной проработки, но для примера его более чем достаточно.
class Client(BaseModel): __tablename__ = "client" __table_args = {"comment": "Таблица клиентов"} surname = Column(Text(length=255), nullable=False, comment="Фамилия") name = Column(Text(length=255), nullable=False, comment="Имя") country = Column(Text(length=255), comment="Страна")
Создадим унаследованную от BaseModel модель Клиента, добавим к ней комментарий и несколько текстовых полей длиной до 255 символов так же с пояснениями.
querysets.py — содержит наборы функций для работы с данными
Код
from typing import List from sqlalchemy import delete, select, update from sqlalchemy.ext.asyncio import AsyncSession from srv_clients.models import Client class ClientQueryset: model = Client @classmethod async def create(cls, session: AsyncSession, **kwargs) -> int: created = cls.model(**kwargs) session.add(created) await session.flush([created]) return created.id
Метод создания записи будет возвращать идентификатор созданной записи. Так как идентификатор является автоинкрементом и в простейших условиях генерируется только после закрытия транзакции, принудительно вызовем метод сессии flush, который сбросит данные в БД и атрибуту id присвоится значение.
@classmethod async def get_by_id(cls, session: AsyncSession, id_: int) -> Client: return await session.scalar( select(cls.model).where(cls.model.id == id_) ) @classmethod async def get_multiple( cls, session: AsyncSession, limit: int = None, offset: int = None, **filters, ) -> List[Client]: where = list() if surname := filters.get("surname"): where.append(cls.model.surname == surname) if name := filters.get("name"): where.append(cls.model.name == name) if country := filters.get("country"): where.append(cls.model.country == country) return await session.scalars( select(cls.model).where(*where).limit(limit).offset(offset) )
Для получения клиента по идентификатору и отфильтрованного списка записей используем методы сессии scalar и scalars соответственно. Условия фильтрации ожидаем в виде динамических именованных параметров. Используем простое сравнение полей и добавим возможность реализации пагинации с помощью параметров limit и offset.
@classmethod async def update(cls, session: AsyncSession, id_: int, **kwargs) -> None: await session.execute( update(cls.model).where(cls.model.id == id_).values(**kwargs) ) @classmethod async def delete(cls, session: AsyncSession, id_: int) -> None: await session.execute(delete(cls.model).where(cls.model.id == id_))
Все методы обернуты в декоратор @classmethod для возможности вызова без создания экземпляра класса.
schemas.py — описывает входные/выходные схемы данных.
Код
from pydantic import BaseModel, Field class ClientSchema(BaseModel): surname: str = Field(..., description="Фамилия") name: str = Field(..., description="Имя") country: str = Field(..., description="Страна") class ClientViewSchema(ClientSchema): id: int = Field(..., description="Идентификатор")
Все атрибуты сделаем обязательными (в качестве значения по умолчанию передадим тип ellipsis — ...). К модели просмотра добавим атрибут идентификатора.
main.py — содержит исполняемый код микросервиса.
Код
from typing import List from fastapi import FastAPI, HTTPException, Request from fastapi.params import Query from starlette import status from srv_clients.database import Base, engine from srv_clients.database import sm as session_maker from srv_clients.querysets import ClientQueryset from srv_clients.schemas import ClientSchema, ClientViewSchema app = FastAPI() app.state.engine = engine app.state.session_maker = session_maker
Создаем приложение, добавляем к нему в качестве атрибутов подключение к базе данных и инициализатор сессий. Делаем это для того, чтобы иметь возможность обращаться к ним внутри эндпоинтов через экземплярRequest.
@app.on_event("startup") async def on_startup(): async with app.state.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) @app.on_event("shutdown") async def on_shutdown(): # async with app.state.engine.begin() as conn: # await conn.run_sync(Base.metadata.drop_all) await app.state.engine.dispose()
Добавим два обработчика событий приложения:
startup— переводим модели SQLAlchemy в таблицы БД;shutdown— очищаем БД путем удаления всех таблиц (закомментировано) и корректно завершаем соединение с БД.
@app.get("/clients", response_model=List[ClientViewSchema]) async def get_filtered_clients( request: Request, surname: str = Query(None, description="Фамилия"), name: str = Query(None, description="Имя"), country: str = Query(None, description="Страна"), limit: int = Query( None, description="Количество записей на странице результатов" ), offset: int = Query( None, description="Смещение страницы результатов относительно первой записи", ), ): async with request.app.state.session_maker() as session: rows = await ClientQueryset.get_multiple( session, surname=surname, name=name, country=country, limit=limit, offset=offset, ) return [row.to_dict() for row in rows.all()] @app.post( "/clients", response_model=ClientViewSchema, status_code=status.HTTP_201_CREATED, ) async def create_client(request: Request, data: ClientSchema): sm = request.app.state.session_maker async with sm.begin() as session: id_ = await ClientQueryset.create(session, **data.dict()) async with sm() as session: client = await ClientQueryset.get_by_id(session, id_) return client.to_dict() @app.get("/clients/{client_id}", response_model=ClientViewSchema) async def get_client(request: Request, client_id: int): async with request.app.state.session_maker() as session: client = await ClientQueryset.get_by_id(session, client_id) if not client: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Client not found", ) return client.to_dict() @app.patch( "/clients/{client_id}", response_model=ClientViewSchema, status_code=status.HTTP_202_ACCEPTED, ) async def update_client(request: Request, client_id: int, data: ClientSchema): sm = request.app.state.session_maker async with sm.begin() as session: client = await ClientQueryset.get_by_id(session, client_id) if not client: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Client not found" ) await ClientQueryset.update(session, client_id, **data.dict()) async with sm() as session: client = await ClientQueryset.get_by_id(session, client_id) return client.to_dict() @app.delete( "/clients/{client_id}", status_code=status.HTTP_202_ACCEPTED, ) async def delete_client(request: Request, client_id: int): sm = request.app.state.session_maker async with sm.begin() as session: client = await ClientQueryset.get_by_id(session, client_id) if not client: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) await ClientQueryset.delete(session, client_id)
Реализуем здесь простой CRUD с транзакциями. Для операций, вносящих изменения в БД, откроем транзакцию (создадим точку сохранения — savepoint) с помощью метода begin и менеджера контекста, при выходе из которого она будет успешно закрыта (под капотом будет вызван оператор commit). В случае возникновения каких‑либо ошибок внутри блока with транзакция завершится откатом (будет вызван оператор rollback).
Для разнообразия добавим фильтрацию —get_filtered_clients.
Для случаев, когда запись из БД обязательно должна быть получена (например, при изменении или удалении), выбросим исключение HTTPException когда она не найдена.
Для преобразования ответа в экземпляр схемы, передаваемой в параметре response_modelэндпоинта, или их список, реализующая его функция должна возвращать соответственно словарь или список словарей. Тут нам и пригодится метод to_dict базовой модели данных.
Можно запускать! Напишем команду:
python -m gunicorn --bind 0.0.0.0:8001 --workers 4 -k uvicorn.workers.UvicornWorker main:app
Здесь мы указываем, что сервис будет работать по адресу 0.0.0.0 и на 8001 порте, будет запущено 4 воркера класса UvicornWorker (необязательно — документация в первоисточнике). Приложение инициализируется в переменную app модуля main (main:app).
Имеем примерно такой вывод:
[2023-01-11 21:02:00 +0300] [29518] [INFO] Starting gunicorn 20.1.0 [2023-01-11 21:02:00 +0300] [29518] [INFO] Listening at: http://0.0.0.0:8001 (29518) [2023-01-11 21:02:00 +0300] [29518] [INFO] Using worker: uvicorn.workers.UvicornWorker [2023-01-11 21:02:00 +0300] [29520] [INFO] Booting worker with pid: 29520 [2023-01-11 21:02:00 +0300] [29521] [INFO] Booting worker with pid: 29521 [2023-01-11 21:02:00 +0300] [29522] [INFO] Booting worker with pid: 29522 [2023-01-11 21:02:00 +0300] [29523] [INFO] Booting worker with pid: 29523 [2023-01-11 21:02:01 +0300] [29521] [INFO] Started server process [29521] [2023-01-11 21:02:01 +0300] [29521] [INFO] Waiting for application startup. [2023-01-11 21:02:01 +0300] [29523] [INFO] Started server process [29523] [2023-01-11 21:02:01 +0300] [29523] [INFO] Waiting for application startup. [2023-01-11 21:02:01 +0300] [29520] [INFO] Started server process [29520] [2023-01-11 21:02:01 +0300] [29520] [INFO] Waiting for application startup. [2023-01-11 21:02:01 +0300] [29522] [INFO] Started server process [29522] [2023-01-11 21:02:01 +0300] [29522] [INFO] Waiting for application startup. [2023-01-11 21:02:01 +0300] [29521] [INFO] Application startup complete. [2023-01-11 21:02:01 +0300] [29522] [INFO] Application startup complete. [2023-01-11 21:02:01 +0300] [29523] [INFO] Application startup complete. [2023-01-11 21:02:01 +0300] [29520] [INFO] Application startup complete.
2. RPC-сервер
Реализуем функционал, который будет получать запросы из очереди, перенаправлять их в соответствующий сервис и возвращать обратно в эту же очередь. На этом этапе осветим два дополнительных вопроса: работу с переменными окружения, в которых будем хранить пароль для подключения к очереди, и собственно работу с очередью RabbitMQ. Кстати, здесь есть хорошая статья о его основных понятия.
Начнем с создания структуры пакета. Вернемся в корень проекта, создадим каталог rpc сервера и перейдем в него:
cd .. && mkdir rpc && cd rpc
Создадим структуру файлов так же одной командой:
touch __init__.py common.py resolver.py server.py
Вложим сюда каталог с настройками и .env файлом:
mkdir config && touch config/.env config/local.yaml
Далее так же по порядку.
__init__.py— служебный файл пакета python будет пуст.
common.py — общие функции работы сервера.
Код
Здесь отступление от первичной цели — хранение и получение настроек в yaml‑файле, а также обращение к переменным окружения из конфигурационного файла. Для этого используем библиотеку PyYaml‑Tags. Пока просто объявим класс и переопределим приватный метод _from_yaml. Вызов будет чуть ниже.
import os from pathlib import Path import yaml from dotenv import load_dotenv from yaml_tags import BaseTag, tag_registry @tag_registry.register("env_tag") class EnvTag(BaseTag): def _from_yaml( self, _loader, _work_dir, _prefix, _suffix, param=None, *args, **kwargs, ): result = os.environ.get(param, "") return f"{_prefix}{result}{_suffix}"
Регистрируем свой тэг env_tag и метод его и формирования.
def load_config(config_path: Path) -> dict: tag_registry.require("env_tag") env_path = f"{config_path.absolute().parent}/.env" load_dotenv(dotenv_path=env_path) with open(config_path) as f: return yaml.load(f, Loader=yaml.Loader) def get_config_path(file_name: str) -> Path: current_dir = Path(__file__).absolute().parent return current_dir / "config" / file_name
Тут мы считаем .env файл. Будем ожидать его в каталоге с конфигурационным(‑ми) файлом(‑ами).
local.yaml:
rmq: url: amqp://gateway:<% env_tag(param="RMQ_PASS") %>@127.0.0.1:5672/gateway
В данном случае храним только строку подключения к RabbitMQ.
.env:
RMQ_PASS=gateway
Обращение к зарегистрированному тэгу простое: <% [имя_тэга]([имя_именованного_параметра]=[имя_переменной_окружения]) %>
resolver.py— хранит функции распределения запросов между сервисам.
Код
from typing import Tuple import aiohttp SERVICE_MAP = { "clients": "0.0.0.0:8001", "goods": "0.0.0.0:8002", } async def resolve( method: str, path: str, params: dict = None, data: dict = None, headers: dict = None, ) -> Tuple[dict, int]: """ Функция определяет по первой части url-запроса к какому сервису происходит обращение, перенаправляет в него запрос и возвращает ответ в виде словаря и код статуса. """ _, service_name, *_ = path.split("/") service_host = SERVICE_MAP[service_name] url = f"http://{service_host}{path}" response, status_code = await make_request( url, method, params, data, headers ) return response, status_code async def make_request( url: str, method: str, params: dict = None, data: dict = None, headers: dict = None, ) -> Tuple[dict, int]: """ Выполнение запроса, тип которого определяется параметром method, по url-адресу. Возвращает ответ в виде словаря и код статуса. """ if not headers: headers = {} if not data: data = {} async with aiohttp.ClientSession() as session: request = getattr(session, method) async with await request( url, params=params, json=data, headers=headers ) as response: response_json = await response.json() return response_json, response.status
server.py— исполняемый код RPC‑сервера.
Код
import asyncio from aio_pika import connect_robust from aio_pika.patterns import RPC from common import get_config_path, load_config from resolver import resolve async def server() -> None: config = load_config(get_config_path("local.yaml")) rmq = config["rmq"] connection = await connect_robust(rmq["url"]) async with connection: channel = await connection.channel() rpc = await RPC.create(channel) await rpc.register(resolve.__name__, resolve, auto_delete=True) try: await asyncio.Future() finally: await connection.close() if __name__ == "__main__": asyncio.run(server())
Здесь все совсем просто. Считываем конфигурационный файл, преобразовываем его в словарь и получаем настройки подключения к очереди RabbitMQ. Создаем это подключение, канал и регистрируем обработчик. Кстати, в пакете aio_pika есть и другие шаблоны, кроме RPC.
Тут также все готово. Можно запускать:
python server.py
Сервер можно распараллелить, просто выполнив команду запуска необходимое количество раз в разных терминалах. Запросы будут автоматически последовательно распределяться между каналами очередей.
Заглянув в графический интерфейс RabbtiMQ, увидим что создается один обменник rpc.dlx:

И две очереди:

Схема работы будет следующая:

Когда клиент запускается, он создает анонимную эксклюзивную очередь обратного вызова (в нашем случае
amq_0x4e0bc6f2bc4e3df862a47b2b772bec3d);Для запроса RPC клиент отправляет сообщение с двумя свойствами:
answer_to, для которого задана очередь обратного вызова, иcorellation_id, для которого установлено уникальное значение для каждого запроса;Запрос отправляется в очередь
resolve;RPC‑сервер ожидает запросов в этой очереди. Когда появляется запрос, он выполняет работу и отправляет сообщение с результатом обратно Клиенту, используя очередь из поля
reply_to;Клиент ожидает данных в очереди обратного вызова. Когда появляется сообщение, оно проверяет свойство
corellation_id. Если он соответствует значению из запроса, он возвращает ответ приложению.
3. Gateway
Наш шлюз должен повторять структуру конечных точек всех обслуживаемых сервисов, а также схем входных и выходных данных. Поэтому выделим для каждого сервиса отдельный каталог.
Начнем с создания структуры. Вернемся в корень проекта, создадим каталог gateway и перейдем в него:
cd .. && mkdir gateway && cd gateway
Создадим структуру файлов и каталогов так же одной командой:
touch __init__.py rabbit.py common.py main.py && mkdir config && touch config/.env config/local.yaml && mkdir srv_clients && touch srv_clients/__init__.py srv_clients/routes.py srv_clients/schemas.py
Далее снова по порядку.
__init__.py — служебный файл пакета python традиционно пуст.
rabbit.py — функции работы с очередями RabbitMQ.
Код
from typing import Tuple from aio_pika import connect_robust from aio_pika.patterns import RPC async def send_request_to_queue( config: dict, message: dict ) -> Tuple[dict, int]: rmq = config["rmq"] connection = await connect_robust(rmq["url"]) async with connection: channel = await connection.channel() rpc = await RPC.create(channel) result, status_code = await rpc.proxy.resolve(**message) return result, status_code
Из настроек получаем конфигурацию для работы с очередью и открываем соединение. Обозначаем, что хотим вызвать функцию resolve. Она, как известно, возвращает словарь и числовой код статуса.
common.py — общие функции работы сервера.
Код
По большей части он будет повторять код rpc‑сервера с одним дополнением — функцией router.
import os from functools import wraps from pathlib import Path from typing import Any, Optional import yaml from dotenv import load_dotenv from fastapi import HTTPException, Request, Response from starlette import status from yaml_tags import BaseTag, tag_registry from gateway.rabbit import send_request_to_queue @tag_registry.register("env_tag") class EnvTag(BaseTag): def _from_yaml( self, _loader, _work_dir, _prefix, _suffix, param=None, *args, **kwargs, ) -> str: result = os.environ.get(param, "") return f"{_prefix}{result}{_suffix}" def load_config(config_path: Path) -> dict: tag_registry.require("env_tag") env_path = f"{config_path.absolute().parent}/.env" load_dotenv(dotenv_path=env_path) with open(config_path) as f: return yaml.load(f, Loader=yaml.Loader) def get_config_path(file_name: str) -> Path: current_dir = Path(__file__).absolute().parent return current_dir / "config" / file_name def router( method, path: str, data_key: Optional[str] = None, status_code: Optional[int] = status.HTTP_200_OK, response_model: Optional[Any] = None, ): """ Обертка функции эндпоинта, реализующая обращение к RPC-серверу. :param method: Вызываемый объект (функция) реализующая тот или иной метод http-запроса. :param path: Адрес эндпоинта. :param data_key: Имя ключа, в котором передаются данные на эндпоинт. :param status_code: Ожидаемый код ответа сервера. :param response_model: Модель данных для преобразования ответа. """ app_method = method( path, status_code=status_code, response_model=response_model ) def wrapper(endpoint): @app_method @wraps(endpoint) async def decorator(request: Request, response: Response, **kwargs): request_method = request.scope["method"].lower() data = kwargs.get(data_key) data = data.dict() if data else {} response_data, response_status_code = await send_request_to_queue( config=request.app.config, message={ "method": request_method, "path": request.scope["path"], "params": dict(request.query_params), "data": data, "headers": {}, }, ) if response_status_code >= status.HTTP_400_BAD_REQUEST: raise HTTPException( status_code=response_status_code, detail=response_data ) response.status_code = response_status_code return response_data return wrapper
Функция router реализует декоратор, который в свою очередь традиционным методом оборачивает эндпоинт в параметризированный декоратор. Из переданных параметров собирается объект сообщения для направления в один из сервисов. Сообщение направляется в очередь и ожидается ответ. Если код ответа информирует об ошибке (>= 400), выбрасываем исключение с полученным статусом и сообщением. В случае успеха, подменяем статус объекта response кодом, полученным от сервиса.
На данном этапе функционал можно дополнить, например, проверкой учетных данных, каким‑то логированием и всяческими плюшками. Для примера ограничимся простыми запросом‑ответом.
main.py — исполняемый код шлюза.
Код
from fastapi import FastAPI from common import get_config_path, load_config from gateway.srv_clients.routes import clients_router app = FastAPI() app.config = load_config(get_config_path("local.yaml")) app.include_router(clients_router)
Здесь мы инициализируем приложение, добавляем к нему в качестве атрибута config словарь с настройками. Делаем это опять же для того, чтобы иметь возможность обращаться к ним внутри эндпоинтов через экземплярRequest, а именно в декораторе route.
Как я говорил выше, описываем сервисы, частично повторяя их структуру.
routes.py — конечные точки входа в сервис (endpoints).
Код
from typing import List from fastapi import APIRouter, Response from fastapi.params import Query from starlette import status from starlette.requests import Request from gateway.common import router from gateway.srv_clients.schemas import ClientSchema, ClientViewSchema clients_router = APIRouter(prefix="/clients") @router( method=clients_router.get, path="", response_model=List[ClientViewSchema], ) async def get_filtered_clients( request: Request, response: Response, surname: str = Query(None, description="Фамилия"), name: str = Query(None, description="Имя"), country: str = Query(None, description="Страна"), limit: int = Query( None, description="Количество записей на странице результатов" ), offset: int = Query( None, description="Смещение страницы результатов относительно первой записи", ), ): pass @router( method=clients_router.post, path="", response_model=ClientViewSchema, status_code=status.HTTP_201_CREATED, data_key="data", ) async def create_client( request: Request, response: Response, data: ClientSchema ): pass @router( method=clients_router.get, path="/{client_id}", response_model=ClientViewSchema, ) async def get_client(request: Request, response: Response, client_id: int): pass @router( method=clients_router.patch, path="/{client_id}", response_model=ClientViewSchema, status_code=status.HTTP_202_ACCEPTED, data_key="data", ) async def update_client( request: Request, response: Response, client_id: int, data: ClientSchema ): pass @router( method=clients_router.delete, path="/{client_id}", status_code=status.HTTP_202_ACCEPTED, ) async def delete_client(request: Request, response: Response, client_id: int): pass
Все эндпоинты не содержат код. Запросы выполняются внутри декоратора, в который мы оборачиваем функцию.
schemas.py — Схемы входных и выходных данных.
Код
from pydantic import BaseModel, Field class ClientSchema(BaseModel): surname: str = Field(..., description="Фамилия") name: str = Field(..., description="Имя") country: str = Field(..., description="Страна") class ClientViewSchema(ClientSchema): id: int = Field(..., description="Идентификатор")
Здесь полная копия схем сервиса.
Готово! Запустим шлюз на 8000-м порте:
python -m gunicorn --bind 0.0.0.0:8000 --workers 4 -k uvicorn.workers.UvicornWorker main:app
Пробы
Вся связка готова. Можно запускать все вместе и применять функционал на практике.
Создадим клиента:
POST http://localhost:8000/clients Content-Type: application/json { "surname": "John", "name": "Malkovitch", "country": "USA" }
И получим ответ:
HTTP/1.1 201 Created date: Mon, 06 Feb 2023 10:43:10 GMT server: uvicorn content-length: 61 content-type: application/json { "surname": "John", "name": "Malkovitch", "country": "USA", "id": 1 }
Запросим список клиентов:
GET http://localhost:8000/clients Content-Type: application/json
Результат будет таким:
HTTP/1.1 200 OK date: Mon, 06 Feb 2023 10:45:12 GMT server: uvicorn content-length: 63 content-type: application/json [ { "surname": "John", "name": "Malkovitch", "country": "USA", "id": 1 } ]
Ожидаемо получаем список json‑объектов.
Так же запросим запись по идентификатору:
GET http://localhost:8000/clients/1 Content-Type: application/json
Результат будет таким:
HTTP/1.1 200 OK date: Mon, 06 Feb 2023 10:48:26 GMT server: uvicorn content-length: 61 content-type: application/json { "surname": "John", "name": "Malkovitch", "country": "USA", "id": 1 }
Изменим нашего Джона Малковича из США на Джона Смита из Британии:
PATCH http://localhost:8000/clients/1 Content-Type: application/json { "surname": "John", "name": "Smith", "country": "Great Britain" }
В ответ получаем измененную запись:
HTTP/1.1 202 Accepted date: Mon, 06 Feb 2023 10:52:02 GMT server: uvicorn content-length: 66 content-type: application/json { "surname": "John", "name": "Smith", "country": "Great Britain", "id": 1 }
А теперь удалим его:
DELETE http://localhost:8000/clients/1 Content-Type: application/json
В ответе теперь нас интересует лишь статус и он 202-й:
HTTP/1.1 202 Accepted date: Mon, 06 Feb 2023 10:52:37 GMT server: uvicorn content-length: 4 content-type: application/json null
Вот и всё. Проделано много работы и написано много кода. RPC достаточно старая, но хорошо себя зарекомендовавшая технология. Данный вариант взаимодействия пользователя с сервисами не ультимативен и может быть реализован с использованием иных технологий. Спасибо, что дочитали до конца. До скорых встреч!
