Comments 8
А можно узнать, что это за символ в самом конце и его значение?:return await handler(event, data)
Хорошая статья наконец-то увидел на примере работу с БД через middleware
В геттере сессия бд с миддлвари передается в кваргах, ее не нужно брать с диалог_менеджера. А вот в хендлере только с диалог_менеджера.
Спасибо за статью, есть ли пример работы с репозиторием, и unit of work, который прокидывается через мидлварь?
Спасибо за хороший отзыв, но увы с unit of work нету у меня работ((
Суть та же, что и с сессией, только передаёшь unit of work.
def create_sqlmodel_engine(settings: DatabaseSettings, **kwargs) -> Engine:
return create_engine(str(settings.dsn), **kwargs)
def sqlmodel_sessionmaker(settings: DatabaseSettings, engine: Engine) -> Callable[[], Session]:
return lambda: Session(bind=engine, autoflush=settings.autoflush)
class BaseSQLModel(SQLModel):
"""Базовый класс для всех моделей данных"""
__table_args__ = {'extend_existing': True}
class Config:
alias_generator = to_camel
validate_by_name = True
arbitrary_types_allowed = True
T = TypeVar('T', bound=BaseSQLModel)
class GenericRepository(Generic[T], ABC):
@abstractmethod
def get(self, id: EntityId) -> T | None: ...
@abstractmethod
def list(self, **filters) -> Sequence[T]: ...
@abstractmethod
def add(self, entity: T) -> T: ...
@abstractmethod
def update(self, entity_id: EntityId, entity: T) -> T: ...
@abstractmethod
def delete(self, id: EntityId) -> None: ...
class GenericSQLRepository(GenericRepository[T], ABC):
def __init__(self, session: Session, model_cls: Type[T]):
self._session = session
self._model_cls = model_cls
def _build_get_stmt(self, entity_id: EntityId) -> SelectOfScalar[T]:
return select(self._model_cls).where(self._model_cls.id == entity_id)
def get(self, id: EntityId) -> T | None:
return self._session.exec(self._build_get_stmt(id)).one_or_none()
# тут дальше реализация методов list/add/update/delete, где мы описываем
# повторяющуюся для всех репозиториев логику. Суть та же, работаем с model_cls
class UserBaseRepository(GenericRepository[User], ABC):
@abstractmethod
def get_by_telegram_id(self, telegram_id: int) -> User | None: ...
class UserRepository(GenericSQLRepository[User], UserBaseRepository):
def __init__(self, session: Session) -> None:
super().__init__(session, User)
def get_by_telegram_id(self, telegram_id: int) -> User | None:
stmt = select(User).where(User.telegram_id == telegram_id)
return self._session.exec(stmt).one_or_none()
class UnitOfWorkBase(ABC):
users: UserBaseRepository # здесь добавляются репозитории
def __enter__(self) -> 'UnitOfWorkBase':
return self
@abstractmethod
def __exit__(self, exc_type, exc_value, traceback) -> None: ...
@abstractmethod
def commit(self) -> None: ...
@abstractmethod
def rollback(self) -> None: ...
class UnitOfWork(UnitOfWorkBase):
def __init__(self, session_factory: Callable[[], Session]) -> None:
self._session_factory = session_factory
def __enter__(self) -> UnitOfWorkBase:
self._session = self._session_factory()
self.users = UserRepository(self._session) # И здесь добавляются репозитории
return super().__enter__()
def __exit__(self, exc_type, exc_value, traceback) -> None:
if exc_type:
self._session.rollback()
try:
self._session.commit()
except Exception:
self._session.rollback()
raise
def commit(self) -> None:
self._session.commit()
def rollback(self) -> None:
self._session.rollback()
class UnitOfWorkMiddleware(BaseMiddleware):
def __init__(self, settings: DatabaseSettings):
self.settings = settings
async def __call__(
self,
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: dict[str, Any],
) -> Any:
engine = create_sqlmodel_engine(self.settings)
sessionmaker = sqlmodel_sessionmaker(self.settings, engine)
with UnitOfWork(session_factory=sessionmaker) as uow:
data['uow'] = uow
return await handler(event, data)
Здесь используется SQLModel, но с SQLAlchemy будет аналогично. Если что, то могу скинуть код описания репозиториев и unit of work для алхимии.
Подумываю прикрутить сюда CQRS, но пока ещё не решился. Да и потребности пока такой нет.
Пример использования:
from aiogram.types import User as TelegramUser,
from m4dn355.state.sql.models import User as DBUser
async def create_user_from_telegram_bot(uow: UnitOfWorkBase, telegram_user: TelegramUser) -> DBUser:
new_user = DBUser(
username=telegram_user.username,
telegram_id=telegram_user.id,
telegram_username=telegram_user.username,
is_verified=True,
)
new_db_user = uow.users.add(new_user)
return new_db_user
async def get_or_create_telegram_user(
uow: UnitOfWorkBase,
telegram_user: TelegramUser,
) -> DBUser:
db_user = uow.users.get_by_telegram_id(telegram_user.id)
if db_user:
return db_user
return await create_user_from_telegram_bot(uow, telegram_user)
@commands_router.message(CommandStart())
async def command_start_handler(message: Message, uow: UnitOfWorkBase, dialog_manager: DialogManager) -> None:
"""/start - Приветственное сообщение"""
await get_or_create_telegram_user(uow, message.from_user)
await dialog_manager.start(MainState.START, mode=StartMode.RESET_STACK)
Надеюсь будет полезно.
Реализация взаимодействия с БД через Middleware в Telegram-ботах