Во время инференса LLM не выполняет побочных эффектов, вместо этого генерируется последовательность токенов, которые можно интерпретировать как намерение вызвать инструмент. Это напоминает мне ту часть шаблона transactional outbox, в которой намерение сущности (entity) отправить запрос внешней системе записывается в специальную таблицу, а не реализуется сущностью самостоятельно.

В статье приведен proof-of-concept модели выполнения, вдохновленной chat completion, в которой управление возвращается вызывающей стороне при необходимости выполнить побочный эффект.

Механика вызова инструментов LLM

Для начала рассмотрим, как LLM вызывает инструменты.

Вызов инструментов LLM
Вызов инструментов LLM
  1. Описания инструментов (tools) и список сообщений (messages) передаются в LLM.

  2. LLM генерирует ответ, содержащий вызов инструмента (tool call). Каждому вызову присваивается идентификатор.

  3. В список сообщений добавляется сообщение ассистента, содержащее вызов инструмента (tool call), а также результат вызова инструмента (tool message), который ссылается на tool call по идентификатору.

  4. LLM генерирует ответ, взаимодействие продолжается аналогично.

Параллели

  • Вызов инструментов LLM имеет нечто общее с шаблоном transactional outbox. Вместо выполнения побочного эффекта фиксируется только намерение. Реализация этого намерения делегируется вызывающей стороне.

  • С оговорками можно сказать, что состояние ассистента на базе LLM определяется последовательностью сообщений. Event sourcing же определяет состояние сущности через последовательность событий - исторический лог.

  • Инференс чем-то похож на восстановление после сбоя в event sourcing. LLM во время инференса обрабатывает все сообщения из контекста. Сущность при восстановлении применяет все накопившиеся события из исторического лога.

Определенно можно сказать, что параллели притянуты за уши. Тем не менее продолжим и сформулируем правила:

  1. Состояние сущности полностью определяется контекстом выполнения.

  2. Контекст выполнения - это последовательность сообщений.

  3. Запрос выполнения логики сущности является частью контекста выполнения.

  4. Запрос побочного эффекта является частью контекста выполнения.

  5. Запрос побочного эффекта возвращает управление вызывающей стороне.

  6. Выполнение побочного эффекта осуществляется вызывающей стороной.

  7. Результат выполнения побочного эффекта - это ответ на запрос.

  8. Результат выполнения логики сущности - это ответ на запрос.

  9. Ответ ссылается на запрос по идентификатору.

  10. Ответ на запрос является частью контекста выполнения.

Proof-of-concept

Для сравнения с механикой вызова инструментов LLM, предлагаю взглянуть на схематичное изображение концепции execution completion:

Выполнение побочного эффекта
Выполнение побочного эффекта
  1. Список сообщений, определяющий состояние сущности, передается в execution.

  2. Execution возвращает список сообщений, содержащий намерение (запрос) выполнить побочный эффект.

  3. В список сообщений добавляется намерение выполнить побочный эффект и результат его выполнения.

  4. Execution возвращает список сообщений, содержащий следующее намерение, выполнение продолжается аналогично.

Предметная модель

Предметная модель искусственная, игрушечная, но достаточная для демонстрации. Сущности:

  • Пользователь (User)

  • Статья (Article)

  • Комментарий (Comment)

Пользователь может написать статью или комментарий к статье.

Методы __getstate__ и __setstate__

В контексте выполнения предусмотрено специальное сообщение, фиксирующее состояние сущности после обработки запроса. Для того чтобы это работало, сущности реализуют методы __getstate__ и __setstate__. Подробнее об этом будет далее.

Код предметной модели:

from dataclasses import dataclass

from execution_completion.model import Entity


@dataclass
class UserState:
    name: str


class User(Entity):
    def __init__(self, name: str) -> None:
        self.name = name

    def __getstate__(self) -> UserState:
        return UserState(self.name)

    def __setstate__(self, state: UserState) -> None:
        self.name = state.name

    def write_article(self, text: str) -> Article:
        return Article(self, text)

    def write_comment(self, article: Article, text: str) -> Comment:
        comment = Comment(self, text)
        article.add_comment(comment)
        return comment


@dataclass
class ArticleState:
    author: User
    text: str
    comments: list[Comment]


class Article(Entity):
    def __init__(self, author: User, text: str) -> None:
        self.author = author
        self.text = text
        self.comments: list[Comment] = []

    def __getstate__(self) -> ArticleState:
        return ArticleState(self.author, self.text, self.comments)

    def __setstate__(self, state: ArticleState) -> None:
        self.author = state.author
        self.text = state.text
        self.comments = state.comments

    def add_comment(self, comment: Comment) -> None:
        self.comments.append(comment)


@dataclass
class CommentState:
    author: User
    text: str


class Comment(Entity):
    def __init__(self, author: User, text: str) -> None:
        self.author = author
        self.text = text

    def __getstate__(self) -> CommentState:
        return CommentState(self.author, self.text)

    def __setstate__(self, state: CommentState) -> None:
        self.author = state.author
        self.text = state.text

Получение запроса на создание сущности

Для идентификации сообщений используется целочисленный счетчик - поле offset. Ответ (response) имеет поле request_offset, ссылающееся на соответствующий запрос (request). Отправленный запрос имеет поле trace_offset, ссылающееся на запрос, в рамках которого он был отправлен.

Выполнение (execution) - основной компонент, отвечающий за выполнение логики сущности. Метод complete принимает на вход последовательность сообщений и возвращает последовательность сообщений.

def test_create_user_request_received() -> None:
    execution = Execution(User)

    input_messages: list[ContextMessage] = [
        # Получен запрос на создание сущности
        CreateEntityRequestReceived(
            offset=0,
            args=("Yura",),
            kwargs={},
        )
    ]

    output_messages = execution.complete(input_messages)

    assert output_messages == [
        # Отправлен ответ об успешном создании сущности
        CreateEntityResponseSent(
            offset=1,
            request_offset=0,
        ),
        # Зафиксировано состояние сущности
        EntityStateChanged(
            offset=2,
            state=UserState("Yura"),
        ),
    ]

В приведенном примере сущностью получен запрос на создание, в результате отправлен ответ об успешном создании и зафиксировано состояние сущности.

Удаление лишних сообщений из контекста

Для удаления из контекста выполнения обработанных сообщений реализован метод cleanup, возвращающий список удаленных сообщений.

def test_cleanup_execution_context() -> None:
    execution = Execution(User)

    # Т.к. логика детерминирована, выполнение пройдет по тому же пути
    input_messages: list[ContextMessage] = [
        CreateEntityRequestReceived(
            offset=0,
            args=("Yura",),
            kwargs={},
        ),
        CreateEntityResponseSent(
            offset=1,
            request_offset=0,
        ),
        EntityStateChanged(
            offset=2,
            state=UserState("Yura"),
        ),
    ]

    # Входные данные содержат и запрос и ответ, так что новых сообщений нет
    output_messages = execution.complete(input_messages)
    assert output_messages == []

    processed_messages = execution.cleanup()

    assert processed_messages == [
        # Полученный запрос удален из контекста
        CreateEntityRequestReceived(
            offset=0,
            args=("Yura",),
            kwargs={},
        ),
        # Отправленный ответ удален из контекста
        CreateEntityResponseSent(
            offset=1,
            request_offset=0,
        ),
    ]

    # Актуальный контекст выполнения
    assert execution.context == [
        # Зафиксировано состояние сущности
        EntityStateChanged(
            offset=2,
            state=UserState("Yura"),
        ),
    ]

В приведенном примере из контекста были удалены запрос на создание, а также ответ на него. Актуальный контекст выполнения содержит только зафиксированное состояние сущности. Этого достаточно, чтобы восстановить состояние и обработать следующий запрос.

Отправка запроса на создание сущности

def test_create_entity_request_sent() -> None:
    execution = Execution(User)

    input_messages: list[ContextMessage] = [
        # Зафиксировано состояние сущности
        EntityStateChanged(
            offset=2,
            state=UserState("Yura"),
        ),
        # Получен запрос на выполнение метода
        EntityMethodRequestReceived(
            offset=3,
            method=User.write_article,
            args=(),
            kwargs={"text": "Execution completion"},
        ),
    ]

    output_messages = execution.complete(input_messages)

    assert output_messages == [
        # Отправлен запрос на создание сущности типа `Article`
        CreateEntityRequestSent(
            offset=4,
            trace_offset=3,
            entity_type=Article,
            args=(execution.subject, "Execution completion"),
            kwargs={},
        )
    ]

На входе история сущности, содержащая зафиксированное состояние и полученный запрос на выполнение метода write_article. В результате управление вернулось вызывающей стороне с намерением отправить запрос на создание сущности типа Article.

Отправка запроса другой сущности

Предположим, пользователь написал статью, а другой пользователь написал комментарий к этой статье.

def test_entity_method_request_sent() -> None:
    yura = User("Yura")
    article = yura.write_article("Execution completion")

    execution = Execution(User)
    comment = Comment(execution.subject, "Bullshit")

    input_messages: list[ContextMessage] = [
        # Зафиксировано состояние
        EntityStateChanged(
            offset=31337,
            state=UserState("Guru"),
        ),
        # Получен запрос на выполнение метода `write_comment`
        EntityMethodRequestReceived(
            offset=31338,
            method=User.write_comment,
            args=(article,),
            kwargs={"text": "Bullshit"},
        ),
        # Отправлен запрос на создание сущности типа `Comment`
        CreateEntityRequestSent(
            offset=31339,
            trace_offset=31338,
            entity_type=Comment,
            args=(execution.subject, "Bullshit"),
            kwargs={},
        ),
        # Получен ответ о создании сущности
        CreateEntityResponseReceived(
            offset=31340,
            request_offset=31339,
            response=comment,
        ),
    ]

    output_messages = execution.complete(input_messages)

    assert output_messages == [
        # Отправлен запрос на выполнение метода `article.add_comment`
        EntityMethodRequestSent(
            offset=31341,
            trace_offset=31338,
            receiver=article,
            method=Article.add_comment,
            args=(comment,),
            kwargs={},
        )
    ]

На входе история сущности, содержащая зафиксированное состояние, полученный запрос на выполнение метода write_comment, отправленный запрос на создание сущности типа Comment и полученный ответ на этот запрос. В результате управление вернулось вызывающей стороне с намерением отправить запрос сущности article на выполнение метода add_comment.

Принципы работы

Сохранение и восстановление состояния сущности

Стандартная библиотека pickle использует методы __getstate__ и __setstate__ для сохранения и восстановления состояния объекта. Библиотека предусматривает, что методы могут быть не реализованы, но о том, что делается в каждом частном случае лучше прочитать в соответствующем разделе документации. Из библиотеки позаимствован упрощенный механизм работы с этими методами, но сама библиотека не используется.

В качестве разминки для пальцев сохраним и восстановим состояние счетчика:

class Counter:
    def __init__(self, value: int) -> None:
        self.value = value
    
    def __getstate__(self) -> int:
        return self.value
    
    def __setstate__(self, state: int) -> None:
        self.value = state


def test_restore_counter_state() -> None:
    counter = Counter(10)
    state = counter.__getstate__()
    del counter
    counter = Counter.__new__(Counter)
    counter.__setstate__(state)
    assert counter.value == 10
  1. При сохранении на существующем объекте вызывается метод __getstate__, возвращающий то самое состояние (state), которое можно хранить вне самого объекта.

  2. При восстановлении на классе вызывается метод __new__, выделяющий ресурсы под объект, но пропускается инициализация методом __init__.

  3. На неинициализированном объекте вызывается метод __setstate__, в который передается сохраненное ранее состояние.

Ключевой момент - это пропуск инициализации методом __init__. Именно это происходит, когда входные сообщения содержат сообщения типа EntityStateChanged.

Перехват побочных эффектов

Для перехвата побочных эффектов без изменения исходного кода предметной модели используется monkey-patching. На базовом классе Entity подменяются методы __new__ (для перехвата создания сущности) и __getattribute__ (для перехвата вызова методов).

Приостановка и возобновление выполнения

В современном Python для приостановки выполнения в ожидании ответа используется asyncio. Однако, я очень хотел оставить методы сущностей синхронными и использовал библиотеку greenlet, которая позволяет обернуть синхронные функции в легковесные корутины, а также явно переключаться между ними, передавая данные.

Работает это таким образом:

class Sender:
    def __init__(self, receiver: Receiver) -> None:
        self.receiver = receiver

    def send(self, message: str) -> str:
        return self.receiver.receive(message)


class Receiver:
    def __init__(self) -> None:
        self.messages: list[str] = []

    def receive(self, message: str) -> str:
        self.messages.append(message)
        return f"Received: {message!r}"


@dataclass
class EntityMethodRequestSent:
    receiver: Receiver
    method: Callable[..., Any]
    args: tuple[Any, ...]
    kwargs: dict[str, Any]


def test_suspend_and_resume_execution() -> None:
    receiver = Receiver()
    sender = Sender(receiver)

    def patched_receive(
        self: Receiver, *args: Any, **kwargs: Any
    ) -> Any:
        # Переключаемся в основной greenlet 
        # с намерением выполнить побочный эффект,
        # возвращаем ответ при переключении обратно.
        return main_greenlet.switch(
            EntityMethodRequestSent(
                receiver=self,
                method=not_patched_receive,
                args=args,
                kwargs=kwargs,
            )
        )

    # Оборачиваем метод отправителя в легковесную корутину
    main_greenlet = greenlet.getcurrent()
    send_greenlet = greenlet(sender.send)

    # Патчим метод получателя
    not_patched_receive = Receiver.receive
    setattr(Receiver, "receive", patched_receive)
    try:
        # Переключаемся в Sender.send, который переключается
        # обратно с намерением вызвать Receiver.receive
        output_message = send_greenlet.switch("Hello!")
    finally:
        setattr(Receiver, "receive", not_patched_receive)

    assert receiver.messages == []
    assert output_message == EntityMethodRequestSent(
        receiver=receiver,
        method=Receiver.receive,
        args=("Hello!",),
        kwargs={},
    )

    # Выполняем побочный эффект на вызывающей стороне
    receive_result = receiver.receive(
        *output_message.args,
        **output_message.kwargs,
    )

    # Переключаемся в Sender.send с результатом,
    # чтобы продолжить выполнение
    send_result = send_greenlet.switch(receive_result)

    assert send_greenlet.dead
    assert send_result == "Received: 'Hello!'"

Алгоритмы

Исходный код доступен на GitHub: https://github.com/returnnullptr/execution-completion

Я позволю себе ограничиться лишь словесным описанием основных алгоритмов.

Алгоритм работы метода complete

Алгоритм постоянно пытается выдать сообщения на выход, но при наличии следующего входного сообщения сверяется, что все идет по запланированному сценарию. Оставшиеся выходные сообщения (которых нет во входных сообщениях) возвращаются.

Во время выполнения поддерживаются в консистентном состоянии:

  • Экземпляр сущности (subject): инициализация, сохранение и восстановление состояния.

  • Отображение (mapping) из offset отправленного запроса в greenlet, приостановленный в ожидании ответа на этот запрос.

  • Контекст выполнения: новые сообщения добавляются в контекст автоматически.

Экземпляр Execution может быть переиспользован, чтобы продолжить выполнение.

Алгоритм
1. Проверяем, что список входных сообщений начинается с сообщений контекста выполнения и содержит их все. Если это не так, то продолжить выполнение невозможно.
2. Итерируемся по новым входным сообщениям и разбираемся с каждым по-порядку.
    1. Если сообщение - полученный запрос:
        1. Оборачиваем запрашиваемый метод в greenlet.
        2. Добавляем сообщение в контекст.
        3. Применяем monkey-patching для перехвата побочных эффектов.
        4. Переключаемся в greenlet.
        5. Если greenlet приостанановлен отправкой запроса:
            1. Добавляем отправленный запрос в очередь для выходных сообщений.
            2. Связываем отправленный запрос с приостановленным greenlet.
        6. Если выполнение greenlet завершается возвратом результата:
            1. Добавляем ответ в очередь для выходных сообщений.
            2. Получаем состояние сущности методом __getstate__.
            3. Добавляем факт изменения состояния в очередь для выходных сообщений.
    2. Если сообщение - отправленный запрос:
        1. Забираем сообщение из очереди для выходных сообщений.
        2. Проверяем, что выполнение идет по запланированному сценарию. Если это не так, то продолжить выполнение невозможно.
        3. Добавляем сообщение в контекст.
    3. Если сообщение - полученный ответ:
        1. Получаем приостановленный greenlet по request_offset.
        2. Добавляем сообщение в контекст.
        3. Применяем monkey-patching для перехвата побочных эффектов.
        4. Переключаемся в greenlet.
        5. Если greenlet приостанановлен отправкой запроса:
            1. Добавляем отправленный запрос в очередь для выходных сообщений.   
            2. Связываем отправленный запрос с приостановленным greenlet.          
        6. Если выполнение greenlet завершается возвратом результата:
            1. Добавляем ответ в очередь для выходных сообщений.
            2. Получаем состояние сущности методом __getstate__.
            3. Добавляем факт изменения состояния в очередь для выходных сообщений.
    4. Если сообщение - отправленный ответ:
        1. Забираем сообщение из очереди для выходных сообщений.
        2. Проверяем, что выполнение идет по запланированному сценарию. Если это не так, то продолжить выполнение невозможно.
        3. Добавляем сообщение в контекст.
    5. Если сообщение - факт изменения состояния:
        1. Если очередь для выходных сообщений не пуста:
            1. Забираем сообщение из очереди для выходных сообщений.
            2. Проверяем, что выполнение идет по запланированному сценарию. Если это не так, то продолжить выполнение невозможно.
        2. Восстанавливаем сущность в сохраненном состоянии методом __setstate__.
        3. Добавляем сообщение в контекст.
3. Пополняем контекст сообщениями из очереди для выходных сообщений.   
4. Возвращаем выходные сообщения списком.

Алгоритм работы метода cleanup

После обработки запросов в контексте остаются сообщения, которые не влияют на конечное состояние сущности:

  • Полученный запрос, на который был отправлен ответ.

  • Запрос, отправленный в рамках обработанного запроса.

  • Ответ, полученный в рамках обработанного запроса.

  • Отправленный ответ.

  • Идущие подряд сообщения об изменении состояния.

Алгоритм
1. Обходим контекст в обратном порядке, чтобы найти:
    1. Отправленные ответы.
    2. Запросы, на которые ссылаются эти ответы.
    3. Запросы, отправленные в рамках обработанных запросов.
2. Обходим контекст в прямом порядке, чтобы найти ответы, полученные в рамках обработанных запросов.
3. Обходим контекст в прямом порядке, чтобы найти факты изменения состояния, между которыми все сообщения обработаны.
4. Обходим контекст в прямом порядке, чтобы разделить его на два списка с сохранением порядка сообщений:
    1. Необработанные сообщения.
    2. Обработанные сообщения.
5. Заменяем контекст выполнения списком необработанных сообщений.
6. Возвращаем обработанные сообщения.

Ограничения

  1. Сущности должны реализовывать методы __getstate__ и __setstate__.

  2. Изменение исходного кода сущностей может привести к невозможности воспроизведения записанной истории.

  3. Допустима только детерминированная логика, без генерации случайных чисел или отправки HTTP запросов.

  4. Предметная модель монолитна, разделение на ограниченные контексты не предусмотрено.

  5. Для выполнения побочных эффектов требуется отдельный механизм.

  6. Требуется отдельный механизм сериализации и десериализации сообщений контекста с сохранением целостности ссылок на объекты.

  7. Для каждой сущности должна быть предусмотрена последовательная очередь сообщений на вход.

  8. Асинхронная обработка двух запросов возможна (когда первый приостановлен, а второй получен), но стандартные примитивы синхронизации не применимы.

Бонус

Соорудив механизм коммуникации между сущностями, можно записать всю историю их взаимодействия и визуализировать происходящее в предметной модели:

Послесловие

На самом деле, статья написана ради последних трех слов.

Если читая статью вы готовили возражения, осознавая ограничения подобного решения, но все-таки оценили проведенный эксперимент, вероятно, вы либо знаете человека, которого я ищу, либо сами им являетесь. Если в вашей уютной команде единомышленников есть место для еще одного пытливого ума, если вы используете event-sourcing, DDD, clean-architecture, port-adapters, CQRS, EDA и прочее кунг-фу, я сочту за честь работать с вами, перенимая опыт. Напишите мне. Я ищу наставника.