Pull to refresh

Как сменить технологию и не закопаться в рефакторинге: опыт внедрения DDD в проект на FastAPI — Часть 1

Level of difficultyMedium
Reading time15 min
Views6.7K

Привет, хабравчане!

Я Дима, Python-разработчик из 21YARD, сервиса поиска строительных подрядчиков.

В серии статей расскажу, что такое DDD (domain-driven design) и какие у него преимущества и недостатки. Разберемся, когда применять подход и как сочетать его с FastAPI, популярным ASGI фреймворком на Python.

В первой части рассмотрим паттерны проектирования Repository и Unit of Work. С их помощью мы работаем через интерфейсы. Паттерны помогают в разделении кода на слои: основная логика приложения представляется внутренними слоями, а используемые технологии - внешними.

Во второй части обсудим, как привести проект к еще более масштабируемому и гибкому состоянию с помощью событийно-ориентированной архитектуры.

Оглавление

Для наглядности внедрим DDD в приложение для выставления оценок. В нем пользователь регистрируется, чтобы оценивать других людей, контролировать свой рейтинг. Пример кода ищите в репозитории по ссылке.

Пара слов о DDD

В основе DDD - Домен (Domain). Это модель предмета и его задач, под которые строится приложение. Счет, который оплачиваем, Сообщение, которое отправляем, или Пользователь, которому выставляем оценку.

Домены строятся на сущностях из реального мира и ложатся в центр приложения. От доменов зависят графический интерфейс, место хранения информации и другие составляющие приложения, связанные с реализациями и представленные во внешних слоях. В DDD нельзя спроектировать таблицы базы данных и потом подстроить под них домен - домен определяет реализацию БД.

DDD дает свободу изменения технологий. Сменить фреймворк или перейти с SQL на NoSQL-решение проще. Переход на новый инструмент меняет способ взаимодействия с доменом. Он сам, структура и логика приложения на его основе при этом не затрагиваются. Это работает потому, что домен - внутренний слой приложения, а база данных представлена в одном из внешних. В DDD внешние слои зависят от внутренних.

DDD diagram
DDD diagram

О паттернах Repository и Unit of Work

Изначально у приложения стандартная структура, которая соответствует лучшим практикам разработки с использованием FastAPI.

У нас три модели (src/users/models.py), которые отражают ключевые сущности программы:

  • Пользователь (UserModel);

  • Статистика пользователя (UserStatisticsModel);

  • История голосований (UserVoteModel).

class UserModel(Base):
    __tablename__ = 'users'

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String, unique=True)
    password: Mapped[str] = mapped_column(String)
    username: Mapped[str] = mapped_column(String, unique=True)


class UserStatisticsModel(Base):
    __tablename__ = 'users_statistics'

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey('users.id', onupdate='CASCADE', ondelete='CASCADE'),
        nullable=False
    )
    likes: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    dislikes: Mapped[int] = mapped_column(Integer, nullable=False, default=0)


class UserVoteModel(Base):
    __tablename__ = 'users_votes'

    id: Mapped[int] = mapped_column(primary_key=True)
    voted_for_user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey('users.id', onupdate='CASCADE', ondelete='CASCADE'),
        nullable=False
    )
    voting_user_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey('users.id', onupdate='CASCADE', ondelete='CASCADE'),
        nullable=False
    )

Модели активно используются в приложении и проходят через слои сервиса, зависимостей и эндпоинтов. Код хороший, тесты проходят, программа работает.

Однако мы построили архитектуру приложения так, что связь с ORM в его внутренних слоях неизбежна. Например, сервисный слой (src/users/service.py), чтобы достать данные пользователей, обязательно использует SQLAlchemy:

class UsersService:

    def __init__(self, session_factory: async_sessionmaker = default_session_factory) -> None:
        self._session_factory: async_sessionmaker = session_factory

    async def register_user(self, user: UserModel) -> UserModel:
        async with self._session_factory() as session:
            session.add(user)
            await session.flush()
            session.add(UserStatisticsModel(user_id=user.id))
            await session.commit()
            return user

    async def get_user_by_email(self, email: str) -> UserModel:
        async with self._session_factory() as session:
            user: Optional[UserModel] = (await session.scalars(select(UserModel).filter_by(email=email))).one_or_none()
            if not user:
                raise UserNotFoundError

            return user

...

"Ребята, мы больше не можем использовать SQLAlchemy, заказчик против ORM. Нам срочно нужно перейти на чистый SQL!" - с такими словами к вам придет проектный менеджер. Придется пройтись граблями по всему приложению, потратить десятки часов на рефакторинг. Всего лишь для того, чтобы после смены технологии программа работала.

Переход не был бы таким болезненным, если бы использовался DDD. Домен не был бы связан с реализацией хранения информации о нем в лице ORM. Это позволило бы использовать его по всему приложению и не опасаться, что потребуется вносить исправления по всей кодовой базе. Теперь модели (src/users/domain/models.py) выглядят так:

@dataclass
class UserModel(AbstractModel):
    email: str
    password: str
    username: str

    # Optional args:
    id: int = 0


@dataclass
class UserStatisticsModel(AbstractModel):
    user_id: int

    # Optional args:
    id: int = 0
    likes: int = 0
    dislikes: int = 0


@dataclass
class UserVoteModel(AbstractModel):
    voting_user_id: int  # Who votes
    voted_for_user_id: int  # Votes for who

    # Optional args:
    id: int = 0

Модель (src/core/interfaces/models.py) - базовый класс, от которого наследуются все прочие модели в приложении. В классе реализован единственный метод, который напоминает model_dump() из BaseModel Pydantic и переводит модели в формат python-словаря. Это позволяет передавать представление домена в другие объекты с помощью распаковки.

@dataclass
class AbstractModel(ABC):
    """
    Base model, from which any domain model should be inherited.
    """

    async def to_dict(
            self,
            exclude: Optional[Set[str]] = None,
            include: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:

        """
        Create a dictionary representation of the model.

        exclude: set of model fields, which should be excluded from dictionary representation.
        include: set of model fields, which should be included into dictionary representation.
        """

        data: Dict[str, Any] = asdict(self)
        if exclude:
            for key in exclude:
                try:
                    del data[key]
                except KeyError:
                    pass

        if include:
            data.update(include)

        return data

Структура базы данных теперь описана в отдельном файле src/users/adapters/orm.py:

users_table = Table(
    'users',
    mapper_registry.metadata,
    Column('id', Integer, primary_key=True, autoincrement=True, nullable=False, unique=True),
    Column('email', String, nullable=False, unique=True),
    Column('password', String, nullable=False),
    Column('username', String, nullable=False, unique=True),
)

users_statistics_table = Table(
    'users_statistics',
    mapper_registry.metadata,
    Column('id', Integer, primary_key=True, autoincrement=True, nullable=False, unique=True),
    Column(
        'user_id',
        Integer,
        ForeignKey('users.id', onupdate='CASCADE', ondelete='CASCADE'),
        nullable=False
    ),
    Column('likes', Integer, nullable=False, default=0),
    Column('dislikes', Integer, nullable=False, default=0),
)

users_votes_table = Table(
    'users_votes',
    mapper_registry.metadata,
    Column('id', Integer, primary_key=True, autoincrement=True, nullable=False, unique=True),
    Column(
        'voted_for_user_id',
        Integer,
        ForeignKey('users.id', onupdate='CASCADE', ondelete='CASCADE'),
        nullable=False
    ),
    Column(
        'voting_user_id',
        Integer,
        ForeignKey('users.id', onupdate='CASCADE', ondelete='CASCADE'),
        nullable=False
    )
)

Императивный стиль маппинга моделей в сравнении с декларативным кажется сложным. Однако в эффективности он не уступает. Нужно только привыкнуть.

Модели с SQLAlchemy теперь связываются за счет маппинга таким способом:

def start_mappers():
    """
    Map all domain models to ORM models, for purpose of using domain models directly during work with the database,
    according to DDD.
    """

    # Imports here not to ruin alembic logics. Also, only for mappers they needed:
    from src.users.domain.models import UserModel, UserStatisticsModel, UserVoteModel

    mapper_registry.map_imperatively(class_=UserModel, local_table=users_table)
    mapper_registry.map_imperatively(class_=UserStatisticsModel, local_table=users_statistics_table)
    mapper_registry.map_imperatively(class_=UserVoteModel, local_table=users_votes_table)

Функция вызывается на старте приложения в рамках указания жизненного цикла его составляющих (src/app.py):

@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator:
    """
    Runs events before application startup and after application shutdown.
    """

    # Startup events:
    engine: AsyncEngine = create_async_engine(DATABASE_URL)
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

    start_users_mappers()

    yield

    # Shutdown events:
    clear_mappers()

Реализуем паттерн Repository, который представляет собой объект для обращения к модели. Паттерн позволяет работать с базой данных через абстракции и инъекцию зависимостей. Это дает возможность менять реализацию взаимодействия с базой данных без необходимости внесения изменений в другой код. У нас будет интерфейс репозитория с самыми ходовыми методами (src/core/interfaces/repositories.py), от которого будут наследоваться репозитории с реализациями:

class AbstractRepository(ABC):
    """
    Interface for any repository, which would be used for work with domain model, according DDD.

    Main purpose is to encapsulate internal logic that is associated with the use of one or another data
    storage scheme, for example, ORM.
    """

    @abstractmethod
    async def add(self, model: AbstractModel) -> AbstractModel:
        raise NotImplementedError

    @abstractmethod
    async def get(self, id: int) -> Optional[AbstractModel]:
        raise NotImplementedError

    @abstractmethod
    async def update(self, id: int, model: AbstractModel) -> AbstractModel:
        raise NotImplementedError

    @abstractmethod
    async def delete(self, id: int) -> None:
        raise NotImplementedError

    @abstractmethod
    async def list(self) -> List[AbstractModel]:
        raise NotImplementedError

Мы работаем с SQLAlchemy и хотим реализовать репозитории с ее использованием. Создадим класс-наследник от абстрактного базового репозитория (src/core/database/interfaces/repositories.py):

class SQLAlchemyAbstractRepository(AbstractRepository, ABC):
    """
    Repository interface for SQLAlchemy, from which should be inherited all other repositories,
    which would be based on SQLAlchemy logics.
    """

    def __init__(self, session: AsyncSession) -> None:
        self._session: AsyncSession = session

В качестве интерфейса используем класс репозитория пользователей, чтобы не быть завязанными на SQLAlchemy (src/users/interfaces/repositories.py):

class UsersRepository(AbstractRepository, ABC):
    """
    An interface for work with users, that is used by users unit of work.
    The main goal is that implementations of this interface can be easily replaced in users unit of work
    using dependency injection without disrupting its functionality.
    """

    @abstractmethod
    async def get_by_email(self, email: str) -> Optional[UserModel]:
        raise NotImplementedError

    @abstractmethod
    async def get_by_username(self, username: str) -> Optional[UserModel]:
        raise NotImplementedError

    @abstractmethod
    async def add(self, model: AbstractModel) -> UserModel:
        raise NotImplementedError

    @abstractmethod
    async def get(self, id: int) -> Optional[UserModel]:
        raise NotImplementedError

    @abstractmethod
    async def update(self, id: int, model: AbstractModel) -> UserModel:
        raise NotImplementedError

    @abstractmethod
    async def list(self) -> List[UserModel]:
        raise NotImplementedError

Практическая реализация, связанная с SQLAlchemy (src/users/adapters/repositories.py):

class SQLAlchemyUsersRepository(SQLAlchemyAbstractRepository, UsersRepository):

    async def get(self, id: int) -> Optional[UserModel]:
        result: Result = await self._session.execute(select(UserModel).filter_by(id=id))
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> Optional[UserModel]:
        result: Result = await self._session.execute(select(UserModel).filter_by(email=email))
        return result.scalar_one_or_none()

    async def get_by_username(self, username: str) -> Optional[UserModel]:
        result: Result = await self._session.execute(select(UserModel).filter_by(username=username))
        return result.scalar_one_or_none()

    async def add(self, model: AbstractModel) -> UserModel:
        result: Result = await self._session.execute(
            insert(UserModel).values(**await model.to_dict(exclude={'id'})).returning(UserModel)
        )

        return result.scalar_one()

    async def update(self, id: int, model: AbstractModel) -> UserModel:
        result: Result = await self._session.execute(
            update(UserModel).filter_by(id=id).values(**await model.to_dict(exclude={'id'})).returning(UserModel)
        )

        return result.scalar_one()

    async def delete(self, id: int) -> None:
        await self._session.execute(delete(UserModel).filter_by(id=id))

    async def list(self) -> List[UserModel]:
        """
        Returning result object instead of converting to new objects by
                    [UserModel(**await r.to_dict()) for r in result.scalars().all()]
        to avoid sqlalchemy.orm.exc.UnmappedInstanceError lately.

        Checking by asserts, that expected return type is equal to fact return type.
        """

        result: Result = await self._session.execute(select(UserModel))
        users: Sequence[Row | RowMapping | Any] = result.scalars().all()

        assert isinstance(users, List)
        for user in users:
            assert isinstance(user, UserModel)

        return users

На каждую модель у нас отдельный репозиторий-интерфейс и репозиторий-реализация. На первый взгляд нарушается атомарность транзакций: в сервисном слое метод регистрации пользователя создавал объект пользователя и заготовку под статистику.

class UsersService:

    def __init__(self, session_factory: async_sessionmaker = default_session_factory) -> None:
        self._session_factory: async_sessionmaker = session_factory

    async def register_user(self, user: UserModel) -> UserModel:
        async with self._session_factory() as session:
            session.add(user)
            await session.flush()
            session.add(UserStatisticsModel(user_id=user.id))
            await session.commit()
            return user
...

На самом деле атомарность сохраняется, поскольку при инициализации репозитория, отнаследованного от SQLAlchemyAbstractRepository, передается объект сессии из SQLAlchemy. Можно передать одну сессию в оба репозитория и, если операции выполнены успешно, совершить коммит за пределами обоих репозиториев.

Такая инициализация репозитория заставит передавать в них сессию, что приведет к зависимости сервисного слоя от SQLAlchemy. Эту проблему решает паттерн Unit of Work.

Название паттерна Unit of Work намекает на его задачу управлять атомарностью операций.

В Python не беспокоиться о ручном освобождении ресурса позволяет контекстный менеджер. Используем его в реализации Unit of Work.

Логика работы с Unit of Work похожа на работу с репозиториями: есть базовый класс и классы реализации. Отличие в том, что меняются не методы, а атрибуты класса - репозитории. Посмотрим на базовый класс Unit of Work (src/core/interfaces/units_of_work.py):

class AbstractUnitOfWork(ABC):
    """
    Interface for any units of work, which would be used for transaction atomicity, according DDD.
    """

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(self, *args, **kwargs) -> None:
        await self.rollback()

    @abstractmethod
    async def commit(self) -> None:
        raise NotImplementedError

    @abstractmethod
    async def rollback(self) -> None:
        raise NotImplementedError

Помимо "магических" методов aenter и aexit, необходимых для реализации механизма асинхронного контекстного менеджера, есть всего два метода:

  • commit для подтверждения операций с репозиториями;

  • rollback для отката изменений, если что-то пошло не так.

Рассмотрим абстрактный класс Unit of Work, связанный с SQLAlchemy. В дальнейшем наследовать реализации Unit of Work будем от него (src/core/database/interfaces/units_of_work.py):

class SQLAlchemyAbstractUnitOfWork(AbstractUnitOfWork):
    """
    Unit of work interface for SQLAlchemy, from which should be inherited all other units of work,
    which would be based on SQLAlchemy logics.
    """

    def __init__(self, session_factory: async_sessionmaker = default_session_factory) -> None:
        super().__init__()
        self._session_factory: async_sessionmaker = session_factory

    async def __aenter__(self) -> Self:
        self._session: AsyncSession = self._session_factory()
        return await super().__aenter__()

    async def __aexit__(self, *args, **kwargs) -> None:
        await super().__aexit__(*args, **kwargs)
        await self._session.close()

    async def commit(self) -> None:
        await self._session.commit()

    async def rollback(self) -> None:
        """
        Rollbacks all uncommited changes.

        Uses self._session.expunge_all() to avoid sqlalchemy.orm.exc.DetachedInstanceError after session rollback,
        due to the fact that selected object is cached by Session. And self._session.rollback() deletes all Session
        cache, which causes error on Domain model, which is not bound now to the session and can not retrieve
        attributes.

        https://pythonhint.com/post/1123713161982291/how-does-a-sqlalchemy-object-get-detached
        """

        self._session.expunge_all()
        await self._session.rollback()

Особенность класса в том, что он связан с SQLAlchemy за счет объекта AsyncSession, необходимого репозиториями-реализациям.

Как и в случае с репозиториями, интерфейс, с которым будет работать сервисный слой, даст возможность не создавать связь с реализацией (src/users/interfaces/units_of_work.py):

class UsersUnitOfWork(AbstractUnitOfWork, ABC):
    """
    An interface for work with users, that is used by service layer of users module.
    The main goal is that implementations of this interface can be easily replaced in the service layer
    using dependency injection without disrupting its functionality.
    """

    users: UsersRepository
    users_statistics: UsersStatisticsRepository
    users_votes: UsersVotesRepository

Особенность таких интерфейсов - наличие классовых атрибутов для обращения к репозиториям при работе с Unit of Work.

Создадим конкретную реализацию, необходимую для работы приложения (src/users/units_of_work.py):

class SQLAlchemyUsersUnitOfWork(SQLAlchemyAbstractUnitOfWork, UsersUnitOfWork):

    async def __aenter__(self) -> Self:
        uow = await super().__aenter__()
        self.users: UsersRepository = SQLAlchemyUsersRepository(session=self._session)
        self.users_statistics: UsersStatisticsRepository = SQLAlchemyUsersStatisticsRepository(session=self._session)
        self.users_votes: UsersVotesRepository = SQLAlchemyUsersVotesRepository(session=self._session)
        return uow

В реализации инициализируем необходимые репозитории-реализации. При необходимости, при создании экземпляра SQLAlchemyUsersUnitOfWork мы можем передать отличный от дефолтного session_factory, экземпляр async_sessionmaker из SQLAlchemy, в метод init.

Рассмотрим сервисный слой, который больше не зависит от реализации и является обособленной боевой единицей (src/users/service.py):

class UsersService:
    """
    Service layer core according to DDD, which using a unit of work, will perform operations on the domain model.
    """

    def __init__(self, uow: UsersUnitOfWork) -> None:
        self._uow: UsersUnitOfWork = uow

    async def register_user(self, user: UserModel) -> UserModel:
        async with self._uow as uow:
            user = await uow.users.add(model=user)
            await uow.users_statistics.add(
                model=UserStatisticsModel(
                    user_id=user.id
                )
            )

            await uow.commit()
            return user

    async def get_user_by_email(self, email: str) -> UserModel:
        async with self._uow as uow:
            user: Optional[UserModel] = await uow.users.get_by_email(email)
            if not user:
                raise UserNotFoundError

            return user

Теперь сервисный слой работает с доменами через Unit of Work, который получает через внедрение зависимостей в момент инициализации. Мы можем подменить его, создав поддельный Unit of Work, что будет понятнее, чем магический MockObject со своей внутрянкой (tests/users/fake_objects.py):

class FakeUsersUnitOfWork(UsersUnitOfWork):

    def __init__(
            self,
            users_repository: UsersRepository,
            users_statistics_repository: UsersStatisticsRepository,
            users_votes_repository: UsersVotesRepository
    ) -> None:

        super().__init__()
        self.users: UsersRepository = users_repository
        self.users_statistics: UsersStatisticsRepository = users_statistics_repository
        self.users_votes: UsersVotesRepository = users_votes_repository
        self.committed: bool = False

    async def commit(self) -> None:
        self.committed = True

    async def rollback(self) -> None:
        pass

При тестировании сервиса передадим поддельный Unit of Work при инициализации (tests/users/unit/test_service.py):

@pytest.mark.anyio
async def test_users_service_register_user_success() -> None:
    users_unit_of_work: UsersUnitOfWork = FakeUsersUnitOfWork(
        users_repository=FakeUsersRepository(),
        users_statistics_repository=FakeUsersStatisticsRepository(),
        users_votes_repository=FakeUsersVotesRepository()
    )
    users_service: UsersService = UsersService(uow=users_unit_of_work)

    assert len(await users_repository.list()) == 0
    user: UserModel = UserModel(**FakeUserConfig().to_dict(to_lower=True))
    await users_service.register_user(user=user)
    assert len(await users_repository.list()) == 1
    assert len(await users_statistics_repository.list()) == 1

Репозитории тоже можем подделать, поскольку Unit of Work работает с интерфейсами, которые ему определили (tests/users/fake_objects.py):

class FakeUsersRepository(UsersRepository):

    def __init__(self, users: Optional[Dict[int, UserModel]] = None) -> None:
        self.users: Dict[int, UserModel] = users if users else {}

    async def get(self, id: int) -> Optional[UserModel]:
        return self.users.get(id)

    async def get_by_email(self, email: str) -> Optional[UserModel]:
        for user in self.users.values():
            if user.email == email:
                return user

        return None

    async def get_by_username(self, username: str) -> Optional[UserModel]:
        for user in self.users.values():
            if user.username == username:
                return user

        return None

    async def add(self, model: AbstractModel) -> UserModel:
        user: UserModel = UserModel(**await model.to_dict())
        self.users[user.id] = user
        return user

    async def update(self, id: int, model: AbstractModel) -> UserModel:
        user: UserModel = UserModel(**await model.to_dict())
        if id in self.users:
            self.users[id] = user

        return user

    async def delete(self, id: int) -> None:
        if id in self.users:
            del self.users[id]

    async def list(self) -> List[UserModel]:
        return list(self.users.values())

Подведем промежуточные итоги

Преимущества подхода:

  • Приложение не зависит от конкретной технологии, в связи с чем ее легко заменить без сложного рефакторинга;

  • Работа ведется через интерфейсы, а не реализации, что позволяет менять функционал приложения и не редактировать множество файлов;

  • Компоненты тестируются через интерфейсы и за счет внедрения зависимостей. Можно просто замокать внедряемый объект;

  • Проект масштабируется в едином стиле;

  • Код можно переиспользовать, поскольку он не привязан к конкретным реализациям.

Недостатки подхода:

  • Нужно написать и поддерживать много абстракций и интерфейсов;

  • Новым участникам сложно входить в проект, так как нужно разбираться во взаимодействии нескольких уровней абстракций;

  • Код работает медленнее из-за множественных абстракций, накладывающихся друг на друга.

DDD полезен в больших длительных проектах, когда требования меняются, технологии устаревают, приложение масштабируется. В статичных проектах, где задачи, их объем и решения не меняются, использовать подход нецелесообразно.

Изучить подробнее DDD можно по книге Персиваля Г., Грегори Б. - "Паттерны разработки на Python: TDD, DDD и событийно-ориентированная архитектура".

Услышимся во второй части.

Буду благодарен поддержке и конструктивной критике!

Tags:
Hubs:
Total votes 17: ↑16 and ↓1+17
Comments15

Articles