Эта статья рассчитана в большинстве своём на новичков. Тут мы поговорим о том, как не упереться в лимиты подключений к базе, и чтобы приложение в продакшн не упало.

В этой статье будет идти речь о SQLAlchemy и частности PostgreSQL. В Django таких проблем по умолчанию я не видел, а вот в алхимии, приходится вручную мониторить подключения.
Стандартное создание engine и сессии для алхимии вида:
engine = create_async_engine( SQLALCHEMY_DATABASE_URL, **( dict(pool_recycle=900, pool_size=100, max_overflow=3) ) ) SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=engine, expire_on_commit=False, class_=AsyncSession )
UPD. Здесь мы будем работать с асинхронной сессией алхимии, но у синхронной такой же механизм действий
И потом мы применяем это примерно так:
async def foo(some: Any): # создаем транзакцию db = SessionLocal() some_do(some) # делаем коммит await db.commit() # закрываем соединение await db.close()
НО, что если на моменте some_do() случится ошибка? Тогда у нас не сделается коммит(ну в принципе логично), и что более важно, не закроется соединение, а это уже критично.
Разберемся почему это критично:
Превысив лимит на подключения к базе, другие запросы не смогут обрабатываться
Лишняя нагрузка на систему
Дополнительные открытые подключения в системе
И если лишняя нагрузка на систему и открытые подключения в системе это хоть и критично, но наше приложение от этого не перестанет работать(до какого то момента). То вот при лимите подключений, уже всё, наше приложение перестанет работать сразу и пользователи будут видеть ошибку
Мы разобрались в чем проблема не закрытых соединений, но как тогда с ними работать?
Можно сделать просто try: … except: …
async def foo(some: Any): # создаем транзакцию db = SessionLocal() try: some_do(some) except Exception as e: print(e) await db.commit() await db.close()
Скажем так, оно будет работать, но всегда есть одно но.. вы серьезно захотите каждый раз делать try except? Всё таки надо найти какой-то более просто способ, чтобы сократить однотипные действия в функции, и убрать вложенность.
Первое что пришло в голову это middlewares, но оно подойдет только если ваше приложение позволяет его реализовать(веб-сервер, телеграм бот), а что если нам нужен универсальный обработчик транзакций? Тут из тени выходит контекстный менеджер
async def foo(some: Any): # создаем транзакцию async with SessionLocal() as s: some_do(some) await db.commit()
Мы определенно сократили количество кода в нашей функции, но всё так же остался лишний уровень вложенности, как мы можем избавиться от него? Передать сессию в функцию из декоратора..? Давайте попробуем
def transaction(): def wrapper(cb): async def wrapped(*args, **kwargs): async with SessionLocal() as session: result = await cb(session, *args, **kwargs) await session.commit() return result return wrapped return wrapper @transaction() async def foo(session: AsyncSession, some: Any): some_do(session, some)
Хмм, кода стало больше, но зато в функции теперь без вложенности и минимальное количество строк. Но погодите-ка, мы каждый раз должны передавать в функцию объект сессии, что создает 1 дополнительную зависимость. Попробуем решить, а как? Просто получать из переменной объект сессии? Да, так в питоне можно, будем использовать contextvars. Можете посмотреть как там что работает, но если кратко, то, что мы обьявляем в родительской функции в переменной ContextVar, то значение передается в вызываемые функции.
Немного примеров:
import asyncio from contextvars import ContextVar var: ContextVar[int] = ContextVar('var') # Тайп хинт показывает, что мы планируем передавать числовой тип в переменную async def bar(): value = var.get() print(f"In context var value is {value}") async def foo(): token = var.set(777) await bar() var.reset(token) asyncio.run(foo()) # In context var value is 777
На выходе мы получаем строку: In context var value is 777
Как видно, мы не передаем явно в функцию никаких значений, но это значение доступно по переменной, и в этом вся магия.
Значит мы можем применить это и к нашей сессии:
P = ParamSpec("P") T = TypeVar("T") def require_session(): session = db_session_var.get() assert session is not None, "Session context is not provided" return session def transaction(): def wrapper( cb: Callable[P, Coroutine[Any, Any, T]] ) -> Callable[P, Coroutine[Any, Any, T]]: @wraps(cb) async def wrapped(*args: P.args, **kwargs: P.kwargs) -> T: if db_session_var.get() is not None: return await cb(*args, **kwargs) async with cast(AsyncSession, SessionLocal()) as session: with use_context_value(db_session_var, session): result = await cb(*args, **kwargs) await session.commit() return result return wrapped return wrapper @contextmanager def use_context_value(context: ContextVar[T], value: T): reset = context.set(value) try: yield finally: context.reset(reset) db_session_var: ContextVar[AsyncSession | None] = ContextVar("db_session_var", default=None)
UPD. Это уже окончательный вид для нашего менеджера, с type hinting и всякими плюшками, чтобы было приятнее работать.
Получается огромный блок для менеджера транзакций, но зато теперь у нас есть простой доступ к нашему объекту сессии:
@transaction() async def foo(): db = require_session() some_do()
И, в чем основная прелесть этого декоратора, то что для под вызываемых функций мы так же их оборачиваем их в transaction(), но у нас используется одна и та же сессия, что дает нам 1 подключение на 1 обработку запроса.
Теперь даже если что-то случится во время выполнения запроса, то у нас автоматически закроется сессия, и у нас не создается очередь из подключений ожидающих закрытие.
Сам этот менеджер-декоратор я использую на своих проектах, он универсальный, будет работать и для тг-ботов, и для веба, да и просто для какого-либо обработчика(главное правильно поставить в порядке декораторов). Мне про этот менеджер-декоратор подсказал мой коллега, за что я ему чрезмерно благодарен.
Спасибо за то что прочитали статью! Если у вас, опытных разработчиков, есть какие-то замечания к статье, буду ждать в комментариях вас
