Продолжение статьи https://habr.com/ru/articles/1028290/

В прошлой серии
Мы поняли, что скелетом агента является долговременное состояние (durable state). Именно оно должно позволить ответить на скучные, но жизненно важные вопросы: какой ход активен, какой шаг уже выполнен, какой job держит lease, какой файл был исходным, какой результат можно выдать пользователю, какое подтверждение еще действительно.
В первой части мы разложили durable state на ход агента, шаг плана и событие. У нас появились такие сущности, как AgentTurn, AgentPlanItem, AgentEvent, и агент уже перестает быть нервным генератором текста, который живет ровно до первого рестарта процесса.
Но трех таблиц мало. Нужны еще разрешения, состояние диалоги/сессии, состояние проекта, фоновые задачи, механизм обработки фоновых задач (lease), счетчик и политика повторов, закладка событий (event cursor) и санитарная обработка payload-ов (payload sanitizer).
Что добавляется после AgentTurn, AgentPlanItem и AgentEvent
Минимальный набор первой части можно расширить так:
Сущность | Что хранит | Зачем нужна |
ApprovalGrant | Выданное пользователем разрешение | Не спрашивать повторно одно и то же действие в рамках допустимого scope |
SessionContext | Активный turn, профиль агента, краткую историю, pending approval | Восстановить диалог и текущую сцену сессии |
ProjectContext | Активный проект, файлы, настройки, текущую операцию | Не дать двум тяжелым операциям одновременно менять один проект |
BackgroundJob | Длинную операцию вне HTTP-запроса | Например, парсинг, workbook-операции, retry, progress, cancellation |
WorkerHeartbeat | Присутствие и занятость исполнителя | Отличить долгую работу от умершего worker-а |
Durable payload policy | Правила сохранения payload-ов | Не складывать base64, секреты и гигантские строки в event log |
ApprovalGrant: подтверждение тоже должно быть durable
Approval - это юридически важная запись о том, что пользователь разрешил действие с конкретным scope. Если подтверждение живет только в памяти процесса, то после рестарта агент снова спросит то же самое или, что хуже, решит продолжить без понятного основания.
Разрешения привязаны к session_id, project_id, tool_name, mode, scope и expires_at. Это правильная форма: подтверждение не становится вечным. Пользователь мог разрешить править файлы на этом проекте, но это не значит, что агент получил право трогать все проекты, все файлы и все будущие операции.
Хороший approval grant должен быть узким. В идеале scope описывает не человеческую фразу «можно», а машинно проверяемые границы: project_id, tool_name, режим only_missing, срок действия. Тогда executor может принять решение без повторного похода к LLM.
Класс AprrovalGrant, она же таблица для хранения разрешений (прав доступа) на выполнение определенных действий или использование инструментов в какой-то системе, может выглядеть следующим образом
class ApprovalGrant(Base): __tablename__ = "approval_grants" grant_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) session_id: Mapped[str] = mapped_column(String(200), index=True) project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) tool_name: Mapped[str] = mapped_column(String(120), index=True) mode: Mapped[str] = mapped_column(String(40), index=True) scope: Mapped[dict] = mapped_column(JSON, default=dict) reason: Mapped[str | None] = mapped_column(Text, nullable=True) expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
Разбор полей (колонок):
grant_id: Уникальный идентификатор каждой записи (UUID). Генерируется автоматически.session_id: ID сессии пользователя. Позволяет понять, в рамках какого сеанса выдано разрешение.project_id: Связь с конкретным проектом (может быть пустым).tool_name: Название инструмента, к которому запрашивается доступ (например, "база_данных", "отправка_email").mode: Режим доступа (например, "чтение", "запись" или "админ").scope: Дополнительные параметры в формате JSON. Позволяет хранить сложные настройки доступа в виде словаря.reason: Текстовое описание того, зачем это разрешение было выдано.expires_at: Срок годности разрешения. Если время вышло, доступ аннулируется.created_at: Время создания записи (автоматически ставится текущее время UTC).
SessionContext: агент должен помнить, где он находится

SessionContext — это durable-состояние диалога. Не transcript, не полный лог сообщений и не “вся память агента”, а компактная техническая карточка текущей сессии.
Если AgentTurn отвечает на вопрос “какой запрос сейчас выполняется”, то SessionContext отвечает на вопрос “в какой сцене находится пользователь и агент”.
Например:
какой turn_id сейчас активен;
есть ли незавершенное подтверждение;
какой проект открыт;
какой профиль агента выбран;
какой краткий summary уже построен;
с какого события UI нужно продолжить чтение после reconnect;
какие операции сейчас нельзя запускать параллельно.
То есть SessionContext нужен не для философской “памяти”, а для скучной инженерной магии: закрыли вкладку, обновили страницу, перезапустили backend, worker умер, пользователь вернулся через час — и система все еще понимает, что происходит.
Примерная структура:
class SessionContext(Base): __tablename__ = "session_contexts" session_id: Mapped[str] = mapped_column(String(200), primary_key=True) user_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) active_turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) active_job_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) status: Mapped[str] = mapped_column(String(40), default="idle", index=True) agent_profile: Mapped[str] = mapped_column(String(80), default="default") summary: Mapped[str | None] = mapped_column(Text, nullable=True) pending_approval: Mapped[dict | None] = mapped_column(JSON, nullable=True) event_cursor: Mapped[int] = mapped_column(default=0) context_version: Mapped[int] = mapped_column(default=1) last_user_message_at: Mapped[datetime | None] = mapped_column(nullable=True) last_agent_event_at: Mapped[datetime | None] = mapped_column(nullable=True) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( default=datetime.utcnow, onupdate=datetime.utcnow, )
Поле | Что хранит | Зачем нужно |
|---|---|---|
session_id | ID сессии | Главный ключ состояния диалога |
user_id | Пользователь | Изоляция сессий и прав |
active_project_id | Текущий проект | Понимать рабочий контекст |
active_turn_id | Текущий ход агента | Восстановить незавершенный turn |
agent_profile | Режим/персона/настройки агента | Например, “код-ревьюер”, “переводчик”, “аналитик” |
summary | Сжатая история | Не тащить весь transcript в каждый prompt |
pending_approval | Ожидаемое подтверждение | Не потерять confirm после рестарта |
event_cursor | Последнее доставленное событие | Догнать UI после reconnect |
status | active, waiting_user, running, idle | Быстро понять состояние сессии |
updated_at | Время обновления | Отладка, TTL, чистка старых сессий |
Важно: SessionContext не должен превращаться в помойку. В него не надо складывать весь prompt, все ответы модели, base64 файлов и простыню traceback-ов. Для этого есть AgentEvent, файловое хранилище, blob storage и отдельные job-таблицы.
Хороший SessionContext маленький, скучный и восстанавливаемый.
ProjectContext: агент не должен пилить один проект двумя руками одновременно
Еще одна сущность, которая быстро становится необходимой, — ProjectContext.
Сессия отвечает за диалог. Проект отвечает за рабочую область.
Пользователь может открыть один проект в нескольких вкладках,, потом еще что-то сделать, потом нажать “повторить”. Если система не хранит durable-состояние проекта, два job-а могут одновременно начать менять одни и те же файлы.
И получится не AI-agent, а кибердеревенский комбайн, который одной рукой чинит забор, второй рукой уже его сносит.
сlass ProjectContext(Base): __tablename__ = "project_contexts" project_id: Mapped[uuid.UUID] = mapped_column(primary_key=True) owner_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) active_operation_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) operation_lock: Mapped[dict | None] = mapped_column(JSON, nullable=True) latest_output_file_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) settings: Mapped[dict | None] = mapped_column(JSON, nullable=True) status: Mapped[str] = mapped_column(String(40), default="idle", index=True) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( default=datetime.utcnow, onupdate=datetime.utcnow, )
ProjectContext хранит:
Поле | Что хранит |
|---|---|
project_id | ID проекта |
owner_id | Владелец |
active_operation_id | Текущая тяжелая операция |
operation_lock | Мягкая блокировка проекта |
latest_output_file_id | Последний результат |
settings | Настройки проекта |
status | idle, processing, needs_review, failed |
updated_at | Последнее изменение |
Это не обязательно жесткий database lock. Чаще достаточно прикладной блокировки: “в этом проекте уже идет операция типа workbook_write, вторую такую же не запускаем”.
Например, можно разрешить читать файл и строить preview, но запретить одновременно две операции, которые пишут результат в один и тот же output slot.
BackgroundJob: долгая работа не должна жить внутри HTTP-запроса

Если операция может занять больше пары секунд, она должна стать job-ом.
HTTP-запрос может принять задачу, проверить права, создать AgentTurn, положить BackgroundJob в очередь и вернуть пользователю состояние: “задача принята”. А дальше работает worker.
Класс джоб будет таким
class Job(Base): tablename = “jobs”
class Job(Base): __tablename__ = "operation_jobs" job_id: Mapped[uuid.UUID] = mapped_column(primary_key=True) turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) type: Mapped[str] = mapped_column(String(80), index=True) status: Mapped[str] = mapped_column(String(40), default="queued", index=True) attempt: Mapped[int] = mapped_column(default=0) max_attempts: Mapped[int] = mapped_column(default=3) next_attempt_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True) lease_owner: Mapped[str | None] = mapped_column(String(200), nullable=True, index=True) lease_expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True) progress_seq: Mapped[int] = mapped_column(default=0) input: Mapped[dict | None] = mapped_column(JSON, nullable=True) output: Mapped[dict | None] = mapped_column(JSON, nullable=True) error: Mapped[dict | None] = mapped_column(JSON, nullable=True) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( default=datetime.utcnow, onupdate=datetime.utcnow, )
Поле | Что хранит |
|---|---|
job_id | ID задачи |
turn_id | К какому turn относится |
project_id | В каком проекте выполняется |
type | translate, parse_workbook, render, export |
status | queued, running, completed, failed, cancelled |
attempt | Номер попытки |
max_attempts | Лимит повторов |
next_attempt_at | Когда можно retry |
lease_owner | Какой worker забрал задачу |
lease_expires_at | Когда lease протухает |
progress_seq | Монотонный номер progress-события |
input | Санитизированный input |
output | Ссылка на результат |
error | Классифицированная ошибка |
Ключевой момент — worker не просто берет задачу. Он атомарно claim-ит ее:
UPDATE background_jobs SET status = 'running', lease_owner = :worker_id, lease_expires_at = :now + interval '2 minutes' WHERE job_id = :job_id AND status = 'queued' AND (next_attempt_at IS NULL OR next_attempt_at <= :now);
Если обновилась одна строка — worker владеет задачей. Если ноль строк — кто-то уже забрал.
Lease нужен, потому что worker может умереть. Не “вернуть ошибку”, не “аккуратно завершиться”, а просто исчезнуть. После истечения lease_expires_at другой worker может подобрать задачу и продолжить или перезапустить ее с учетом idempotency.
EventCursor: UI должен догонять события, а не молиться на websocket
Live stream — это приятно, но websocket не является durable state.
Пользователь закрыл ноутбук, сеть моргнула, вкладка перезагрузилась. Если события жили только в памяти процесса, прогресс потерян. Поэтому UI должен читать события из AgentEvent по cursor-у.
Условный сценарий:
UI подписался на события turn-а.
Получил события до event_seq = 42.
Соединение оборвалось.
UI reconnect-ится и говорит: “дай события после 42”.
Backend читает durable event log и отдает 43, 44, 45....
Так интерфейс перестает зависеть от идеальной сети.
class EventCursor(Base): __tablename__ = "event_cursors" cursor_id: Mapped[uuid.UUID] = mapped_column(primary_key=True) session_id: Mapped[str] = mapped_column(String(200), index=True) turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True) consumer_id: Mapped[str] = mapped_column(String(200), index=True) last_event_seq: Mapped[int] = mapped_column(BigInteger, default=0) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( default=datetime.utcnow, onupdate=datetime.utcnow, ) __table_args__ = ( UniqueConstraint( "session_id", "turn_id", "consumer_id", name="uq_event_cursor_consumer", ), )
event_seq лучше делать монотонным внутри turn_id или session_id. Не надо использовать только timestamp: у двух событий может быть одинаковое время, а порядок все равно важен.

Durable payload policy: event log не мусорный бак
Отдельно стоит прописать политику payload-ов.
Почти в каждом агенте рано или поздно появляется соблазн: “давайте просто положим весь JSON в event payload”. Через месяц в event log лежат base64-файлы, токены доступа, гигантские HTML-страницы, персональные данные и ответы модели на 300 килобайт.
Правило простое: durable event должен хранить факт, ссылку и короткий summary, а не весь мир.
Плохо:
{ "event_type": "file_processed", "payload": { "file_base64": "UEsDBBQAAAA...", "openai_api_key": "sk-...", "full_html": "<html>..." } }
Хорошо:
{ "event_type": "file_processed", "payload": { "file_id": "file_123", "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "rows_count": 1842, "result_file_id": "file_456" } }
Payload sanitizer должен вырезать:
секреты;
base64 и бинарные данные;
слишком длинные строки;
полные prompt-ы без необходимости;
персональные данные, если они не нужны для восстановления процесса.
Идеально, если sanitizer применяется централизованно перед записью AgentEvent, а не “по договоренности между разработчиками”. Договоренность живет до первого пятничного hotfix-а.
Все это ложится в один централизованный слой перед добавлением AgentEvent
import json import re from typing import Any SECRET_KEY_RE = re.compile( r"(api[_-]?key|token|secret|password|authorization|cookie|access[_-]?token|refresh[_-]?token)", re.IGNORECASE, ) BASE64_RE = re.compile(r"^[A-Za-z0-9+/]+={0,2}$") class EventPayloadSanitizer: max_depth = 8 max_string_length = 2_000 max_list_items = 100 max_payload_bytes = 32_000 def sanitize(self, payload: dict[str, Any] | None) -> dict[str, Any] | None: if payload is None: return None sanitized = self._sanitize_value(payload, depth=0) encoded = json.dumps(sanitized, ensure_ascii=False, default=str) if len(encoded.encode("utf-8")) > self.max_payload_bytes: return { "summary": "payload_too_large", "original_size_bytes": len(encoded.encode("utf-8")), } return sanitized def _sanitize_value(self, value: Any, depth: int) -> Any: if depth > self.max_depth: return "[redacted:max_depth]" if isinstance(value, dict): result = {} for key, item in value.items(): key_str = str(key) if SECRET_KEY_RE.search(key_str): result[key_str] = "[redacted:secret]" continue result[key_str] = self._sanitize_value(item, depth + 1) return result if isinstance(value, list): items = value[: self.max_list_items] result = [self._sanitize_value(item, depth + 1) for item in items] if len(value) > self.max_list_items: result.append(f"[truncated:{len(value) - self.max_list_items}_items]") return result if isinstance(value, str): return self._sanitize_string(value) return value def _sanitize_string(self, value: str) -> str: if self._looks_like_base64(value): return "[redacted:base64]" if len(value) > self.max_string_length: return value[: self.max_string_length] + f"...[truncated:{len(value)} chars]" return value def _looks_like_base64(self, value: str) -> bool: compact = value.strip() if len(compact) < 256: return False if len(compact) % 4 != 0: return False return bool(BASE64_RE.fullmatch(compact))
Как выглядит путь одного запроса
Соберем все вместе на сценарии: пользователь просит, например, обработать файл и применить проектные настройки.
API принимает запрос и нормализует session_id. Если есть client_turn_id или idempotency_key, система проверяет, не запускали ли такой turn раньше.
AgentService создает AgentTurn и строит TurnPlan. Шаги плана попадают в AgentPlanItem.
ApprovalService оценивает риск. Если нужен confirm, в SessionContext сохраняется pending_approval, а в agent_events появляется approval_requested.
Пользователь подтверждает. Система создает ApprovalGrant с ограниченным scope и снимает pending_approval.
Executor запускает шаг. Для долгой операции создается TranslateJob или WorkbookJob, а turn получает события tool_started и job_queued.
Worker атомарно claim-ит job, выставляет lease_owner и lease_expires_at, обновляет progress_seq и публикует progress.
UI читает live stream. Если вкладка закрылась, после reconnect он догоняет события из БД.
При успехе job получает completed и output_file_id, проект обновляет last_translated_file_id, turn получает финальное событие.
При retryable-ошибке job возвращается в queued с next_attempt_at. При terminal-ошибке сохраняются error и классификация отказа.
В коде контракт между слоями можно выразить вот так
class AgentRequestHandler: def handle(self, request: AgentRequest) -> AgentTurn: session_id = normalize_session_id(request.session_id) with self.db.transaction(): existing_turn = self.turns.find_by_idempotency_key( session_id=session_id, idempotency_key=request.idempotency_key or request.client_turn_id, ) if existing_turn is not None: return existing_turn turn = self.agent_service.create_turn( session_id=session_id, project_id=request.project_id, user_input=request.input, idempotency_key=request.idempotency_key, client_turn_id=request.client_turn_id, ) plan = self.agent_service.build_plan(turn) self.agent_service.save_plan_items(turn, plan) approval = self.approval_service.assess(turn, plan) if approval.required: self.sessions.set_pending_approval( session_id=session_id, approval=approval.to_pending_payload(), ) self.events.write( turn_id=turn.turn_id, session_id=session_id, type="approval_requested", payload=approval.to_event_payload(), ) return turn self.executor.enqueue_ready_steps(turn, plan) return turn
И важная часть дял воркера:
class JobWorker: def run_once(self) -> None: job = self.jobs.claim_next( lease_owner=self.worker_id, lease_seconds=60, ) if job is None: return try: self.events.write( turn_id=job.turn_id, job_id=job.job_id, project_id=job.project_id, type="job_started", payload={"job_id": str(job.job_id), "type": job.type}, ) output = self.execute(job) self.jobs.complete(job.job_id, output=output) self.events.write( turn_id=job.turn_id, job_id=job.job_id, project_id=job.project_id, type="job_completed", payload=output, ) except RetryableJobError as exc: self.jobs.schedule_retry(job.job_id, error=exc.to_payload()) except TerminalJobError as exc: self.jobs.fail(job.job_id, error=exc.to_payload())
Телеграм канал автора, где он что‑то пишет про ML, NLP и разработку
