В прошлый раз мы делали AI-официанта с помощью OpenAI Assistants API и Vector Store в Telegram.
В этот раз воспользуемся другим инструментом Assistants API от OpenAI под названием Code Interpreter.
Что такое Code Interpreter?
Языковая модель генерирует текст, она не может проводить сколько-нибудь сложные математические вычисления или анализ данных, она просто не предназначена для этого. Однако, модель может генерировать код и очень хорошо. Что, если давать модели задание, для выполнения которого она сгенерирует программный код, он исполнится в изолированной среде разработки, и полученный результат модель уже использует для генерации ответа? Именно эту задачу и выполняет Code Interpreter.
Задача
В этой статье разберем следующий пример: у меня есть простой CSV файл со всеми платежами для фейкового онлайн-магазина. Формат данных следующий:
"id","user_id","amount","created_at"
"4675","2251032","1837.00","2024-05-22 07:10:02"
"4676","7472836","2849.00","2024-05-22 07:27:45"
"4677","6271037","2999.00","2024-05-22 07:33:12"
"4678","6815010","2877.00","2024-05-22 08:01:58"
"4679","2565937","5000.00","2024-05-22 08:13:17"
"4680","7074300","299.00","2024-05-22 08:16:49"
"4681","7028029","5770.00","2024-05-22 08:33:42"
Я хочу задавать вопросы, требующие анализа этих данных. Например: сколько заработали в определенном месяце? Есть ли различие в поведении покупателей по дням недели? Результат я хочу получать в виде текста или графиков, где применимо.
Подготовка
Повторяем все шаги из оригинальной статьи. Единственная разница будет в коде для облачной функции.
requirements.txt
В редакторе сначала создадим новый файл, назовём его requirements.txt
и положим туда следующий код:
openai~=1.33.0
boto3~=1.34.122
pyTelegramBotAPI~=4.19.1
Это список зависимостей для Python, которые облако автоматически подгрузит во время сборки так, чтобы мы могли пользоваться ими в коде самой функции.
config.py
Теперь создадим файл, в котором будет код, который отвечает за конфигурацию проекта. Назовём его config.py
и запишем следующий код:
import json
import os
import boto3
import openai
YANDEX_KEY_ID = os.environ.get("YANDEX_KEY_ID")
YANDEX_KEY_SECRET = os.environ.get("YANDEX_KEY_SECRET")
YANDEX_BUCKET = os.environ.get("YANDEX_BUCKET")
PROXY_API_KEY = os.environ.get("PROXY_API_KEY")
ASSISTANT_MODEL = os.environ.get("ASSISTANT_MODEL")
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN")
TG_BOT_ADMIN = os.environ.get("TG_BOT_ADMIN")
def get_s3_client():
session = boto3.session.Session(
aws_access_key_id=YANDEX_KEY_ID, aws_secret_access_key=YANDEX_KEY_SECRET
)
return session.client(
service_name="s3", endpoint_url="https://storage.yandexcloud.net"
)
def get_config() -> dict:
s3client = get_s3_client()
try:
response = s3client.get_object(Bucket=YANDEX_BUCKET, Key="config.json")
return json.loads(response["Body"].read())
except:
return {}
def save_config(new_config: dict):
s3client = get_s3_client()
s3client.put_object(
Bucket=YANDEX_BUCKET, Key="config.json", Body=json.dumps(new_config)
)
proxy_client = openai.Client(
api_key=PROXY_API_KEY,
base_url="https://api.proxyapi.ru/openai/v1",
)
Здесь мы читаем все необходимые параметры из переменных окружения и прописываем функции для получения и сохранения дополнительных (динамических) параметров в бакете хранилища Яндекс Облака. Инициируем также клиент для ProxyAPI согласно документации, то есть переопределяем путь к API.
admin.py
Далее создаём файл admin.py
и кладём туда весь код, который отвечает за разные административные задачи проекта.
from config import ASSISTANT_MODEL, get_config, proxy_client, save_config
def create_assistant(name, instructions):
assistant_id = get_assistant_id()
if not assistant_id:
new_assistant = proxy_client.beta.assistants.create(
model=ASSISTANT_MODEL,
instructions=instructions,
name=name,
tools=[
{
"type": "code_interpreter",
}
],
)
config = get_config()
config["assistant_id"] = new_assistant.id
save_config(config)
else:
proxy_client.beta.assistants.update(
assistant_id=assistant_id, instructions=instructions
)
def get_assistant_id():
config = get_config()
return config["assistant_id"] if "assistant_id" in config else None
def upload_file(chat_id, filename, file):
config = get_config()
file_object = proxy_client.files.create(file=(filename, file), purpose="assistants")
thread = proxy_client.beta.threads.create(
messages=[
{
"role": "user",
"content": "Для всех вопросов, которые я буду задавать, используй эти данные для анализа",
"attachments": [
{"file_id": file_object.id, "tools": [{"type": "code_interpreter"}]}
],
}
]
)
if not "threads" in config:
config["threads"] = {}
config["threads"][chat_id] = thread.id
save_config(config)
def get_thread_id(chat_id: str):
config = get_config()
if "threads" in config and chat_id in config["threads"]:
return config["threads"][chat_id]
return None
create_assistant
Создаёт или обновляет настройки для ассистента на основе имени и инструкций. Сразу привязываем ассистента к нашему векторному хранилищу. Сохраняем идентификатор ассистента в конфигурации.
get_assistant_id
Возвращает идентификатор ассистента из конфигурации.
upload_file
Метод, который загружает файл в файловое (не векторное!) хранилище API и инициирует новый тред для этого чата. Идея здесь такая: для каждого нового загруженного файла начинаем новый тред, чтобы пользователь мог опрашивать модель именно по данным из этого файла. Старый тред «забывается».
get_thread_id
Возвращает текущий идентификатор треда, если хоть один файл уже был загружен.
chat.py
В файле chat.py (тоже создаём его) будем хранить методы для работы с сообщениями пользователей.
import os
from admin import get_assistant_id, get_thread_id
from config import proxy_client
def process_message(chat_id: str, message: str) -> list[dict]:
assistant_id = get_assistant_id()
thread_id = get_thread_id(chat_id)
if not thread_id:
raise ValueError("Thread not found")
proxy_client.beta.threads.messages.create(
thread_id=thread_id, content=message, role="user"
)
run = proxy_client.beta.threads.runs.create_and_poll(
thread_id=thread_id,
assistant_id=assistant_id,
)
answer = []
if run.status == "completed":
messages = proxy_client.beta.threads.messages.list(
thread_id=thread_id, run_id=run.id
)
for message in messages:
if message.role == "assistant":
for block in message.content:
if block.type == "text":
answer.insert(0, {"type": "text", "text": block.text.value})
if block.text.annotations:
for annotation in block.text.annotations:
if annotation.type == "file_path":
answer.insert(
0,
{
"type": "file",
"file": download_file(
annotation.file_path.file_id
),
"filename": os.path.basename(
annotation.text.split(":")[-1]
),
},
)
elif block.type == "image_file":
answer.insert(
0,
{
"type": "image",
"file": download_file(block.image_file.file_id),
},
)
return answer
def download_file(file_id: str) -> str:
file_content = proxy_client.files.content(file_id)
content = file_content.read()
with open(f"/tmp/{file_id}", "wb") as f:
f.write(content)
return f"/tmp/{file_id}"
process_message
Этот метод принимает сообщения пользователя, и отправляет его через ProxyAPI на обработку в Assistants API. Assistants API работает немного по-другому, в сравнении с обычной API. Здесь после добавления сообщения в тред ответ модели мы сразу не получим. Надо дополнительно запустить обработку всего треда с помощью метода run. Что мы и делаем, после чего получаем ответ, который может состоять из нескольких сообщений.
Code Interpreter, кроме текста, может еще генерировать другие файлы и изображения (например, графики). Так что мы дополнительно распознаем такие случаи с помощью block.type и annotations, загружаем содержимое таких файлов и добавляем в массив сообщений, которые вернул ассистент.
download_file
Загружает файл, сгенерированный ассистентом в локальную временную директорию. Он удалится автоматически, когда облачная функция закончит работу.
index.py
import json
import logging
import threading
import time
import telebot
from admin import create_assistant, get_assistant_id, get_thread_id, upload_file
from chat import process_message
from config import TG_BOT_ADMIN, TG_BOT_TOKEN
from telebot.types import InputFile
logger = telebot.logger
telebot.logger.setLevel(logging.INFO)
bot = telebot.TeleBot(TG_BOT_TOKEN, threaded=False)
is_typing = False
def start_typing(chat_id):
global is_typing
is_typing = True
typing_thread = threading.Thread(target=typing, args=(chat_id,))
typing_thread.start()
def typing(chat_id):
global is_typing
while is_typing:
bot.send_chat_action(chat_id, "typing")
time.sleep(4)
def stop_typing():
global is_typing
is_typing = False
def check_setup(message):
if not get_assistant_id():
if message.from_user.username != TG_BOT_ADMIN:
bot.send_message(
message.chat.id, "Бот еще не настроен. Свяжитесь с администратором."
)
else:
bot.send_message(
message.chat.id,
"Бот еще не настроен. Используйте команду /create для создания ассистента.",
)
return False
return True
def check_admin(message):
if message.from_user.username != TG_BOT_ADMIN:
bot.send_message(message.chat.id, "Доступ запрещен")
return False
return True
@bot.message_handler(commands=["help", "start"])
def send_welcome(message):
if not check_setup(message):
return
bot.send_message(
message.chat.id,
(
f"Привет! Я твой карманный дата-аналитик. Загрузи файл и задавай вопросы по нему. Я умею проводить анализ данных и строить графики."
),
)
@bot.message_handler(commands=["create"])
def create_assistant_command(message):
if not check_admin(message):
return
instructions = message.text.split("/create")[1].strip()
if len(instructions) == 0:
bot.send_message(
message.chat.id,
"""
Введите подробные инструкции для работы ассистента после команды /create и пробела.
Например:
/create Ты - дата-аналитик. Пользователь загружает файл для анализа, а ты, используя инструменты, отвечаешь на вопросы и, если нужно, строишь графики.
Если ассистент уже был ранее создан, инструкции будут обновлены.
""",
)
return
name = bot.get_me().full_name
create_assistant(name, instructions)
bot.send_message(
message.chat.id,
"Ассистент успешно создан. Теперь вы можете добавлять документы в базу знаний с помощью команды /upload.",
)
@bot.message_handler(commands=["upload"])
def upload_file_command(message):
if not check_setup(message):
return
return bot.send_message(message.chat.id, "Загрузите файл с данными для анализа.")
@bot.message_handler(content_types=["document"])
def upload_file_handler(message):
if not check_setup(message):
return
file_info = bot.get_file(message.document.file_id)
downloaded_file = bot.download_file(file_info.file_path)
try:
upload_file(message.chat.id, message.document.file_name, downloaded_file)
except Exception as e:
return bot.send_message(message.chat.id, f"Ошибка при загрузке файла: {e}")
return bot.send_message(
message.chat.id,
"Файл успешно загружен и новая сессия анализа начата. Задавайте ваши вопросы.",
)
@bot.message_handler(content_types=["text"])
def handle_message(message):
if not check_setup(message):
return
if not get_thread_id(str(message.chat.id)):
return bot.send_message(
message.chat.id,
"Для начала работы загрузите файл с данными для анализа с помощью команды /upload.",
)
start_typing(message.chat.id)
try:
answers = process_message(str(message.chat.id), message.text)
except Exception as e:
bot.send_message(message.chat.id, f"Ошибка при обработке сообщения: {e}")
return
stop_typing()
for answer in answers:
if answer["type"] == "text":
bot.send_message(message.chat.id, answer["text"])
elif answer["type"] == "image":
bot.send_photo(message.chat.id, InputFile(answer["file"]))
elif answer["type"] == "file":
bot.send_document(
message.chat.id,
InputFile(answer["file"]),
visible_file_name=answer["filename"],
)
def handler(event, context):
message = json.loads(event["body"])
update = telebot.types.Update.de_json(message)
if update.message is not None:
try:
bot.process_new_updates([update])
except Exception as e:
print(e)
return {
"statusCode": 200,
"body": "ok",
}
typing
Вспомогательная функция, которая посылает статус «набирает сообщение...», чтобы наши пользователи не скучали, пока модель готовит ответ.
check_setup
Вспомогательная функция, которая выводит ошибку, если бот ещё не настроен администратором.
check_admin
Вспомогательная функция, которая проверяет, является ли автор сообщения администратором бота.
send_welcome
Посылаем приветственное сообщение после команды /start.
create_assistant_command
Команда /create доступна только администратору, она создаёт или обновляет инструкции для ассистента.
upload_file_command
Команда /upload просто информирует пользователя, что для начала сессии анализа данных надо загрузить файл.
upload_file_handler
При загрузке файла сохраняем его на стороне API и сообщаем пользователю, что новая сессия открыта.
handle_message
Собственно обработчик входящих сообщений от пользователей. Здесь происходит общение с ассистентом.
handler
Точка входа для всей облачной функции. Сюда будут приходить все команды и сообщений от Telegram.
Для удобства я опубликовал весь исходный код функции на GitLab:
В будущем, если будут какие-то обновления, то они будут именно в репозитории.
Вернитесь к оригинальной статье и закончите остальные шаги, начиная с момента «Прописываем точку входа равной index.handler» и до конца, включая заполнение переменных окружения, создание шлюза API и установки Telegram WebHook.
Тест
Начнём с того, что настроим нашего ассистента. Для этого я запущу команду /create
от имени администратора:

Отлично, ассистент настроен. Теперь загружу мой CSV файл с данными о продажах.

Теперь, наконец, попробую узнать нужную мне информацию.




Ура! Всё работает!