All streams
Search
Write a publication
Pull to refresh

Comments 8

А можно узнать, что это за символ в самом конце и его значение?:
return await handler(event, data)

Он случайно поставился когда я копировал код с VsCode, он не какую смысловую нагрузку не несёт. Я их удалю чтобы не мешали другим, спасибо что указали на ошибку

UFO landed and left these words here

Хорошая статья наконец-то увидел на примере работу с БД через middleware

В геттере сессия бд с миддлвари передается в кваргах, ее не нужно брать с диалог_менеджера. А вот в хендлере только с диалог_менеджера.

Спасибо за статью, есть ли пример работы с репозиторием, и unit of work, который прокидывается через мидлварь?

Суть та же, что и с сессией, только передаёшь unit of work.

def create_sqlmodel_engine(settings: DatabaseSettings, **kwargs) -> Engine:
    return create_engine(str(settings.dsn), **kwargs)


def sqlmodel_sessionmaker(settings: DatabaseSettings, engine: Engine) -> Callable[[], Session]:
    return lambda: Session(bind=engine, autoflush=settings.autoflush)


class BaseSQLModel(SQLModel):
    """Базовый класс для всех моделей данных"""
    __table_args__ = {'extend_existing': True}

    class Config:
        alias_generator = to_camel
        validate_by_name = True
        arbitrary_types_allowed = True


T = TypeVar('T', bound=BaseSQLModel)


class GenericRepository(Generic[T], ABC):
    @abstractmethod
    def get(self, id: EntityId) -> T | None: ...

    @abstractmethod
    def list(self, **filters) -> Sequence[T]: ...

    @abstractmethod
    def add(self, entity: T) -> T: ...

    @abstractmethod
    def update(self, entity_id: EntityId, entity: T) -> T: ...

    @abstractmethod
    def delete(self, id: EntityId) -> None: ...


class GenericSQLRepository(GenericRepository[T], ABC):
    def __init__(self, session: Session, model_cls: Type[T]):
        self._session = session
        self._model_cls = model_cls

    def _build_get_stmt(self, entity_id: EntityId) -> SelectOfScalar[T]:
        return select(self._model_cls).where(self._model_cls.id == entity_id)

    def get(self, id: EntityId) -> T | None:
        return self._session.exec(self._build_get_stmt(id)).one_or_none()
    # тут дальше реализация методов list/add/update/delete, где мы описываем
    # повторяющуюся для всех репозиториев логику. Суть та же, работаем с model_cls



class UserBaseRepository(GenericRepository[User], ABC):
    @abstractmethod
    def get_by_telegram_id(self, telegram_id: int) -> User | None: ...


class UserRepository(GenericSQLRepository[User], UserBaseRepository):
    def __init__(self, session: Session) -> None:
        super().__init__(session, User)

    def get_by_telegram_id(self, telegram_id: int) -> User | None:
        stmt = select(User).where(User.telegram_id == telegram_id)
        return self._session.exec(stmt).one_or_none()


class UnitOfWorkBase(ABC):
    users: UserBaseRepository  # здесь добавляются репозитории

    def __enter__(self) -> 'UnitOfWorkBase':
        return self

    @abstractmethod
    def __exit__(self, exc_type, exc_value, traceback) -> None: ...

    @abstractmethod
    def commit(self) -> None: ...

    @abstractmethod
    def rollback(self) -> None: ...


class UnitOfWork(UnitOfWorkBase):
    def __init__(self, session_factory: Callable[[], Session]) -> None:
        self._session_factory = session_factory

    def __enter__(self) -> UnitOfWorkBase:
        self._session = self._session_factory()
        self.users = UserRepository(self._session)  # И здесь добавляются репозитории
        return super().__enter__()

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        if exc_type:
            self._session.rollback()
        try:
            self._session.commit()
        except Exception:
            self._session.rollback()
            raise

    def commit(self) -> None:
        self._session.commit()

    def rollback(self) -> None:
        self._session.rollback()


class UnitOfWorkMiddleware(BaseMiddleware):
    def __init__(self, settings: DatabaseSettings):
        self.settings = settings

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        engine = create_sqlmodel_engine(self.settings)
        sessionmaker = sqlmodel_sessionmaker(self.settings, engine)

        with UnitOfWork(session_factory=sessionmaker) as uow:
            data['uow'] = uow
            return await handler(event, data)

Здесь используется SQLModel, но с SQLAlchemy будет аналогично. Если что, то могу скинуть код описания репозиториев и unit of work для алхимии.

Подумываю прикрутить сюда CQRS, но пока ещё не решился. Да и потребности пока такой нет.

Пример использования:

from aiogram.types import User as TelegramUser,
from m4dn355.state.sql.models import User as DBUser


async def create_user_from_telegram_bot(uow: UnitOfWorkBase, telegram_user: TelegramUser) -> DBUser:
    new_user = DBUser(
        username=telegram_user.username,
        telegram_id=telegram_user.id,
        telegram_username=telegram_user.username,
        is_verified=True,
    )
    new_db_user = uow.users.add(new_user)
    return new_db_user


async def get_or_create_telegram_user(
    uow: UnitOfWorkBase,
    telegram_user: TelegramUser,
) -> DBUser:
    db_user = uow.users.get_by_telegram_id(telegram_user.id)
    if db_user:
        return db_user
    return await create_user_from_telegram_bot(uow, telegram_user)

  
@commands_router.message(CommandStart())
async def command_start_handler(message: Message, uow: UnitOfWorkBase, dialog_manager: DialogManager) -> None:
    """/start - Приветственное сообщение"""
    await get_or_create_telegram_user(uow, message.from_user)
    await dialog_manager.start(MainState.START, mode=StartMode.RESET_STACK)

Надеюсь будет полезно.

Sign up to leave a comment.

Articles