Салют Хабр! Меня зовут Влад и я хотел бы осветить довольно важную тему, которую я не видел в обучающих материалах, а именно сложные запросы через паттерн Repository.

Проблема: построение гибких SQLAlchemy запросов на основе данных, переданных из бизнес логики. Более масштабное использование паттерна Repository.

Реше��ие(то, о чём говорится в этой статье): все запросы формируются посредством конфигов, переданных из бизнес логики, далее конфиги обрабатываются и из них составляется ORM запрос.

Схема работы реализации:

Добавляются 3 новых слоя: Configs, ORM Query Builder, Operator Agregate.
Добавляются 3 новых слоя: Configs, ORM Query Builder, Operator Agregate.

Configs - это правила, по которым в бизнес логику должны вернуться данные.

Актуальность для читателя: я думаю, что эта статья способна показать работу с паттерном Repository под другим углом, показать как можно строить гибкие запросы без создания кучи методов в имплементациях и потом описывания этих же методов в интерфейсах.

Пожалуй, можно начинать.

Все вы видели те самые классы, которые мелькают в видео на Youtube.

from typing import Generic, TypeVar, Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

DTO = TypeVar("DTO")

class SQLAlchemyRepository(Generic[DTO]):
     
     def __init__(self, session, model):
          self._model = model
          self._session = session

     async def get_by_filter(self, filters: dict[str, Any]) -> DTO | None:
          query = select(self._model).filter_by(**filters)
          data = await self._session.execute(query)
          
          data_scalar = data.scalar()
          if data_scalar is None:
               return None
          
          return data_scalar.to_dto()
     
     async def get_many_by_filter(self, filters: dict[str, Any]) -> list[DTO]:
          query = select(self._model).filter_by(**filters)
          data = await self._session.execute(query)
          
          data_all = data.scalars().all()
          
          if not data_all:
               return []
          return [sqal_model.to_dto() for sqal_model in data_all]

При росте вашего проекта вам понадобятся более кастомизированные запросы в бд и логики этих методов хватать не будет, соответственно вам придётся расширяться.

Как уже упоминалось в Решение используются конфиги, думаю, сейчас лучший момент, чтобы их показать, но для начала нужно объявить структуру проекта.

repository/
  builder_configs/
      __init__.py
      configs.py
      types.py

Рассмотрим содержимое файла configs.py

from dataclasses import dataclass, field
from typing import Any
from .types import FilterType, OrderByType, LazyLoadType, FilterLogicType, UNSET


@dataclass
class Filter:
     column: str
     value: Any
     filter_type: FilterType = FilterType.EQ


@dataclass
class FilterConfig:
     filters: list[Filter]
     mode: FilterLogicType = FilterLogicType.NULL

@dataclass
class OrderByConfig:
     column: str
     mode: OrderByType = OrderByType.ASC


@dataclass
class ColumnConfig:
     column: str
     label: str | UNSET = UNSET
     value: Any = UNSET
     filter_type: FilterType = FilterType.EQ
     value_is_column: bool = False


@dataclass
class JoinConfig:
     table_name: str
     columns: list[ColumnConfig] = field(default_factory=list)
     filters: list[FilterConfig] = field(default_factory=list)
     order_by: list[OrderByConfig] = field(default_factory=list)


@dataclass
class LazyLoadConfig:
     relationship_strategy: str
     load_type: LazyLoadType = LazyLoadType.JOINEDLOAD

Хочу поподробней остановиться на конфигах.

  • Filter - простой в реализации dataclass, который принимает на вход поле, значение и необходимый оператор.

  • FilterConfig - в него можно передавать несколько фильтров и применять к ним логическое условие OR, AND. В случае если условие передано не будет (NULL), то все агрегированные объекты будут передавать в where позиционно.

  • OrderByConfig - сортировка поля по заданному режиму (ASC, DESC)

  • ColumnConfig - этот dataclass позволяет включать в SQL запрос выборку отдельных колонок, задавать им label, также он позволяет делать булевые выборки, также есть поддержка добаdления в булевую выборку полей из таблиц. Для этого нужно: value=table_name.column, value_is_column=True.

  • JoinConfig - позволяет присоединять другие таблицы. Переданные атрибуты columns, filters, order_by будут использованы для конкретной таблицы table_name.

  • LazyLoadConfig - поддержка подгрузки relationship. Чтобы подгрузить нужно указать название relationhip атрибута. Если же вы хотите получить вложенное отношение, то нужно указывать relationship по цепочке через .

    • Пример вложенных отношений:

      Допустим, есть модель User и она имеет отношение с таблицей Item через атрибут item, также Item связана с другой таблицей Collections посредством отношения collections и вот здесь и появляются вложенные отношения. Представим, что выполняется запрос в таблицу User и понадобилось подтянуть отношение item и к нему collections. В таком случае relationship_strategy=item.collections. Более подробно это будет рассмотрено при разборе реализации метода load класса SQLAlchemyQueryBuilder.

Теперь можно посмотреть на содержание файла types.py

from typing import NewType
from enum import Enum

UNSET = NewType("UNSET", None)

class FilterLogicType(Enum):
     AND = "and"
     OR = "or"
     NULL = "null"

class FilterType(Enum):
     EQ = "="
     GT = ">"
     GE = ">="
     LT = "<"
     LE = "<="
     IN = "in"
     VECTOR_SORT = "vector_sort"

  
class OrderByType(Enum):
     ASC = "asc"
     DESC = "desc"


class LazyLoadType(Enum):
     JOINEDLOAD = "joinedload"
     SELECTINLOAD = "selectinload"

Всё, что находится в FilterType должен содержать Operator Agregate.

Теперь я хотел бы затронуть сам ORM Query Builder. Структура папок в свою очередь принимает такой вид:

repository/
  builder_configs/
      __init__.py
      configs.py
      types.py

  query_builder/
      __init__.py
      interface.py
      impl/
          __init__.py
          alchemy.py

Интерфейс для Query Builder (query_builder/interface.py)

from typing import Protocol, Any
from typing_extensions import Self
from repository.builder_configs.configs import (
  FilterConfig,
  ColumnConfig,
  JoinConfig,
  LazyLoadConfig,
  OrderByConfig
)
from .agregator.interface import AgregateFilterTypeProtocol


class QueryBuilderProtocol(Protocol):
     
     def init(
          self, 
          filter_agregate: AgregateFilterTypeProtocol, 
          *args, 
          **kwargs
     ) -> None:
          self._filter_agregate = filter_agregate
          
     def count(self) -> Self:
          raise NotImplementedError
     
     def columns(self, columns: list[ColumnConfig], *args, *kwargs) -> Self:
          raise NotImplementedError
     
     def filter(self, filters: list[FilterConfig], *args, *kwargs) -> Self:
          raise NotImplementedError
     
     def join(self, joins: list[JoinConfig]) -> Self:
          raise NotImplementedError
     
     def load(self, loads: list[LazyLoadConfig], *args, *kwargs) -> Self:
          raise NotImplementedError
     
     def order_by(self, order_by: list[OrderByConfig], *args, *kwargs) -> Self:
          raise NotImplementedError
     
     def values(self, data: dict[str, Any] | list[dict[str, Any]]) -> Self:
          raise NotImplementedError
     
     def limit(self, value: int | None) -> Self:
          raise NotImplementedError
     
     def offset(self, value: int | None) -> Self:
          raise NotImplementedError
     
     def build(self) -> Any:
          raise NotImplementedError

В магическом методе __init__ вы можете заметить AgregateFilterTypeProtocol данный протокол нужен для агрегации операторов.

С появлением агрегатора структура папок примет такой вид:

repository/
  builder_configs/
      __init__.py
      configs.py
      types.py

  query_builder/
      __init__.py
      interface.py
      impl/
          __init__.py
          alchemy.py
      agregator/
          __init__.py
          interface.py
          impl/
              __init__.py
              alchemy.py

Ниже вы можете увидеть интерфейс агрегатора (query_builder/agregator/interface.py)

from typing import Protocol, Any
from repository.builder_configs.types import FilterType


class AgregateFilterTypeProtocol(Protocol):
     
     def filter_agregate(
          self, 
          value: Any, 
          filter_type: FilterType, 
          *args,
          **kwargs
     ) -> Any:
          raise NotImplementedError

Реализация агрегатора (query_builder/agregator/impl/alchemy.py)

from typing import Any
from sqlalchemy import func
from sqlalchemy.orm.properties import MappedColumn
from repository.builder_configs.types import FilterType


class SQLAlchemyAgregateFilterType:
     
     def filter_agregate(
          self, 
          value: Any, 
          filter_type: FilterType, 
          mapped_column: MappedColumn[Any],
          *args,
          **kwargs
     ) -> Any:
          operator_with_lambda = {
               "=": lambda column, value: column == value,
               ">": lambda column, value: column > value,
               ">=": lambda column, value: column >= value,
               "<": lambda column, value: column < value,
               "<=": lambda column, value: column <= value,
               "in": lambda column, value: column.in_(value),
               "vector_sort": lambda column, value: column.op("@@")(func.websearch_to_tsquery(str(value)))
          }
          if filter_type.value not in operator_with_lambda:
               raise ValueError(f"Not found operator {filter_type.value}")
          return operator_with_lambda[filter_type.value](mapped_column, value)

В реализации вам может быть непонятен аргумент mapped_column - это экземпляр класса, полученный из атрибута ORM модели.

Ну и теперь гвоздь программы SQLAlchemyQueryBuilder. Я буду объяснять каждый метод постепенно.

from typing import Callable, Any, TypeVar
from typing_extensions import Self

from sqlalchemy.orm.properties import MappedColumn
from sqlalchemy.sql.elements import ColumnElement, Label, UnaryExpression
from sqlalchemy.orm.strategy_options import _AbstractLoad
from sqlalchemy import or_, and_, func, asc, desc
from sqlalchemy.orm import MappedColumn, joinedload, selectinload

from db.models import Base
from repository.builder_configs.configs import (
     FilterConfig,
     ColumnConfig,
     JoinConfig,
     OrderByConfig,
     LazyLoadConfig
)
from repository.builder_configs.types import LazyLoadType, OrderByType, FilterLogicType, UNSET
from repository.query_builder.agregator.impl.alchemy import SQLAlchemyAgregateFilterType


ALCHEMYMODEL = TypeVar("ALCHEMYMODEL", bound=Base)


class SQLAlchemyQueryBuilder:
     def __init__(
          self,
          query_type: Callable,
          model: type[ALCHEMYMODEL],
          filter_agregate: SQLAlchemyAgregateFilterType,
          *args,
          **kwargs
     ) -> None:
          self._query_type: Callable = query_type
          self._filter_agregate: SQLAlchemyAgregateFilterType = filter_agregate
          self._model: type[ALCHEMYMODEL] = model
          
          self._columns: list[MappedColumn[Any] | ColumnElement[bool] | Label[Any]] = []
          self._limit: int | None = None
          self._offset: int | None = None
          self._joins: list[type[ALCHEMYMODEL]] = []
          self._filter: list[ColumnElement[bool]] = []
          self._order_by: list[UnaryExpression] = []
          self._values: list[dict[str, Any]] = []
          self._loads: list[_AbstractLoad] = []
          self._count: bool = False
          self._orm_models: dict[str, type["Base"]] = model.orm_models()

query_type - тип выполняемого запроса: select, update и тп.

model - orm модель

После создания экземпляра этого класса и при вызове методов данные накапливаются в атрибутах и при вызове метода build строится ORM запрос.

Здесь можно обратить внимание на атрибут _orm_models, исходя из его аннотации можно сказать, что он хранит все зарегистрированные sqlalchemy модели.

Реализация метода orm_models

class Base(DeclarativeBase):
     _cached_orm_models: dict[str, type["Base"]] = {}
     
     @classmethod
     def orm_models(cls) -> dict[str, type["Base"]]:
          if not cls._cached_orm_models:
               models = {}
               for mapper in cls.registry.mappers:
                    models[mapper.class_.__tablename__] = mapper.class_
               cls._cached_orm_models.update(models)
          return cls._cached_orm_models

Теперь можно перейти к самим методам класса SQLAlchemyQueryBuilder.

count

def count(self) -> Self:
    self._count = True
    return self

Позволяет получать количество записей.

columns

def columns(
     self, 
     columns: list[ColumnConfig] = [], 
     model: type[ALCHEMYMODEL] | None = None,
     *args,
     **kwargs
) -> Self:
     if columns:
          if model is None:
               model = self._model
                    
          for config in columns:
               obj = getattr(model, config.column, None)
               if (obj is None):
                    raise ValueError(f"model {model.__name__} has not column {config.column}")
                    
               if config.value is not UNSET:
                    if config.value_is_column is True:
                         table_name, column_name = config.value.split(".")
                         orm_object = self._orm_models.get(table_name)

                         if (hasattr(orm_object, column_name) is False):
                              raise ValueError(f"model {orm_object} has not column {column_name}")

                         config.value = getattr(orm_object, column_name)
                              
                    obj = self._filter_agregate.filter_agregate(
                         mapped_column=obj,
                         value=config.value,
                         filter_type=config.filter_type
                    )
                         
               if config.label is not None:
                    obj = obj.label(config.label)
               self._columns.append(obj)
          return self

В начале функции проверяется, была ли передана модель в функцию, это нужно, чтобы переиспользовать метод для разных моделей. Дальше - итерация, в которой из объекта model достаётся атрибут config.column. После идёт проверка на переданное value и если оно является полем в другой таблице, то строка распиливается по . и из _orm_models достаётся нужная orm модель, после в config.value записывается экземпляр класса MappedColumn полученный из атрибута column_name. Дальше работает агрегатор и как вы можете заметить в config.value может лежать как произвольное значение так и MappedColumn, далее это добавление label ну и окончательное действие - запись получившегося объекта в список _columns.

filter

def filter(
     self, 
     filters: list[FilterConfig] = [],
     model: type[ALCHEMYMODEL] | None = None,
     *args,
     **kwargs
) -> Self:
     if filters:
          if model is None:
               model = self._model
                    
          configs = []
          for filter in filters:
               mode = None
               if filter.mode == FilterLogicType.OR:
                    mode = or_
               elif filter.mode == FilterLogicType.AND:
                    mode = and_
                    
               elements_objects = []
               for filter_ in filter.filters:
                    obj = getattr(model, filter_.column, None)
                    if (obj is None):
                         raise ValueError(f"model {model.__name__} has not column {filter_.column}")
                         
                    column_element = self._filter_agregate.filter_agregate(
                         mapped_column=obj,
                         filter_type=filter_.filter_type,
                         value=filter_.value
                    )
                    elements_objects.append(column_element)
                         
               if mode is None:
                    configs.extend(elements_objects)
               else:
                    configs.append(mode(*elements_objects))
                         
          if configs:
               self._filter.extend(configs)
     return self

Итерация по FilterConfig, определение логического условия, после чего происходит итерация по Filter, в этом блоке кода также достаётся MappedColumn и отправляется в агрегатор. В element_objects остаются все аргегированные Filter, в configs - FilterConfig. После завершения основного цикла изменения отправляются в атрибут _filters.  

order_by

def order_by(
     self, 
     order_by: list[OrderByConfig] = [], 
     model: type[ALCHEMYMODEL] | None = None,
     *args,
     **kwargs
) -> Self:
     if order_by:
          if model is None:
               model = self._model
                    
          for oby in order_by:
               oby_mode = asc
               if oby.mode == OrderByType.DESC:
                    oby_mode = desc
                    
               obj = getattr(model, oby.column, None)
               if (obj is None):
                    raise ValueError(f"model {model.__name__} has not column {oby.column}")
                    
               self._order_by.append(oby_mode(obj))
     return self

Я думаю, что здесь объяснения излишни так как используются всё те же принципы, что и в предыдущих методах.

join

def join(self, joins: list[JoinConfig]) -> Self:
     if joins:
          for join in joins:
               orm_object = self._orm_models.get(join.table_name)
               
               if join.columns:
                    self.columns(
                         columns=join.columns,
                         model=orm_object
                    )
               if join.filters:
                    self.filter(
                         filters=join.filters,
                         model=orm_object
                    )
               if join.order_by:
                    self.order_by(
                         order_by=join.order_by,
                         model=orm_object
                    )
               self._joins.append(orm_object)
     return self

Здесь можно заметить, что переиспользуются методы filter, columns, order_by, только

вместо изначально переданной в init orm модели, будет использована модель, название

которой указано в конфиге, объект этой модели достаётся из _orm_models.

load

def load(
     self, 
     loads: list[LazyLoadConfig], 
     model: type[Base] | None = None,
     *args,
     **kwargs
) -> Self:
     if loads:
          for load in loads:
               load_mode = joinedload
               if load.load_type == LazyLoadType.SELECTINLOAD:
                    load_mode = selectinload
                     
               current_model = model  
               if model is None:
                    current_model = self._model
               
               current_load = None
               relationship_path = load.relationship_strategy.split(".")
                    
               for part in relationship_path:
                    relationship_declared = getattr(current_model, part, None)
                    if (relationship_declared is None):
                         raise ValueError(f"Error in relationship path: model {current_model.__name__} has no relationship {part}")
                    
                    current_model = relationship_declared.mapper.class_
                    if current_load is None:
                         current_load = load_mode(relationship_declared)
                    else:
                         current_load = getattr(current_load, load_mode.__name__)(relationship_declared)
                              
               if current_load:
                    self._loads.append(current_load)
          return self

Здесь я бы хотел объяснить как именно обрабатывается relationship_strategy и как собираются вложенные relationship. Переменная relationship_path хранит все отношения, из которых составляется цепочка, итерируясь по ним, relationship_declared получает экземпляр класса _RelationshipDeclared и после current_model перезаписывается на модель, с которой связано отношение. После чего формируется current_load, если оно не было до этого записано, то просто передаём отношение в функцию load_mode, иначе мы вызываем функцию load_mode у current_load. Пример: joinedload(part1_relationship).joinedload(part2_relationship). Именно так и строятся вложенные relationship.

Теперь покажу несколько легковесных методов, в которых я думаю вы уже можете разобраться самостоятельно.

values, limit, offset

def values(
     self,
     data: dict[str, Any] | list[dict[str, Any]]
) -> Self:
     if isinstance(data, dict):
          self._values.append(data)
     elif isinstance(data, list):
          self._values.extend(data)
     return self

    
def limit(
     self,
     value: int | None = None
) -> Self:
     if value:
          self._limit = value
     return self
     
     
def offset(
     self,
     value: int | None = None
) -> Self:
     if value:
          self._offset = value
     return self

build

def build(
     self
) -> Any:
     query = self._query_type(self._model)

     if self._columns:
          query = self._query_type(*self._columns).select_from(self._model)
               
     if self._count:
          count = func.count()
          if self._columns:
               count = func.count(*self._columns)
                    
          query = self._query_type(count).select_from(self._model)
               
     if self._values:
          query = query.values(self._values)
               
     if self._filter:
               query = query.filter(*self._filter)
               
     if self._joins:
          for join in self._joins:
               query = query.join(join)
                    
     if self._order_by:
          query = query.order_by(*self._order_by)
               
     if self._loads:
          query = query.options(*self._loads)
               
     if self._limit:
          query = query.limit(self._limit)
          
     if self._offset:
          query = query.offset(self._offset)
     return query

Данный метод собирает ORM запрос.

Больше сказать мне нечего, всё показал, всё рассказал. Надеюсь, что эта статья даст читающим новые мысли по поводу паттерна Repository и то как с ним можно работать. Очень надеюсь на конструктивную критику и буду рад, если вы предложите свои реализации решения данной проблемы.

В этом репозитории вы можете посмотреть примеры использования.

Спасибо тебе, дорогой читатель, за интерес к моей статье!