Привет, Хабр! Меня зовут Владимир и это продолжение статьи про разработку локального кодер-агента.

В первой части мы создали инфраструктуру: подняли контейнер с моделью, настроили Langfuse для трассировки и написали простейшего агента с доступом к MCP-инструментам.

Во второй части мы добавили агенту мозги: планировщик, оценщики, защиту от зацикливания и суммаризацию контекста.

Сегодня мы превратим этот набор в работающий граф. Добавим чекпоинтеры, прерывания и доработаем интерфейс под обновлённую архитектуру.

Сборка графа

Доработку графа начнем с конструктора - в него необходимо добавить чекпоинтер. Дополнительно, для упорядочивания трассировок добавим session_id к LangFuse. Его будем формировать на основе текущей даты и времени, для чего в конструктор добавим временную метку создания экземпляра агента:

class MCPAgent:
    def __init__(self):
        self.llm_with_tools: ChatOpenAI | None = None
        self.tools: list[BaseTool] | None = None
        self.graph = None
        self.checkpointer = InMemorySaver()
        self.lf_handler = CallbackHandler(public_key=settings.langfuse.public_key)
        self.init_time = datetime.now().strftime('%Y%m%d_%H%M%S')

Инициализация графа не менялась - инициализируем инструменты, биндим на LLM и собираем граф (код далее)

class MCPAgent:
    # Предыдущий код
    async def init_graph(self):
        try:
            self.tools = await init_tools()
        except Exception as e:
            logger.error(f'Ошибка инициализации инструментов: {e}')
            raise
        self.llm_with_tools = settings.llm.chat.llm.bind_tools(self.tools, parallel_tool_calls=False)
        self.graph = self._compile_graph()

Перейдём к условным рёбрам. Начнём с рёбер маршрутиризации подтверждения плана и выполнения шага. В них проверяем флаг is_approved. Если подтверждение есть, то переходим к исполнению плана или суммаризации соответственно. Если нет, то возвращаем на доработку.

class MCPAgent:
    # Предыдущий код    
    @staticmethod
    def need_adjust_plan_router(state: AgentState) -> Literal['injector', 'planer']:
        if state.get('is_approved', False):
            return 'injector'
        return 'planer'

    @staticmethod
    def need_modify_step_router(state: AgentState) -> Literal['agent_node', 'compressor']:
        if state.get('is_approved', False):
            return 'compressor'
        return 'agent_node'

Теперь маршрутизатор перехода по шагам. В нём мы должны проверить, что план ещё в процессе выполнения. Проверяем сравнением номера шага и длины плана

class MCPAgent:
    # Предыдущий код
    @staticmethod
    def next_step_router(state: AgentState) -> Literal['injector', 'finalizer']:
        if state['current_step'] < len(state['plan']):
            return 'injector'
        return 'finalizer'

Соберём граф. Инициализируем узлы и соединим их согласно схеме из второй части статьи:

class MCPAgent:
    # Предыдущий код
    def _compile_graph(self):
        workflow = StateGraph(AgentState)

        workflow.add_node('agent_node', AgentNode(llm=self.llm_with_tools).node)
        workflow.add_node('compressor', ContextCompressorNode().node)
        workflow.add_node('finalizer', FinalizerNode(llm=settings.llm.chat.llm).node)
        workflow.add_node('planer', PlanerNode(llm=settings.llm.chat.llm).node)
        workflow.add_node('plan_solver', PlanSolverNode().node)
        workflow.add_node('agent_solver', AgentSolverNode().node)
        workflow.add_node('injector', StepInjectorNode().node)
        workflow.add_node('tools', ToolNode(self.tools))

        workflow.set_entry_point(key='planer')
        workflow.add_edge(start_key='planer', end_key='plan_solver')
        workflow.add_conditional_edges(source='plan_solver', path=self.need_adjust_plan_router)
        workflow.add_edge(start_key='injector', end_key='agent_node')
        workflow.add_conditional_edges(source='agent_node', path=self.agent_router)
        workflow.add_edge(start_key='tools', end_key='agent_node')
        workflow.add_conditional_edges(source='agent_solver', path=self.need_modify_step_router)
        workflow.add_conditional_edges(source='compressor', path=self.next_step_router)
        workflow.set_finish_point(key='finalizer')
        graph = workflow.compile(
            checkpointer=self.checkpointer,
            interrupt_after=['planer'],
            interrupt_before=['agent_solver'],
        )
        return graph

В компилятор передаём два останова - после формирования плана и до "решателя" узла-агента.

Для того, чтобы граф работал с остановами, нам надо два метода - один для первоначального запуска графа, второй - для продолжения общения. Начнем с запуска:

class MCPAgent:
    # Предыдущий код
    async def run(self, user_messages: str, request_id: str | None = None) -> dict[str, Any]:
        self._check_graph_available()
        trace_id = Langfuse.create_trace_id()
        return await self._ainvoke_with_tracing(
            data={'user_request': user_messages, 'user_input': user_messages, 'trace_id': trace_id},
            request_id=request_id, trace_id=trace_id, span_name='agent_run')

Служебные методы разберу позже, пока сам алгоритм. Для начала проверяем, что не забыли инициализировать граф. Далее формируем trace_id для Langfuse средствами Langfuse. Без передачи trace_id Langfuse будет создавать отдельные трейсы для каждого возврата в граф. Ну и вызываем служебный метод вызова LLM.

Теперь метод возобновления общения:

class MCPAgent:
    # Предыдущий код
    async def resume(self, user_messages: str, request_id: str | None = None) -> dict[str, Any]:
        self._check_graph_available()
        trace_id = self._get_trace_id(request_id)
        return await self._ainvoke_with_tracing(
            data=Command(update={'user_input': user_messages}),
            request_id=request_id, trace_id=trace_id, span_name='agent_resume')

Основные отличия - номер трейса в Langfuse мы берём из стейта и наше сообщение пользователя оборачиваем в Command. Переходим к служебным методам.

Первый метод проверяет наличие агента:

class MCPAgent:
    # Предыдущий код
    def _check_graph_available(self):
        if self.graph is None:
            raise RuntimeError(
                'Агент не инициализирован. Запустите `initialize()`.')

Второй нужен для извлечения trace_id из сохраненного стейта:

class MCPAgent:
    # Предыдущий код
    def _get_trace_id(self, request_id: str) -> str | None:
        return self.graph.get_state(
            {'configurable': {'thread_id': request_id}}
        ).values.get('trace_id', None)

Про thread_id писал в части про чекпоинтер. Он нужен для того, чтобы граф знал, стейт какой именно сессии надо извлекать из памяти. Формируется на стороне веб-интерфейса. Теперь метод вызова LLM:

class MCPAgent:
    # Предыдущий код
    async def _ainvoke_with_tracing(
            self, data: dict[str, Any] | Command, request_id: str, trace_id: str, span_name: str
    ) -> dict[str, Any]:
        with settings.langfuse.client.start_as_current_observation(
                as_type='span',
                name=span_name,
                trace_context={'trace_id': trace_id},
        ) as span:
            result = await self.graph.ainvoke(data, config=self._create_config(request_id))
            return result['messages']

Первым делом создаём кастомное наблюдение типа спан и передаём в него параметром наш trace_id. Langfuse сам сгруппирует все наблюдения по trace_id. Далее вызываем метод ainvoke графа, передав в него конфиг:

class MCPAgent:
    # Предыдущий код
    def _create_config(self, request_id: str) -> dict[str, Any]:
        return {
            'callbacks': [self.lf_handler],
            'metadata': {
                'langfuse_session_id': f'docker_session_{self.init_time}',
            },
            'configurable': {'thread_id': request_id}
        }

В конфиге мы прописываем Langfuse CallbackHandler для организации наблюдения, в метаданных передаём параметр langfuse_session_id, который в дальнейшем можно использовать для фильтрации трейсов и thread_id для сохранения стейта в чекпоинтере.

Граф готов. Осталось доработать Gradio интерфейс под новые функции и можно релизить)

Интерфейс пользователя

При изменении архитектуры агента (добавление подтверждения пользователем) я пошел на один компромис - подтверждение простым словом “Продолжить”. И чтобы каждый раз его не писать, я решил добавить в интерфейс кнопку “Продолжить”. Для этого пришлось отказаться от gr.ChatInterface и переписать интерфейс на gr.Chatbot:

class MCPCodingAgentApp:
    def build_interface(self):
        with gr.Blocks(title='MCP Coding Agent', fill_height=True) as self.demo:
            gr.Markdown('# MCP Coding Agent')
            gr.Markdown('Помощник разработчика с доступом к файлам, Git и документации')

            chatbot = gr.Chatbot(label='Чат с агентом', height=700)
            request_id_state = gr.State('')

            with gr.Row():
                msg = gr.Textbox(
                    label='Ваше сообщение',
                    placeholder='Введите сообщение или нажмите "Продолжить"',
                    scale=8,
                    container=False
                )
                submit_btn = gr.Button('Отправить', variant='primary')
                continue_btn = gr.Button('▶ Продолжить', variant='secondary')

            submit_btn.click(
                fn=self._respond,
                inputs=[msg, chatbot, request_id_state],
                outputs=[chatbot, msg, request_id_state])
            msg.submit(
                fn=self._respond,
                inputs=[msg, chatbot, request_id_state],
                outputs=[chatbot, msg, request_id_state])
            continue_btn.click(
                fn=self._continue,
                inputs=[chatbot, request_id_state],
                outputs=[chatbot, request_id_state])

После поля чата создаём строку с полем для ввода сообщения и двумя кнопками - “Отправить” и “Продолжить”. Далее назначаем функции-обработчики для наших элементов (для поля ввода тоже, чтобы была отправка по нажатию Enter). Параметры inputs и outputs связывают входы-выходы функции-обработчика с объектами Gradio.

Объект gr.State('') нужен для хранения request_id в рамках сессии.

Так как чат у нас теперь самодельный, то и управлять всем в обработчиках мы должны руками:

class MCPCodingAgentApp:
    # Предыдущий код
    async def _respond(self, message: str, history: list, request_id: str):
        if not request_id:
            request_id = str(uuid.uuid4())

        if not message or not message.strip():
            return history, '', request_id

        phase = self.agent.get_phase(request_id)
        if phase is None or phase == 'done':
            result = await self.agent.run(user_messages=message, request_id=request_id)
        else:
            result = await self.agent.resume(user_messages=message, request_id=request_id)

        history.append({'role': 'user', 'content': message})
        agent_response = result[-1].content if result else 'Нет ответа'
        history.append({'role': 'assistant', 'content': agent_response})
        return history, '', request_id

В обработчике кнопки “Отправить” и нажатия Enter мы должны предусмотреть, что пользователь решил напечатать слово “Продолжить” вместо нажатия отдельной кнопки. Для этого мы анализируем фазу выполнения агента и, в зависимости от неё, вызываем соответствующий метод агента. Дополнительно создаём первичный request_id, защищаемся от пустого сообщения (как в запросе, так и в ответе) и наполняем историю.

Обработчик продолжения общения немного покороче:

class MCPCodingAgentApp:
    # Предыдущий код
    async def _continue(self, history: list, request_id: str):
        phase = self.agent.get_phase(request_id)

        if phase is None or phase == 'done':
            history.append({
                'role': 'assistant',
                'content': 'Нет активных задач для продолжения. Задайте новый вопрос.'
            })
            return history, request_id

        result = await self.agent.resume(user_messages='Продолжить', request_id=request_id)

        history.append({'role': 'user', 'content': '▶ Продолжить'})
        agent_response = result[-1].content if result else 'Нет ответа'
        history.append({'role': 'assistant', 'content': agent_response})
        return history, request_id

Убеждаемся по фазе, что агент в процессе работы, после чего направляем ему сообщение “Продолжить”, иначе сообщаем пользователю, что вы не в цикле. Также сохраняем историю.

Код инициализации и запуска остаётся без изменений.

С кодом всё.

Проверка работоспособности

Запускаем make up.

Теперь можно расслабиться и откинуться на спинку кресла (есть тут те, кто ставил Windows98?😄)

Ждём закачки образов, модели. Для скачивания модели можно использовать uvx (ИМХО в разы быстрее):

HF_TOKEN=<ТОКЕН> HF_HOME=<ПАПКА ДЛЯ МОДЕЛИ> uvx hf download nvidia/Qwen3.6-35B-A3B-NVFP4

В HF_HOME нужно указать туже папку, которую указываем как том для vLLM.

Простой вопрос

Проверку начал с задания попроще, но в несколько шагов:

Напиши функцию проверки на четность. Сохрани её в файл и сделай коммит

Приемлемо. Глянем трейс задачи

Всё как надо - два шага выполнения, три вызова инструментов (write file, git add, git commit).

Вопрос посложнее

Тут нашел в интернете какую-то задачу на знание FastAPI, помеченную как средней сложности

Создай асинхронный dependency get_db, который открывает транзакцию.

Реализуй модель для эндпоинта User с id:uuid и nickname: str

Реализуй эндпоинт GET /users/{user_id}/posts, который возвращает пользователя и его посты в одном запросе к БД (без N+1 проблемы).

Сохрани результаты в отдельные файлы

Сделай коммит изменений

На запрос получил следующий план действий:

Глянем схему трейса

Красота) При выполнении агент даже вызывал инструменты для просмотра содержимого директории в поисках файла model.py. Ну и результат работы агента в рабочей директории:

Итоги

Финальным экспериментом я решил проверить, сможет ли агент написать документацию для самого себя. Дал ему доступ к собственной папке и попросил:

Проанализируй файлы в рабочей папке. Необходимо составить файл readme.md. Файл должен содержать краткое описание проекта и способ запуска

Получилось неожиданно хорошо - агент описал структуру проекта, перечислил основные компоненты и добавил команды для запуска через Docker. Результат можно посмотреть в репозитории: ссылка на README.md. Ручного в файле только про параметр WS.

Код проекта доступен тут

Первая часть с базой и простым ReAct агентом с MCP инструментами тут

Вторая часть в которой показана реализация узлов тут