Эта статья рассчитана в большинстве своём на новичков. Тут мы поговорим о том, как не упереться в лимиты подключений к базе, и чтобы приложение в продакшн не упало.
В этой статье будет идти речь о SQLAlchemy и частности PostgreSQL. В Django таких проблем по умолчанию я не видел, а вот в алхимии, приходится вручную мониторить подключения.
Стандартное создание engine и сессии для алхимии вида:
engine = create_async_engine(
SQLALCHEMY_DATABASE_URL,
**(
dict(pool_recycle=900, pool_size=100, max_overflow=3)
)
)
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False,
class_=AsyncSession
)
UPD. Здесь мы будем работать с асинхронной сессией алхимии, но у синхронной такой же механизм действий
И потом мы применяем это примерно так:
async def foo(some: Any):
# создаем транзакцию
db = SessionLocal()
some_do(some)
# делаем коммит
await db.commit()
# закрываем соединение
await db.close()
НО, что если на моменте some_do() случится ошибка? Тогда у нас не сделается коммит(ну в принципе логично), и что более важно, не закроется соединение, а это уже критично.
Разберемся почему это критично:
Превысив лимит на подключения к базе, другие запросы не смогут обрабатываться
Лишняя нагрузка на систему
Дополнительные открытые подключения в системе
И если лишняя нагрузка на систему и открытые подключения в системе это хоть и критично, но наше приложение от этого не перестанет работать(до какого то момента). То вот при лимите подключений, уже всё, наше приложение перестанет работать сразу и пользователи будут видеть ошибку
Мы разобрались в чем проблема не закрытых соединений, но как тогда с ними работать?
Можно сделать просто try: … except: …
async def foo(some: Any):
# создаем транзакцию
db = SessionLocal()
try:
some_do(some)
except Exception as e:
print(e)
await db.commit()
await db.close()
Скажем так, оно будет работать, но всегда есть одно но.. вы серьезно захотите каждый раз делать try except? Всё таки надо найти какой-то более просто способ, чтобы сократить однотипные действия в функции, и убрать вложенность.
Первое что пришло в голову это middlewares, но оно подойдет только если ваше приложение позволяет его реализовать(веб-сервер, телеграм бот), а что если нам нужен универсальный обработчик транзакций? Тут из тени выходит контекстный менеджер
async def foo(some: Any):
# создаем транзакцию
async with SessionLocal() as s:
some_do(some)
await db.commit()
Мы определенно сократили количество кода в нашей функции, но всё так же остался лишний уровень вложенности, как мы можем избавиться от него? Передать сессию в функцию из декоратора..? Давайте попробуем
def transaction():
def wrapper(cb):
async def wrapped(*args, **kwargs):
async with SessionLocal() as session:
result = await cb(session, *args, **kwargs)
await session.commit()
return result
return wrapped
return wrapper
@transaction()
async def foo(session: AsyncSession, some: Any):
some_do(session, some)
Хмм, кода стало больше, но зато в функции теперь без вложенности и минимальное количество строк. Но погодите-ка, мы каждый раз должны передавать в функцию объект сессии, что создает 1 дополнительную зависимость. Попробуем решить, а как? Просто получать из переменной объект сессии? Да, так в питоне можно, будем использовать contextvars. Можете посмотреть как там что работает, но если кратко, то, что мы обьявляем в родительской функции в переменной ContextVar, то значение передается в вызываемые функции.
Немного примеров:
import asyncio
from contextvars import ContextVar
var: ContextVar[int] = ContextVar('var') # Тайп хинт показывает, что мы планируем передавать числовой тип в переменную
async def bar():
value = var.get()
print(f"In context var value is {value}")
async def foo():
token = var.set(777)
await bar()
var.reset(token)
asyncio.run(foo()) # In context var value is 777
На выходе мы получаем строку: In context var value is 777
Как видно, мы не передаем явно в функцию никаких значений, но это значение доступно по переменной, и в этом вся магия.
Значит мы можем применить это и к нашей сессии:
P = ParamSpec("P")
T = TypeVar("T")
def require_session():
session = db_session_var.get()
assert session is not None, "Session context is not provided"
return session
def transaction():
def wrapper(
cb: Callable[P, Coroutine[Any, Any, T]]
) -> Callable[P, Coroutine[Any, Any, T]]:
@wraps(cb)
async def wrapped(*args: P.args, **kwargs: P.kwargs) -> T:
if db_session_var.get() is not None:
return await cb(*args, **kwargs)
async with cast(AsyncSession, SessionLocal()) as session:
with use_context_value(db_session_var, session):
result = await cb(*args, **kwargs)
await session.commit()
return result
return wrapped
return wrapper
@contextmanager
def use_context_value(context: ContextVar[T], value: T):
reset = context.set(value)
try:
yield
finally:
context.reset(reset)
db_session_var: ContextVar[AsyncSession | None] = ContextVar("db_session_var", default=None)
UPD. Это уже окончательный вид для нашего менеджера, с type hinting и всякими плюшками, чтобы было приятнее работать.
Получается огромный блок для менеджера транзакций, но зато теперь у нас есть простой доступ к нашему объекту сессии:
@transaction()
async def foo():
db = require_session()
some_do()
И, в чем основная прелесть этого декоратора, то что для под вызываемых функций мы так же их оборачиваем их в transaction(), но у нас используется одна и та же сессия, что дает нам 1 подключение на 1 обработку запроса.
Теперь даже если что-то случится во время выполнения запроса, то у нас автоматически закроется сессия, и у нас не создается очередь из подключений ожидающих закрытие.
Сам этот менеджер-декоратор я использую на своих проектах, он универсальный, будет работать и для тг-ботов, и для веба, да и просто для какого-либо обработчика(главное правильно поставить в порядке декораторов). Мне про этот менеджер-декоратор подсказал мой коллега, за что я ему чрезмерно благодарен.
Спасибо за то что прочитали статью! Если у вас, опытных разработчиков, есть какие-то замечания к статье, буду ждать в комментариях вас