Рано или поздно маленькие приложения разрастаются до нагруженных production-решений, поэтому программисту необходимо заранее продумать стек технологий. Для Python концептуальный выбор стоит между синхронными и асинхронными фреймворками. После появления библиотеки asyncio популярность асинхронных Python-фреймворков сильно выросла, потеснив таких монстров, как Django и Flask, и стало намного проще писать веб-приложения, способные пережить высокий RPS.
В нашей компании по ряду причин перешли к асинхронным решениям, и ниже я опишу пример асинхронного веб-приложения с возможностью работы с брокером очередей RabbitMQ и запуском периодичных заданий. С кодом проекта можно ознакомиться по ссылке. Это приложение можно использовать как шаблон для новых проектов, в который достаточно будет добавить свою бизнес-логику, чтобы получилось полноценное production-решение.
Статья поможет новичкам посмотреть, как можно использовать вместе различные асинхронные библиотеки. А опытные программисты могут сравнить представленное ниже решение со своими наработками и конструктивно покритиковать.
![](https://habrastorage.org/r/w1560/webt/dr/vc/zm/drvczmptatcvgwv_dwaeahuys_g.png)
Перечисленные выше зависимости тянут за собой другие библиотеки, полный список зависимостей можно изучить в файле requirements.txt.
После того, как мы выбрали стек технологий, рассмотрим расположение основных модулей приложения.
![](https://habrastorage.org/r/w780q1/webt/m1/ge/8x/m1ge8xsdrvghdshkvrnprohy3ak.jpeg)
![](https://habrastorage.org/r/w1560/webt/c1/ku/ll/c1kullf_p9ssialf8v51f81zlt8.png)
Содержат миграции приложения и конфигурацию для их запуска.
Типовая миграция:
![](https://habrastorage.org/r/w1560/webt/i1/ix/jh/i1ixjh7bxv_etju8w_8fgdxs0pg.png)
В
Пример самого простого обработчика сообщений:
![](https://habrastorage.org/r/w1560/webt/8h/r6/38/8hr6380jqa2crhnlcepp3j9ox2g.png)
Инициализировать приложение можно вот так:
![](https://habrastorage.org/r/w1560/webt/os/fu/v0/osfuv03qsnbsftr56pkfxpf3sym.png)
Папка
![](https://habrastorage.org/r/w1560/webt/cq/4v/ur/cq4vury7v9zncujjk1lxvakzip8.png)
В этой папке лежат обработчики для API-методов. Маршрутизация к ним находится в файле
Пример API по созданию новой записи и получения её по ID:
![](https://habrastorage.org/r/w1560/webt/ag/9y/zz/ag9yzzl0wtzmyxy3manyd-lwyyu.png)
В
![](https://habrastorage.org/r/w1560/webt/4f/te/bc/4ftebcbuv-vb_sjqscr_a9fmnnc.png)
Папка
Модель «Студент» с набором базовых действий с этой сущностью:
![](https://habrastorage.org/r/w1560/webt/9r/kt/g9/9rktg9mj9fsotqofws2tcs5im-g.png)
В
![](https://habrastorage.org/r/w1560/webt/et/6e/rj/et6erjy79llg683sxxqpf3-trj0.png)
Расписание запускаемых «кронов» находится в
Самый простой «крон»:
![](https://habrastorage.org/r/w1560/webt/jn/ua/fo/jnuafoskf7vhmrvplz6drhfv7im.png)
В этой папке находятся marshmallow-схемы, используемые как в handlers, так и в остальных модулях проекта.
Пример валидации входных и выходных данных сущности «Студент»:
Теперь, когда мы разобрались со структурой и особенностями приложения, можно перейти к его запуску.
![](https://habrastorage.org/r/w1560/webt/xn/id/g5/xnidg5coxyk9gmxrq_c2c4nzzs0.png)
Перед запуском необходимо поднять базу и RabbitMQ через команду
Также прогоним миграции командой
Запуск приложения благодаря библиотеке manage.py похож на локальный запуск Django-проектов:
При работе в API-режиме в случае запуска приложения на многоядерном сервере можно установить количество внутренних процессов равным количеству ядер сервера с помощью переменной среды
При запуске в режиме консьюмера работу тестового консьюмера можно проверить через веб-панель RabbitMQ (находится по адресу
Работу планировщика задач можно увидеть по сообщениям в логах либо по изменению данных в базе, — если через API добавить несколько тестовых студентов.
![](https://habrastorage.org/r/w1560/webt/vf/80/ca/vf80caxcn_kr5v8e9q2n5s0f1bm.png)
Самым удобным, на мой взгляд, способом отображения документации является Swagger. Чтобы описанные в нём форматы данных не расходились с реальным кодом, необходимо сделать так, чтобы они генерировались из одних и тех же схем. Для этого мы будем использовать библиотеку sanic_openapi и самописную функцию
Документация приложения расположена по адресу
Для тестового проекта документация будет иметь вид:
![](https://habrastorage.org/r/w1560/webt/9o/p1/nq/9op1nqwrs-tcdfpo-we61ronmke.png)
![](https://habrastorage.org/r/w1560/webt/qy/va/pk/qyvapkr1sohj0ankoaxr_mnlfru.png)
Первый этап закончен, теперь приложение может принимать входящие запросы, читать сообщения из RabbitMQ и выполнять периодические задания. Но оно всё ещё не «production ready».
Чтобы это исправить, надо:
Если эти темы интересны, могу описать их в следующих статьях.
Исходный код библиотеки доступен на GitHub под MIT лицензией.
В комментариях было бы интересно узнать про конфигурации ваших асинхронных приложений, а также их достоинства и недостатки.
В нашей компании по ряду причин перешли к асинхронным решениям, и ниже я опишу пример асинхронного веб-приложения с возможностью работы с брокером очередей RabbitMQ и запуском периодичных заданий. С кодом проекта можно ознакомиться по ссылке. Это приложение можно использовать как шаблон для новых проектов, в который достаточно будет добавить свою бизнес-логику, чтобы получилось полноценное production-решение.
Статья поможет новичкам посмотреть, как можно использовать вместе различные асинхронные библиотеки. А опытные программисты могут сравнить представленное ниже решение со своими наработками и конструктивно покритиковать.
Выбор зависимостей:
![](https://habrastorage.org/webt/dr/vc/zm/drvczmptatcvgwv_dwaeahuys_g.png)
- sanic — веб-сервер. Выбран из-за простоты использования и наличия большого количества вспомогательных библиотек.
Альтернативы:
- aioHttp (но в качестве loop рекомендую использовать uvloop, либо какую-нибудь другую реализацию, написанную на C);
- FastApi
- aiopg — коннектор к базе данных. Есть более быстрые реализации, но aiopg поддерживает возможность работы с SQLAlchemy Core (подробнее в документации).
Альтернативы:
- asyncpg. Для поддержки работы с SQLAlchemy Core необходимо установить библиотеку databases либо gino.
- SQLAlchemy — библиотека для работы с базами данных. Всю её мощь в асинхронном приложении пока использовать не получится (много блокирующих операций), поэтому возьмём из неё модели и возможность написания запросов без использования raw SQL. Для асинхронного Python не имеет альтернатив.
- alembic — библиотека для миграции к базе данных. Поддерживает возможность создания автоматических миграций на основе SqlAlchemy, что полезно для новичков.
Альтернативы:
- yoyo-migrations
- migrate
- aioamqp — коннектор к RabbitMQ. Этот брокер сообщений используется в крупных приложениях для асинхронного отказоустойчивого обмена данными.
Альтернативы:
- aiorabbit
- aio-pika
- manage.py — библиотека для удобного запуска приложений с возможностью передачи дополнительных параметров из консоли.
Альтернативы:
- click
- Marshmallow — реализует удобную валидацию входящих данных.
Альтернативы:
- schematics
- jsonschema
- sanic-openapi — в связке с marshmallow позволяет генерировать документацию к API-методам в swagger.
Альтернативы:
- pyswagger
- envparse — утилита для парсинга конфигов приложения.
Альтернативы:
- использовать
os.environ.get()
из стандартной библиотеки.
- использовать
- APScheduler — асинхронная библиотека для запуска «кронов».
Альтернативы:
- async-cron
- AioHttp — фреймворк, часть функциональности которого будет использоваться для асинхронных HTTP-запросов.
Альтернативы:
- requests-async
Перечисленные выше зависимости тянут за собой другие библиотеки, полный список зависимостей можно изучить в файле requirements.txt.
После того, как мы выбрали стек технологий, рассмотрим расположение основных модулей приложения.
Структура проекта
![](https://habrastorage.org/webt/m1/ge/8x/m1ge8xsdrvghdshkvrnprohy3ak.jpeg)
alembic и alembic.ini
![](https://habrastorage.org/webt/c1/ku/ll/c1kullf_p9ssialf8v51f81zlt8.png)
Содержат миграции приложения и конфигурацию для их запуска.
Типовая миграция:
from alembic import op
import sqlalchemy as sa
revision = 'cee7a5698356'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table('student',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('now()'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
def downgrade():
op.drop_table('student')
amqp
![](https://habrastorage.org/webt/i1/ix/jh/i1ixjh7bxv_etju8w_8fgdxs0pg.png)
В
consumers
перечислены обработчики входящих сообщений. amqp_config
содержит конфигурации для подключения к различным vhost. Prefix
в конфигурации задаёт имя канала для дальнейшего использования в приложении. В connection
перечислены базовые функции для удобного подключения к RabbitMQ с возможностью переподключения, а также биндинги exchange’ей и очередей (при старте приложения все необходимые для работы сущности должны автоматически создаваться из кода). В decorators
можно найти декоратор для отказоустойчивой работы с сообщениями из очередей. При получении ошибки сообщение упадёт в альтернативную очередь, и через заданное время снова попадёт в основную. Если же за несколько попыток сообщение не удалось обработать, то оно будет переложено в третью очередь для дальнейшего «ручного» анализа программистами.Пример самого простого обработчика сообщений:
import json
from sanic.log import logger
from models.student import Student
class ExampleConsumer:
def __init__(self, app):
self.app = app
async def handle(self, channel, body, envelope, properties):
logger.info('incoming msg {}'.format(body))
data = json.loads(body.decode())
async with self.app.db_engine.acquire() as conn:
await Student.create(conn=conn, values=data)
app
![](https://habrastorage.org/webt/8h/r6/38/8hr6380jqa2crhnlcepp3j9ox2g.png)
app.py
содержит инициализацию Sanic-приложения с обслуживающими функциями, а в server.py
находится его запуск.Инициализировать приложение можно вот так:
def init_app(api=False, consume=False, schedule=False) -> AppSanic:
app = AppSanic(__name__, log_config=LOG_SETTINGS, strict_slashes=True)
app.exception(NotFound)(handle_404)
app.listener('before_server_start')(setup_connect)
app.listener('before_server_stop')(close_connect)
if api:
set_router(app)
if consume:
app.listener('before_server_start')(setup_amqp_connection)
app.listener('after_server_start')(start_consume_amqp)
app.listener('before_server_stop')(close_amqp_connection)
if schedule:
app.listener('before_server_start')(schedule_initializer)
app.blueprint(routes.srv)
return app
core
![](https://habrastorage.org/webt/os/fu/v0/osfuv03qsnbsftr56pkfxpf3sym.png)
Папка
core
содержит вспомогательные функции для инициализации приложения.handler
![](https://habrastorage.org/webt/cq/4v/ur/cq4vury7v9zncujjk1lxvakzip8.png)
В этой папке лежат обработчики для API-методов. Маршрутизация к ним находится в файле
routes.py
в корне приложения.Пример API по созданию новой записи и получения её по ID:
async def create_student(request: Request):
data = request['validated_json']
async with request.app.db_engine.acquire() as conn:
result = await Student.create(conn=conn, values=data)
return http_created(StudentResultSchema().dump(result))
async def get_student_by_id(request: Request, student_id):
async with request.app.db_engine.acquire() as conn:
result = await Student.get_by_id(conn=conn, student_id=student_id)
if not result:
return http_bad_request('student not found')
return http_ok(StudentResultSchema().dump(result))
lib
![](https://habrastorage.org/webt/ag/9y/zz/ag9yzzl0wtzmyxy3manyd-lwyyu.png)
В
lib
представлены самописные классы и функции, используемые во всём проекте.models
![](https://habrastorage.org/webt/4f/te/bc/4ftebcbuv-vb_sjqscr_a9fmnnc.png)
Папка
models
содержит модели для доступа к данным. Alembic умеет через reflection
видеть изменения в этих моделях и создавать автоматические миграции через команду alembic revision --autogenerate
.Модель «Студент» с набором базовых действий с этой сущностью:
from datetime import datetime
from sqlalchemy import Column, String, Integer, Boolean, DateTime, text
from lib.db_mixin import DB
from . import Base
class Student(Base, DB):
__tablename__ = 'student'
id = Column(Integer, autoincrement=True, primary_key=True)
name = Column(String, nullable=True)
active = Column(Boolean, default=True, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, server_default=text('now()'))
updated_at = Column(DateTime, default=datetime.utcnow, server_default=text('now()'))
@classmethod
async def create(cls, conn, values):
cur = await Student.insert().values(
**values
).returning(Student.__table__).execute(conn)
result = await cur.first()
return dict(result)
@classmethod
async def update_by_id(cls, conn, student_id, values):
cur = await Student.update().values(
updated_at=datetime.utcnow(),
**values
).where(
Student.id == student_id
).returning(Student.__table__).execute(conn)
result = await cur.first()
return dict(result) if result else None
@classmethod
async def get_by_id(cls, conn, student_id):
result = await Student.select() \
.where(Student.id == student_id) \
.get(conn)
return dict(result) if result else None
@classmethod
async def get_all(cls, conn):
result = await Student.select() \
.where(Student.active.is_(True)) \
.all(conn)
return [dict(i) for i in result]
@classmethod
async def set_not_active_by_id(cls, conn, student_id):
await Student.update().values(active=False) \
.where(Student.id == student_id) \
.execute(conn)
@classmethod
async def delete_by_id(cls, conn, student_id):
await Student.delete() \
.where(Student.id == student_id) \
.execute(conn)
student_table = Student.__table__
modules
![](https://habrastorage.org/webt/9r/kt/g9/9rktg9mj9fsotqofws2tcs5im-g.png)
В
modules
представлен модуль с HTTP-клиентом для запросов к внешним сервисам, а также приведён пример клиента для получения данных с worldtimeapi.org.from modules.http_client.client import BaseClient
import settings
class TimezoneApiClient(BaseClient):
service_url = settings.TIMEZONE_API_HOST
request_hooks = [
'auth_request',
]
current_time_url = 'timezone/{area}/{city}'
def auth_request(self, kwargs):
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers'].update({
# 'Authorization': 'settings.EXAMPLE_TOKEN',
'Accept': 'application/json'
})
async def get_time(self, area, city):
response = await self.get(self.get_full_url('current_time_url', area=area, city=city))
return response
schedule
![](https://habrastorage.org/webt/et/6e/rj/et6erjy79llg683sxxqpf3-trj0.png)
Расписание запускаемых «кронов» находится в
__init__.py
, код периодических функций лежит в остальных .py-файлах папки schedule
.Самый простой «крон»:
from sanic import Sanic
from sanic.log import logger
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from models.student import Student
async def example_schedule(app):
logger.info('start example_schedule')
async with app.db_engine.acquire() as conn:
all_students = await Student.get_all(conn=conn)
for s in all_students:
await Student().update_by_id(
conn=conn,
student_id=s['id'],
values={
'name': s['name'] + '_new'
}
)
logger.info('end example_schedule')
def initialize_scheduler(app: Sanic, loop):
scheduler = AsyncIOScheduler({
'event_loop': loop,
'apscheduler.timezone': 'UTC',
})
scheduler.add_job(example_schedule, 'interval', minutes=1, kwargs={'app': app})
scheduler.add_job(example_schedule, 'cron', hour="*", minute=00, kwargs={'app': app})
return scheduler
schemas
![](https://habrastorage.org/webt/jn/ua/fo/jnuafoskf7vhmrvplz6drhfv7im.png)
В этой папке находятся marshmallow-схемы, используемые как в handlers, так и в остальных модулях проекта.
Пример валидации входных и выходных данных сущности «Студент»:
from marshmallow import fields
from lib.schema import BaseSchema
class CreateStudentSchema(BaseSchema):
name = fields.String(required=True)
class UpdateStudentSchema(BaseSchema):
name = fields.String(required=True)
class StudentResultSchema(BaseSchema):
id = fields.Integer(required=True)
name = fields.String(required=True)
active = fields.Boolean(required=True)
created_at = fields.DateTime(format='%Y-%m-%dT%H:%M:%S+00:00', required=True)
updated_at = fields.DateTime(format='%Y-%m-%dT%H:%M:%S+00:00', required=True)
Теперь, когда мы разобрались со структурой и особенностями приложения, можно перейти к его запуску.
Запуск
![](https://habrastorage.org/webt/xn/id/g5/xnidg5coxyk9gmxrq_c2c4nzzs0.png)
Перед запуском необходимо поднять базу и RabbitMQ через команду
docker-compose up
Также прогоним миграции командой
alembic upgrade head
Запуск приложения благодаря библиотеке manage.py похож на локальный запуск Django-проектов:
- Веб-сервер —
python manage.py run
- Консьюмеры —
python manage.py consume
- Кроны —
python manage.py schedule
При работе в API-режиме в случае запуска приложения на многоядерном сервере можно установить количество внутренних процессов равным количеству ядер сервера с помощью переменной среды
WORKERS
(либо через os.cpu_count()
). При запуске в режиме консьюмера работу тестового консьюмера можно проверить через веб-панель RabbitMQ (находится по адресу
http://localhost:15672/
, сервер слушает очередь example.main
).Работу планировщика задач можно увидеть по сообщениям в логах либо по изменению данных в базе, — если через API добавить несколько тестовых студентов.
Документация
![](https://habrastorage.org/webt/vf/80/ca/vf80caxcn_kr5v8e9q2n5s0f1bm.png)
Самым удобным, на мой взгляд, способом отображения документации является Swagger. Чтобы описанные в нём форматы данных не расходились с реальным кодом, необходимо сделать так, чтобы они генерировались из одних и тех же схем. Для этого мы будем использовать библиотеку sanic_openapi и самописную функцию
open_api_schemas
.@doc.summary('Обновление записи "Студент"')
@doc.consumes(open_api_schemas(UpdateStudentSchema()), location='body')
@doc.produces(open_api_schemas(StudentResultSchema()))
@validate_json(UpdateStudentSchema)
async def update_student_by_id(request: Request, student_id):
data = request['validated_json']
async with request.app.db_engine.acquire() as conn:
result = await Student.update_by_id(conn=conn, student_id=student_id, values=data)
if not result:
return http_bad_request('student not found')
return http_ok(StudentResultSchema().dump(result))
Документация приложения расположена по адресу
http://0.0.0.0:8090/swagger/#/
. Чтобы включить её отображение, необходимо проставить переменную среды DEBUG
в значение True
. Сделать это можно либо bash-командой export DEBUG=True
, либо создав файл .env
в корне проекта и написав туда аналогичную строчку.Для тестового проекта документация будет иметь вид:
![](https://habrastorage.org/webt/9o/p1/nq/9op1nqwrs-tcdfpo-we61ronmke.png)
А что дальше?
![](https://habrastorage.org/webt/qy/va/pk/qyvapkr1sohj0ankoaxr_mnlfru.png)
Первый этап закончен, теперь приложение может принимать входящие запросы, читать сообщения из RabbitMQ и выполнять периодические задания. Но оно всё ещё не «production ready».
Чтобы это исправить, надо:
- Покрыть код приложения тестами (рекомендую pytest).
- Разработать декораторы и middleware для авторизации и контроля доступа к данным.
- Настроить сборщик логов и подключить Sentry.
- Начать собирать метрики приложения (например, с помощью Prometheus) и выставить по ним оповещения.
- Подключить системы для кеширования данных (например, Redis).
- Собрать приложения в Docker, подготовить его к работе на тестовых и боевых стендах.
- Провести нагрузочное тестирование для выявления узких мест и правильного выставления лимитов по ресурсам.
Если эти темы интересны, могу описать их в следующих статьях.
Исходный код библиотеки доступен на GitHub под MIT лицензией.
В комментариях было бы интересно узнать про конфигурации ваших асинхронных приложений, а также их достоинства и недостатки.