Привет, Хабр! На связи Артём Петров, я занимаюсь разработкой ПО в центре технологий VK. Хочу рассказать о важной задаче обработки больших объёмов данных с использованием нескольких экземпляров одной и той же модели машинного обучения. Этот процесс называется batch inference («пакетный инференс») и позволяет значительно повысить производительность системы, особенно когда речь идёт о таблицах большого размера.
Open Inference Protocol (OIP)
Существует много различных инструментов для инференса моделей машинного обучения. Например, библиотека CatBoost предлагает собственный специализированный формат для эффективной обработки данных. Несколько лет назад также появился Open Inference Protocol, который предназначен для унифицированной записи входных данных различных моделей в стандартизированном виде, включая разнообразные эмбеддинги.
Так выглядят входные данные:
{
"inputs": [
{
"name": "IAB_TEXT",
"shape": [
1
],
"datatype": "BYTES",
"data": [
""
]
}
]
}
У inputs
может быть имя, размерность, тип данных и сами данные.
А так выглядит ответ:
{
"model_name": "banner-classifier-python",
"model_version": "1",
"outputs": [
{
"name": "SCORE",
"shape": [
1,
34
],
"datatype": "FP64",
"data": [
0.16790169891977166,
0.12961032360575966,
0.05175606230880863,
0.04470961327023595,
0.1444450118377472,
0.09656370402784009,
0.1787367952925338,
0.19052070898630957,
0.22135750180980127,
0.05315560371866009,
0.058228992477796775,
0.10493222226161725,
0.07665099811086777,
0.05640374156249024,
0.03818363270376569,
0.06162369427523705,
0.08013533940834351,
0.12504546550503864,
0.017400716033072968,
0.05009478320309705,
0.0784720983460859,
0.11254104392238658,
0.15487643434177747,
0.07010075919855611,
0.06598170891575378,
0.028824552053010277,
0.13164396641373022,
0.15785238316985242,
0.09407360275114268,
0.20113965508675122,
0.09949341314285333,
0.156285839927019,
0.21333526549889067,
0.1807703644287332
]
},
{
"name": "PRED_IDX",
"shape": [
1,
0
],
"datatype": "FP64"
}
]
}
Можно добавлять и другие поля, если в этом есть необходимость.
Мы сейчас используем две основные реализации протокола — MLServer и Triton. Triton — это реализация от Nvidia, корни которой уходят глубоко в С++ и для которой есть бэкенды для Python, ONNX, TensorRT, PyTorch, Tensorflow и так далее. А MLServer — это более простой формат, поэтому сначала расскажу о нём.
MLServer
Пользователи присылают модели в виде .tar-файла:
├── assets
├── models
│ └── spam
│ ├── model-settings.json
│ ├── package
│ │ ├── init.py
│ │ └── models.py
│ ├── settings.json
│ └── spam.cbm
└── requirements.txt
Requirements.txt — описание требований к пакетам на Python, если что-то надо установить.
MLServer позволяет загружать лишь одну модель одновременно. Файл settings.json служит базовой конфигурацией сервиса, где можно настроить вывод отладочной информации. Обычно этот файл не содержит критически важных настроек.
[0] % cat models/spam/settings.json
{"debug": "true"}
В model-settings.json хранится метаинформация о том, как мы должны общаться с моделью. У inference-сервера есть специальная ручка, просто get HTTP, имя модели и так далее.
[0] % cat models/spam/model-settings.json
{"name": "spam", "implementation": "package.CatboostModel", "parameters": {"uri": "spam.cbm"}, "platform": "python3.10", "inputs": [{"name": "value", "shape": [-1], "datatype": "FP32", "parameters": {"content_type": "np"}}], "outputs": [{"name": "predict", "shape": [2, 1], "datatype": "FP64", "parameters": {"content_type": "np"}}]}
Чтобы запустить inference-сервер модели в MLServer, необходимо реализовать базовый класс MLModel, наследоваться от него и сделать один основной метод — predict
. Остальные — по желанию.
[0] % cat models/spam/package/models.py
from catboost import CatBoostClassifier
import numpy as np
from mlserver import types
from mlserver.model import MLModel
from mlserver.utils import get_model_uri
from mlserver.codecs import NumpyCodec, NumpyRequestCodec
WELLKNOWN_MODEL_FILENAMES = ["model.cbm", "model.bin"]
class CatboostModel(MLModel):
"""
Implementation of the MLModel interface to load and serve `catboost` models.
"""
async def load(self) -> bool:
model_uri = await get_model_uri(
self._settings, wellknown_filenames=WELLKNOWN_MODEL_FILENAMES
)
self._model = CatBoostClassifier()
self._model.load_model(model_uri)
self.ready = True
return self.ready
async def predict(self, payload: types.InferenceRequest) -> types.InferenceResponse:
decoded = self.decode_request(payload, default_codec=NumpyRequestCodec)
prediction = np.array(self._model.predict(decoded, prediction_type="Probability"))
print(prediction)
return types.InferenceResponse(
model_name=self.name,
model_version=self.version,
outputs=[NumpyCodec.encode_output(name="predict", payload=prediction)],
)
В этом примере мы загружаем бинарный формат Catboost и подготавливаем сервер для predict
.
Triton
Triton — это inference сервер моделей глубокого обучения, разработанный компанией NVIDIA. Ядро сервера написано на C++, однако пользователям доступен интерфейс на Python, что делает его удобным инструментом для интеграции в существующие пайплайны. В нём поддерживаются специфические форматы, вроде TensorRT и Onnx.
Одной из ключевых особенностей Triton является возможность создавать ансамбли моделей. Это означает, что несколько моделей могут быть объединены в единый конвейер, что упрощает создание сложных архитектур.
[0] % tree
├── assets
│ └── sentencepiece.bpe.model
├── models
│ ├── embed64
│ │ ├── 1
│ │ │ └── model.onnx
│ │ └── config.pbtxt
│ ├── me5_ensemble
│ │ ├── 1
│ │ └── config.pbtxt
│ └── tokenizer
│ ├── 1
│ │ └── model.py
│ └── config.pbtxt
└── requirements.txt
В качестве базовых моделей здесь используются Embed64 и tokenizer.
[0] % cat ./models/embed64/config.pbtxt
name: "embed64"
platform: "onnxruntime_onnx"
max_batch_size: 16
input [
{
name: "inp"
data_type: TYPE_INT64
dims: [ -1 ]
}
]
output [
{
name: "out"
data_type: TYPE_FP32
dims: [ -1 ]
}
]
instance_group [
{
count: 1
kind: KIND_GPU
gpus: [ 0 ]
}
]
dynamic_batching {
}
Эта модель принимает на вход тензоры целочисленных значений (TYPE_INT64) с неопределённой размерностью, что позволяет обрабатывать данные различной длины. Выход представляет собой тензор с плавающей точкой (TYPE_FP32), также с неопределённой размерностью. Параметры динамического батчинга обеспечивают эффективное использование ресурсов GPU.
Всё это можно собрать в ансамбль:
[0] % cat ./models/me5_ensemble/config.pbtxt
name: "me5_ensemble"
platform: "ensemble"
max_batch_size: 0
input [
{
name: "query"
data_type: TYPE_STRING
dims: [ 1 ]
},
{
name: "id"
data_type: TYPE_INT64
dims: [ 1 ]
},
{
name: "version"
data_type: TYPE_INT64
dims: [ 1 ]
}
]
output [
{
name: "community_embed_e5"
data_type: TYPE_FP32
dims: [ -1 ]
}
]
ensemble_scheduling {
step [
{
model_name: "tokenizer"
model_version: -1
input_map {
key: "query"
value: "query"
}
input_map {
key: "id"
value: "id"
}
input_map {
key: "version"
value: "version"
}
output_map {
key: "inp"
value: "inp"
}
},
{
model_name: "embed64"
model_version: -1
input_map {
key: "inp"
value: "inp"
}
output_map {
key: "out"
value: "community_embed_e5"
}
}
]
}
В нём мы описываем, что именно хотим делать и каким образом: сначала идём в токенизатор, а его ответы отдаём в эмбеддер. Результат работы сервера — это community_embed_e5
.
Я и мой бро Batch-worker
Необходимость использовать Batch-worker возникла по причине того, что если в какой-то команде есть ML-модели, то всё время нужно что-то обучить или сверить результаты, возможно, с предыдущей версией. Отсюда вытекает вполне логичный вывод: каждому отделу проще создать собственный механизм для инференса.
То есть главная задача пакетного инференса – дать пользователям возможность обрабатывать таблицы данных без необходимости создания инфраструктуры с их стороны. Пользователь развёртывает модель на нашей платформе с желаемым количеством экземпляров и ресурсов. Затем он вводит входную и выходную таблицу, и запускает inference. Всё это мы решаем благодаря Open Inference Protocol, который позволяет общаться с моделью в общем виде. То есть мы просто получаем входные данные, знаем, что нужно прочитать из таблицы и что отдать.
Разумеется, все не может быть идеально:
Во-первых, весь проект реализован на Go, а это означает, что для OIP библиотек нет. Библиотеки существуют исключительно на Python, но эта технология не подходит для высоконагруженных проектов. Остаётся разрабатывать собственную реализацию вручную.
Формат позволяет обращаться по разным спецификациям: REST, gRPC, особенным бинарным на базе REST, и все они работают абсолютно по-разному.
Протокол подразумевает вложенные массивы (в будущем — тензоры), которые нужно уметь правильно обрабатывать в map-reduce хранилище под названием YTsaurus.
Пакетный инференс должен уметь работать долго. Минимум несколько дней, потому что не все модели отвечают за одну миллисекунду, а пользователи приносят терабайтные таблицы. Бывают ситуации, когда модель запускают и ждут неделями.
Есть множество внешних зависимостей, таких как: облако, хранилище map-reduce.
Нужно уметь восстанавливаться, если случился сбой. А сбои будут, потому что из-за обилия множества внешних зависимостей могут возникать странные проблемы. Например, какое-то время назад у нас отваливались транзакции: просто не работали больше получаса, и неважно, пингуешь их или нет. Ещё случалось, что развернули в облаке модель, она проработала пару дней, а потом переполнился диск и начались падения по ООМ. Часто случает и так, что модель просто начинает сыпать ошибками. Поэтому нужно уметь перезапуститься так, чтобы не потерять накопленный результат.
Наша реализация Open Inference Protocol
Общая схема работы пакетного инференса сейчас выглядит так:

Условно схему системы можно разделить на три слоя: чтение, inference и запись.
Чтение
Эта процедура работает почти безотказно, потому что зависимостей мало: прочитал небольшую строчку из таблицы → записал в канал; посмотрел, есть ли запрос на изменение размера пакета. То есть мы добавили только возможность пользователю динамически обновлять batch-worker, меняя ему параметры batching’a.

Batching — один из механизмов OIP, который позволяет укладывать несколько запросов в один. Мы сохраняем все имена и типы данных, просто в поле Data пишем сразу несколько строк и меняем shape.
После чтения нужно отправить запросы в канал, из которого они пойдут уже в модели.
Inference
Регулярно запускаем проверки работоспособности:
каждые 15 минут — ping;
каждые 2 минуты — dig;
а также вне графика, если с хоста приходит много ошибок.

Тут начинается стохастика. Не всегда понятно, что с этим делать. Ведь когда мы читаем, всё это пишется в один канал. Поэтому на каждой хост-модели у нас есть свой worker, который инферит модели. Это значит, что необходимо уметь балансировать нагрузку и возвращать ошибки. Например, если мы прочитали из канала и постучали в «мёртвую» модель, то нужно аккуратно выключить этот хост и вернуть прочитанные им сообщения в общий пул.
Кастомные каналы
Объясню, как у нас получилось реализовать возможность вернуть сообщение в канал и не получить взаимную блокировку.
У канала есть обычные методы: Send
, Close
, Closed
, Len
и Cap
.
type channel struct {
ch chan *Request
done chan struct{}
}
// Send attempts to send a Request to the channel.
func (c *channel) Send(req *Request) error {
}
// Receive returns the channel for reading Requests.
func (c *channel) Receive() <-chan *Request {
}
// Close closes the channel.
func (c *channel) Close() {
}
// Closed returns a channel that is closed when the channel is closed.
func (c *channel) Closed() <-chan struct{} {
}
// Len returns the current number of items in the channel.
func (c *channel) Len() int {
}
// Cap returns the capacity of the channel.
func (c *channel) Cap() int {
}
Есть структура Output
, которая примерно всё повторяет, в которую пишутся ответы от модели:
type Output struct {
*channel
}
// Send sends a Request to the channel, respecting context cancellation.
func (o *Output) Send(ctx context.Context, response *Request) error {
}
И есть структура Input
: входной канал, в который пишет читатель.
type Input struct {
*channel
wg sync.WaitGroup
}
// Send sends a Request to the channel, respecting context cancellation.
func (i *Input) Send(ctx context.Context, req *Request) error {
}
// Retry attempts to resend a Request to the channel asynchronously.
func (i *Input) Retry(ctx context.Context, req *Request) error {
}
// Done decrements the wait group counter.
func (i *Input) Done() {
}
// Close waits for all operations to complete and then closes the channel.
func (i *Input) Close() {
}
Может произойти следующая ситуация: представьте, что чтение сообщений идёт из общего канала, а тут хосты начинают замедляться («троттлить»), и обработанное сообщение снова возвращается в тот же самый канал. И если вдруг все n-воркеров начали писать в канал обратно, а читатель продолжает читать, то возникнет взаимная блокировка и вся система упадёт. Чтобы этого не происходило, мы добавили обычную wait-группу, которая позволяет модели или воркеру, который отвечает за модель, обработать сообщение, добавив к счетчику wait-группы единицу. Если всё успешно, то мы отправляем сообщение дальше, вызвав функцию Done()
, декрементировав счетчик wait-группы. А если нет, то возвращаем сообщение в канал , не выполняя Done()
, чтобы не закрыть канал, пока в него пытаются вернуться сообщения.
Запись

Запись тоже достаточно нетривиальная по нескольким причинам задача. Зачастую происходит так, что мы читаем какие-то небольшие входные данные. Это могут быть строки разной длины. Плохо, когда вместо строки вываливается эмбеддинг, например, размером 512 на 512 типа float32. В результате может случиться out of memory. Поэтому, ещё до запуска, нужно рассчитывать размер буфера, так как ответы от моделей могут весить мегабайты и даже больше. За этот расчёт у нас отвечает функция RequestSize()
. В конфигурации указываем размеры буферов в гигабайтах. Узнав, сколько сообщений можно хранить, не получив out of memory, уже определяем математически максимальный размер для заданного кол-ва памяти.
Но есть и другая, более обширная проблема. Данные в статических таблиц YT хранятся чанковым методом (чанк – блок памяти определенного размера), что делает запись пакета размера, меньшего чем один чанк, очень дорогой, поэтому такие полупустые чанки нужно уметь сливать в один с помощью merge-операции. Кроме того, появляется проблема блокировки: во время merge-операции все другие операции над таблицей блокируются, поэтому большая таблица может блокироваться на часы, тем самым уменьшая количество чанков. Чтобы этого избежать, мы создаем n временных таблиц, в которые пишем параллельно из нескольких потоков по принципу N:N (каждой таблице свой собственный поток), в которые пишем определенное количество пакетов, к примеру, m. После записи m батчей в каждую из таблиц, мы сливаем эти таблицы в выходную, а сами уже создаем новые m таблицы и им присваиваем новых воркеров. Таким образом, это помогло нам кратно ускорить batch-worker'а так, что теперь буфер на записи всегда простаивает, какие бы увесистые ответы мы не получали от моделей.
Результаты и планы
У слияния чанков есть проблема: всё равно копится очередь. Раньше мы писали в одну таблицу и ждали, пока данные объединятся, что могло занимать очень много времени. Сейчас пишем сразу, к примеру, в пять таблиц, и затем они объединяются. Но они могут не все заполниться одновременно, например, четыре будут ждать, пока одна ещё записывается. А хорошо бы не ждать, а каким-то образом избавиться от тех четырёх, чтобы освободить буфер. Этого можно добиться, реализовав объединение с древовидной структурой. Тогда улучшится производительность, упростится управление данными и повысится масштабируемость системы.
Вторая проблема — это балансировка нагрузки. Хотя за всё отвечает канал, всё же достаточно тяжело понять, когда модель начинает «троттлить». Если она начала немного тупить, то может отдавать, допустим, один плохой ответ из трёх в выходном канале за счет большей скорости, нежели честный инференс. И что с этим делать — непонятно. Кажется, что нужно снижать нагрузку. Но тогда снизится общая скорость обработки. И было бы неплохо уметь правильно обращаться с воркерами, добавляя и убирая их по мере необходимости.
Ещё было бы неплохо добавить немножко больше удобства для пользователей: дашборды, статистику и разные визуализации. Сейчас пользователь может только смотреть на изменения в своей таблице.
Хотим внедрить поддержку VLLM — это протокол для общения с большими языковыми моделями. OIP тоже это позволяет, но очень неудобно.
Планируем поддержать протокол gRPC и бинарный формат OIP. Скорость — это главное, и если её можно поднять на 10-30 % на больших наборах данных и моделях, то обязательно стоит это сделать.
Ещё больше про практический опыт и нетривиальные задачи, которые мы решаем, расскажем на VK Go Meetup 2025, который пройдёт уже 24 апреля. Мы запустили большой технологический проект по переводу ВКонтакте на сервисную архитектуру и построению единой платформы разработки. Регистрируйтесь на сайте и узнайте первыми о том, как мы планируем решать эту масштабную задачу.