Как стать автором
Обновить

RabbitMQ как способ масштабирования ML проекта

Уровень сложностиСредний
Время на прочтение6 мин
Количество просмотров5.8K
RabbitMQ
RabbitMQ

Данная статья будет полезна тем кто хочет расширить свой учебный (или даже рабочий) ML пет-проект добавлением новой технологии.

Весь код в статье написан для OpenSource проекта по детекции синтетического текста raisontext в рамках курса ML System Design от ODS.

Небольшой комментарий для упрощения дальнейшего чтения: проект raisontext имеет серверную часть на FastAPI. Серверная часть связана с клиентской посредством веб-сокетов.


Содержание

  1. Вступление

  2. Постановка проблемы

  3. RabbitMQ

  4. Практическая часть, сервер

  5. Практическая часть, модель

  6. Общая архитектура

  7. Итоги


Вступление

В жизненном цикле успешного ML проекта наступает момент, когда необходимо масштабироваться. При этом масштабирование, в зависимости от используемых моделей и выделенного железа, может быть разным.

В случае с raisontext, было принято решение масштабироваться в разграничение сущностей модели и серверной части. А также учесть безотказность предикта модели, в случае если к ней одновременно обращается множество запросов.

Постановка проблемы

Предположим, что предикт модели занимает 5 секунд. В случае когда два юзера попробуют обратиться к модели одновременно, произойдет что-то неявное. Может зависнуть как сам сайт, так и упасть модель.

Так как нас не устраивает данное поведение, необходимо придумать некий разграничивающий туннель, который не позволит любому запросу напрямую попадать в модель, а будет вместо этого вставать в очередь.

Именно для такого поведения и существуют брокеры сообщений.

RabbitMQ

На Хабре уже достаточно учебных статей про RabbitMQ (тык и тык), поэтому подробно останавливаться не буду. Скажу только, что в случае с raisontext будут использоваться две очереди:

  • raisontext_model - для отправки запросов к модели

  • raisontext_answer - для получения ответов модели

Практическая часть, сервер

В данном проекте используется облачная версия RabbitMQ: CloudAMQP

В обычной ситуации для корректной работы с RabbitMQ необходимо создать класс Consumer либо Producer в зависимости от того, кем будет являться сущность (модель или сервер).

В нашем случае и сервер и модель оба являются одновременно и Consumer и Producer. Поэтому создадим универсальный PikaClient, который сможет как принимать, так и отправлять сообщения в очереди.

class PikaClient:
    def __init__(self, process_callable, publish_queue_name, consume_queue_name):
        self.publish_queue_name = publish_queue_name
        self.consume_queue_name = consume_queue_name

        self.connection = pika.BlockingConnection(
            pika.URLParameters(os.environ.get("RABBIT_MQ_URL"))
        )
        self.channel = self.connection.channel()
        self.publish_queue = self.channel.queue_declare(queue=publish_queue_name, durable=True)
        self.consume_queue = self.channel.queue_declare(queue=consume_queue_name, durable=True)

        self.publish_callback_queue = self.publish_queue.method.queue
        self.consume_callback_queue = self.consume_queue.method.queue

        self.response = None
        self.process_callable = process_callable

    async def consume(self, loop):
        """Setup message listener with the current running loop"""
        connection = await connect_robust(
            os.environ.get("RABBIT_MQ_URL"),
            loop=loop
        )

        channel = await connection.channel()
        queue = await channel.declare_queue(self.consume_queue_name, durable=True)
        await queue.consume(self.process_incoming_message, no_ack=False)
        return connection

    async def process_incoming_message(self, message):
        """Processing incoming message from RabbitMQ"""
        await message.ack()
        body = message.body
        print('Received message')
        if body:
            await self.process_callable(json.loads(body))

    def send_message(self, message: dict):
        """Method to publish message to RabbitMQ"""
        self.channel.basic_publish(
            exchange='',
            routing_key=self.publish_queue_name,
            properties=pika.BasicProperties(
                reply_to=self.publish_callback_queue,
                correlation_id=str(uuid.uuid4())
            ),
            body=json.dumps(message)
        )

Кратко пробежимся по основным точкам:

  • RABBIT_MQ_URL - переменная среды, в которой находится уникальный AMQP URL, что-то вроде amqps://efiosdwv:***@cow.rmq2.cloudamqp.com/efiosdwv

  • Асинхронный метод consume получает из очереди сообщение и вызывает process_incoming_message. Благодаря асинхронности consume может выполняться у нескольких юзеров одновременно

  • Асинхронный метод process_incoming_message информирует что сообщение было получено и вызывает функцию, отправленную в конструктор класса (то, что мы хотим чтобы было выполнено при получении сообщения)

  • Синхронный метод send_message отправляет сообщение в очередь

Посмотрим теперь как обращаться с этим классом с серверной стороны. Для начала создадим функцию, которая будет выполняться при получении сообщения из очереди.

async def receive_prediction(message: dict):
    text = message['answer']
    id_ = message["id"]

    print(f'Here we got incoming message from {id_}: {text}')
    websocket = id_to_socket_dict.get(id_, None)
    if websocket is not None:
        await manager.send_personal_message(text, websocket)

Так как raisontext работает на веб-сокетах, то в функции получения предсказания от модели, мы проверяем по айдишнику пользователя есть ли такой веб-сокет. И если есть, то отправляем сообщения обратно на клиентскую часть. Тут стоит упомянуть, что веб-сокет пропадает из id_to_socket_dict в случае, если юзер закрыл сайт.

После того как мы создали функцию, которая выполняется при получении ответа модели, можно инициализировать наш PikaClient:

pika_client = PikaClient(
    receive_prediction,
    publish_queue_name="raisontext_model",
    consume_queue_name="raisontext_answer"
)

Последнее что нам осталось - запустить прослушивание очереди при запуске сервера (consume) и вставить send_message в необходимый эндпоинт (produce).

Так как с send_message все довольно тривиально, давайте посмотрим как запускается прослушивание очереди:

@app.on_event("startup")
async def startup():
    loop = asyncio.get_running_loop()
    task = loop.create_task(pika_client.consume(loop))
    await task

Данный код позволяет слушать очередь асинхронно с обращением к эндпоинтам серверной части. А это именно то, чего мы хотели.

Практическая часть, модель

Теперь когда серверная часть настроена на отправку сообщений и слушание очереди ответов, давайте настроим модель таким же образом.

В отличии от сервера модель не может слушать сообщения из очереди асинхронно. А потому здесь будет немного хардкода, без выделения сущности клиента в отдельный класс.

url = os.environ.get("RABBIT_MQ_URL")
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel_request = connection.channel()  # start a channel
channel_request.queue_declare(queue='raisontext_model', durable=True)

channel_answer = connection.channel()  # start a channel
channel_answer.queue_declare(queue="raisontext_answer", durable=True)

def callback(ch, method, properties, body):
    dict_message = json.loads(body)
    id_ = dict_message['id']
    text = dict_message['text']
    print(f"[x] Received {id_} with {text}")

    answer = evaluator.predict([text])
    answer_dict = {
        "id": id_,
        "answer": str(round(answer[0], 3))
    }

    channel_answer.basic_publish(
        exchange='',
        routing_key='raisontext_answer',
        body=json.dumps(answer_dict)
    )

channel_request.basic_consume('raisontext_model', callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel_request.start_consuming()
connection.close()

Во многом код говорит сам за себя. Но разберем некоторые ключевые моменты:

  • evaluator - загруженная модель, у которой есть метод предикт (может быть кастомным классом)

  • callback - функция, которая вызывается при поступлении сообщения в очередь raisontext_model. Так как предполагается, что модель работает долго и находится в рамках одного вычислительного ресурса, callback является синхронным и выполняется для каждого юзера по очереди. В конце тела функции полученный предикт кладется в очередь ответов модели raisontext_answer

Общая архитектура

Теперь когда мы построили систему сервер - брокер сообщений - модель, давайте взглянем на общее устройство проекта:

Поведенческая диаграмма raisontext
Поведенческая диаграмма raisontext

Итоги

Отлично! Теперь когда брокер сообщений настроен, можно думать о новых способах дальнейшего масштабирования проекта.

Благодаря использованию облачной версии RabbitMQ, модель и сервер разграничены и могут запускаться на различных удаленных машинах, что упрощает дальнейшее масштабирование системы.

В целом благодаря использованию брокера сообщений, наша система стала более отказоустойчива и надежна.

Полный код и требования к библиотекам можно найти в официальном репозитории проекта raisontext.

Теги:
Хабы:
Всего голосов 6: ↑4 и ↓2+4
Комментарии4

Публикации

Истории

Работа

Python разработчик
137 вакансий
Data Scientist
82 вакансии

Ближайшие события