Салют Хабр! Меня зовут Влад и я хотел бы осветить довольно важную тему, которую я не видел в обучающих материалах, а именно сложные запросы через паттерн Repository.
Проблема: построение гибких SQLAlchemy запросов на основе данных, переданных из бизнес логики. Более масштабное использование паттерна Repository.
Реше��ие(то, о чём говорится в этой статье): все запросы формируются посредством конфигов, переданных из бизнес логики, далее конфиги обрабатываются и из них составляется ORM запрос.
Схема работы реализации:

Configs, ORM Query Builder, Operator Agregate.Configs - это правила, по которым в бизнес логику должны вернуться данные.
Актуальность для читателя: я думаю, что эта статья способна показать работу с паттерном Repository под другим углом, показать как можно строить гибкие запросы без создания кучи методов в имплементациях и потом описывания этих же методов в интерфейсах.
Пожалуй, можно начинать.
Все вы видели те самые классы, которые мелькают в видео на Youtube.
from typing import Generic, TypeVar, Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession DTO = TypeVar("DTO") class SQLAlchemyRepository(Generic[DTO]): def __init__(self, session, model): self._model = model self._session = session async def get_by_filter(self, filters: dict[str, Any]) -> DTO | None: query = select(self._model).filter_by(**filters) data = await self._session.execute(query) data_scalar = data.scalar() if data_scalar is None: return None return data_scalar.to_dto() async def get_many_by_filter(self, filters: dict[str, Any]) -> list[DTO]: query = select(self._model).filter_by(**filters) data = await self._session.execute(query) data_all = data.scalars().all() if not data_all: return [] return [sqal_model.to_dto() for sqal_model in data_all]
При росте вашего проекта вам понадобятся более кастомизированные запросы в бд и логики этих методов хватать не будет, соответственно вам придётся расширяться.
Как уже упоминалось в Решение используются конфиги, думаю, сейчас лучший момент, чтобы их показать, но для начала нужно объявить структуру проекта.
repository/ builder_configs/ __init__.py configs.py types.py
Рассмотрим содержимое файла configs.py
from dataclasses import dataclass, field from typing import Any from .types import FilterType, OrderByType, LazyLoadType, FilterLogicType, UNSET @dataclass class Filter: column: str value: Any filter_type: FilterType = FilterType.EQ @dataclass class FilterConfig: filters: list[Filter] mode: FilterLogicType = FilterLogicType.NULL @dataclass class OrderByConfig: column: str mode: OrderByType = OrderByType.ASC @dataclass class ColumnConfig: column: str label: str | UNSET = UNSET value: Any = UNSET filter_type: FilterType = FilterType.EQ value_is_column: bool = False @dataclass class JoinConfig: table_name: str columns: list[ColumnConfig] = field(default_factory=list) filters: list[FilterConfig] = field(default_factory=list) order_by: list[OrderByConfig] = field(default_factory=list) @dataclass class LazyLoadConfig: relationship_strategy: str load_type: LazyLoadType = LazyLoadType.JOINEDLOAD
Хочу поподробней остановиться на конфигах.
Filter - простой в реализации dataclass, который принимает на вход поле, значение и необходимый оператор.
FilterConfig - в него можно передавать несколько фильтров и применять к ним логическое условие OR, AND. В случае если условие передано не будет (NULL), то все агрегированные объекты будут передавать в where позиционно.
OrderByConfig - сортировка поля по заданному режиму (ASC, DESC)
ColumnConfig - этот dataclass позволяет включать в SQL запрос выборку отдельных колонок, задавать им label, также он позволяет делать булевые выборки, также есть поддержка добаdления в булевую выборку полей из таблиц. Для этого нужно:
value=table_name.column,value_is_column=True.JoinConfig - позволяет присоединять другие таблицы. Переданные атрибуты
columns,filters,order_byбудут использованы для конкретной таблицыtable_name.LazyLoadConfig - поддержка подгрузки relationship. Чтобы подгрузить нужно указать название relationhip атрибута. Если же вы хотите получить вложенное отношение, то нужно указывать relationship по цепочке через
.Пример вложенных отношений:
Допустим, есть модель User и она имеет отношение с таблицей Item через атрибут
item, также Item связана с другой таблицей Collections посредством отношенияcollectionsи вот здесь и появляются вложенные отношения. Представим, что выполняется запрос в таблицу User и понадобилось подтянуть отношение item и к нему collections. В таком случаеrelationship_strategy=item.collections. Более подробно это будет рассмотрено при разборе реализации методаloadкласса SQLAlchemyQueryBuilder.
Теперь можно посмотреть на содержание файла types.py
from typing import NewType from enum import Enum UNSET = NewType("UNSET", None) class FilterLogicType(Enum): AND = "and" OR = "or" NULL = "null" class FilterType(Enum): EQ = "=" GT = ">" GE = ">=" LT = "<" LE = "<=" IN = "in" VECTOR_SORT = "vector_sort" class OrderByType(Enum): ASC = "asc" DESC = "desc" class LazyLoadType(Enum): JOINEDLOAD = "joinedload" SELECTINLOAD = "selectinload"
Всё, что находится в FilterType должен содержать Operator Agregate.
Теперь я хотел бы затронуть сам ORM Query Builder. Структура папок в свою очередь принимает такой вид:
repository/ builder_configs/ __init__.py configs.py types.py query_builder/ __init__.py interface.py impl/ __init__.py alchemy.py
Интерфейс для Query Builder (query_builder/interface.py)
from typing import Protocol, Any from typing_extensions import Self from repository.builder_configs.configs import ( FilterConfig, ColumnConfig, JoinConfig, LazyLoadConfig, OrderByConfig ) from .agregator.interface import AgregateFilterTypeProtocol class QueryBuilderProtocol(Protocol): def init( self, filter_agregate: AgregateFilterTypeProtocol, *args, **kwargs ) -> None: self._filter_agregate = filter_agregate def count(self) -> Self: raise NotImplementedError def columns(self, columns: list[ColumnConfig], *args, *kwargs) -> Self: raise NotImplementedError def filter(self, filters: list[FilterConfig], *args, *kwargs) -> Self: raise NotImplementedError def join(self, joins: list[JoinConfig]) -> Self: raise NotImplementedError def load(self, loads: list[LazyLoadConfig], *args, *kwargs) -> Self: raise NotImplementedError def order_by(self, order_by: list[OrderByConfig], *args, *kwargs) -> Self: raise NotImplementedError def values(self, data: dict[str, Any] | list[dict[str, Any]]) -> Self: raise NotImplementedError def limit(self, value: int | None) -> Self: raise NotImplementedError def offset(self, value: int | None) -> Self: raise NotImplementedError def build(self) -> Any: raise NotImplementedError
В магическом методе __init__ вы можете заметить AgregateFilterTypeProtocol данный протокол нужен для агрегации операторов.
С появлением агрегатора структура папок примет такой вид:
repository/ builder_configs/ __init__.py configs.py types.py query_builder/ __init__.py interface.py impl/ __init__.py alchemy.py agregator/ __init__.py interface.py impl/ __init__.py alchemy.py
Ниже вы можете увидеть интерфейс агрегатора (query_builder/agregator/interface.py)
from typing import Protocol, Any from repository.builder_configs.types import FilterType class AgregateFilterTypeProtocol(Protocol): def filter_agregate( self, value: Any, filter_type: FilterType, *args, **kwargs ) -> Any: raise NotImplementedError
Реализация агрегатора (query_builder/agregator/impl/alchemy.py)
from typing import Any from sqlalchemy import func from sqlalchemy.orm.properties import MappedColumn from repository.builder_configs.types import FilterType class SQLAlchemyAgregateFilterType: def filter_agregate( self, value: Any, filter_type: FilterType, mapped_column: MappedColumn[Any], *args, **kwargs ) -> Any: operator_with_lambda = { "=": lambda column, value: column == value, ">": lambda column, value: column > value, ">=": lambda column, value: column >= value, "<": lambda column, value: column < value, "<=": lambda column, value: column <= value, "in": lambda column, value: column.in_(value), "vector_sort": lambda column, value: column.op("@@")(func.websearch_to_tsquery(str(value))) } if filter_type.value not in operator_with_lambda: raise ValueError(f"Not found operator {filter_type.value}") return operator_with_lambda[filter_type.value](mapped_column, value)
В реализации вам может быть непонятен аргумент mapped_column - это экземпляр класса, полученный из атрибута ORM модели.
Ну и теперь гвоздь программы SQLAlchemyQueryBuilder. Я буду объяснять каждый метод постепенно.
from typing import Callable, Any, TypeVar from typing_extensions import Self from sqlalchemy.orm.properties import MappedColumn from sqlalchemy.sql.elements import ColumnElement, Label, UnaryExpression from sqlalchemy.orm.strategy_options import _AbstractLoad from sqlalchemy import or_, and_, func, asc, desc from sqlalchemy.orm import MappedColumn, joinedload, selectinload from db.models import Base from repository.builder_configs.configs import ( FilterConfig, ColumnConfig, JoinConfig, OrderByConfig, LazyLoadConfig ) from repository.builder_configs.types import LazyLoadType, OrderByType, FilterLogicType, UNSET from repository.query_builder.agregator.impl.alchemy import SQLAlchemyAgregateFilterType ALCHEMYMODEL = TypeVar("ALCHEMYMODEL", bound=Base) class SQLAlchemyQueryBuilder: def __init__( self, query_type: Callable, model: type[ALCHEMYMODEL], filter_agregate: SQLAlchemyAgregateFilterType, *args, **kwargs ) -> None: self._query_type: Callable = query_type self._filter_agregate: SQLAlchemyAgregateFilterType = filter_agregate self._model: type[ALCHEMYMODEL] = model self._columns: list[MappedColumn[Any] | ColumnElement[bool] | Label[Any]] = [] self._limit: int | None = None self._offset: int | None = None self._joins: list[type[ALCHEMYMODEL]] = [] self._filter: list[ColumnElement[bool]] = [] self._order_by: list[UnaryExpression] = [] self._values: list[dict[str, Any]] = [] self._loads: list[_AbstractLoad] = [] self._count: bool = False self._orm_models: dict[str, type["Base"]] = model.orm_models()
query_type - тип выполняемого запроса: select, update и тп.
model - orm модель
После создания экземпляра этого класса и при вызове методов данные накапливаются в атрибутах и при вызове метода build строится ORM запрос.
Здесь можно обратить внимание на атрибут _orm_models, исходя из его аннотации можно сказать, что он хранит все зарегистрированные sqlalchemy модели.
Реализация метода orm_models
class Base(DeclarativeBase): _cached_orm_models: dict[str, type["Base"]] = {} @classmethod def orm_models(cls) -> dict[str, type["Base"]]: if not cls._cached_orm_models: models = {} for mapper in cls.registry.mappers: models[mapper.class_.__tablename__] = mapper.class_ cls._cached_orm_models.update(models) return cls._cached_orm_models
Теперь можно перейти к самим методам класса SQLAlchemyQueryBuilder.
count
def count(self) -> Self: self._count = True return self
Позволяет получать количество записей.
columns
def columns( self, columns: list[ColumnConfig] = [], model: type[ALCHEMYMODEL] | None = None, *args, **kwargs ) -> Self: if columns: if model is None: model = self._model for config in columns: obj = getattr(model, config.column, None) if (obj is None): raise ValueError(f"model {model.__name__} has not column {config.column}") if config.value is not UNSET: if config.value_is_column is True: table_name, column_name = config.value.split(".") orm_object = self._orm_models.get(table_name) if (hasattr(orm_object, column_name) is False): raise ValueError(f"model {orm_object} has not column {column_name}") config.value = getattr(orm_object, column_name) obj = self._filter_agregate.filter_agregate( mapped_column=obj, value=config.value, filter_type=config.filter_type ) if config.label is not None: obj = obj.label(config.label) self._columns.append(obj) return self
В начале функции проверяется, была ли передана модель в функцию, это нужно, чтобы переиспользовать метод для разных моделей. Дальше - итерация, в которой из объекта model достаётся атрибут config.column. После идёт проверка на переданное value и если оно является полем в другой таблице, то строка распиливается по . и из _orm_models достаётся нужная orm модель, после в config.value записывается экземпляр класса MappedColumn полученный из атрибута column_name. Дальше работает агрегатор и как вы можете заметить в config.value может лежать как произвольное значение так и MappedColumn, далее это добавление label ну и окончательное действие - запись получившегося объекта в список _columns.
filter
def filter( self, filters: list[FilterConfig] = [], model: type[ALCHEMYMODEL] | None = None, *args, **kwargs ) -> Self: if filters: if model is None: model = self._model configs = [] for filter in filters: mode = None if filter.mode == FilterLogicType.OR: mode = or_ elif filter.mode == FilterLogicType.AND: mode = and_ elements_objects = [] for filter_ in filter.filters: obj = getattr(model, filter_.column, None) if (obj is None): raise ValueError(f"model {model.__name__} has not column {filter_.column}") column_element = self._filter_agregate.filter_agregate( mapped_column=obj, filter_type=filter_.filter_type, value=filter_.value ) elements_objects.append(column_element) if mode is None: configs.extend(elements_objects) else: configs.append(mode(*elements_objects)) if configs: self._filter.extend(configs) return self
Итерация по FilterConfig, определение логического условия, после чего происходит итерация по Filter, в этом блоке кода также достаётся MappedColumn и отправляется в агрегатор. В element_objects остаются все аргегированные Filter, в configs - FilterConfig. После завершения основного цикла изменения отправляются в атрибут _filters.
order_by
def order_by( self, order_by: list[OrderByConfig] = [], model: type[ALCHEMYMODEL] | None = None, *args, **kwargs ) -> Self: if order_by: if model is None: model = self._model for oby in order_by: oby_mode = asc if oby.mode == OrderByType.DESC: oby_mode = desc obj = getattr(model, oby.column, None) if (obj is None): raise ValueError(f"model {model.__name__} has not column {oby.column}") self._order_by.append(oby_mode(obj)) return self
Я думаю, что здесь объяснения излишни так как используются всё те же принципы, что и в предыдущих методах.
join
def join(self, joins: list[JoinConfig]) -> Self: if joins: for join in joins: orm_object = self._orm_models.get(join.table_name) if join.columns: self.columns( columns=join.columns, model=orm_object ) if join.filters: self.filter( filters=join.filters, model=orm_object ) if join.order_by: self.order_by( order_by=join.order_by, model=orm_object ) self._joins.append(orm_object) return self
Здесь можно заметить, что переиспользуются методы filter, columns, order_by, только
вместо изначально переданной в init orm модели, будет использована модель, название
которой указано в конфиге, объект этой модели достаётся из _orm_models.
load
def load( self, loads: list[LazyLoadConfig], model: type[Base] | None = None, *args, **kwargs ) -> Self: if loads: for load in loads: load_mode = joinedload if load.load_type == LazyLoadType.SELECTINLOAD: load_mode = selectinload current_model = model if model is None: current_model = self._model current_load = None relationship_path = load.relationship_strategy.split(".") for part in relationship_path: relationship_declared = getattr(current_model, part, None) if (relationship_declared is None): raise ValueError(f"Error in relationship path: model {current_model.__name__} has no relationship {part}") current_model = relationship_declared.mapper.class_ if current_load is None: current_load = load_mode(relationship_declared) else: current_load = getattr(current_load, load_mode.__name__)(relationship_declared) if current_load: self._loads.append(current_load) return self
Здесь я бы хотел объяснить как именно обрабатывается relationship_strategy и как собираются вложенные relationship. Переменная relationship_path хранит все отношения, из которых составляется цепочка, итерируясь по ним, relationship_declared получает экземпляр класса _RelationshipDeclared и после current_model перезаписывается на модель, с которой связано отношение. После чего формируется current_load, если оно не было до этого записано, то просто передаём отношение в функцию load_mode, иначе мы вызываем функцию load_mode у current_load. Пример: joinedload(part1_relationship).joinedload(part2_relationship). Именно так и строятся вложенные relationship.
Теперь покажу несколько легковесных методов, в которых я думаю вы уже можете разобраться самостоятельно.
values, limit, offset
def values( self, data: dict[str, Any] | list[dict[str, Any]] ) -> Self: if isinstance(data, dict): self._values.append(data) elif isinstance(data, list): self._values.extend(data) return self def limit( self, value: int | None = None ) -> Self: if value: self._limit = value return self def offset( self, value: int | None = None ) -> Self: if value: self._offset = value return self
build
def build( self ) -> Any: query = self._query_type(self._model) if self._columns: query = self._query_type(*self._columns).select_from(self._model) if self._count: count = func.count() if self._columns: count = func.count(*self._columns) query = self._query_type(count).select_from(self._model) if self._values: query = query.values(self._values) if self._filter: query = query.filter(*self._filter) if self._joins: for join in self._joins: query = query.join(join) if self._order_by: query = query.order_by(*self._order_by) if self._loads: query = query.options(*self._loads) if self._limit: query = query.limit(self._limit) if self._offset: query = query.offset(self._offset) return query
Данный метод собирает ORM запрос.
Больше сказать мне нечего, всё показал, всё рассказал. Надеюсь, что эта статья даст читающим новые мысли по поводу паттерна Repository и то как с ним можно работать. Очень надеюсь на конструктивную критику и буду рад, если вы предложите свои реализации решения данной проблемы.
В этом репозитории вы можете посмотреть примеры использования.
Спасибо тебе, дорогой читатель, за интерес к моей статье!
