Как стать автором
Обновить

Оптимистическая блокировка коллекций Агрегата при использовании Doctrine

Время на прочтение9 мин
Количество просмотров5.3K

Проектирование своего проекта по DDD последнее время становится всё более популярным. Сейчас не будем углубляться в данную методологию с её принципами, плюсами и минусами. Я хочу рассказать с какой проблемой столкнулась наша команда при использовании данной методологии на PHP, а именно внедрении Data Mapper’а Doctrine ORM.

Чтобы более понятно донести нашу проблему я буду использовать известный всем Агрегат Заказа (Order) и позиции заказа (OrderLine), которые являются коллекциями Dortrine ORM. Так же сильно упростим данный агрегат, чтобы фокусироваться на самой проблеме. И так начинаем!

Для начала мы создадим наш агрегат заказа:

<?php
declare(strict_types=1);

namespace App\Order;

use App\Shared\Domain\AggregateRoot;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\Entity
 * @ORM\Table(name="orders")
 */
class Order extends AggregateRoot
{
    private const STATUS_NEW = 'new';
    private const STATUS_CLOSED = 'closed';

    /**
     * @ORM\Id
     * @ORM\Column(type="guid")
     */
    private string $id;
    /**
     * @ORM\Column(type="string")
     */
    private string $status;
    /**
     * @var OrderLine[]|Collection
     * @ORM\OneToMany(targetEntity="OrderLine", mappedBy="order", orphanRemoval=true, cascade={"persist"})
     */
    private Collection $items;

    public function __construct(string $id, Collection $lineItems)
    {
        if ($lineItems->isEmpty()) {
            throw new \DomainException('Позиции заказа отсутствуют.');
        }
        $this->id = $id;
        $this->items = $lineItems;
        $this->status = self::STATUS_NEW;
    }
  	
    public function close(): void
    {
        $this->status = self::STATUS_CLOSED;
    }

    public function addLine(Line $line): void
    {
        $this->items->add(new OrderLine($this, $line));
    }

    public function getId(): string
    {
        return $this->id;
    }

    public function getStatus(): string
    {
        return $this->status;
    }

    /**
     * @return OrderLine[]
     */
    public function getItems(): array
    {
        return $this->items->toArray();
    }
}

Затем создадим позиции заказа:

<?php
declare(strict_types=1);

namespace App\Order;

use Doctrine\ORM\Mapping as ORM;
use Ramsey\Uuid\Uuid;

/**
 * @ORM\Entity
 * @ORM\Table(name="order_lines")
 */
class OrderLine
{
    /**
     * @ORM\Id
     * @ORM\Column(type="guid")
     */
    private string $id;
    /**
     * @ORM\ManyToOne(targetEntity="Order", inversedBy="orderLines")
     * @ORM\JoinColumn(name="order_id")
     */
    private Order $order;
    /**
     * @ORM\Embedded(class="Line")
     */
    private Line $line;

    public function __construct(Order $order, Line $line)
    {
        $this->id = Uuid::uuid4()->toString();
        $this->order = $order;
        $this->line = $line;
    }

    public function getId(): string
    {
        return $this->id;
    }

    public function getOrder(): Order
    {
        return $this->order;
    }

    public function getLine(): Line
    {
        return $this->line;
    }
}

Для удобства работы с позицией заказа вынесли из него Line в Value Object:

<?php
declare(strict_types=1);

namespace App\Order;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\Embeddable
 */
final class Line
{
    /**
     * @var string
     *
     * @ORM\Column(type="string")
     */
    private $name;
    /**
     * @var int
     *
     * @ORM\Column(type="integer")
     */
    private $quantity;
    /**
     * @ORM\Column(type="float")
     */
    private $price;

    public function __construct(string $name, int $quantity, float $price)
    {
        $this->name = $name;
        $this->quantity = $quantity;
        $this->price = $price;
    }

    public function getQuantity(): int
    {
        return $this->quantity;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function getPrice(): float
    {
        return $this->price;
    }
}

Чтобы иметь возможность хранить во всех агрегатах возникшие события, а так же публиковать их — мы создали супер класс AggregateRoot:

<?php

declare(strict_types=1);

namespace Atiline\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateRoot
{
    /**
     * @var DomainEvent[]
     */
    private array $recordedEvents = [];

    abstract public function getId();

    protected function recordEvent(DomainEvent $event): void
    {
        $this->recordedEvents[] = $event;
    }

    public function releaseEvents(): array
    {
        $events = $this->recordedEvents;
        $this->recordedEvents = [];
        return $events;
    }
}

Напомню, что в качестве Data Mapper'а мы выбрали Doctrine ORM, а для маппинга сущности будем использовать Doctrine Annotations, поэтому сразу настроили для всех Entity и Value Object необходимые аннотации.

Проблема

Чтобы предотвратить возникновение конфликтов между параллельными бизнес-транзакциями путем обнаружения конфликта и отката транзакции мы хотим использовать оптимистическую блокировку. Doctrine предоставляет решение данной проблемы путём добавляения свойства $version и аннотации @ORM\Version

Для того чтобы не добавлять данное свойство в каждый агрегат мы можем вынести его в наш супер класс ArggregateRoot и пометить его аннотацией @ORM\MappedSuperclass чтобы Doctrine смогла распознать аннотации в наследуемом абстрактном классе.

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateRoot
{
    /**
     * @ORM\Version
     * @ORM\Column(type="integer")
     */
    protected int $version;
    
    //....
}

Теперь давайте проверим работу оптимистической блокировки. Для этого генерируем миграции и применим их. Затем создадим тестовый заказ, сохраним изменения в базе данных, а после получим из базы данных созданный заказ и попытаемся закрыть заказ.

<?php

final class OrderTestController extends AbstractController
{
    public function createAndClose(EntityManagerInterface $em): Response
    {
        // Создаём заказ
        $order = new Order('09597988-aa74-4bb3-9fb9-0e154dd7cdec');
        $em->persist($order);
        $em->flush();

        // Закзываем заказ
        $order = $em->getRepository(Order::class)->find('09597988-aa74-4bb3-9fb9-0e154dd7cdec');
        $order->close();
        $em->flush();
      
        return $this->json([], 201);
    }
}

После того как мы вызовем $order->close() наш заказ сменит статус на closed, а так же версия агрегата, которая записывается в поле version, увеличится на +1. Это значит, что оптимистическая блокировка будет работать.

Теперь давайте попробуем у данного агрегата добавить позицию заказа через метод $order->addLine($line). После вызова данного метода наш агрегат не увеличил версию в агрегате на +1, так как данная сущность-агрегат не изменилась. Следовательно мы встречаем проблему с работой оптимистической блокировки. Для изменений полей самого агрегата — она работает, но если изменяется сущность-коллекция внутри агрегата — блокировка перестаёт работать и версия не увеличивается на +1.

Для решение данной проблемы мы обратимся к документации Doctrine в надежде на то, что данная проблема возникала не только у нас и решение уже есть.

Почитав документацию, Doctrine предлагает пойти путём добавления агрегатного поля, которое будет изменяться при каждом изменении коллекции. Это может быть дата обновления, может общее количество позиций в заказе. Однако это не очень удобно по нескольким причинам:

  1. Всегда нужно придумывать какое-то поле для агрегации

  2. Есть риск забыть изменение агрегатного поля в каждом методе, изменяющим коллекцию из-за чего блокировка не сработает

  3. Нужно произвести много изменений, чтобы внедрить данный подход в существующий код

Сразу бы хотелось обозначить, что проблема не надуманная. Мы встретились с реальными ситуациями, когда событие возникало несколько раз.

Решение

Так как мы не хотим отказываться от уже созданной Doctrine оптимистической блокировки, то мы решили на основе этого сделать своё решение, которое имело бы данное агрегированное поле и менялось каждый раз при обновлении коллекции.

Для этого мы введём новый супер класс AggregateEntity, от которого всегда будем наследоваться нашими вложенными коллекциями. Пока что он будет пустым, но мы всегда можем добавить в него какую-то логику.

<?php
declare(strict_types=1);

namespace App\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateEntity
{
}

Чтобы иметь возможность определять к какому агрегату принадлежит вложенная сущность можно пойти несколькими путями, но мы решили пойти более гибким - через аннотацию.

Создадим свою аннотацию, в которой будем указывать метод, вызывающий агрегат:

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

/**
 * @Annotation
 * @Target("CLASS")
 */
final class AggregateEntityAnnotation
{
    /**
     * Method for getting the Aggregated Root
     */
    public string $methodAggregateRoot;
}

И добавим данную аннотацию к нашей коллекции OrderLine, а также наследуем её от супер класса AggegateEntity :

<?php
declare(strict_types=1);


namespace App\Order;

use App\Shared\Domain\AggregateEntity;
use App\Shared\Domain\AggregateEntityAnnotation;
use Doctrine\ORM\Mapping as ORM;
use Ramsey\Uuid\Uuid;

/**
 * @ORM\Entity
 * @ORM\Table(name="order_lines")
 * @AggregateEntityAnnotation(methodAggregateRoot="getOrder")
 */
class OrderLine extends AggregateEntity
{
   	...
      
    /**
     * @ORM\ManyToOne(targetEntity="Order", inversedBy="items")
     * @ORM\JoinColumn(name="order_id")
     */
    private Order $order;
   
  	...

    public function getOrder(): Order
    {
        return $this->order;
    }
  	
  	...
}

После чего добавим наше агрегированное свойство $aggregateVersion в супер класс агрегата AggregateRoot и добавим метод updateAggregateVersion(), который будет изменять наше поле аналогично с полем version предоставляемой Doctrine. Таким образом у нас будет два изменяемых поля увеличивающееся на +1:

<?php

declare(strict_types=1);

namespace App\Shared\Domain;

use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\MappedSuperclass
 */
abstract class AggregateRoot extends AggregateEntity
{
    /**
     * @ORM\Column(type="integer", options={"default" : 1})
     */
    protected int $aggregateVersion = 1;
    
  	...
      
    public function updateAggregateVersion(): void
    {
        $this->aggregateVersion++;
    }
}

Нам остаётся придумать сервис, который будет изменять нашу версию. Займёмся его проектированием:

<?php

declare(strict_types=1);

namespace App\Shared\Doctrine;

use App\Shared\Domain\AggregateEntity;
use App\Shared\Domain\AggregateEntityAnnotation;
use App\Shared\Domain\AggregateRootEntity;
use Doctrine\Common\Annotations\Reader;
use Doctrine\ORM\EntityManagerInterface;
use LogicException;
use ReflectionClass;

final class DoctrineAggregateVersioning
{
    private Reader $reader;

    public function __construct(Reader $reader)
    {
        $this->reader = $reader;
    }

    public function update(EntityManagerInterface $em, AggregateEntity ...$entities): void
    {
        $aggregateRoot = $this->getAggregateRoot($entities);

        if (!$em->contains($aggregateRoot)) {
            $em->persist($aggregateRoot);
        }

      	//Изменяем версию агрегата
        $aggregateRoot->updateAggregateVersion();

        $uow = $em->getUnitOfWork();
        $classMetadata = $em->getClassMetadata(get_class($aggregateRoot));
      	//Применяем изменения
        $uow->recomputeSingleEntityChangeSet($classMetadata, $aggregateRoot);
    }

    /**
     * @param AggregateEntity[] $entities
     * @return AggregateRootEntity
     */
    private function getAggregateRoot(array $entities): AggregateRootEntity
    {
        $aggregateRoot = null;
  			
      	//Пытаемся получить сразу агрегат
        foreach ($entities as $entity) {
            if ($entity instanceof AggregateRootEntity) {
                $aggregateRoot = $entity;
            }
        }

      	//Если агрегат не нашёлся, то получаем его из аннотаций последней сущности
        if ($aggregateRoot === null) {
            $aggregateEntity = end($entities);
            $annotation = $this->getAggregateEntityAnnotation($aggregateEntity);

            if (!method_exists($aggregateEntity, $annotation->methodAggregateRoot)) {
                throw new LogicException(sprintf('Method "%s" not exists in class "%s".',
                    $annotation->methodAggregateRoot,
                    $aggregateEntity::class
                ));
            }

            $aggregateRoot = $aggregateEntity->{$annotation->methodAggregateRoot}();
        }

        return $aggregateRoot;
    }

    private function getAggregateEntityAnnotation(AggregateEntity $entity): AggregateEntityAnnotation
    {
        $reflectionClass = new ReflectionClass($entity);

        if (!$annotation = $this->reader->getClassAnnotation($reflectionClass, AggregateEntityAnnotation::class)) {
            throw new LogicException(sprintf('Aggregate "%s" must have AggregateEntityAnnotation!', $entity::class));
        }

        return $annotation;
    }
}

Теперь создадим подписчика, который будет вызывать созданный нами сервис перед тем как данные будут сохранены. Внести изменения мы можем только в момент события onFlush.

<?php

declare(strict_types=1);

namespace App\Infrastructure\Doctrine\Listener;

use App\Shared\Doctrine\DoctrineAggregateVersioning;
use App\Shared\Domain\AggregateEntity;
use Doctrine\Bundle\DoctrineBundle\EventSubscriber\EventSubscriberInterface;
use Doctrine\ORM\Event\OnFlushEventArgs;
use Doctrine\ORM\Events;

final class DoctrineAggregateVersioningSubscriber implements EventSubscriberInterface
{
    private DoctrineAggregateVersioning $aggregateVersioning;

    public function __construct(DoctrineAggregateVersioning $aggregateVersioning)
    {
        $this->aggregateVersioning = $aggregateVersioning;
    }

    public function getSubscribedEvents(): array
    {
        return [
            Events::onFlush,
        ];
    }

    public function onFlush(OnFlushEventArgs $args): void
    {
        $em = $args->getEntityManager();
        $uow = $em->getUnitOfWork();

        //Собираем все изменённые сущности
        $entities = array_merge(
            $uow->getScheduledEntityInsertions(),
            $uow->getScheduledEntityUpdates(),
            $uow->getScheduledEntityDeletions()
        );

    		//Получаем только агрегированные сущности
        $aggregateEntities = [];
        foreach ($entities as $entity) {
            if ($entity instanceof AggregateEntity) {
                $aggregateEntities[] = $entity;
            }
        }

        if (empty($aggregateEntities)) {
            return;
        }
      
				//Изменяем версию
        $this->aggregateVersioning->update($em, ...$aggregateEntities);
    }
}

Итоги

Для поиска решение данной проблемы мы потратили около 2-х дней, однако мы не смогли найти нужного нам решения, поэтому пришли к собственному. Конечно, мы понимаем, что данное решение не идеально и, в какой-то степени, «костыльное». Поэтому мы публикуем данное решение, в надежде придти к какому-то более лаконичному решению с помощью сообщества Хабр. Если данное решение окажется хорошим, то оно определённо поможет другим. А мы, в свою очередь готовы предоставить данное решение как публичную библиотеку.

Авторы:

Ворожцов Максим

PHP Developer

Захаров Илья

PHP Developer

Теги:
Хабы:
Всего голосов 10: ↑10 и ↓0+10
Комментарии11

Публикации

Истории

Работа

PHP программист
99 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань