Привет! Меня зовут Олег Бугримов, я руковожу разработкой в команде Data Science SWAT в Авито. Мы занимаемся инженерией для машинного обучения. Одно из направлений - это оптимизация продового инференса. Наша задача чтобы модельки работали быстро и не потребляли безумное количество ресурсов. Так вот, мы дооптимизировались до того, что реализовали инструмент который позволяет сэкономить 30% железа. Ниже представлен реальный график нагрузки GPU-процессора:
Как видно, после переезда на наше решение, нагрузка на GPU-процессор уменьшилась на треть, при этом, конечно же, сам трафик на сервис не поменялся. Дальше я расскажу как мы этого добились.
Почему python, а не go
Хотя основной язык разработки у нас в Авито это golang, но наши МЛ сервисы мы пишем на python. Конечно, у нас в компании есть МЛ-решения на go, но на питоне получается быстрее и дешевле выкатить модель, сразу после того как только она сошла с конвейера обучения. Дальше, при условии что работа модели приносит пользу, можно переводить ее на go, но с нашим акведуком острой потребности в этом нет.
Теоретическая часть
Постановка проблемы
Сервисы на питоне мы пишем с применением asyncio. Были реализации с flask на тредах но, после ряда исследований, мы пришли к выводу, что asyncio все таки лучше. При работе модели в сервисе с использованием asyncio есть одна сложность, если мл-моделька работает значительное время, скажем десятки миллисекунд и более, то ioloop начинает блокироваться и сервис начинает работать заметно медленнее. Можно решить только эту проблему, но мы решили что можно еще повысить пропускную способность моделей. На анимации можно увидеть как происходит блокировка, объект ждет пока модель обработает предыдущий объект, освободится и начнет обрабатывать следующий. Блокировку ioloop изобразить сложнее, поэтому остановимся на том что объект просто ждет пока его возьмут в работу.
Конечно, есть самое простое и очевидное решение, чтобы избавиться от такого ожидания - добавить больше серверов и раскатить на них больше таких моделей, и тогда запросы будут попадать на свободные мощности и не ждать. Но, мл-модели даже при простое занимают как минимум оперативную память, а в максимуме еще и процессор и GPU-память. Поэтому это решение хоть и заманчивое, но в него стоит идти после попыток как то соптимизировать потребление. И только потом, уже с чистой совестью и ощущением, что с точки зрения оптимизации сделано уже максимум, можно покупать больше серверов.
Вынос модели в подпроцесс
Проблему блокировки ioloop мы решаем тем, что выносим работу модели в дочерний подпроцесс. Таким образом, основной процесс занят тем, что принимает запросы, совершает io-bound операции, такие как скачивание картинки, и передает эту картинку в дочерний процесс, где модель производит инференс на этих данных и возвращает результат обратно в основной процесс.
Как это сделать с помощью акведука можно посмотреть ниже в разделе практики.
Разделение вычислений на этапы
Чтобы двигаться дальше, раскрою немного подробностей про то как работает МЛ-модель. Это набор алгоритмов которые получают даные на входе, производят манипуляции с этими данными или вычисления на их основе, и возвращают результат работы. Как правило, работу модели можно разбить на несколько этапов. Классический случай это три этапа: этап подготовки данных, этап расчета на подготовленных данных и этап постобработки полученных результатов. Например, если на входе подается изображение, то его необходимо сначала привести к нужному размеру, к нужной цветовой палитре и преобразовать в фортмат numpy, чтобы модель могла работать с этими данными. Далее, с этими преобразованными данными работает, например нейросеть, она производит какойто вывод на основе данных, это второтой этап. На третьем этапе например, исходная картнинка изменяется на основе данных полученных от нейросети, скажем устанавливает плашку для сокрытия автомобильного номера.
Если вы можете разбить работу вашей модели на этапы, значит вы можете оптимизировать инференс с помощью акведука. Для начала, просто выносим каждый этап в отдельный подпроцесс. Таким образом, в тот момент пока работает этап с моделью, на этап подготовки данных уже поступили новые данные и он их уже обрабатывает. При этом, на этапе постобработки, после модели, идет работа с данными которые до этого обрабатывались моделью. В итоге, ваш сервис может одновременно обрабатывать уже три запроса, а не один.
Но, думаю очевидно, что этапы длятся разное время время, а еще, очень часто встречаются модели которые сопоставимо быстро обрабатывают сразу несколько объектов, вместо одного, это так называемое батчирование или обработка пачки объектов.
Батчирование
Батчирование, то есть обработка сразу нескольких объектов, это распространенная инженерная практика, которая очень часто позволяет получить ускорение. Особенно это заметно в МЛ-инференсе, ускорение происходит за счет уменьшения числа I/O операций и операций по работе с памятью. Еще более значительное ускорение при батчировании получается если модель работает на ГПУ. Например, один объект ваша модель будет обрабатывать 10мс, а 10 таких объектов одновременно она обработает за 15мс. Понятно что лучше обработать их пачкой, чем тратить суммарно 100мс на последовательную обработку каждого такого объекта по очереди. Все что остается сделать, это реализовать механику, которая будет копить объекты в пачки и пересылать сразу пачку в модель. В акведуке из коробки поддерживается два режима батчирования. Первый режим - с засечкой по времени ожидания, он же таймаут и есть еще режим без таймаута, так называемое динамическое батчирование.
Увеличение числа процессов
Ну вот мы научились копить объекты в пачку и затем быстро их обрабатывать в модели. Но это в самой модели, а перед моделью, как вы помните, у нас этап предобработки, например преобразование картинки numpy. Проблема в том, что на этом этапе батчирование не будет работать, так как нет еще таких библиотек или механизмов которые пачку картинок преобразуют за более быстрое время. Можно, конечно, тут использовать треды, при условии что ваша логика отпускает GIL. Но из-за GIL не всё в питоне будет быстрее, и даже если будет ускорение, все равно вы ограничены скоростью работы одного ядра, так как интерпретатор запускается только на одном ядре, вы не сможете запускать параллельных операций больше чем позволит вам ваше ядро. На помощь нам тут приходит мультипроцессинг. В акведуке каждый этап - это отдельный подпроцесс. Небольшим движением руки вы можете увеличить число процессов на любом этапе. Мы не рекомендуем увеличивать число процессов для этапа с моделью, это уже считается как горизонтальное масштабирование модели под нагрузку, и делается другими способами - например у нас это кубернетес. А вот для этапов до модели и после - пожалуйста. Например, модель обрабатывает 10 объектов за 15мс, данные на предварительном этапе готовятся 20мс, получается, чтобы собрать батч из 10 объектов вы будете ждать 200мс. Общее время потока получается 215мс. Если на подготовку сделать 10 одновременных процессов, то за 20мс будет подготовлено сразу 10 объектов и общее время потока становится 35мс. Не забудьте, что на этапах, после работы модели, тоже можно увеличить число процессов. Вот тут то, наконец, и раскрывается вся мощь акведука.
Разделяемая память
Мы все время думаем как еще можно ускорить инференс в проде. Нас смутил следующий момент: в качестве способа передачи данных между процессами в акведуке используются межпроцессные очереди, они основаны на пайпах и достаточно быстро работают. Но, если вы передаете большие объемы данных, а точнее большие объекты, то это начинает отнимать, пусть заметное время. Например, если это большая картинка, ее надо сначала сериализовать, то есть перевести в байты готовые к передаче, потом передать через буферы ввода/вывода процессов, потом обратно десериализовать в объект готовый к работе. Для больших объектов, порядка мегабайт, на это может уходить единицы, а иногда и десятки миллисекунд.
Ускориться здесь можно за счет использования разделяемой (шаренной) памяти: данные помещаются в область памяти доступную для всех процессов, а через очереди передаются только идентификаторы. Отдельное искусство, это обеспечить надежность такого механизма, но эта тема будет раскрыта в отдельной статье.
Метрики
Собственно, выше вы узнали теорию как работает акведук. Подавляющее большинство моделей в компании мы запускаем через акведук, это позволяет нам экономить колоссальное число серверов. Но, когда ваша модель работает в проде, очень важно следить все ли с ней в порядке, какое время ответа, сколько запросов скопилось в очереди. Также, для лучшей оптимизации очень важно знать сколько времени работает каждый этап и сколько времени уходит на пересылку данных между этапами. Для этого в акведуке прямо из коробки есть поддержка учета жизненно важных метрик.
Время работы на каждом этапе, позволяет понять какие этапы медленные, стоит ли добавлять число процессов:
Метрика размера очередей, тоже позволяет понять, не накинуть ли еще процессов на этап, перед которым копится большая очередь. Или может добавить батчирование, если это ГПУ-модель:
Общее количество задач, сколько успешных и сколько из них завершается с ошибкой таймаута, например из-за того, что не успевает обработаться за выделенное время:
Практическая часть
Теперь немного реального кода. Чтобы не усложнять давайте ориентироваться, что весь код находится в одном файле.
Создаем модель и хендлер
Давайте представим что у вас есть МЛ-модель, которая блокирует ioloop, это легко представить в виде обычного синхронного sleep() в python.
# импортируем сразу компоненты, которые понадобятся далее
from aqueduct import BaseTask, BaseTaskHandler, Flow, FlowStep
class MyModel:
def __init__(self):
"""При старте обычно необходимо загрузить
и проинициализировать модель в памяти,
мы этот этап опустим, но для наглядности
саму конструкцию я оставил
"""
self.model = None
def inference(self, param):
sleep(1) # обещанный sleep на 1 секунду
Для общения между собой воркеры и основной процесс пересылают задачи, это просто класс со свойствами в которые можно сохранять что угодно:
class Task(BaseTask):
"""Контейнер задания, для пересылки параметров между этапами"""
def __init__(self, param):
super().__init__()
self.param = param
self.sum = 0 # в это поле будет сохранять результат
Сами воркеры это просто хендлеры, с двумя методами. Метод для инициализации модели и метод который вызывается при получении задания:
class SumHandler(BaseTaskHandler):
"""Инициализируем инстанс, модель пока загружать не надо,
так как этот процесс вызывается еще в родительском процессе.
Это сделано для удобного пробрасывания параметров"""
def __init__(self):
self._model = None
def on_start(self):
"""А вот тут уже загружаем модельку в память,
этот метод вызывается уже в дочернем процессе"""
self._model = MyModel()
def handle(self, *tasks: Task):
"""Обработка заданий. Получаем список задач и обрабатываем"""
for task in tasks:
task.sum = self._model.process(task.param)
Создание флоу с одним воркером
Теперь нам нужно, чтобы акведук принимал залачу в работу и передавал ее по нашему флоу. Определение флоу сделаем в виде метода, чтобы было удобно его использовать там где надо.
def get_flow() -> Flow:
return Flow(
FlowStep(AvitonetPreprocessingHandler()),
)
Так выглядит простейший флоу состоящий из единственого этапа - работы модели. Если мы хотим добавить еще этапы перед или после работы модели, то достаточно просто указать их в нужном порядке, например так:
def get_flow() -> Flow:
return Flow(
FlowStep(PreprocessingHandler()), # сначала выполнится PreprocessingHandler
FlowStep(SumHandler()), # потом отработает SumHandler
FlowStep(PostprocessingHandler()), # и в самом конце будет выполнен PostprocessingHandler
)
Но и это не всё, допустим наша модель может обрабатывать сразу пачку объектов одним махом, тогда мы указываем batch_size чтобы акведук собирал объекты в пачки и отправлял в модель. А в предобработке сделаем параллельно работающие процессы, чтобы данные одновременно в нескольких процессах:
def get_flow() -> Flow:
return Flow(
# 12 процессов готовят одновременно каждый по одному объекту
FlowStep(PreprocessingHandler(), nproc=12),
FlowStep(SumHandler(), batch_size=12), # теперь будет приходить не более 12 объектов
FlowStep(PostprocessingHandler()),
)
Таким образом, модель получает сразу пачку объектов которые были подготовленны параллельно работающими процессами на предыдущем этапе и одним махом их обрабатывает. За счет этого и получается экономия времени. Обратите внимание, что мы указали batch_size=12 и теперь в хендлер будет приходить не один объект, а несколько, не более 12, именно для этого в сигнатуре хендлера SumHandler.handle мы учитываем возможность приема как одного так и нескольких объектов.
Интеграция в http-сервер
Акведук можно использовать в любом питоновском http-фреймворке. Рассмотрим на примере модного и молодежного FastAPI:
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
class MyFlow:
def __init__(self):
self.my_flow = get_flow()
self.my_flow.start()
@app.get("/score/")
async def read_items(mymodel: Annotated[MyFlow, Depends(MyFlow)]):
task = Task(param="my_param")
await flow.my_flow(task)
return {"result": task.sum}
Вот так вы получаете ответ от модели работающей с помощью акведука.
Общая разделяемая память
Чтобы передавать данные через разделяемую память надо просто включить поле в разделяемую память:
@app.get("/score/")
async def read_items(mymodel: Annotated[MyFlow, Depends(MyFlow)]):
task = Task(param="my_param")
task.share_value("param") # теперь поле param хранится в общей памяти
await flow.my_flow(task)
return {"result": task.sum}
Другие решения
Было бы неправильно не описать альтернативы.
Airflow/celery/kubeflow
aiflow и подобные прекрасно решают проблему организации пайплайнов для батчевых моделей, когда у вас много-много данных и надо их обработать с помощью моделей. Акведук про другое, акведук нужен когда вам надо получить ответ прямо здесь и сейчас, как можно быстрее. В реал-тайм инференсе при использованеии аирфлоу вы замедлите свои вычисления, тут нужен акведук. И наоборот, в батчевом инференсе лучше использовать аирфлоу. Так что, это даже не вопрос альтернативы, это вообще решения разных проблем.
Nvidia Triton
Близкое по духу решение для инференса от nvidia. Позволяет экономить ГПУ-память за счет переиспользования одной модели на одном сервере. Смело пробуйте и сравнивайте с акведуком. Можно даже использовать их вместе, акведук для построения основного флоу вычисления, а тритон для организации работы шага с моделью.
Mpipe
https://vmlaker.github.io/mpipe/
Технически очень похожее решение на акведук. Местами даже очень быстрое, потому что вместо очередей там пайпы между процессами. Если туда добавить асинхронности, быстрое получение результата и еще ряд фичей то вполне получился бы акведук. На этапе содания акведука мы про эту либу еще не знали. Если бы знали, то может быть в акведуке было написано import mpipe и часть функционала мы бы переиспользовали. Но много-много все равно пришлось бы дописывать, поэтому сути это не меняет.
Итого
Акведук мы использум почти везде где есть реал-тайм МЛ-инференс. За счет этого мы получаем существенную экономию железа. Вы также можете попробовать использовать акведук у себя.
Как вы уже поняли сам акведук разположен на гитхабе, можно его скачать, смотреть и ковырять. Будет рады мердж-реквестам: https://github.com/avito-tech/aqueduct