Мотивация
Сегодня я наткнулся на статью за авторством @enamored_poc. Увидев заголовок, я был в предвкушении: наконец-то кто-то взялся за этот гайд — в своё время я как раз искал что-то подобное. Однако, дочитав статью до конца, понял, что автор по сути просто пересказал раздел Bigger applications из официальной документации и лишь добавил пару замечаний оттуда же.
С одной стороны уже есть куча видео, статей, где можно изучить как пишутся правильно сервисы по всем заветам дядюшки Боба (автора "Чистой Архитектуры"). Но, с другой стороны, всё это обычно показывают на примере стандартного интернет-магазина или списка TODO, и в какой-то момент перестаёшь понимать, а зачем всё это вообще нужно.
Поэтому в этом гайде мы возьмём достаточно нетривиальную тему, которую я регулярно встречаю в вакансиях и на «галерах», и попробуем реализовать её на практике.
Постановка задачи
Владелец сети магазинов хочет загружать видео с камер видеонаблюдения на некоторый сервер. Это видео должно сохраняться в хранилище и покадрово обрабатываться нейросетями. В результате мы должны получить метаданные о том, сколько людей было в кадре, координаты их местоположения в кадре, а также их пол. Но больше всего владельцу нужно получать сводную информацию о том, сколько людей и какого пола фиксируется в разные моменты времени, чтобы он мог например менять выкладку продуктов. После сдачи MVP-версии заказчик может выдать нам контракт на дальнейшее развитие системы: анализ видео-трафика в реальном времени, распознавание лиц, выявление нарушителей, и, не дай бог, слежкой за сотрудниками и т.д.
Немного мотивации: в этой задачке (правда, не в этой части) мы затронем работу с RabbitMQ, Redis, S3, Postgres, а также пощупаем YOLO, opencv. Тем самым закроем большинство типичных требований к современным вакансиям. А еще эту задачку можно переложить на любую предметную область, связанную с видеоаналитикой.
Анализ задачи
Да, всё начинается с анализа предметной области, а не с создания структуры папок. Так что разложим задачку по полочкам.
На вход поступает видеофайл; при этом в будущем этот видеофайл будет загружаться в потоковом режиме или напрямую транслироваться с камеры видеонаблюдения.
Эти видеофайлы должны где-то и как-то храниться. Мы пока не можем спрогнозировать, сколько таких видео будет храниться, какие серверы готов выделить заказчик, какие требования будут к времени записи и чтения, будут ли SSD на этих серверах.
Все видеофайлы должны быть разбиты на кадры и скормлены нейросетям. Сейчас мы ничего не знаем о нейросетях (мы перекладыватели JSON, а не аналитики): о том, как быстро они способны обработать все кадры, как часто эти кадры нужно подсовывать и многое другое. Зато мы можем предположить, что пользователю не нужно ждать результата обработки в рамках одного запроса, так как его интересуют только аналитические данные.
Для каждого кадра нам необходимо выявить человека, его координаты в кадре, время и пол. В будущем заказчик хочет и распознавание воришек, а может, потом захочет следить и за кассирами, или за тем, какие тележки берут. Всё это тоже должно куда-то и в каком-то виде сохраняться. Мы слышали про NoSQL и Postgres, но пока ��е знаем, что лучше применить для будущих аналитических задач.
Вероятно, нужно подготовить REST API, которое позволит делать запросы о количестве людей в кадре и их поле, а также показывать красивые графики и таблицы для заказчика. Держим в голове, что позже появится ещё множество данных, которые он захочет получать.
Выглядит жутко, куча неизвестных... Хм.. С чего бы начать?? Может набросать endpoint-ы для загрузки видео? Или сделать endpoint "загрузить картинку", а потом запустить нейросеть, которую советует ChatGPT, все-равно же разбивать покадрово видео придется? А может спроектировать БД, где будут сохранятся методанные с видео, и уже от данных плясать?
НЕТ! На самом деле у нас уже есть всё необходимое, чтобы проектировать архитектуру, а все «неизвестные» — это всего лишь детали, которые будут сбивать вас с пути. Даже слово «endpoint» здесь лишнее, не говоря уже о FastAPI. Так что же делать дальше?
Выделение сущностей и постановка задач доменного слоя
Как уже было сказано выше, у нас достаточно информации, чтобы выделить ключевые сущности домена. Начнём с самой базовой.
Источник записи. Нам важно сохранять информацию о том, откуда поступает видеофайл: связан ли он с одной конкретной камерой, группой камер или, например, с целой сетью магазинов. Пользователь может захотеть загружать как одиночные записи, так и десятки небольших роликов с одной и той же камеры.
Исходный код (domain/entity/video_source.py)
from dataclasses import dataclass, field from typing import Optional, Dict, Any from uuid import UUID, uuid4 @dataclass(frozen=True) class RecordingSourceId: """Уникальный идентификатор источника записи внутри нашей системы.""" value: UUID @staticmethod def new() -> "RecordingSourceId": return RecordingSourceId(uuid4()) def __str__(self) -> str: return str(self.value) @dataclass(frozen=True) class RecordingSourceExtra: """ Необязательные данные об источнике. Сейчас здесь почти ничего нет, но именно сюда позже поедут метаданные о местоположении камеры """ floor: Optional[int] = None location_label: Optional[str] = None meta: Dict[str, Any] = field(default_factory=dict) class RecordingSource: """ Источник записи (камера/логический канал). На уровне домена нам пока важно только то, что есть его идентификатор и необязательные доп.данные. """ idx: RecordingSourceId extra: RecordingSourceExtra = RecordingSourceExtra() def __init__( self, idx: RecordingSourceId extra: RecordingSourceExtra = RecordingSourceExtra() ) -> None: self.idx: RecordingSourceId = idx self.source_id: RecordingSourceExtra = extra
Видеофайл. Некоторый файл; при этом нам сейчас не важно, передан ли он нам сразу или записывается в потоковом режиме. Самое главное, что он точно должен содержать: уникальный идентификатор, время начала, уникальный идентификатор источника. При этом такие атрибуты, как время окончания, длительность, кодеки, размер кадра, уже будут опциональными и сейчас нам не особенно нужны.
Исходный код (domain/entity/video_file.py)
from dataclasses import dataclass, field from datetime import datetime from typing import Optional, Dict, Any from uuid import UUID, uuid4 from domain.entity.video_source import RecordingSourceId @dataclass(frozen=True) class VideoFileId: """Уникальный идентификатор видеозаписи внутри нашей системы.""" value: UUID @staticmethod def new() -> "VideoFileId": return VideoFileId(uuid4()) def __str__(self) -> str: return str(self.value) @dataclass class VideoFileExtra: """ Необязательные тех.детали видео. Сейчас они нам не критичны, но сюда можно будет добавить конец записи, кодеки, размер кадра и т.п. """ ended_at: Optional[datetime] = None duration_seconds: Optional[float] = None codec: Optional[str] = None frame_width: Optional[int] = None frame_height: Optional[int] = None meta: Dict[str, Any] = field(default_factory=dict) class VideoFile: """ Видеофайл/видеозапись. Нас сейчас интересует: - уникальный идентификатор видео; - время начала записи; - источник, с которого это видео пришло. Остальное — опционально и уезжает в VideoFileExtra. """ def __init__( self, idx: VideoFileId, source_id: "RecordingSourceId", started_at: datetime, extra: Optional[VideoFileExtra] = None, ) -> None: self.idx: VideoFileId = idx self.source_id: RecordingSourceId = source_id self.started_at: datetime = started_at self.extra: VideoFileExtra = extra or VideoFileExtra()
Кадр. Неважно, хранится ли он в виде картинки или нет, но у него точно будут следующие параметры: время, идентификатор видеофайла, сами данные в каком-то виде. Это всё, что нам пока нужно для идентификации; остальные атрибуты сейчас роли не играют.
Исходный код (domain/entity/frame.py)
from dataclasses import dataclass from datetime import datetime from typing import Any from domain.entity.video_file import VideoFileIdx @dataclass(frozen=True) class FrameData: """ Важный value object: «сырые» данные кадра. Сейчас нам не важно, это байты картинки, JPEG/PNG, numpy-массив или что-то ещё — главное, что это единый объект данных кадра. """ value: Any class Frame: """ Один кадр видеозаписи. Для идентификации кадра в домене нам сейчас достаточно: - времени кадра; - идентификатора видеозаписи; - самих данных кадра (в каком-то виде). Остальные атрибуты (номер кадра, размер, формат и т.п.) считаем деталями реализации и пока в модель не тащим. """ def __init__( self, video_idx: "VideoFileIdx", captured_at: datetime, data: FrameData, ) -> None: self.video_idx = video_idx self.captured_at = captured_at self.data = data
Объект. Как мы помним, заказчик на данном этапе требует просто находить людей в кадре, но в будущем это может быть что угодно (тележка, телефон в руках у кассира и т.д.). В кадрах мы будем идентифицировать именно объекты: у каждого объекта будет класс, дополнительные признаки (сейчас это только «пол»), а также ссылки на кадры, в которых этот объект был зафиксирован (заказчику ведь неинтересно, как клёво мы умеем работать с одной картинкой — ему нужны агрегированные данные).
Исходный код (domain/entity/detected_object.py)
from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Dict, Any, List, Optional from uuid import UUID, uuid4 from domain.entity.video_file import VideoFileIdx @dataclass(frozen=True) class ObjectIdx: """Уникальный идентификатор объекта (человек, тележка, телефон и т.д.).""" value: UUID @staticmethod def new() -> "ObjectIdx": return ObjectIdx(uuid4()) def __str__(self) -> str: return str(self.value) @dataclass(frozen=True) class ObjectClass: """ Класс объекта: - сейчас нас интересует 'person', - в будущем: 'cart', 'phone', 'employee', 'thief' и т.д. """ value: str # например: "person", "cart", "phone" @dataclass class ObjectAttributes: """ Дополнительные признаки объекта. Их мы будем заполнять пока в виде словаря, но потом мы сможем лучше конкретизировать эти объекты. Главное, что нам из-за этих изменений не придется переделывать саму сущность. """ meta: Dict[str, Any] = field(default_factory=dict) @dataclass(frozen=True) class Coordinate: """ Координаты объекта в кадре """ x: int y: int width: int height: int @dataclass(frozen=True) class FrameRef: """ Ссылка на кадр, в котором объект был зафиксирован. Мы не навязываем отдельный идентификатор кадра, а ссылаемся через пару (video_idx, captured_at), чего достаточно в домене, чтобы однозначно указать кадр. """ video_idx: "VideoFileIdx" captured_at: datetime coordinate: Coordinate # === Сущность === class DetectedObject: """ Объект, обнаруженный в видеопотоке. На уровне домена нас интересует: - уникальный идентификатор объекта; - его класс (person/cart/phone/…); - дополнительные признаки (сейчас — пол); - список кадров, в которых этот объект встречается (для дальнейшей агрегации и аналитики). """ def __init__( self, idx: ObjectIdx, object_class: ObjectClass, attributes: Optional[ObjectAttributes] = None, frames: Optional[List[FrameRef]] = None, ) -> None: self.idx = idx self.object_class = object_class self.attributes = attributes or ObjectAttributes() self.frames: List[FrameRef] = frames or []
Отчет. Результат для заказчика. Конечно, его можно формировать «на лету», но вдруг камеры и нейросети будут работать супербыстро, и мы получим 140 кадров в секунду. Тогда для формирования любого отчёта нам придётся каждый раз пережевывать большой объём данных. Поэтому набросаем, что нам нужно заранее: временные промежутки, вид отчёта (количество людей, половой состав). Ещё не хватает пруфов: заказчику нужно будет доказать работоспособность нашего сервиса, поэтому добавляем ссылки на источники, видеофайлы и объекты.
Исходный код (domain/entity/report.py)
from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Any, List from uuid import UUID, uuid4 from domain.entity.recording_source import RecordingSourceIdx from domain.entity.video_file import VideoFileIdx from domain.entity.detected_object import ObjectIdx @dataclass(frozen=True) class ReportIdx: """Уникальный идентификатор отчёта.""" value: UUID @staticmethod def new() -> "ReportIdx": return ReportIdx(uuid4()) def __str__(self) -> str: return str(self.value) @dataclass(frozen=True) class ReportType: """ Тип отчёта: - сейчас достаточно, например, 'people_count', 'gender_distribution'; - в будущем: 'traffic_heatmap', 'theft_suspects', 'queue_length' и т.д. """ value: str @dataclass(frozen=True) class TimeRange: """Временной промежуток, за который строится отчёт.""" start: datetime end: datetime @dataclass class ReportData: """ Агрегированные данные отчёта. Сейчас оставляем это в виде произвольного словаря, чтобы не блокировать развитие. Когда домен стабилизируется, из meta можно будет вытащить отдельные данные """ meta: Dict[str, Any] = field(default_factory=dict) # === Сущность === class Report: """ Отчёт для заказчика. """ def __init__( self, idx: ReportIdx, report_type: ReportType, time_range: TimeRange, data: ReportData, source_idx_list: List["RecordingSourceIdx"], video_idx_list: List["VideoFileIdx"], object_idx_list: List["ObjectIdx"], ) -> None: self.idx = idx self.report_type = report_type self.time_range = time_range self.data = data self.source_idx_list = list(source_idx_list) self.video_idx_list = list(video_idx_list) self.object_idx_list = list(object_idx_list)
Ну всё, вроде бы понятно, ведь у нас есть набор сущностей. Следующий шаг: написать вокруг этого эндпоинты или даже набросать БД, раз всё это уже можно сохранять. Опять нет!
Сначала попробуем выделить всё, что происходит вокруг этих сущностей: какие действия с ними выполняются и какие задачи решают разные части системы. На этом этапе мы будем определять сервисы. А чтобы описать их поведение абстрактно, нам поможет ABC из стандартной библиотеки, позволяющий задавать интерфейсы через абстрактные базовые классы. Начнём:
Сервис сохранения файлов. Задача нетривиальная: можно записывать файл в потоковом режиме, можно сразу копировать его на диск, но это детали — сейчас они нам не нужны. Самое главное — результат: файл сохранён или нет.
Исходный код (domain/service/file_storage.py)
from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional, BinaryIO from domain.entity.video_file import VideoFileIdx @dataclass(frozen=True) class FileLocation: """ Абстрактное местоположение файла. Это может быть путь на диске, ключ в объектном хранилище, URL и т.д. Конкретику решит инфраструктура. """ value: str @dataclass(frozen=True) class FileSaveResult: """ Результат сохранения файла. Главное — удалось или нет. Остальное опционально. """ success: bool location: Optional[FileLocation] = None error_message: Optional[str] = None class FileStorage(ABC): """ Абстрактный сервис сохранения файлов. Детали (стриминг, буферизация, ретраи, конкретное хранилище) остаются за пределами домена. Здесь нас интересует только: «попробуй сохранить» → FileSaveResult. """ @abstractmethod def save_video_content( self, video_idx: "VideoFileIdx", content: BinaryIO, ) -> FileSaveResult: """ Сохранить бинарное содержимое видеозаписи, связав его с доменной сущностью VideoFile. Аргумент `content` — абстрактный поток байт: это может быть открытый файл, сетевой стрим и т.п. """ raise NotImplementedError
Сервис нарезки кадров из видео. Опять же, мы не знаем, какие-либо библиотеки существуют для этого, какой размер кадра нам нужен на выходе и т.д. Но зато мы знаем, что в результате работы сервиса мы должны получить исчерпывающие данные для формирования сущности «Кадр».
Исходный код (domain/service/frame_extractor.py)
from abc import ABC, abstractmethod from typing import Iterable from domain.entity.video_file import VideoFileIdx from domain.entity.frame import Frame class FrameExtractor(ABC): """ Абстрактный сервис нарезки кадров из видео. На уровне домена нас интересует только то, что на вход подаётся идентификатор видеозаписи, а на выходе мы получаем набор сущностей Frame, достаточный для дальнейшей обработки и аналитики. """ @abstractmethod def extract_frames(self, video_idx: "VideoFileIdx") -> Iterable["Frame"]: """ Извлечь кадры для указанного видео. Конкретная реализация сама решает: - как прочитать файл (локальный диск, object storage и т.п.); - как именно и с какими параметрами вырезать кадры. """ raise NotImplementedError
Сервис обработки кадров. Мы не знаем, какие именно нейросети нужны; возможно, придётся запускать несколько моделей друг за другом. Но мы точно можем сказать, что на выходе должны быть получены списки классов, признаков и координат в кадре. Эти данные, в свою очередь, нужны для создания и обновления сущности «Объект».
Исходный код (domain/service/frame_processor.py)
from abc import ABC, abstractmethod from dataclasses import dataclass from typing import List from domain.entity.frame import Frame from domain.entity.detected_object import ObjectClass, ObjectAttributes, Coordinate @dataclass(frozen=True) class RawDetection: """ Результат работы нейросетей по одному объекту на кадре. На уровне сервиса обработки кадров нам достаточно: - класса объекта; - набора признаков (пока в виде ObjectAttributes); - координат объекта в кадре. """ object_class: "ObjectClass" attributes: "ObjectAttributes" coordinate: "Coordinate" class FrameProcessor(ABC): """ Абстрактный сервис обработки кадров нейросетями. Мы не знаем: - какие именно нейросети будут использоваться; - сколько их будет и в каком порядке они запустятся; - на каком железе это всё будет крутиться. """ @abstractmethod def process_frame(self, frame: "Frame") -> List[RawDetection]: """ Обработать кадр и вернуть список найденных объектов с их ��лассами, признаками и координатами. """ raise NotImplementedError
Сервис классификации объектов. Проблема в том, что кадров у нас много, и объекты на кадрах нужно как-то связать. Мы пока не знаем как, но, как подсказывает логика, следует анализировать предыдущие кадры и пытаться понять, нужно ли создавать новый объект или можно обновить существующий.
Исходный код (domain/service/object_classifier.py)
from abc import ABC, abstractmethod from dataclasses import dataclass from typing import List from domain.entity.frame import Frame from domain.entity.detected_object import ObjectIdx, FrameRef from domain.service.frame_processor import RawDetection @dataclass(frozen=True) class ObjectClassificationDecision: """ Результат классификации одного «сыро обнаруженного» объекта. На уровне домена нас интересует: - какой доменной сущности DetectedObject он соответствует (существующий объект или новый); - в каком кадре и с какими координатами он был обнаружен; - исходные данные детекции (класс, признаки, координаты). """ existing_object_idx: Optional["ObjectIdx"] # None, если объект новый frame_ref: "FrameRef" raw_detection: "RawDetection" @property def is_new(self) -> bool: """True, если по этой детекции нужно создать новый объект.""" return self.existing_object_idx is None class ObjectClassifier(ABC): """ Абстрактный сервис классификации/сопоставления объектов по кадрам. Его задача — только принять решение: - к какому уже существующему ObjectIdx отнести детекцию; - по каким детекциям нужно создать НОВЫЙ объект. Как именно это делается (трекеры, эвклидово расстояние, нейросети и т.п.) — детализация инфраструктуры, домен этого не знает. """ @abstractmethod def classify_objects( self, frame: "Frame", detections: List["RawDetection"], ) -> List[ObjectClassificationDecision]: """ На основании набора детекций на кадре решить, какие из них принадлежат уже существующим объектам, а для каких потребуется создать новый объект. """ raise NotImplementedError
Сервис анализа найденных объектов. Заказчику всё-таки нужно получать аналитические данные, и мы уже определились, что формировать их «по запросу» не стоит. Поэтому нам нужен сервис, который анализирует уникальные объекты и старается добыть данные, необходимые для построения отчёта.
Исходный код (domain/service/object_analyzer.py)
from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Dict, Any, List from domain.entity.detected_object import DetectedObject, ObjectIdx from domain.entity.video_file import VideoFileIdx from domain.entity.recording_source import RecordingSourceIdx @dataclass class ObjectAnalyticsResult: """ Результат анализа уникальных объектов. Здесь мы собираем всё, что нужно для дальнейшего построения отчёта: - агрегированные метрики (meta), - «пруфы» — ссылки на источники, видео и объекты. """ meta: Dict[str, Any] = field(default_factory=dict) source_idx_list: List["RecordingSourceIdx"] = field(default_factory=list) video_idx_list: List["VideoFileIdx"] = field(default_factory=list) object_idx_list: List["ObjectIdx"] = field(default_factory=list) class ObjectAnalyzer(ABC): """ Абстрактный сервис анализа найденных (уже классифицированных) объектов. Его задача: - взять набор уникальных объектов (DetectedObject), - «переварить» их, - выдать агрегированные данные, которые потом пойдут в построение доменной сущности отчёта. ВАЖНО: сервис ничего не знает о том, как будут в��глядеть конкретные отчёты, он лишь добывает сырые аналитические данные и пруфы. """ @abstractmethod def analyze_objects( self, objects: List["DetectedObject"], ) -> ObjectAnalyticsResult: """ Проанализировать набор уникальных объектов и вернуть агрегированную информацию, пригодную для построения отчёта. """ raise NotImplementedError
Теперь поговорим о сохранении, извлечении и поиске сущностей. И здесь нас снова выручает модуль abc, с помощью которого мы напишем абстрактный интерфейс для работы с хранилищем (Repository):
Репозиторий источников. Нам нужно всё-таки вести учёт источников и связывать их с видеофайлами.
Исходный код (domain/repository/recording_source_repository.py)
from abc import ABC, abstractmethod from typing import Optional, List from domain.entity.recording_source import RecordingSource, RecordingSourceIdx class RecordingSourceRepository(ABC): """ Абстрактный репозиторий источников записи. Детали хранилища (Postgres, NoSQL, файлики, in-memory) здесь не определяются. """ @abstractmethod def save(self, source: "RecordingSource") -> None: """ Сохранить или обновить источник записи. """ raise NotImplementedError @abstractmethod def get_by_idx(self, idx: "RecordingSourceIdx") -> Optional["RecordingSource"]: """ Получить источник по его доменному идентификатору. """ raise NotImplementedError @abstractmethod def list_all(self) -> List["RecordingSource"]: """ Вернуть все известные источники. (Пагинация уже как домашнее задание) """ raise NotImplementedError
Репозиторий видеофайлов. Должен позволять сохранять важные данные о сущности «Видеофайл», включая её идентификатор, чтобы мы могли легко находить видеофайлы и работать с ними из сервиса сохранения видеофайлов.
Исходный код (domain/repository/video_file_repository.py)
from abc import ABC, abstractmethod from typing import Optional, List from domain.entity.video_file import VideoFile, VideoFileIdx from domain.entity.recording_source import RecordingSourceIdx class VideoFileRepository(ABC): """ Абстрактный репозиторий видеозаписей. Задачи: - сохранить/обновить сущность VideoFile; - уметь по доменному идентификатору VideoFileIdx быстро её найти; - при необходимости находить все видео, привязанные к источнику. """ @abstractmethod def save(self, video: "VideoFile") -> None: """ Сохранить или обновить видеозапись. """ raise NotImplementedError @abstractmethod def get_by_idx(self, idx: "VideoFileIdx") -> Optional["VideoFile"]: """ Найти видеозапись по её доменному идентификатору. """ raise NotImplementedError @abstractmethod def list_by_source_idx(self, source_idx: "RecordingSourceIdx") -> List["VideoFile"]: """ Получить все видеозаписи, пришедшие с указанного источника. """ raise NotImplementedError
Репозиторий кадров. ММы хотим сохранять пруфы нашей работы, поэтому не забываем и про хранилище кадров, которые уже были проанализированы. При этом, нужно ли сохранять сами бинарные данные кадров, можно решить позже.
Исходный код (domain/repository/frame_repository.py)
from abc import ABC, abstractmethod from datetime import datetime from typing import Optional, List from domain.entity.frame import Frame from domain.entity.video_file import VideoFileIdx class FrameRepository(ABC): """ Абстрактный репозиторий кадров. Задачи: - сохранять проанализированные кадры как «пруф» работы системы; - уметь находить кадры по видеозаписи и времени; - уметь получать набор кадров по видеозаписи и/или диапазону времени. """ @abstractmethod def save(self, frame: "Frame") -> None: """ Сохранить или обновить кадр. """ raise NotImplementedError @abstractmethod def get_by_video_and_time( self, video_idx: "VideoFileIdx", captured_at: datetime, ) -> Optional["Frame"]: """ Найти конкретный кадр по видеозаписи и времени съёмки. Эта пара (video_idx, captured_at). """ raise NotImplementedError @abstractmethod def list_by_video_idx( self, video_idx: "VideoFileIdx", *, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, ) -> List["Frame"]: """ Получить кадры для указанного видео. """ raise NotImplementedError
Репозиторий объектов. Нужен набор методов для сохранения объектов и их поиска по различным параметрам, которые могут потребоваться сервисам.
Исходный код (domain/repository/object_repository.py)
from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import Optional, List, Dict, Any from domain.entity.detected_object import DetectedObject, ObjectIdx, ObjectClass from domain.entity.recording_source import RecordingSourceIdx from domain.entity.video_file import VideoFileIdx @dataclass class ObjectSearchCriteria: """ Критерии поиска объектов. Это абстрактный фильтр, который можно постепенно расширять по мере появления новых сценариев в сервисах. """ object_classes: Optional[List["ObjectClass"]] = None source_idx: Optional["RecordingSourceIdx"] = None video_idx: Optional["VideoFileIdx"] = None # Временной диапазон, в котором объект хоть раз появлялся appeared_from: Optional[datetime] = None appeared_to: Optional[datetime] = None # Фильтрация по признакам (ключи/значения в ObjectAttributes.meta) attributes: Dict[str, Any] = field(default_factory=dict) class ObjectRepository(ABC): """ Абстрактный репозиторий объектов (DetectedObject). """ @abstractmethod def save(self, obj: "DetectedObject") -> None: """ Сохранить или обновить объект. """ raise NotImplementedError @abstractmethod def get_by_idx(self, idx: "ObjectIdx") -> Optional["DetectedObject"]: """ Получить объект по доменному идентификатору. """ raise NotImplementedError @abstractmethod def search( self, criteria: ObjectSearchCriteria, *, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List["DetectedObject"]: """ Поиск объектов по набору критериев. """ raise NotImplementedError
Репозиторий отчетов. Сохраняем отчёты; при этом мы пока даже не выбираем временные промежутки их хранения — нам важен только результат. Также не забываем про методы извлечения отчётов.
Исходный код (domain/repository/report_repository.py)
from abc import ABC, abstractmethod from datetime import datetime from typing import Optional, List from domain.entity.report import Report, ReportIdx, ReportType class ReportRepository(ABC): """ Абстрактный репозиторий отчётов. """ @abstractmethod def save(self, report: "Report") -> None: """ Сохранить или обновить отчёт. """ raise NotImplementedError @abstractmethod def get_by_idx(self, idx: "ReportIdx") -> Optional["Report"]: """ Получить отчёт по его доменному идентификатору. """ raise NotImplementedError @abstractmethod def list_by_type( self, report_type: "ReportType", *, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List["Report"]: """ Получить отчёты указанного типа. Временной промежуток задаёт интересующий нас диапазон по времени, за который построены отчёты. Реализация сама решает: - как интерпретировать start_time/end_time относительно поля TimeRange у отчёта; - как оптимизировать выборку и пагинацию. """ raise NotImplementedError @abstractmethod def list_all( self, *, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List["Report"]: """ Вернуть все отчёты (с возможной пагинацией). """ raise NotImplementedError
Вообще, сущности хорошо бы не создавать «голыми руками». Поэтому лучше сразу позаботиться о фабриках и не конструировать их напрямую. Да, сначала это кажется лишней работой, но по мере усложнения сущностей вы всё равно к этому придёте: формат входных данных будет меняться. Давайте сразу заложим фабрики для всех ключевых сущностей.
Исходный код (domain/factory/recording_source_factory.py)
from dataclasses import dataclass from typing import Optional from domain.entity.recording_source import ( RecordingSource, RecordingSourceIdx, RecordingSourceExtra, ) @dataclass class RecordingSourceFactory: """ Фабрика источников записи. """ def create( self, extra: Optional["RecordingSourceExtra"] = None, ) -> "RecordingSource": idx = RecordingSourceIdx.new() return RecordingSource( idx=idx, extra=extra or "RecordingSourceExtra"() )
Исходный код (domain/factory/video_file_factory.py)
from dataclasses import dataclass from datetime import datetime from typing import Optional from domain.entity.video_file import VideoFile, VideoFileIdx, VideoFileExtra from domain.entity.recording_source import RecordingSourceIdx @dataclass class VideoFileFactory: """ Фабрика видеозаписей. """ def create( self, source_idx: RecordingSourceIdx, started_at: datetime, extra: Optional["VideoFileExtra"] = None, ) -> "VideoFile": video_idx = VideoFileIdx.new() return VideoFile( idx=video_idx, source_idx=source_idx, started_at=started_at, extra=extra or VideoFileExtra(), )
Исходный код (domain/factory/frame_factory.py)
from dataclasses import dataclass from datetime import datetime from typing import Any from domain.entity.frame import Frame, FrameData from domain.entity.video_file import VideoFileIdx @dataclass class FrameFactory: """ Фабрика кадров. """ def create( self, video_idx: VideoFileIdx, captured_at: datetime, raw_data: Any, ) -> "Frame": frame_data = FrameData(value=raw_data) return Frame( video_idx=video_idx, captured_at=captured_at, data=frame_data, )
Исходный код (domain/factory/detected_object_factory.py)
from dataclasses import dataclass from typing import Optional from domain.entity.detected_object import ( DetectedObject, ObjectIdx, ObjectClass, ObjectAttributes, FrameRef, ) @dataclass class DetectedObjectFactory: """ Фабрика доменных объектов (DetectedObject). """ def create_new( self, object_class: ObjectClass, attributes: Optional[ObjectAttributes] = None, first_frame: Optional[FrameRef] = None, ) -> DetectedObject: idx = ObjectIdx.new() frames = [first_frame] if first_frame is not None else None return DetectedObject( idx=idx, object_class=object_class, attributes=attributes or ObjectAttributes(), frames=frames, )
Исходный код (domain/factory/report_factory.py)
from dataclasses import dataclass from domain.entity.report import ( Report, ReportIdx, ReportType, TimeRange, ReportData, ) from domain.service.object_analyzer import ObjectAnalyticsResult @dataclass class ReportFactory: """ Фабрика отчётов. """ def create_from_analytics( self, report_type: ReportType, time_range: TimeRange, analytics: ObjectAnalyticsResult, ) -> Report: idx = ReportIdx.new() data = ReportData(meta=analytics.meta) return Report( idx=idx, report_type=report_type, time_range=time_range, data=data, source_idx_list=analytics.source_idx_list, video_idx_list=analytics.video_idx_list, object_idx_list=analytics.object_idx_list, )
Вроде всё готово, но наши сущности сейчас — это не просто «мешки с данными», которые сами по себе ничего не делают. Можно, конечно, добавить им простые сеттеры и геттеры, но тогда они останутся просто контейнерами для данных и не будут отражать поведение объектов и их возможности. А нам ведь нужно в будущем поддерживать систему, поэтому лучше сразу думать о том, какие операции должны жить внутри самих сущностей.
Видеоисточник.
domain/entity/video_source.py
class RecordingSource: # старый код def update_extra(self, extra: RecordingSourceExtra) -> None: """ Обновить дополнительные данные об источнике. """ self.extra = extra
Видеофайл.
domain/entity/video_file.py
class VideoFile: # старый код def mark_ended(self, ended_at: Optional[datetime] = None) -> None: """ Пометить видеозапись как завершённую. """ new_ended_at = ended_at or datetime.utcnow() if new_ended_at < self.started_at: raise ValueError("ended_at cannot be earlier than started_at") if self.extra.ended_at is not None and new_ended_at <= self.extra.ended_at: return self.extra.ended_at = new_ended_at self.extra.duration_seconds = (new_ended_at - self.started_at).total_seconds() def update_duration(self, duration_seconds: float) -> None: """ Обновить длительность видеозаписи в секундах. """ if duration_seconds < 0: raise ValueError("duration_seconds cannot be negative") self.extra.duration_seconds = duration_seconds def update_technical_info( self, *, codec: Optional[str] = None, frame_width: Optional[int] = None, frame_height: Optional[int] = None, meta: Optional[Dict[str, Any]] = None, ) -> None: """ Обновить технические параметры видеозаписи. """ if frame_width is not None and frame_width <= 0: raise ValueError("frame_width must be positive") if frame_height is not None and frame_height <= 0: raise ValueError("frame_height must be positive") if codec is not None: self.extra.codec = codec if frame_width is not None: self.extra.frame_width = frame_width if frame_height is not None: self.extra.frame_height = frame_height if meta: self.extra.meta.update(meta) def update_extra(self, extra: VideoFileExtra) -> None: """ Полностью заменить объект с дополнительными данными. """ self.extra = extra
Кадр
domain/entity/frame.py
class Frame: # старый код def update_data(self, data: FrameData) -> None: """ Заменить данные кадра. Смысл: - кадр мог быть перекодирован, - к нему могла быть применена анонимизация/маскирование, - данные могли быть уменьшены (даунскейл, JPEG и т.п.). Вместо прямого присваивания снаружи, всё изменение «сырых» данных кадра проходит через этот метод. """ self.data = data def is_in_time_range(self, start: datetime, end: datetime) -> bool: """ Проверить, попадает ли кадр во временной интервал [start, end). """ return start <= self.captured_at < end
Объект
domain/entity/detected_object.py
class DetectedObject: # старый код def add_frame(self, frame_ref: FrameRef) -> None: """ Зафиксировать, что объект был замечен ещё в одном кадре. """ self.frames.append(frame_ref) def last_seen_at(self) -> Optional[datetime]: """ Вернуть момент времени, когда объект был замечен последним. """ if not self.frames: return None return max(ref.captured_at for ref in self.frames) def was_seen_in_range(self, start: datetime, end: datetime) -> bool: """ Проверить, встречался ли объект в интервале [start, end). """ return any(start <= ref.captured_at < end for ref in self.frames) def update_attributes(self, meta: Dict[str, Any]) -> None: """ Обновить признаки объекта (attributes.meta). """ self.attributes.meta.update(meta)
Отчет
domain/entity/report.py
class Report: # старый код def covers_moment(self, moment: datetime) -> bool: """ Проверить, относится ли указанный момент ко времени этого отчёта. """ return self.time_range.start <= moment < self.time_range.end def overlaps_with(self, start: datetime, end: datetime) -> bool: """ Проверить, пересекается ли отчёт с временным интервалом [start, end). """ r_start = self.time_range.start r_end = self.time_range.end return not (end <= r_start or r_end <= start) def add_source_idx(self, source_idx: "RecordingSourceIdx") -> None: """ Добавить источник в список «пруфов» отчёта. """ if source_idx not in self.source_idx_list: self.source_idx_list.append(source_idx) def add_video_idx(self, video_idx: "VideoFileIdx") -> None: """ Добавить видеозапись в список «пруфов» отчёта. """ if video_idx not in self.video_idx_list: self.video_idx_list.append(video_idx) def add_object_idx(self, object_idx: "ObjectIdx") -> None: """ Добавить объект в список «пруфов» отчёта. """ if object_idx not in self.object_idx_list: self.object_idx_list.append(object_idx)
На данном этапе мы ужасно далеки от создания чего-то работоспособного. Тогда зачем всё это нужно?
Мы уже понимаем, что нам нужно делать, и можем прогнозировать, как все компоненты будут взаимодействовать друг с другом.
У нас сейчас нет привязки к конкретным технологиям, мы даже не приступали к их анализу, но ЗАТО мы уже можем поэтапно разобрать обработку одного видеофайла.
Наша архитектура может быть легко расширена под новые требования, когда заказчик согласится продолжить работу с нами.
Мы можем без труда написать реализации наших абстрактных сервисов и репозиториев, используя встроенные типы данных (
dict,list) или модули для работы с файлами.
Таким образом, у нас появился домен (Domain), который описывает предметную область нашей задачки и является центром всей системы: именно здесь задаются бизнес-правила и определяется поведение объектов. Далее нам предстоит описать, как компоненты домена взаимодействуют друг с другом (слой Application), — но об этом уже в следующей статье (если, конечно, она вам зайдёт).
P.S. Я намеренно полностью или почти опустил такие важные компоненты, как объекты-значения, доменные события, агрегаты, bounded context и т.п. По моему мнению, это тяжеловато даже для многих специалистов, а у нас всё-таки гайд для новичков. Но какие-то моменты постараюсь осветить в следующей статье.
P.P.S. Это моя первая статья, так что не бейте сильно. С удовольствием почитаю ваши пожелания и всё исправлю/дополню. Сразу оговорюсь: кода много, а старые проекты куда-то затерялись, поэтому в кодогенерации не обошлось без помощи GPT. Заранее спасибо за понимание.
