Как стать автором
Обновить

Делаем карманного аналитика данных с помощью OpenAI Assistants API и Code Interpreter в Telegram

Уровень сложностиСредний
Время на прочтение9 мин
Количество просмотров3.7K

В прошлый раз мы делали AI-официанта с помощью OpenAI Assistants API и Vector Store в Telegram.

В этот раз воспользуемся другим инструментом Assistants API от OpenAI под названием Code Interpreter. 

Что такое Code Interpreter?

Языковая модель генерирует текст, она не может проводить сколько-нибудь сложные математические вычисления или анализ данных, она просто не предназначена для этого. Однако, модель может генерировать код и очень хорошо. Что, если давать модели задание, для выполнения которого она сгенерирует программный код, он исполнится в изолированной среде разработки, и полученный результат модель уже использует для генерации ответа? Именно эту задачу и выполняет Code Interpreter.

Задача

В этой статье разберем следующий пример: у меня есть простой CSV файл со всеми платежами для фейкового онлайн-магазина. Формат данных следующий:

"id","user_id","amount","created_at"
"4675","2251032","1837.00","2024-05-22 07:10:02"
"4676","7472836","2849.00","2024-05-22 07:27:45"
"4677","6271037","2999.00","2024-05-22 07:33:12"
"4678","6815010","2877.00","2024-05-22 08:01:58"
"4679","2565937","5000.00","2024-05-22 08:13:17"
"4680","7074300","299.00","2024-05-22 08:16:49"
"4681","7028029","5770.00","2024-05-22 08:33:42"

Я хочу задавать вопросы, требующие анализа этих данных. Например: сколько заработали в определенном месяце? Есть ли различие в поведении покупателей по дням недели? Результат я хочу получать в виде текста или графиков, где применимо.

Подготовка

Повторяем все шаги из оригинальной статьи. Единственная разница будет в коде для облачной функции.

requirements.txt

В редакторе сначала создадим новый файл, назовём его requirements.txt и положим туда следующий код:

openai~=1.33.0
boto3~=1.34.122
pyTelegramBotAPI~=4.19.1

Это список зависимостей для Python, которые облако автоматически подгрузит во время сборки так, чтобы мы могли пользоваться ими в коде самой функции.

config.py

Теперь создадим файл, в котором будет код, который отвечает за конфигурацию проекта. Назовём его config.py и запишем следующий код:

import json
import os

import boto3
import openai

YANDEX_KEY_ID = os.environ.get("YANDEX_KEY_ID")
YANDEX_KEY_SECRET = os.environ.get("YANDEX_KEY_SECRET")
YANDEX_BUCKET = os.environ.get("YANDEX_BUCKET")
PROXY_API_KEY = os.environ.get("PROXY_API_KEY")
ASSISTANT_MODEL = os.environ.get("ASSISTANT_MODEL")
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN")
TG_BOT_ADMIN = os.environ.get("TG_BOT_ADMIN")


def get_s3_client():
    session = boto3.session.Session(
        aws_access_key_id=YANDEX_KEY_ID, aws_secret_access_key=YANDEX_KEY_SECRET
    )
    return session.client(
        service_name="s3", endpoint_url="https://storage.yandexcloud.net"
    )


def get_config() -> dict:
    s3client = get_s3_client()
    try:
        response = s3client.get_object(Bucket=YANDEX_BUCKET, Key="config.json")
        return json.loads(response["Body"].read())
    except:
        return {}


def save_config(new_config: dict):
    s3client = get_s3_client()
    s3client.put_object(
        Bucket=YANDEX_BUCKET, Key="config.json", Body=json.dumps(new_config)
    )


proxy_client = openai.Client(
    api_key=PROXY_API_KEY,
    base_url="https://api.proxyapi.ru/openai/v1",
)

Здесь мы читаем все необходимые параметры из переменных окружения и прописываем функции для получения и сохранения дополнительных (динамических) параметров в бакете хранилища Яндекс Облака. Инициируем также клиент для ProxyAPI согласно документации, то есть переопределяем путь к API.

admin.py

Далее создаём файл admin.py и кладём туда весь код, который отвечает за разные административные задачи проекта.

from config import ASSISTANT_MODEL, get_config, proxy_client, save_config


def create_assistant(name, instructions):
    assistant_id = get_assistant_id()
    if not assistant_id:
        new_assistant = proxy_client.beta.assistants.create(
            model=ASSISTANT_MODEL,
            instructions=instructions,
            name=name,
            tools=[
                {
                    "type": "code_interpreter",
                }
            ],
        )
        config = get_config()
        config["assistant_id"] = new_assistant.id
        save_config(config)
    else:
        proxy_client.beta.assistants.update(
            assistant_id=assistant_id, instructions=instructions
        )


def get_assistant_id():
    config = get_config()
    return config["assistant_id"] if "assistant_id" in config else None


def upload_file(chat_id, filename, file):
    config = get_config()
    file_object = proxy_client.files.create(file=(filename, file), purpose="assistants")
    thread = proxy_client.beta.threads.create(
        messages=[
            {
                "role": "user",
                "content": "Для всех вопросов, которые я буду задавать, используй эти данные для анализа",
                "attachments": [
                    {"file_id": file_object.id, "tools": [{"type": "code_interpreter"}]}
                ],
            }
        ]
    )
    if not "threads" in config:
        config["threads"] = {}

    config["threads"][chat_id] = thread.id
    save_config(config)


def get_thread_id(chat_id: str):
    config = get_config()

    if "threads" in config and chat_id in config["threads"]:
        return config["threads"][chat_id]

    return None

create_assistant

Создаёт или обновляет настройки для ассистента на основе имени и инструкций. Сразу привязываем ассистента к нашему векторному хранилищу. Сохраняем идентификатор ассистента в конфигурации.

get_assistant_id

Возвращает идентификатор ассистента из конфигурации.

upload_file

Метод, который загружает файл в файловое (не векторное!) хранилище API и инициирует новый тред для этого чата. Идея здесь такая: для каждого нового загруженного файла начинаем новый тред, чтобы пользователь мог опрашивать модель именно по данным из этого файла. Старый тред «забывается».

get_thread_id

Возвращает текущий идентификатор треда, если хоть один файл уже был загружен.

chat.py

В файле chat.py (тоже создаём его) будем хранить методы для работы с сообщениями пользователей.

import os

from admin import get_assistant_id, get_thread_id
from config import proxy_client


def process_message(chat_id: str, message: str) -> list[dict]:
    assistant_id = get_assistant_id()
    thread_id = get_thread_id(chat_id)
    if not thread_id:
        raise ValueError("Thread not found")

    proxy_client.beta.threads.messages.create(
        thread_id=thread_id, content=message, role="user"
    )

    run = proxy_client.beta.threads.runs.create_and_poll(
        thread_id=thread_id,
        assistant_id=assistant_id,
    )

    answer = []

    if run.status == "completed":
        messages = proxy_client.beta.threads.messages.list(
            thread_id=thread_id, run_id=run.id
        )
        for message in messages:
            if message.role == "assistant":
                for block in message.content:
                    if block.type == "text":
                        answer.insert(0, {"type": "text", "text": block.text.value})
                        if block.text.annotations:
                            for annotation in block.text.annotations:
                                if annotation.type == "file_path":
                                    answer.insert(
                                        0,
                                        {
                                            "type": "file",
                                            "file": download_file(
                                                annotation.file_path.file_id
                                            ),
                                            "filename": os.path.basename(
                                                annotation.text.split(":")[-1]
                                            ),
                                        },
                                    )
                    elif block.type == "image_file":
                        answer.insert(
                            0,
                            {
                                "type": "image",
                                "file": download_file(block.image_file.file_id),
                            },
                        )

    return answer


def download_file(file_id: str) -> str:
    file_content = proxy_client.files.content(file_id)
    content = file_content.read()
    with open(f"/tmp/{file_id}", "wb") as f:
        f.write(content)
    return f"/tmp/{file_id}"

process_message

Этот метод принимает сообщения пользователя, и отправляет его через ProxyAPI на обработку в Assistants API. Assistants API работает немного по-другому, в сравнении с обычной API. Здесь после добавления сообщения в тред ответ модели мы сразу не получим. Надо дополнительно запустить обработку всего треда с помощью метода run. Что мы и делаем, после чего получаем ответ, который может состоять из нескольких сообщений.

Code Interpreter, кроме текста, может еще генерировать другие файлы и изображения (например, графики). Так что мы дополнительно распознаем такие случаи с помощью block.type и annotations, загружаем содержимое таких файлов и добавляем в массив сообщений, которые вернул ассистент.

download_file

Загружает файл, сгенерированный ассистентом в локальную временную директорию. Он удалится автоматически, когда облачная функция закончит работу.

index.py

import json
import logging
import threading
import time

import telebot
from admin import create_assistant, get_assistant_id, get_thread_id, upload_file
from chat import process_message
from config import TG_BOT_ADMIN, TG_BOT_TOKEN
from telebot.types import InputFile

logger = telebot.logger
telebot.logger.setLevel(logging.INFO)

bot = telebot.TeleBot(TG_BOT_TOKEN, threaded=False)


is_typing = False


def start_typing(chat_id):
    global is_typing
    is_typing = True
    typing_thread = threading.Thread(target=typing, args=(chat_id,))
    typing_thread.start()


def typing(chat_id):
    global is_typing
    while is_typing:
        bot.send_chat_action(chat_id, "typing")
        time.sleep(4)


def stop_typing():
    global is_typing
    is_typing = False


def check_setup(message):
    if not get_assistant_id():
        if message.from_user.username != TG_BOT_ADMIN:
            bot.send_message(
                message.chat.id, "Бот еще не настроен. Свяжитесь с администратором."
            )
        else:
            bot.send_message(
                message.chat.id,
                "Бот еще не настроен. Используйте команду /create для создания ассистента.",
            )
        return False
    return True


def check_admin(message):
    if message.from_user.username != TG_BOT_ADMIN:
        bot.send_message(message.chat.id, "Доступ запрещен")
        return False
    return True


@bot.message_handler(commands=["help", "start"])
def send_welcome(message):
    if not check_setup(message):
        return

    bot.send_message(
        message.chat.id,
        (
            f"Привет! Я твой карманный дата-аналитик. Загрузи файл и задавай вопросы по нему. Я умею проводить анализ данных и строить графики."
        ),
    )


@bot.message_handler(commands=["create"])
def create_assistant_command(message):
    if not check_admin(message):
        return

    instructions = message.text.split("/create")[1].strip()
    if len(instructions) == 0:
        bot.send_message(
            message.chat.id,
            """
Введите подробные инструкции для работы ассистента после команды /create и пробела.

Например: 
/create Ты - дата-аналитик. Пользователь загружает файл для анализа, а ты, используя инструменты, отвечаешь на вопросы и, если нужно, строишь графики.

Если ассистент уже был ранее создан, инструкции будут обновлены.
            """,
        )
        return

    name = bot.get_me().full_name
    create_assistant(name, instructions)

    bot.send_message(
        message.chat.id,
        "Ассистент успешно создан. Теперь вы можете добавлять документы в базу знаний с помощью команды /upload.",
    )


@bot.message_handler(commands=["upload"])
def upload_file_command(message):
    if not check_setup(message):
        return

    return bot.send_message(message.chat.id, "Загрузите файл с данными для анализа.")


@bot.message_handler(content_types=["document"])
def upload_file_handler(message):
    if not check_setup(message):
        return

    file_info = bot.get_file(message.document.file_id)
    downloaded_file = bot.download_file(file_info.file_path)

    try:
        upload_file(message.chat.id, message.document.file_name, downloaded_file)
    except Exception as e:
        return bot.send_message(message.chat.id, f"Ошибка при загрузке файла: {e}")

    return bot.send_message(
        message.chat.id,
        "Файл успешно загружен и новая сессия анализа начата. Задавайте ваши вопросы.",
    )


@bot.message_handler(content_types=["text"])
def handle_message(message):
    if not check_setup(message):
        return

    if not get_thread_id(str(message.chat.id)):
        return bot.send_message(
            message.chat.id,
            "Для начала работы загрузите файл с данными для анализа с помощью команды /upload.",
        )

    start_typing(message.chat.id)

    try:
        answers = process_message(str(message.chat.id), message.text)
    except Exception as e:
        bot.send_message(message.chat.id, f"Ошибка при обработке сообщения: {e}")
        return

    stop_typing()

    for answer in answers:
        if answer["type"] == "text":
            bot.send_message(message.chat.id, answer["text"])
        elif answer["type"] == "image":
            bot.send_photo(message.chat.id, InputFile(answer["file"]))
        elif answer["type"] == "file":
            bot.send_document(
                message.chat.id,
                InputFile(answer["file"]),
                visible_file_name=answer["filename"],
            )


def handler(event, context):
    message = json.loads(event["body"])
    update = telebot.types.Update.de_json(message)

    if update.message is not None:
        try:
            bot.process_new_updates([update])
        except Exception as e:
            print(e)

    return {
        "statusCode": 200,
        "body": "ok",
    }

typing

Вспомогательная функция, которая посылает статус «набирает сообщение...», чтобы наши пользователи не скучали, пока модель готовит ответ.

check_setup

Вспомогательная функция, которая выводит ошибку, если бот ещё не настроен администратором.

check_admin

Вспомогательная функция, которая проверяет, является ли автор сообщения администратором бота.

send_welcome

Посылаем приветственное сообщение после команды /start.

create_assistant_command

Команда /create доступна только администратору, она создаёт или обновляет инструкции для ассистента.

upload_file_command

Команда /upload просто информирует пользователя, что для начала сессии анализа данных надо загрузить файл.

upload_file_handler

При загрузке файла сохраняем его на стороне API и сообщаем пользователю, что новая сессия открыта.

handle_message

Собственно обработчик входящих сообщений от пользователей. Здесь происходит общение с ассистентом.

handler

Точка входа для всей облачной функции. Сюда будут приходить все команды и сообщений от Telegram.


Для удобства я опубликовал весь исходный код функции на GitLab:

https://gitlab.com/evrovas/data-analyst-telegram-bot

В будущем, если будут какие-то обновления, то они будут именно в репозитории.


Вернитесь к оригинальной статье и закончите остальные шаги, начиная с момента «Прописываем точку входа равной index.handler» и до конца, включая заполнение переменных окружения, создание шлюза API и установки Telegram WebHook.

Тест

Начнём с того, что настроим нашего ассистента. Для этого я запущу команду /create от имени администратора:

Отлично, ассистент настроен. Теперь загружу мой CSV файл с данными о продажах.

Теперь, наконец, попробую узнать нужную мне информацию.

Ура! Всё работает!

Теги:
Хабы:
+5
Комментарии9

Публикации

Истории

Работа

Python разработчик
131 вакансия
Data Scientist
83 вакансии

Ближайшие события