
Данная статья будет полезна тем кто хочет расширить свой учебный (или даже рабочий) ML пет-проект добавлением новой технологии.
Весь код в статье написан для OpenSource проекта по детекции синтетического текста raisontext в рамках курса ML System Design от ODS.
Небольшой комментарий для упрощения дальнейшего чтения: проект raisontext имеет серверную часть на FastAPI. Серверная часть связана с клиентской посредством веб-сокетов.
Содержание
Вступление
В жизненном цикле успешного ML проекта наступает момент, когда необходимо масштабироваться. При этом масштабирование, в зависимости от используемых моделей и выделенного железа, может быть разным.
В случае с raisontext, было принято решение масштабироваться в разграничение сущностей модели и серверной части. А также учесть безотказность предикта модели, в случае если к ней одновременно обращается множество запросов.
Постановка проблемы
Предположим, что предикт модели занимает 5 секунд. В случае когда два юзера попробуют обратиться к модели одновременно, произойдет что-то неявное. Может зависнуть как сам сайт, так и упасть модель.
Так как нас не устраивает данное поведение, необходимо придумать некий разграничивающий туннель, который не позволит любому запросу напрямую попадать в модель, а будет вместо этого вставать в очередь.
Именно для такого поведения и существуют брокеры сообщений.
RabbitMQ
На Хабре уже достаточно учебных статей про RabbitMQ (тык и тык), поэтому подробно останавливаться не буду. Скажу только, что в случае с raisontext будут использоваться две очереди:
raisontext_model - для отправки запросов к модели
raisontext_answer - для получения ответов модели
Практическая часть, сервер
В данном проекте используется облачная версия RabbitMQ: CloudAMQP
В обычной ситуации для корректной работы с RabbitMQ необходимо создать класс Consumer либо Producer в зависимости от того, кем будет являться сущность (модель или сервер).
В нашем случае и сервер и модель оба являются одновременно и Consumer и Producer. Поэтому создадим универсальный PikaClient, который сможет как принимать, так и отправлять сообщения в очереди.
class PikaClient: def __init__(self, process_callable, publish_queue_name, consume_queue_name): self.publish_queue_name = publish_queue_name self.consume_queue_name = consume_queue_name self.connection = pika.BlockingConnection( pika.URLParameters(os.environ.get("RABBIT_MQ_URL")) ) self.channel = self.connection.channel() self.publish_queue = self.channel.queue_declare(queue=publish_queue_name, durable=True) self.consume_queue = self.channel.queue_declare(queue=consume_queue_name, durable=True) self.publish_callback_queue = self.publish_queue.method.queue self.consume_callback_queue = self.consume_queue.method.queue self.response = None self.process_callable = process_callable async def consume(self, loop): """Setup message listener with the current running loop""" connection = await connect_robust( os.environ.get("RABBIT_MQ_URL"), loop=loop ) channel = await connection.channel() queue = await channel.declare_queue(self.consume_queue_name, durable=True) await queue.consume(self.process_incoming_message, no_ack=False) return connection async def process_incoming_message(self, message): """Processing incoming message from RabbitMQ""" await message.ack() body = message.body print('Received message') if body: await self.process_callable(json.loads(body)) def send_message(self, message: dict): """Method to publish message to RabbitMQ""" self.channel.basic_publish( exchange='', routing_key=self.publish_queue_name, properties=pika.BasicProperties( reply_to=self.publish_callback_queue, correlation_id=str(uuid.uuid4()) ), body=json.dumps(message) )
Кратко пробежимся по основным точкам:
RABBIT_MQ_URL - переменная среды, в которой находится уникальный AMQP URL, что-то вроде amqps://efiosdwv:***@cow.rmq2.cloudamqp.com/efiosdwv
Асинхронный метод consume получает из очереди сообщение и вызывает process_incoming_message. Благодаря асинхронности consume может выполняться у нескольких юзеров одновременно
Асинхронный метод process_incoming_message информирует что сообщение было получено и вызывает функцию, отправленную в конструктор класса (то, что мы хотим чтобы было выполнено при получении сообщения)
Синхронный метод send_message отправляет сообщение в очередь
Посмотрим теперь как обращаться с этим классом с серверной стороны. Для начала создадим функцию, которая будет выполняться при получении сообщения из очереди.
async def receive_prediction(message: dict): text = message['answer'] id_ = message["id"] print(f'Here we got incoming message from {id_}: {text}') websocket = id_to_socket_dict.get(id_, None) if websocket is not None: await manager.send_personal_message(text, websocket)
Так как raisontext работает на веб-сокетах, то в функции получения предсказания от модели, мы проверяем по айдишнику пользователя есть ли такой веб-сокет. И если есть, то отправляем сообщения обратно на клиентскую часть. Тут стоит упомянуть, что веб-сокет пропадает из id_to_socket_dict в случае, если юзер закрыл сайт.
После того как мы создали функцию, которая выполняется при получении ответа модели, можно инициализировать наш PikaClient:
pika_client = PikaClient( receive_prediction, publish_queue_name="raisontext_model", consume_queue_name="raisontext_answer" )
Последнее что нам осталось - запустить прослушивание очереди при запуске сервера (consume) и вставить send_message в необходимый эндпоинт (produce).
Так как с send_message все довольно тривиально, давайте посмотрим как запускается прослушивание очереди:
@app.on_event("startup") async def startup(): loop = asyncio.get_running_loop() task = loop.create_task(pika_client.consume(loop)) await task
Данный код позволяет слушать очередь асинхронно с обращением к эндпоинтам серверной части. А это именно то, чего мы хотели.
Практическая часть, модель
Теперь когда серверная часть настроена на отправку сообщений и слушание очереди ответов, давайте настроим модель таким же образом.
В отличии от сервера модель не может слушать сообщения из очереди асинхронно. А потому здесь будет немного хардкода, без выделения сущности клиента в отдельный класс.
url = os.environ.get("RABBIT_MQ_URL") params = pika.URLParameters(url) connection = pika.BlockingConnection(params) channel_request = connection.channel() # start a channel channel_request.queue_declare(queue='raisontext_model', durable=True) channel_answer = connection.channel() # start a channel channel_answer.queue_declare(queue="raisontext_answer", durable=True) def callback(ch, method, properties, body): dict_message = json.loads(body) id_ = dict_message['id'] text = dict_message['text'] print(f"[x] Received {id_} with {text}") answer = evaluator.predict([text]) answer_dict = { "id": id_, "answer": str(round(answer[0], 3)) } channel_answer.basic_publish( exchange='', routing_key='raisontext_answer', body=json.dumps(answer_dict) ) channel_request.basic_consume('raisontext_model', callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel_request.start_consuming() connection.close()
Во многом код говорит сам за себя. Но разберем некоторые ключевые моменты:
evaluator - загруженная модель, у которой есть метод предикт (может быть кастомным классом)
callback - функция, которая вызывается при поступлении сообщения в очередь raisontext_model. Так как предполагается, что модель работает долго и находится в рамках одного вычислительного ресурса, callback является синхронным и выполняется для каждого юзера по очереди. В конце тела функции полученный предикт кладется в очередь ответов модели raisontext_answer
Общая архитектура
Теперь когда мы построили систему сервер - брокер сообщений - модель, давайте взглянем на общее устройство проекта:

Итоги
Отлично! Теперь когда брокер сообщений настроен, можно думать о новых способах дальнейшего масштабирования проекта.
Благодаря использованию облачной версии RabbitMQ, модель и сервер разграничены и могут запускаться на различных удаленных машинах, что упрощает дальнейшее масштабирование системы.
В целом благодаря использованию брокера сообщений, наша система стала более отказоустойчива и надежна.
Полный код и требования к библиотекам можно найти в официальном репозитории проекта raisontext.
