Во время инференса LLM не выполняет побочных эффектов, вместо этого генерируется последовательность токенов, которые можно интерпретировать как намерение вызвать инструмент. Это напоминает мне ту часть шаблона transactional outbox, в которой намерение сущности (entity) отправить запрос внешней системе записывается в специальную таблицу, а не реализуется сущностью самостоятельно.
В статье приведен proof-of-concept модели выполнения, вдохновленной chat completion, в которой управление возвращается вызывающей стороне при необходимости выполнить побочный эффект.
Механика вызова инструментов LLM
Для начала рассмотрим, как LLM вызывает инструменты.

Описания инструментов (tools) и список сообщений (messages) передаются в LLM.
LLM генерирует ответ, содержащий вызов инструмента (tool call). Каждому вызову присваивается идентификатор.
В список сообщений добавляется сообщение ассистента, содержащее вызов инструмента (tool call), а также результат вызова инструмента (tool message), который ссылается на tool call по идентификатору.
LLM генерирует ответ, взаимодействие продолжается аналогично.
Параллели
Вызов инструментов LLM имеет нечто общее с шаблоном transactional outbox. Вместо выполнения побочного эффекта фиксируется только намерение. Реализация этого намерения делегируется вызывающей стороне.
С оговорками можно сказать, что состояние ассистента на базе LLM определяется последовательностью сообщений. Event sourcing же определяет состояние сущности через последовательность событий - исторический лог.
Инференс чем-то похож на восстановление после сбоя в event sourcing. LLM во время инференса обрабатывает все сообщения из контекста. Сущность при восстановлении применяет все накопившиеся события из исторического лога.
Определенно можно сказать, что параллели притянуты за уши. Тем не менее продолжим и сформулируем правила:
Состояние сущности полностью определяется контекстом выполнения.
Контекст выполнения - это последовательность сообщений.
Запрос выполнения логики сущности является частью контекста выполнения.
Запрос побочного эффекта является частью контекста выполнения.
Запрос побочного эффекта возвращает управление вызывающей стороне.
Выполнение побочного эффекта осуществляется вызывающей стороной.
Результат выполнения побочного эффекта - это ответ на запрос.
Результат выполнения логики сущности - это ответ на запрос.
Ответ ссылается на запрос по идентификатору.
Ответ на запрос является частью контекста выполнения.
Proof-of-concept
Для сравнения с механикой вызова инструментов LLM, предлагаю взглянуть на схематичное изображение концепции execution completion:

Список сообщений, определяющий состояние сущности, передается в execution.
Execution возвращает список сообщений, содержащий намерение (запрос) выполнить побочный эффект.
В список сообщений добавляется намерение выполнить побочный эффект и результат его выполнения.
Execution возвращает список сообщений, содержащий следующее намерение, выполнение продолжается аналогично.
Предметная модель
Предметная модель искусственная, игрушечная, но достаточная для демонстрации. Сущности:
Пользователь (User)
Статья (Article)
Комментарий (Comment)
Пользователь может написать статью или комментарий к статье.
Методы __getstate__ и __setstate__
В контексте выполнения предусмотрено специальное сообщение, фиксирующее состояние сущности после обработки запроса. Для того чтобы это работало, сущности реализуют методы __getstate__ и __setstate__. Подробнее об этом будет далее.
Код предметной модели:
from dataclasses import dataclass from execution_completion.model import Entity @dataclass class UserState: name: str class User(Entity): def __init__(self, name: str) -> None: self.name = name def __getstate__(self) -> UserState: return UserState(self.name) def __setstate__(self, state: UserState) -> None: self.name = state.name def write_article(self, text: str) -> Article: return Article(self, text) def write_comment(self, article: Article, text: str) -> Comment: comment = Comment(self, text) article.add_comment(comment) return comment @dataclass class ArticleState: author: User text: str comments: list[Comment] class Article(Entity): def __init__(self, author: User, text: str) -> None: self.author = author self.text = text self.comments: list[Comment] = [] def __getstate__(self) -> ArticleState: return ArticleState(self.author, self.text, self.comments) def __setstate__(self, state: ArticleState) -> None: self.author = state.author self.text = state.text self.comments = state.comments def add_comment(self, comment: Comment) -> None: self.comments.append(comment) @dataclass class CommentState: author: User text: str class Comment(Entity): def __init__(self, author: User, text: str) -> None: self.author = author self.text = text def __getstate__(self) -> CommentState: return CommentState(self.author, self.text) def __setstate__(self, state: CommentState) -> None: self.author = state.author self.text = state.text
Получение запроса на создание сущности
Для идентификации сообщений используется целочисленный счетчик - поле offset. Ответ (response) имеет поле request_offset, ссылающееся на соответствующий запрос (request). Отправленный запрос имеет поле trace_offset, ссылающееся на запрос, в рамках которого он был отправлен.
Выполнение (execution) - основной компонент, отвечающий за выполнение логики сущности. Метод complete принимает на вход последовательность сообщений и возвращает последовательность сообщений.
def test_create_user_request_received() -> None: execution = Execution(User) input_messages: list[ContextMessage] = [ # Получен запрос на создание сущности CreateEntityRequestReceived( offset=0, args=("Yura",), kwargs={}, ) ] output_messages = execution.complete(input_messages) assert output_messages == [ # Отправлен ответ об успешном создании сущности CreateEntityResponseSent( offset=1, request_offset=0, ), # Зафиксировано состояние сущности EntityStateChanged( offset=2, state=UserState("Yura"), ), ]
В приведенном примере сущностью получен запрос на создание, в результате отправлен ответ об успешном создании и зафиксировано состояние сущности.
Удаление лишних сообщений из контекста
Для удаления из контекста выполнения обработанных сообщений реализован метод cleanup, возвращающий список удаленных сообщений.
def test_cleanup_execution_context() -> None: execution = Execution(User) # Т.к. логика детерминирована, выполнение пройдет по тому же пути input_messages: list[ContextMessage] = [ CreateEntityRequestReceived( offset=0, args=("Yura",), kwargs={}, ), CreateEntityResponseSent( offset=1, request_offset=0, ), EntityStateChanged( offset=2, state=UserState("Yura"), ), ] # Входные данные содержат и запрос и ответ, так что новых сообщений нет output_messages = execution.complete(input_messages) assert output_messages == [] processed_messages = execution.cleanup() assert processed_messages == [ # Полученный запрос удален из контекста CreateEntityRequestReceived( offset=0, args=("Yura",), kwargs={}, ), # Отправленный ответ удален из контекста CreateEntityResponseSent( offset=1, request_offset=0, ), ] # Актуальный контекст выполнения assert execution.context == [ # Зафиксировано состояние сущности EntityStateChanged( offset=2, state=UserState("Yura"), ), ]
В приведенном примере из контекста были удалены запрос на создание, а также ответ на него. Актуальный контекст выполнения содержит только зафиксированное состояние сущности. Этого достаточно, чтобы восстановить состояние и обработать следующий запрос.
Отправка запроса на создание сущности
def test_create_entity_request_sent() -> None: execution = Execution(User) input_messages: list[ContextMessage] = [ # Зафиксировано состояние сущности EntityStateChanged( offset=2, state=UserState("Yura"), ), # Получен запрос на выполнение метода EntityMethodRequestReceived( offset=3, method=User.write_article, args=(), kwargs={"text": "Execution completion"}, ), ] output_messages = execution.complete(input_messages) assert output_messages == [ # Отправлен запрос на создание сущности типа `Article` CreateEntityRequestSent( offset=4, trace_offset=3, entity_type=Article, args=(execution.subject, "Execution completion"), kwargs={}, ) ]
На входе история сущности, содержащая зафиксированное состояние и полученный запрос на выполнение метода write_article. В результате управление вернулось вызывающей стороне с намерением отправить запрос на создание сущности типа Article.
Отправка запроса другой сущности
Предположим, пользователь написал статью, а другой пользователь написал комментарий к этой статье.
def test_entity_method_request_sent() -> None: yura = User("Yura") article = yura.write_article("Execution completion") execution = Execution(User) comment = Comment(execution.subject, "Bullshit") input_messages: list[ContextMessage] = [ # Зафиксировано состояние EntityStateChanged( offset=31337, state=UserState("Guru"), ), # Получен запрос на выполнение метода `write_comment` EntityMethodRequestReceived( offset=31338, method=User.write_comment, args=(article,), kwargs={"text": "Bullshit"}, ), # Отправлен запрос на создание сущности типа `Comment` CreateEntityRequestSent( offset=31339, trace_offset=31338, entity_type=Comment, args=(execution.subject, "Bullshit"), kwargs={}, ), # Получен ответ о создании сущности CreateEntityResponseReceived( offset=31340, request_offset=31339, response=comment, ), ] output_messages = execution.complete(input_messages) assert output_messages == [ # Отправлен запрос на выполнение метода `article.add_comment` EntityMethodRequestSent( offset=31341, trace_offset=31338, receiver=article, method=Article.add_comment, args=(comment,), kwargs={}, ) ]
На входе история сущности, содержащая зафиксированное состояние, полученный запрос на выполнение метода write_comment, отправленный запрос на создание сущности типа Comment и полученный ответ на этот запрос. В результате управление вернулось вызывающей стороне с намерением отправить запрос сущности article на выполнение метода add_comment.
Принципы работы
Сохранение и восстановление состояния сущности
Стандартная библиотека pickle использует методы __getstate__ и __setstate__ для сохранения и восстановления состояния объекта. Библиотека предусматривает, что методы могут быть не реализованы, но о том, что делается в каждом частном случае лучше прочитать в соответствующем разделе документации. Из библиотеки позаимствован упрощенный механизм работы с этими методами, но сама библиотека не используется.
В качестве разминки для пальцев сохраним и восстановим состояние счетчика:
class Counter: def __init__(self, value: int) -> None: self.value = value def __getstate__(self) -> int: return self.value def __setstate__(self, state: int) -> None: self.value = state def test_restore_counter_state() -> None: counter = Counter(10) state = counter.__getstate__() del counter counter = Counter.__new__(Counter) counter.__setstate__(state) assert counter.value == 10
При сохранении на существующем объекте вызывается метод
__getstate__, возвращающий то самое состояние (state), которое можно хранить вне самого объекта.При восстановлении на классе вызывается метод
__new__, выделяющий ресурсы под объект, но пропускается инициализация методом__init__.На неинициализированном объекте вызывается метод
__setstate__, в который передается сохраненное ранее состояние.
Ключевой момент - это пропуск инициализации методом __init__. Именно это происходит, когда входные сообщения содержат сообщения типа EntityStateChanged.
Перехват побочных эффектов
Для перехвата побочных эффектов без изменения исходного кода предметной модели используется monkey-patching. На базовом классе Entity подменяются методы __new__ (для перехвата создания сущности) и __getattribute__ (для перехвата вызова методов).
Приостановка и возобновление выполнения
В современном Python для приостановки выполнения в ожидании ответа используется asyncio. Однако, я очень хотел оставить методы сущностей синхронными и использовал библиотеку greenlet, которая позволяет обернуть синхронные функции в легковесные корутины, а также явно переключаться между ними, передавая данные.
Работает это таким образом:
class Sender: def __init__(self, receiver: Receiver) -> None: self.receiver = receiver def send(self, message: str) -> str: return self.receiver.receive(message) class Receiver: def __init__(self) -> None: self.messages: list[str] = [] def receive(self, message: str) -> str: self.messages.append(message) return f"Received: {message!r}" @dataclass class EntityMethodRequestSent: receiver: Receiver method: Callable[..., Any] args: tuple[Any, ...] kwargs: dict[str, Any] def test_suspend_and_resume_execution() -> None: receiver = Receiver() sender = Sender(receiver) def patched_receive( self: Receiver, *args: Any, **kwargs: Any ) -> Any: # Переключаемся в основной greenlet # с намерением выполнить побочный эффект, # возвращаем ответ при переключении обратно. return main_greenlet.switch( EntityMethodRequestSent( receiver=self, method=not_patched_receive, args=args, kwargs=kwargs, ) ) # Оборачиваем метод отправителя в легковесную корутину main_greenlet = greenlet.getcurrent() send_greenlet = greenlet(sender.send) # Патчим метод получателя not_patched_receive = Receiver.receive setattr(Receiver, "receive", patched_receive) try: # Переключаемся в Sender.send, который переключается # обратно с намерением вызвать Receiver.receive output_message = send_greenlet.switch("Hello!") finally: setattr(Receiver, "receive", not_patched_receive) assert receiver.messages == [] assert output_message == EntityMethodRequestSent( receiver=receiver, method=Receiver.receive, args=("Hello!",), kwargs={}, ) # Выполняем побочный эффект на вызывающей стороне receive_result = receiver.receive( *output_message.args, **output_message.kwargs, ) # Переключаемся в Sender.send с результатом, # чтобы продолжить выполнение send_result = send_greenlet.switch(receive_result) assert send_greenlet.dead assert send_result == "Received: 'Hello!'"
Алгоритмы
Исходный код доступен на GitHub: https://github.com/returnnullptr/execution-completion
Я позволю себе ограничиться лишь словесным описанием основных алгоритмов.
Алгоритм работы метода complete
Алгоритм постоянно пытается выдать сообщения на выход, но при наличии следующего входного сообщения сверяется, что все идет по запланированному сценарию. Оставшиеся выходные сообщения (которых нет во входных сообщениях) возвращаются.
Во время выполнения поддерживаются в консистентном состоянии:
Экземпляр сущности (subject): инициализация, сохранение и восстановление состояния.
Отображение (mapping) из
offsetотправленного запроса вgreenlet, приостановленный в ожидании ответа на этот запрос.Контекст выполнения: новые сообщения добавляются в контекст автоматически.
Экземпляр Execution может быть переиспользован, чтобы продолжить выполнение.
Алгоритм
1. Проверяем, что список входных сообщений начинается с сообщений контекста выполнения и содержит их все. Если это не так, то продолжить выполнение невозможно. 2. Итерируемся по новым входным сообщениям и разбираемся с каждым по-порядку. 1. Если сообщение - полученный запрос: 1. Оборачиваем запрашиваемый метод в greenlet. 2. Добавляем сообщение в контекст. 3. Применяем monkey-patching для перехвата побочных эффектов. 4. Переключаемся в greenlet. 5. Если greenlet приостанановлен отправкой запроса: 1. Добавляем отправленный запрос в очередь для выходных сообщений. 2. Связываем отправленный запрос с приостановленным greenlet. 6. Если выполнение greenlet завершается возвратом результата: 1. Добавляем ответ в очередь для выходных сообщений. 2. Получаем состояние сущности методом __getstate__. 3. Добавляем факт изменения состояния в очередь для выходных сообщений. 2. Если сообщение - отправленный запрос: 1. Забираем сообщение из очереди для выходных сообщений. 2. Проверяем, что выполнение идет по запланированному сценарию. Если это не так, то продолжить выполнение невозможно. 3. Добавляем сообщение в контекст. 3. Если сообщение - полученный ответ: 1. Получаем приостановленный greenlet по request_offset. 2. Добавляем сообщение в контекст. 3. Применяем monkey-patching для перехвата побочных эффектов. 4. Переключаемся в greenlet. 5. Если greenlet приостанановлен отправкой запроса: 1. Добавляем отправленный запрос в очередь для выходных сообщений. 2. Связываем отправленный запрос с приостановленным greenlet. 6. Если выполнение greenlet завершается возвратом результата: 1. Добавляем ответ в очередь для выходных сообщений. 2. Получаем состояние сущности методом __getstate__. 3. Добавляем факт изменения состояния в очередь для выходных сообщений. 4. Если сообщение - отправленный ответ: 1. Забираем сообщение из очереди для выходных сообщений. 2. Проверяем, что выполнение идет по запланированному сценарию. Если это не так, то продолжить выполнение невозможно. 3. Добавляем сообщение в контекст. 5. Если сообщение - факт изменения состояния: 1. Если очередь для выходных сообщений не пуста: 1. Забираем сообщение из очереди для выходных сообщений. 2. Проверяем, что выполнение идет по запланированному сценарию. Если это не так, то продолжить выполнение невозможно. 2. Восстанавливаем сущность в сохраненном состоянии методом __setstate__. 3. Добавляем сообщение в контекст. 3. Пополняем контекст сообщениями из очереди для выходных сообщений. 4. Возвращаем выходные сообщения списком.
Алгоритм работы метода cleanup
После обработки запросов в контексте остаются сообщения, которые не влияют на конечное состояние сущности:
Полученный запрос, на который был отправлен ответ.
Запрос, отправленный в рамках обработанного запроса.
Ответ, полученный в рамках обработанного запроса.
Отправленный ответ.
Идущие подряд сообщения об изменении состояния.
Алгоритм
1. Обходим контекст в обратном порядке, чтобы найти: 1. Отправленные ответы. 2. Запросы, на которые ссылаются эти ответы. 3. Запросы, отправленные в рамках обработанных запросов. 2. Обходим контекст в прямом порядке, чтобы найти ответы, полученные в рамках обработанных запросов. 3. Обходим контекст в прямом порядке, чтобы найти факты изменения состояния, между которыми все сообщения обработаны. 4. Обходим контекст в прямом порядке, чтобы разделить его на два списка с сохранением порядка сообщений: 1. Необработанные сообщения. 2. Обработанные сообщения. 5. Заменяем контекст выполнения списком необработанных сообщений. 6. Возвращаем обработанные сообщения.
Ограничения
Сущности должны реализовывать методы
__getstate__и__setstate__.Изменение исходного кода сущностей может привести к невозможности воспроизведения записанной истории.
Допустима только детерминированная логика, без генерации случайных чисел или отправки HTTP запросов.
Предметная модель монолитна, разделение на ограниченные контексты не предусмотрено.
Для выполнения побочных эффектов требуется отдельный механизм.
Требуется отдельный механизм сериализации и десериализации сообщений контекста с сохранением целостности ссылок на объекты.
Для каждой сущности должна быть предусмотрена последовательная очередь сообщений на вход.
Асинхронная обработка двух запросов возможна (когда первый приостановлен, а второй получен), но стандартные примитивы синхронизации не применимы.
Бонус
Соорудив механизм коммуникации между сущностями, можно записать всю историю их взаимодействия и визуализировать происходящее в предметной модели:

Послесловие
На самом деле, статья написана ради последних трех слов.
Если читая статью вы готовили возражения, осознавая ограничения подобного решения, но все-таки оценили проведенный эксперимент, вероятно, вы либо знаете человека, которого я ищу, либо сами им являетесь. Если в вашей уютной команде единомышленников есть место для еще одного пытливого ума, если вы используете event-sourcing, DDD, clean-architecture, port-adapters, CQRS, EDA и прочее кунг-фу, я сочту за честь работать с вами, перенимая опыт. Напишите мне. Я ищу наставника.
