Реализацией RPC запросов поверх брокеров сообщений никого не удивишь: очередь для запроса, очередь для ответа — ничего сложного.
Тот же RabbitMQ имеет пример в официальной документации. Других примеров там нет, поэтому создается впечатление, что отправка ответных сообщений в другую очередь — единственный возможный способ реализации RPC.
Этот сценарий отлично работает когда у нас есть непрерывный поток сообщений и непрерывный поток ответов на них. Однако, данный подход не применим в случаях, когда нам нужно отправить только одно сообщение и получить ответ именно на него. Мы сразу же попадаем в какой-то ад с фильтрацией ответов по correlation_id.
На самом деле, в RabbitMQ есть механизм и для такого сценария. Но он спрятан в недрах документации и о нем почти нет информации в интернете (особенно рабочих примеров кода).
Вот это недоразумение мы сейчас и исправим.

P.S: Здесь я не буду объяснять, кто такой этот ваш RabbitMQ и зачем он нужен: эту информацию вы можете найти в другой моей статье.
Direct Reply-TO
Работает все достаточно просто, за исключением некоторых нюансов, которые всплывают на практике.
Концепция заключается в следующем:
- мы подписываемся на специальную псевдоочередь
amqp.rabbitmq.reply-to - отправляем сообщение с указанием этой очереди в качестве
reply-toзаголовка - кролик генерирует для нас уникальный
routing_key, по которому будет должно быть опубликовано ответное сообщение вdefault exchange - сервер получает наше сообщение и отправляет ответ по этому
routing_key.
Нет нужды создавать какие-либо дополнительные очереди, нет дополнительных расходов на управление ими со стороны RMQ. Это абсолютно win-to-win механизм.
Алгоритм действий:
Со стороны клиента:
- (СНАЧАЛА) подписываемся на очередь с волшебным названием
amqp.rabbitmq.reply-toв no-ack режиме, объявлять ее не нужно - отправляем сообщение с указанием заголовка
reply-to = amqp.rabbitmq.reply-to
Со стороны сервера:
- получаем сообщение. В нем, в качестве
reply-toзаголовка будет нечто видаamqp.rabbitmq.reply-to.<uuid> - отправляем ответ в
default exchangeсreply-toзначением в качестве ключа маршрутизации
На этом, в принципе, все. Однако каждое слово в этом алгоритме важно: сначала отправили, потом подписались — провал, попытались объявить очередь — провал, подписались в режиме ack — снова провал и т.д.
Поэтому мне пришлось потратить некоторое количество времени на написание рабочего кода. Давайте перейдем к нему, чтобы разобраться подробнее?
Python Example
Пример будет приведен с использование библиотеки aio-pika так как свою реализацию я писал имеенно на ней.
Пишем сервер
Сначала напишем некий бойлерплейт для подключения к очереди:
import asyncio from functools import partial import aio_pika async def consumer( msg: aio_pika.IncomingMessage, channel: aio_pika.RobustChannelб ): ... async def main(): connection = await aio_pika.connect_robust( "amqp://guest:guest@127.0.0.1/" ) queue_name = "test" async with connection: channel = await connection.channel() queue = await channel.declare_queue(queue_name) # через partial прокидываем в наш обработчик сам канал await queue.consume(partial(consumer, channel=channel)) try: await asyncio.Future() except Exception: pass asyncio.run(main())
А теперь перейдем к нашей функции-обработчику:
async def consumer( msg: aio_pika.IncomingMessage, channel: aio_pika.RobustChannel, ): # используем контекстный менеджер для ack'а сообщения async with msg.process(): print(msg.body) # проверяем, требует ли сообщение ответа if msg.reply_to: # отправляем ответ в default exchange await channel.default_exchange.publish( message=aio_pika.Message( body=b"hi!", correlation_id=msg.correlation_id, ), routing_key=msg.reply_to, # самое важное )
Как вы видите, действительно ничего сложного.
Пишем клиент
А вот тут будет немного веселья. Наша цель сделать такой же просто интерфейс как у requests:
data = requests.get("https://my-url.com").json()
Однако, это не так просто. Помните, что сначала нужно подписаться на ответную очередь? Так мы получаем следующий код:
import asyncio import aio_pika RABBIT_REPLY = "amq.rabbitmq.reply-to" async def consume_response(msg: aio_pika.IncomingMessage): print(msg.body) async def main(): connection = await aio_pika.connect_robust( "amqp://guest:guest@127.0.0.1/" ) async with connection: channel = await connection.channel() callback_queue = await channel.get_queue(RABBIT_REPLY) # сначала подписываемся consumer_tag = await callback_queue.consume( callback=consume_response, no_ack=True, # еще один важный нюанс ) # потом публикуем await channel.default_exchange.publish( message=aio_pika.Message( body=b"hello", reply_to=RABBIT_REPLY # указываем очередь для ответа ), routing_key="test" ) asyncio.run(main())
Так мы получаем ответное сообщение в нашу функцию-обработчик. Однако, теперь его нужно как-то достать оттуда. Для этого будем использовать asyncio.Queue.
import asyncio import aio_pika RABBIT_REPLY = "amq.rabbitmq.reply-to" async def main(): connection = await aio_pika.connect_robust( "amqp://guest:guest@127.0.0.1/" ) async with connection: channel = await connection.channel() callback_queue = await channel.get_queue(RABBIT_REPLY) # создаем asyncio.Queue для ответа rq = asyncio.Queue(maxsize=1) # сначала подписываемся consumer_tag = await callback_queue.consume( callback=rq.put, # помещаем сообщение в asyncio.Queue no_ack=True, # еще один важный нюанс ) # потом публикуем await channel.default_exchange.publish( message=aio_pika.Message( body=b"hello", reply_to=RABBIT_REPLY # указываем очередь для ответа ), routing_key="test" ) # получаем ответ из asyncio.Queue response = await rq.get() print(response.body) # освобождаем RABBIT_REPLY await callback_queue.cancel(consumer_tag) asyncio.run(main())
Теперь у нас уже есть что-то похожее на синхронный запрос-ответ. Можно немного поколдовать над интерфейсами и вы получите RPC over RMQ запрос, идентичный натуральному requests.
Вместо заключения
Ну а я уже поколдовал над этими интерфейсами. И вы можете увидеть результат этого колдовства в моем фреймворке Propan.
С его использование RPC запросы будут выглядеть для вас следующим образом:
from propan import PropanApp, RabbitBroker broker = RabbitBroker("amqp://guest:guest@localhost:5672/") app = PropanApp(rabbit_broker) # server side @broker.handle("ping") async def heartbeat(): return "pong" @app.after_startup async def self_ping(): # client RPC request response = await broker.publish(queue="ping", callback=True) assert response == "pong"
И теперь вы точно знаете, что у них под капотом.
Нюансы
RabbitMQ Direct Reply-to действительно отличный механзим, однако и у него есть ограничения.
На псевдочередь amqp.rabbitmq.reply-to можно подписываться из разных сервисов неограниченное число раз одновременно, однако, если вы хотите отправить несколько разных запросов в рамках одного сервиса (одного connection, если быть точным) одновременно, у вас не получится это сделать: вы словите ошибку, что очередь уже имеет потребителя.
Поэтому в рамках одного сервиса необходимо использовать локи на отправку RPC запросов, что, к слову, также реализовано в Propan.
