Как стать автором
Обновить

YandexGPT API быстро и без труда с Python SDK. Делимся опытом интеграции

Уровень сложностиСредний
Время на прочтение25 мин
Количество просмотров12K
Оценка времени интеграции нового функционала(источник: https://pikabu.ru/story/otsenil_tak_otsenil_4799433)
Оценка времени интеграции нового функционала
(источник: https://pikabu.ru/story/otsenil_tak_otsenil_4 799 433)

Приветствую! Меня зовут Григорий, и я главный по спецпроектам в команде AllSee.

YandexGPT API — сервис для доступа к генеративным языковым моделям, хоть и является мощным инструментом для решения многих задач, однако может озадачить разработчика, решившего внедрить его в свои проекты, отсутствием официального SDK, разнообразием способов авторизации, видов моделей и эндпоинтов API. В данной статье я рассказываю, как мы внедряли YandexGPT в свои проекты, а в конце делюсь всеми наработками в формате Python SDK.

Мотивация

Зачем YandexGPT, когда есть ChatGPT?

Ответ короткий: каждой задаче — свой инструмент. Сейчас объясню на примере.

Представим, что мы решаем задачу выделения из текста списка упомянутых категорий, оценивая качество алгоритмов по заранее установленной метрике от 0 до 100%, при этом нас устраивает результат выше 90%. Мы попробовали применить к решению задачи YandexGPT, а также ChatGPT и получили следующие результаты:

  • YandexGPT — метрика 95%

  • ChatGPT — метрика 97%

Оба решения показывают хорошие результаты, но давайте посмотрим на стоимость:

  • YandexGPT — 0,6 рублей за 1000 токенов

  • ChatGPT — 1,9 рублей за 1000 токенов + аренда proxy‑сервера

Выходит, что в задачах, где все модели показываю сравнительно похожий результат по метрикам, целесообразно выбрать более экономичный вариант. В нашем случае, YandexGPT.

Проблемы интеграции YandexGPT

А теперь представим, что мы уже провели сравнительный анализ опций, как в примере выше и решили, что YandexGPT — идеально вписывается в наш проект. Начинаем интегрировать.

F(p) = \mathcal{L}[1(t)] = \int_0^{+\infty} 1(t) e^{-pt} \, dt = \frac{1}{p} \left. e^{-pt} \right|_0^{+\infty} = \frac{1}{p}.

С интегрированием закончили, возвращаемся к написанию кода. Если наш проект предполагает только синхронные запросы, то взаимодействие с YandexGPT API становится относительно простым: хорошо этот процесс описан в данной статье. Однако стоит нам посмотреть в сторону асинхронных запросов или IAM‑токенов, объём кодовой базы вырастает в разы и начинает выглядеть как‑то так:

Пример кодовой базы с одного из проектов
Работа с настройкми и авторизацией
import base64
import os
import time
from typing import Any, Dict, Optional

import jwt
import requests
from dotenv import load_dotenv

load_dotenv()


class YandexGPTConfigManager:
    available_models: list[str] = ["yandexgpt", "yandexgpt-lite", "summarization"]

    def __init__(
        self,
        model_type: str = "yandexgpt",
        iam_token: Optional[str] = None,
        catalog_id: Optional[str] = None,
    ) -> None:
        """
        Ths class is used to manage configuration for YandexGPT.
        You can: \n
        1) Provide model type, IAM token, and catalog ID directly \n
        2) Use MODEL_TYPE, IAM_TOKEN, CATALOG_ID environment variables for direct initialization \n
        3) Use SERVICE_ACCOUNT_ID, SERVICE_ACCOUNT_KEY_ID, CATALOG_ID, PRIVATE_KEY_BASE64 for generating IAM token
        :param model_type: model type to use. Supported values: 'yandexgpt', 'yandexgpt-lite', 'summarization'
        :param iam_token: IAM token to use
        :param catalog_id: Catalog ID on YandexCloud to use
        """
        self.model_type: str = model_type
        self.iam_token: Optional[str] = iam_token
        self.catalog_id: Optional[str] = catalog_id
        self._initialize_params()
        self._check_params()

    def _initialize_params(self) -> None:
        """
        Initializes model type, IAM token, and catalog id. Or tries to initialize from environment variables.
        """
        if self.iam_token and self.catalog_id:
            # if both IAM token and catalog id are already set, do nothing
            return
        else:
            # trying to initialize from environment variables
            self._initialize_from_env_vars()

    @staticmethod
    def _generate_jwt_token(
        service_account_id: str,
        private_key: str,
        key_id: str,
        url: str = "https://iam.api.cloud.yandex.net/iam/v1/tokens",
    ) -> str:
        """
        Generates JWT token from service account id, private key, and key id.
        :param service_account_id: service account id
        :param private_key: private key
        :param key_id: key id
        :param url: url for swapping JWT token to IAM request
        :return: encoded JWT token
        """
        # generating JWT token
        now: int = int(time.time())
        payload: Dict[str, Any] = {
            "aud": url,
            "iss": service_account_id,
            "iat": now,
            "exp": now + 360,
        }
        encoded_token: str = jwt.encode(
            payload, private_key, algorithm="PS256", headers={"kid": key_id}
        )
        return encoded_token

    @staticmethod
    def _swap_jwt_to_iam(
        jwt_token: str, url: str = "https://iam.api.cloud.yandex.net/iam/v1/tokens"
    ) -> str:
        """
        Swaps JWT token to IAM token.
        :param jwt_token: encoded JWT token
        :param url: url for swapping JWT token to IAM request
        :return: IAM token
        """
        headers: Dict[str, str] = {"Content-Type": "application/json"}
        data: Dict[str, str] = {"jwt": jwt_token}
        # swapping JWT token to IAM
        response: requests.Response = requests.post(url, headers=headers, json=data)
        if response.status_code == 200:
            # if succeeded to get IAM token
            return response.json()["iamToken"]
        else:
            # if failed to get IAM token
            raise Exception(
                f"Failed to get IAM token. Status code: {response.status_code}\n{response.text}"
            )

    def _initialize_from_env_vars(self) -> None:
        """
        Initializes model type, IAM token, and catalog id from environment variables. Or to generate IAM token from environment variables.
        """
        # trying to initialize from environment variables
        self._set_iam_from_env()
        self._set_model_type_from_env()
        self._set_catalog_id_from_env()
        if not self.iam_token:
            # if IAM token is not set, trying to initialize from config and private key
            self._set_iam_from_env_config_and_private_key()

    def _set_iam_from_env(self) -> None:
        self.iam_token = os.getenv("IAM_TOKEN", self.iam_token)

    def _set_model_type_from_env(self) -> None:
        self.model_type = os.getenv("MODEL_TYPE", self.model_type)

    def _set_catalog_id_from_env(self) -> None:
        self.catalog_id = os.getenv("CATALOG_ID", self.catalog_id)

    def _set_iam_from_env_config_and_private_key(self) -> None:
        """
        Generates and sets IAM token from environment variables.
        """
        # getting environment variables
        iam_url: str = os.getenv(
            "IAM_URL", "https://iam.api.cloud.yandex.net/iam/v1/tokens"
        )
        service_account_id: Optional[str] = os.getenv("SERVICE_ACCOUNT_ID")
        service_account_key_id: Optional[str] = os.getenv("SERVICE_ACCOUNT_KEY_ID")
        catalog_id: Optional[str] = os.getenv("CATALOG_ID")
        private_key_base64: Optional[str] = os.getenv("PRIVATE_KEY_BASE64")
        # checking environment variables
        if not all(
            [
                iam_url,
                service_account_id,
                service_account_key_id,
                catalog_id,
                private_key_base64,
            ]
        ):
            raise ValueError(
                "One or more environment variables for IAM token generation are missing."
            )
        # decoding private key
        private_key_bytes: bytes = base64.b64decode(private_key_base64)
        private_key: str = private_key_bytes.decode("utf-8")
        # generating JWT token
        jwt_token: str = self._generate_jwt_token(
            service_account_id=service_account_id,
            private_key=private_key,
            key_id=service_account_key_id,
            url=iam_url,
        )
        # swapping JWT token to IAM
        self.iam_token = self._swap_jwt_to_iam(jwt_token, iam_url)

    def _check_params(self) -> None:
        """
        Checks if model type, IAM token and catalog id are set.
        """
        if not self.iam_token:
            raise ValueError("IAM token is not set")
        if not self.catalog_id:
            raise ValueError("Catalog ID is not set")
        if self.model_type not in self.available_models:
            raise ValueError(f"Model type must be one of {self.available_models}")

Запросы к YandexGPT API
import asyncio
from typing import Union, Dict, Any, TypedDict

import aiohttp

from .yandex_gpt_config_manager import YandexGPTConfigManager


class YandexGPTMessage(TypedDict):
    role: str
    text: str


class YandexGPT:
    def __init__(
            self,
            config_manager: Union[YandexGPTConfigManager, Dict[str, Any]],
    ) -> None:
        """
        This class is used to interact with the Yandex GPT API. You can provide either a config manager or a dictionary
        containing model type, IAM token, and catalog ID. You can learn more about config manager setup in yandex_gpt_config_manager.py.
        :param config_manager: Config manager or a dictionary containing model type, IAM token, and catalog ID.
        """
        self.config_manager = config_manager

    @staticmethod
    async def _send_async_completion_request(
            headers: Dict[str, str],
            payload: Dict[str, Any],
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completionAsync"
    ) -> str:
        """
        Sends async request to completion API.
        :param headers: dict with authorization token (IAM), content type, and x-folder-id (YandexCloud catalog ID)
        :param payload: dict with model URI, completion options, and messages
        :param completion_url:
        :return: ID of the completion operation to poll
        """
        async with aiohttp.ClientSession() as session:
            async with session.post(completion_url, headers=headers, json=payload) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    return data['id']
                else:
                    raise Exception(f"Failed to send async request, status code: {resp.status}")

    @staticmethod
    async def _poll_async_completion(
            operation_id: str,
            headers: Dict[str, str],
            timeout: int = 5,
            poll_url: str = 'https://llm.api.cloud.yandex.net/operations/'
    ) -> Dict[str, Any]:
        """
        Polls async completion operation.
        :param operation_id: ID of the completion operation to poll
        :param headers: dict with authorization token (IAM)
        :param timeout: time after which the operation is considered timed out
        :param poll_url: poll URL
        :return: completion result
        """
        async with aiohttp.ClientSession() as session:
            end_time = asyncio.get_event_loop().time() + timeout
            while True:
                if asyncio.get_event_loop().time() > end_time:
                    raise TimeoutError(f"Operation timed out after {timeout} seconds")
                async with session.get(f"{poll_url}{operation_id}", headers=headers) as resp:
                    if resp.status == 200:
                        data = await resp.json()
                        if data.get('done', False):
                            return data
                    else:
                        raise Exception(f"Failed to poll operation status, status code: {resp.status}")
                await asyncio.sleep(1)

    async def get_completion(
            self,
            messages: YandexGPTMessage,
            temperature: float = 0.6,
            max_tokens: int = 1000,
            stream: bool = False,
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completionAsync",
            timeout: int = 5
    ) -> str:
        """
        Sends async completion request to the Yandex GPT API and polls the result.
        :param messages:
        :param temperature:
        :param max_tokens:
        :param stream: IDK whould it work in current realization (keep it False)
        :param completion_url:
        :param timeout: time after which the operation is considered timed out
        :return: text of the completion
        """
        # checking config manager
        if not all([
            getattr(self.config_manager, 'model_type', None),
            getattr(self.config_manager, 'iam_token', None),
            getattr(self.config_manager, 'catalog_id', None)
        ]):
            raise ValueError("Model type, IAM token, and catalog ID must be set in config manager to send a "
                             "completion request.")

        # making request
        headers: Dict[str, str] = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.config_manager.iam_token}",
            "x-folder-id": self.config_manager.catalog_id
        }
        payload: Dict[str, Any] = {
            "modelUri": f"gpt://"
                        f"{self.config_manager.catalog_id}"
                        f"/{self.config_manager.model_type}"
                        f"/latest",
            "completionOptions": {
                "stream": stream,
                "temperature": temperature,
                "maxTokens": max_tokens
            },
            "messages": messages
        }

        completion_request_id = await self._send_async_completion_request(
            headers=headers,
            payload=payload,
            completion_url=completion_url
        )

        completion_response = await self._poll_async_completion(
            operation_id=completion_request_id,
            headers=headers,
            timeout=timeout
        )

        if completion_response.get('error', None):
            raise Exception(f"Failed to get completion: {completion_response['error']}")

        return completion_response['response']['alternatives'][0]['message']['text']

Чтобы помочь разработчикам познакомиться со всеми возможностями YandexGPT API без танцев с бубнами и написания сотен строк кода мы делимся всеми нашими наработками в формате Python SDK. А далее я расскажу вам, как устроен подкапотный мир SDK.

Решение

Конфигурация и аутентификация

Для выполнения REST‑запросов к YandexGPT API, нам необходмо указывать данные для аутентификации, ID каталога Yandex Cloud и URI нужной нам модели. Подробнее об этом можно почитать в статье или в официальной документации YandexGPT API.

Автоматизируем генерацию данных полей. Реализуем базовый класс, который будет уметь хранить и предоставлять необходимые нам данные.

YandexGPTConfigManagerBase
available_models: List[str] = [
        "yandexgpt",
        "yandexgpt-lite",
        "summarization"
    ]


class YandexGPTConfigManagerBase:
    """
    Base class for YaGPT configuration management. It handles configurations related to model type, catalog ID, IAM
    token, and API key for authorization when making requests to the completion endpoint.
    """
    def __init__(
            self,
            model_type: Optional[str] = None,
            catalog_id: Optional[str] = None,
            iam_token: Optional[str] = None,
            api_key: Optional[str] = None,
    ) -> None:
        """
        Initializes a new instance of the YandexGPTConfigManagerBase class.

        Parameters
        ----------
        model_type : Optional[str], optional
            Model type to use.
        catalog_id : Optional[str], optional
            Catalog ID on YandexCloud to use.
        iam_token : Optional[str], optional
            IAM token for authorization.
        api_key : Optional[str], optional
            API key for authorization.
        """
        self.model_type: Optional[str] = model_type
        self.catalog_id: Optional[str] = catalog_id
        self.iam_token: Optional[str] = iam_token
        self.api_key: Optional[str] = api_key

    @property
    def completion_request_authorization_field(self) -> str:
        """
        Returns the authorization field for the completion request header based on the IAM token or API key.

        Raises
        ------
        ValueError
            If neither IAM token nor API key is set.

        Returns
        -------
        str
            The authorization field for the completion request header in the form of "Bearer {iam_token}" or
            "Api-Key {api_key}".
        """
        # Checking if either iam_token or api_key is set and returning the authorization field string
        if self.iam_token:
            return f"Bearer {self.iam_token}"
        elif self.api_key:
            return f"Api-Key {self.api_key}"
        else:
            raise ValueError("IAM token or API key is not set")

    @property
    def completion_request_catalog_id_field(self) -> str:
        """
        Returns the catalog ID field for the completion request header.

        Raises
        ------
        ValueError
            If catalog_id is not set.

        Returns
        -------
        str
            The catalog ID field for the completion request header.
        """
        # Checking if catalog_id is set and returning the catalog id field string
        if self.catalog_id:
            return self.catalog_id
        else:
            raise ValueError("Catalog ID is not set")

    @property
    def completion_request_model_type_uri_field(self) -> str:
        """
        Returns the model type URI field for the completion request payload.

        Raises
        ------
        ValueError
            If model_type or catalog_id is not set or if model_type is not in the available models.

        Returns
        -------
        str
            The model type URI field for the completion request header in the URI format.
        """
        global available_models

        # Checking if model_type is in available_models
        if self.model_type not in available_models:
            raise ValueError(f"Model type {self.model_type} is not supported. Supported values: {available_models}")

        # Checking if model_type and catalog_id are set and returning the model type URI field string
        if self.model_type and self.catalog_id:
            return f"gpt://{self.catalog_id}/{self.model_type}/latest"
        else:
            raise ValueError("Model type or catalog ID is not set")

В зависимости от выбранного метода (API‑ключ или IAM‑токен), реализуем соответствующие классы для авторизации. Эти классы будут автоматически выполнять все необходимые запросы к API, а нам останется только задать нужные параметры при инициализации или через переменные окружения.

YandexGPTConfigManagerForAPIKey и YandexGPTConfigManagerForIAMToken
class YandexGPTConfigManagerForAPIKey(YandexGPTConfigManagerBase):
    """
    Class for configuring the YandexGPT model using an API key. It supports setting model type, catalog ID, and API key
    directly or through environment variables. The class allows for configuration flexibility by providing the option to
    use environmental variables for model type (`YANDEX_GPT_MODEL_TYPE`), catalog ID (`YANDEX_GPT_CATALOG_ID`), and API
    key (`YANDEX_GPT_API_KEY`), which can override the constructor values if set.
    """
    def __init__(
            self,
            model_type: Optional[str] = None,
            catalog_id: Optional[str] = None,
            api_key: Optional[str] = None,
    ) -> None:
        """
        Initializes a new instance of the YandexGPTConfigManagerForAPIKey class.

        Parameters
        ----------
        model_type : Optional[str], optional
            Model type to use.
        catalog_id : Optional[str], optional
            Catalog ID on YandexCloud to use.
        api_key : Optional[str], optional
            API key for authorization.
        """
        # Setting model type, catalog ID and API key from the constructor
        super().__init__(
            model_type=model_type,
            catalog_id=catalog_id,
            api_key=api_key
        )

        # Setting model type, catalog ID and API key from the environment variables if they are set
        self._set_config_from_env_vars()

        # Checking if model type, catalog ID and API key are set
        self._check_config()

    def _set_config_from_env_vars(self) -> None:
        """
        Sets configuration parameters from environment variables if they are not provided in the constructor.
        """
        self.model_type = os.environ.get("YANDEX_GPT_MODEL_TYPE", self.model_type)
        self.catalog_id = os.environ.get("YANDEX_GPT_CATALOG_ID", self.catalog_id)
        self.api_key = os.environ.get("YANDEX_GPT_API_KEY", self.api_key)

    def _check_config(self) -> None:
        """
        Ensures that the necessary configuration parameters are set, raising a ValueError if any are missing.
        """
        if not self.model_type:
            raise ValueError(
                "Model type is not set. You can ether provide it in the constructor or set in YANDEX_GPT_MODEL_TYPE "
                "environment variable"
            )
        elif not self.catalog_id:
            raise ValueError(
                "Catalog ID is not set. You can ether provide it in the constructor or set in YANDEX_GPT_CATALOG_ID "
                "environment variable"
            )
        elif not self.api_key:
            raise ValueError(
                "API key is not set. You can ether provide it in the constructor or set in YANDEX_GPT_API_KEY "
                "environment variable"
            )


class YandexGPTConfigManagerForIAMToken(YandexGPTConfigManagerBase):
    """
    Class for configuring the YandexGPT model using an IAM token. It handles configurations involving model type,
    catalog ID, and IAM token, with options for direct input or initialization via environment variables. The class
    provides several pathways for initializing these configurations:

    1. Directly through constructor parameters.
    2. Through environment variables `YANDEX_GPT_MODEL_TYPE`, `YANDEX_GPT_CATALOG_ID`, and `YANDEX_GPT_IAM_TOKEN`.
    3. Automatically generating the IAM token using the environment variables `YANDEX_GPT_IAM_URL`,
       `YANDEX_GPT_SERVICE_ACCOUNT_ID`, `YANDEX_GPT_SERVICE_ACCOUNT_KEY_ID`, `YANDEX_GPT_CATALOG_ID`, and
       `YANDEX_GPT_PRIVATE_KEY`.
    """
    def __init__(
            self,
            model_type: Optional[str] = None,
            catalog_id: Optional[str] = None,
            iam_token: Optional[str] = None,
    ) -> None:
        """
        Initializes a new instance of the YandexGPTConfigManagerForIAMToken class.

        Parameters
        ----------
        model_type : Optional[str], optional
            Model type to use.
        catalog_id : Optional[str], optional
            Catalog ID on YandexCloud to use.
        iam_token : Optional[str], optional
            IAM token for authorization.
        """
        # Setting model type, catalog ID and IAM token from the constructor
        super().__init__(
            model_type=model_type,
            catalog_id=catalog_id,
            iam_token=iam_token
        )

        # Setting model type, catalog ID and IAM token using one of options
        self._set_config()

        # Checking if model type, catalog ID and API key are set
        self._check_config()

    def _set_config(self) -> None:
        """
        Sets model type, IAM token, and catalog id or tries to initialize them from environment variables.
        """
        if self.iam_token and self.catalog_id and self.model_type:
            # If all parameters are set, do nothing
            return
        else:
            # Trying to initialize from environment variables
            self._set_config_from_env_vars()

    def _set_config_from_env_vars(self) -> None:
        """
        Sets config from environment variables or tries to generate the IAM token using additional environment variables
        if not directly provided.
        """
        self.model_type = os.environ.get("YANDEX_GPT_MODEL_TYPE", self.model_type)
        self.catalog_id = os.environ.get("YANDEX_GPT_CATALOG_ID", self.catalog_id)
        self.iam_token = os.environ.get("YANDEX_GPT_IAM_TOKEN", self.iam_token)

        if not self.iam_token:
            # If IAM token is not set, trying to initialize from config and private key
            self._set_iam_from_env_config_and_private_key()

    def _set_iam_from_env_config_and_private_key(self) -> None:
        """
        Generates and sets IAM token from environment variables if not provided.
        """
        # Getting environment variables
        iam_url: str = os.getenv("YANDEX_GPT_IAM_URL", "https://iam.api.cloud.yandex.net/iam/v1/tokens")
        service_account_id: Optional[str] = os.getenv("YANDEX_GPT_SERVICE_ACCOUNT_ID", None)
        service_account_key_id: Optional[str] = os.getenv("YANDEX_GPT_SERVICE_ACCOUNT_KEY_ID", None)
        catalog_id: Optional[str] = os.getenv("YANDEX_GPT_CATALOG_ID", None)
        private_key: Optional[str] = os.getenv("YANDEX_GPT_PRIVATE_KEY", None)

        # Checking environment variables
        if not all([iam_url, service_account_id, service_account_key_id, catalog_id, private_key]):
            raise ValueError("One or more environment variables for IAM token generation are missing.")

        # Generating JWT token
        jwt_token: str = self._generate_jwt_token(
            service_account_id=service_account_id,
            private_key=private_key,
            key_id=service_account_key_id,
            url=iam_url,
        )

        # Swapping JWT token to IAM
        self.iam_token = self._swap_jwt_to_iam(jwt_token, iam_url)

    @staticmethod
    def _generate_jwt_token(
            service_account_id: str,
            private_key: str,
            key_id: str,
            url: str = "https://iam.api.cloud.yandex.net/iam/v1/tokens",
    ) -> str:
        """
        Generates and swaps a JWT token to an IAM token.

        Parameters
        ----------
        service_account_id : str
            Service account ID
        private_key : str
            Private key
        key_id : str
            Service account key ID
        url : str
            IAM URL for token request

        Returns
        -------
        str
            The IAM token
        """
        # Generating JWT token
        now: int = int(time.time())
        payload: Dict[str, Any] = {
            "aud": url,
            "iss": service_account_id,
            "iat": now,
            "exp": now + 360,
        }
        encoded_token: str = jwt.encode(
            payload,
            private_key,
            algorithm="PS256",
            headers={"kid": key_id}
        )
        return encoded_token

    @staticmethod
    def _swap_jwt_to_iam(
            jwt_token: str, url: str = "https://iam.api.cloud.yandex.net/iam/v1/tokens"
    ) -> str:
        """
        Swaps a JWT token for an IAM token by making a POST request to the Yandex IAM service.

        Parameters
        ----------
        jwt_token : str
            The JWT token to be swapped.
        url : str, optional
            The URL to send the JWT token to, by default set to Yandex IAM token service endpoint.

        Returns
        -------
        str
            The IAM token received in response.

        Raises
        ------
        Exception
            If the request fails or does not return a successful response.
        """
        headers: Dict[str, str] = {"Content-Type": "application/json"}
        data: Dict[str, str] = {"jwt": jwt_token}
        # Swapping JWT token to IAM
        response: requests.Response = requests.post(
            url,
            headers=headers,
            json=data
        )
        if response.status_code == 200:
            # If succeeded to get IAM token return it
            return response.json()["iamToken"]
        else:
            # If failed to get IAM token raise an exception
            raise Exception(f"Failed to get IAM token. Status code: {response.status_code}\n{response.text}")

    def _check_config(self) -> None:
        """
        Ensures that the necessary configuration parameters are set, raising a ValueError if any are missing.
        """
        if not self.model_type:
            raise ValueError(
                "Model type is not set. You can ether provide it in the constructor or set in YANDEX_GPT_MODEL_TYPE "
                "environment variable"
            )
        elif not self.catalog_id:
            raise ValueError(
                "Catalog ID is not set. You can ether provide it in the constructor or set in YANDEX_GPT_CATALOG_ID "
                "environment variable"
            )
        elif not self.iam_token:
            raise ValueError(
                "IAM token is not set. You can ether provide it in the constructor or set in YANDEX_GPT_IAM_TOKEN "
                "environment variable or generate it automatically by setting YANDEX_GPT_SERVICE_ACCOUNT_ID, "
                "YANDEX_GPT_SERVICE_ACCOUNT_KEY_ID, YANDEX_GPT_CATALOG_ID and YANDEX_GPT_PRIVATE_KEY environment "
                "variables"
            )


class YandexGPTConfigManagerForIAMTokenWithBase64Key(YandexGPTConfigManagerForIAMToken):
    """
    A specialized configuration manager for YandexGPT that handles base64-encoded private keys. This is particularly
    useful in environments like Docker where special characters (e.g., newline) in environment variables can cause
    issues. The private key is expected to be set in the YANDEX_GPT_PRIVATE_KEY_BASE64 environment variable.

    Inherits attributes from YandexGPTConfigManagerForIAMToken.
    """
    def _set_iam_from_env_config_and_private_key(self) -> None:
        """
        Overrides the base method to generate and set the IAM token using a base64-encoded private key from
        environment variables.

        Raises
        ------
        ValueError
            If any required environment variables are missing.
        """
        # Getting environment variables
        iam_url: str = os.getenv("YANDEX_GPT_IAM_URL", "https://iam.api.cloud.yandex.net/iam/v1/tokens")
        service_account_id: Optional[str] = os.getenv("YANDEX_GPT_SERVICE_ACCOUNT_ID", None)
        service_account_key_id: Optional[str] = os.getenv("YANDEX_GPT_SERVICE_ACCOUNT_KEY_ID", None)
        catalog_id: Optional[str] = os.getenv("YANDEX_GPT_CATALOG_ID", None)
        private_key_base64: Optional[str] = os.getenv("YANDEX_GPT_PRIVATE_KEY_BASE64", None)

        # Checking environment variables
        if not all([iam_url, service_account_id, service_account_key_id, catalog_id, private_key_base64]):
            raise ValueError("One or more environment variables for IAM token generation are missing.")

        # Decoding private key
        private_key_bytes: bytes = base64.b64decode(private_key_base64)
        private_key: str = private_key_bytes.decode("utf-8")

        # Generating JWT token
        jwt_token: str = self._generate_jwt_token(
            service_account_id=service_account_id,
            private_key=private_key,
            key_id=service_account_key_id,
            url=iam_url,
        )

        # Swapping JWT token to IAM
        self.iam_token = self._swap_jwt_to_iam(jwt_token, iam_url)

YandexGPT

Для работы с YandexGPT API создадим специализированный базовый класс, который будет поддерживать как синхронные, так и асинхронные запросы.

YandexGPTBase
class YandexGPTBase:
    """
    This class is used to interact with the Yandex GPT API, providing asynchronous and synchronous methods to send requests and poll for their completion.
    """
    @staticmethod
    async def send_async_completion_request(
            headers: Dict[str, str],
            payload: Dict[str, Any],
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completionAsync"
    ) -> str:
        """
        Sends an asynchronous request to the Yandex GPT completion API.

        Parameters
        ----------
        headers : Dict[str, str]
            Dictionary containing the authorization token (IAM), content type, and x-folder-id (YandexCloud catalog ID).
        payload : Dict[str, Any]
            Dictionary with the model URI, completion options, and messages.
        completion_url : str
            URL of the completion API.

        Returns
        -------
        str
            ID of the completion operation to poll.
        """
        # Making the request
        async with aiohttp.ClientSession() as session:
            async with session.post(completion_url, headers=headers, json=payload) as resp:
                # If the request was successful, return the ID of the completion operation
                # Otherwise, raise an exception
                if resp.status == 200:
                    data = await resp.json()
                    return data['id']
                else:
                    raise Exception(f"Failed to send async request, status code: {resp.status}")

    @staticmethod
    async def poll_async_completion(
            operation_id: str,
            headers: Dict[str, str],
            timeout: int = 5,
            poll_url: str = 'https://llm.api.cloud.yandex.net/operations/'
    ) -> Dict[str, Any]:
        """
        Polls the status of an asynchronous completion operation until it completes or times out.

        Parameters
        ----------
        operation_id : str
            ID of the completion operation to poll.
        headers : Dict[str, str]
            Dictionary containing the authorization token (IAM).
        timeout : int
            Time in seconds after which the operation is considered timed out.
        poll_url : str
            Poll URL.

        Returns
        -------
        Dict[str, Any]
            Completion result.
        """
        # Polling the completion operation for the specified amount of time
        async with aiohttp.ClientSession() as session:
            end_time = asyncio.get_event_loop().time() + timeout
            while True:
                # Check if the operation has timed out and if so, raise an exception
                if asyncio.get_event_loop().time() > end_time:
                    raise TimeoutError(f"Operation timed out after {timeout} seconds")
                # Polling the operation
                async with session.get(f"{poll_url}{operation_id}", headers=headers) as resp:
                    # If the request was successful, return the completion result
                    # Otherwise, raise an exception
                    if resp.status == 200:
                        data = await resp.json()
                        if data.get('done', False):
                            return data
                    else:
                        raise Exception(f"Failed to poll operation status, status code: {resp.status}")
                await asyncio.sleep(1)

    @staticmethod
    def send_sync_completion_request(
            headers: Dict[str, str],
            payload: Dict[str, Any],
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
    ) -> Dict[str, Any]:
        """
        Sends a synchronous request to the Yandex GPT completion API.

        Parameters
        ----------
        headers : Dict[str, str]
            Dictionary containing the authorization token (IAM), content type, and x-folder-id (YandexCloud catalog ID).
        payload : Dict[str, Any]
            Dictionary with the model URI, completion options, and messages.
        completion_url : str
            URL of the completion API.

        Returns
        -------
        Dict[str, Any]
            Completion result.
        """
        # Making the request
        response = requests.post(completion_url, headers=headers, json=payload)
        # If the request was successful, return the completion result
        # Otherwise, raise an exception
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to send sync request, status code: {response.status_code}")

А теперь создадим класс‑обёртку с автоматической обработкой запросов на генерацию и менеджментом данных для авторизации.

YandexGPT
class YandexGPT(YandexGPTBase):
    """
    Extends the YandexGPTBase class to interact with the Yandex GPT API using a simplified configuration manager.
    This class allows for easier configuration of API requests and includes both synchronous and asynchronous methods.
    """
    def __init__(
            self,
            config_manager: Union[YandexGPTConfigManagerBase, Dict[str, Any]]
    ) -> None:
        """
        Initializes the YandexGPT class with a configuration manager.

        Parameters
        ----------
        config_manager : Union[YandexGPTConfigManagerBase, Dict[str, Any]]
            Config manager or a dictionary containing:
            1) completion_request_model_type_uri_field
               ("gpt://{self.config_manager.catalog_id}/{self.config_manager.model_type}/latest")
            2) completion_request_catalog_id_field (self.config_manager.catalog_id)
            3) completion_request_authorization_field ("Bearer {iam_token}" or "Api-Key {api_key}")
        """
        self.config_manager = config_manager

    def _create_completion_request_headers(self) -> Dict[str, str]:
        """
        Creates headers for sending a completion request to the API.

        Returns
        -------
        Dict[str, str]
            Dictionary with authorization credentials, content type, and x-folder-id (YandexCloud catalog ID).
        """
        return {
            "Content-Type": "application/json",
            "Authorization": self.config_manager.completion_request_authorization_field,
            "x-folder-id": self.config_manager.completion_request_catalog_id_field
        }

    def _create_completion_request_payload(
            self,
            messages: Union[List[YandexGPTMessage], List[Dict[str, str]]],
            temperature: float = 0.6,
            max_tokens: int = 1000,
            stream: bool = False
    ) -> Dict[str, Any]:
        """
        Creates the payload for sending a completion request.

        Parameters
        ----------
        messages : Union[List[YandexGPTMessage], List[Dict[str, str]]]
            List of messages with roles and texts.
        temperature : float
            Controls the randomness of the completion, from 0 (deterministic) to 1 (random).
        max_tokens : int
            Maximum number of tokens to generate.
        stream : bool
            Stream option for the API, currently not supported in this implementation.

        Returns
        -------
        Dict[str, Any]
            Dictionary containing the model URI, completion options, and messages.
        """
        return {
            "modelUri": self.config_manager.completion_request_model_type_uri_field,
            "completionOptions": {
                "stream": stream,
                "temperature": temperature,
                "maxTokens": max_tokens
            },
            "messages": messages
        }

    async def get_async_completion(
            self,
            messages: Union[List[YandexGPTMessage], List[Dict[str, str]]],
            temperature: float = 0.6,
            max_tokens: int = 1000,
            stream: bool = False,
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completionAsync",
            timeout: int = 5
    ) -> str:
        """
        Sends an asynchronous completion request to the Yandex GPT API and polls for the result.

        Parameters
        ----------
        messages : Union[List[YandexGPTMessage], List[Dict[str, str]]]
            List of messages with roles and texts.
        temperature : float
            Randomness of the completion, from 0 (deterministic) to 1 (most random).
        max_tokens : int
            Maximum number of tokens to generate.
        stream : bool
            Indicates whether streaming is enabled; currently not supported in this implementation.
        completion_url : str
            URL to the Yandex GPT asynchronous completion API.
        timeout : int
            Time in seconds after which the operation is considered timed out.

        Returns
        -------
        str
            The text of the completion result.

        Raises
        ------
        Exception
            If the completion operation fails or times out.
        """
        # Making the request and obtaining the ID of the completion operation
        headers: Dict[str, str] = self._create_completion_request_headers()
        payload: Dict[str, Any] = self._create_completion_request_payload(
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream
        )

        completion_request_id: str = await self.send_async_completion_request(
            headers=headers,
            payload=payload,
            completion_url=completion_url
        )

        # Polling the completion operation
        completion_response: Dict[str, Any] = await self.poll_async_completion(
            operation_id=completion_request_id,
            headers=headers,
            timeout=timeout
        )

        # If the request was successful, return the completion result
        # Otherwise, raise an exception
        if completion_response.get('error', None):
            raise Exception(f"Failed to get completion: {completion_response['error']}")
        else:
            return completion_response['response']['alternatives'][0]['message']['text']

    def get_sync_completion(
            self,
            messages: Union[List[YandexGPTMessage], List[Dict[str, str]]],
            temperature: float = 0.6,
            max_tokens: int = 1000,
            stream: bool = False,
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion",
    ):
        """
        Sends a synchronous completion request to the Yandex GPT API and returns the result.

        Parameters
        ----------
        messages : Union[List[YandexGPTMessage], List[Dict[str, str]]]
            List of messages with roles and texts.
        temperature : float
            Randomness of the completion, from 0 (deterministic) to 1 (most random).
        max_tokens : int
            Maximum number of tokens to generate.
        stream : bool
            Indicates whether streaming is enabled; currently not supported in this implementation.
        completion_url : str
            URL to the Yandex GPT synchronous completion API.

        Returns
        -------
        str
            The text of the completion result.

        Raises
        ------
        Exception
            If the completion request fails.
        """
        # Making the request
        headers: Dict[str, str] = self._create_completion_request_headers()
        payload: Dict[str, Any] = self._create_completion_request_payload(
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream
        )

        completion_response: Dict[str, Any] = self.send_sync_completion_request(
            headers=headers,
            payload=payload,
            completion_url=completion_url
        )

        # If the request was successful, return the completion result
        # Otherwise, raise an exception
        if completion_response.get('error', None):
            raise Exception(f"Failed to get completion: {completion_response['error']}")
        else:
            return completion_response['result']['alternatives'][0]['message']['text']

Threads

А теперь самый сок: для хранения и управления отдельными чатами создадим класс YandexGPTThread. Он позволит хранить сообщения, управлять перепиской и генерировать ответы любым удобным способом: синхронно или асинхронно.

YandexGPTThread
class YandexGPTThreadStatus(TypedDict):
    status: str
    last_error_message: Optional[str]


class YandexGPTThread(YandexGPT):
    """
    A thread-based interface for interacting with the Yandex GPT model.

    This class manages asynchronous messaging and maintains the state of conversation threads.
    """
    def __init__(
            self,
            config_manager: Union[YandexGPTConfigManagerBase, Dict[str, Any]],
            messages: Optional[List[YandexGPTMessage]] = None,
    ) -> None:
        """
        Initializes a new instance of the YandexGPTThread.

        Parameters
        ----------
        config_manager : Union[YandexGPTConfigManagerBase, Dict[str, Any]]
            Configuration manager for the Yandex GPT model.
        messages : Optional[List[YandexGPTMessage]], optional
            Initial list of messages within the thread, by default None.
        """
        super().__init__(config_manager=config_manager)

        if messages:
            self.messages = messages
        else:
            self.messages = []

        self.status = YandexGPTThreadStatus(
            status="created",
            last_error_message=None
        )

    def add_message(
            self,
            role: str,
            text: str
    ) -> None:
        """
        Appends a new message to the conversation thread.

        Parameters
        ----------
        role : str
            The role of the message, typically 'user' or 'assistant'.
        text : str
            The content of the message.
        """
        self.messages.append(YandexGPTMessage(role=role, text=text))

    def __getitem__(self, index):
        """
        Allows retrieval of a message by index from the conversation thread.

        Parameters
        ----------
        index : int
            Index of the message to retrieve.

        Returns
        -------
        YandexGPTMessage
            The message at the specified index.
        """
        return self.messages[index]

    def __len__(self):
        """
        Returns the number of messages in the conversation thread.

        Returns
        -------
        int
            The number of messages.
        """
        return len(self.messages)

    async def run_async(
            self,
            temperature: float = 0.6,
            max_tokens: int = 1000,
            stream: bool = False,
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completionAsync",
            timeout: int = 15
    ):
        """
        Runs the thread asynchronously, requesting and appending completion from the Yandex GPT model.

        Parameters
        ----------
        temperature : float
            Sampling temperature, scales the likelihood of less probable tokens. Value from 0 to 1.
        max_tokens : int
            Maximum number of tokens to generate.
        stream : bool
            Stream responses from the API (not currently supported).
        completion_url : str
            URL of the asynchronous completion API.
        timeout : int
            Timeout in seconds for the asynchronous call.

        Raises
        ------
        Exception
            If the thread is already running.
        """
        if self.status["status"] == "running":
            raise Exception("Thread is already running")
        else:
            self.status["status"] = "running"

            try:
                completion_text = await self.get_async_completion(
                    messages=self.messages,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    stream=stream,
                    completion_url=completion_url,
                    timeout=timeout
                )
                self.add_message(role="assistant", text=completion_text)
            except Exception as e:
                self.status["status"] = "error"
                self.status["last_error_message"] = str(e)
            finally:
                self.status["status"] = "idle"

    def run_sync(
            self,
            temperature: float = 0.6,
            max_tokens: int = 1000,
            stream: bool = False,
            completion_url: str = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
    ):
        """
        Runs the thread synchronously, requesting and appending completion from the Yandex GPT model.

        Parameters
        ----------
        temperature : float
            Sampling temperature, scales the likelihood of less probable tokens. Value from 0 to 1.
        max_tokens : int
            Maximum number of tokens to generate.
        stream : bool
            Stream responses from the API (not currently supported).
        completion_url : str
            URL of the synchronous completion API.

        Raises
        ------
        Exception
            If the thread is already running.
        """
        if self.status["status"] == "running":
            raise Exception("Thread is already running")
        else:
            self.status["status"] = "running"

            try:
                completion_text = self.get_sync_completion(
                    messages=self.messages,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    stream=stream,
                    completion_url=completion_url
                )
                self.add_message(role="assistant", text=completion_text)
            except Exception as e:
                self.status["status"] = "error"
                self.status["last_error_message"] = str(e)
            finally:
                self.status["status"] = "idle"

Результат

Посмотрим на простой пример использования нашего решения:

from yandex_gpt import YandexGPTConfigManagerForIAMToken, YandexGPTThread
import asyncio

yandex_gpt_thread = YandexGPTThread(config_manager=YandexGPTConfigManagerForIAMToken(), messages=[{'role': 'system', 'text': 'You are a helpful assistant.'}, {'role': 'user', 'text': 'Hello!'}])

asyncio.run(yandex_gpt_thread.run_async())

print(yandex_gpt_thread[-1]['text'])

В результате всего в несколько строк кода получаем вот такой прекрасный результат:

>> Hello! How can I help you?

Заключение

Мы обсудили, как и когда стоит использовать YandexGPT, какие проблемы могут возникнуть у разработчиков, решивших работать с YandexGPT API, и как их можно решить с помощью небольшого Python SDK.

Все описанные выше наработки находятся в открытом доступе и подробно задокументированы. Вы можете поддержать проект своим участием: за это полагаются хорошая карма и позитивные космические волны — попробуйте! А я буду рад принять ваш Pull Request.

Удачи и будем на связи✌️

Если после прочтения вам хочется узнать, какие еще решения на базе ИИ могут быть полезны бизнесу — посмотрите подборку 85 решений с ИИ, которые мы собрали в AllSee и с радостью готовы поделиться с вами.

Теги:
Хабы:
Всего голосов 10: ↑9 и ↓1+10
Комментарии5

Публикации

Истории

Работа

Python разработчик
103 вакансии
Data Scientist
67 вакансий

Ближайшие события

28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
2 – 18 декабря
Yandex DataLens Festival 2024
МоскваОнлайн
11 – 13 декабря
Международная конференция по AI/ML «AI Journey»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань