В своей прошлой статье "Как подружить Celery и SQLAlchemy 2.0 с асинхронным Python" я разбирал возможность запускать асинхронные задачи "из-под Celery" и в комментариях мне сообщили о существовании ещё одной библиотеки под названием aio_pika. И признаться, о ней я раньше никогда не слышал. Оно и не удивительно, библиотека имеет всего в районе 1К звёзд на GitHub (по сравнению с 20К+ у Celery). Я рассмотрел абсолютно все популярные (500+ звёзд) решения и остановился именно на этом из-за активной (на текущий момент) разработке и относительной популярности.
Стек, который вы увидите в статье: FastAPI, RabbitMQ, aio_pika и docker. Статья будет полезна тем кто использует Celery в своих проектах, а так же тем, кто только слышал о том, что такое очереди и RabbitMQ.
Навигация:
Предисловие
Библиотека позиционирует себя "обёрткой aiormq для asyncio для людей". Моей целью стало заменить Celery, используемый в проекте на неё. Решил я это сделать из-за того, что его интерфейс не предполагает разбиение приложения и worker'ов в отдельные сервисы, чего очень хотелось бы. Второстепенными причинами стали: отсутствие асинхронности, запах legacy (я про атрибут self, который необходимо писать первым аргументом функций) и отсутствие type-хинтов. Celery в проекте использовался для IO-Bound и Delay задач, поэтому интеграция асинхронности была очень кстати.
Конфигурация RabbitMQ
Я обновил свой RabbitMQ добавив плагин "RabbitMQ Delayed Message Plugin". Он нужен был для того, чтобы делать "отложенные" задачи (выполняются по истечении определённого времени). Celery с этим справлялся, т.к. у него была нативная поддержка данной фичи, но aio-pika
такого не имеет. Этот плагин позволяет добавить этот функционал в сам RabbitMQ. Мой docker-compose конфиг стал выглядеть следующим образом:
docker-compose.yaml
rabbit:
image: rabbitmq:3-management
hostname: rabbit
env_file:
- .env
volumes:
- ./services/rabbit/delayed_message.ez:/opt/rabbitmq/plugins/delayed_message.ez
- ./services/rabbit/enabled:/etc/rabbitmq/enabled_plugins
ports:
- "15672:15672"
Через volumes я подключил скачанный плагин, а так же добавил его в список активированных по умолчанию. Мой enabled_plugins файл выглядел следующим образом:
[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].
*Точка в конце обязательна
Task router для consumer'a
Следующим этапом я написал Router для моего worker'а, который был бы для меня удобен. На этом моменте я немного заморочился:
router.py
class Router:
_routes: dict[str, list[str]] = {}
def __init__(self):
modules = list(filter(
lambda x: x != '__init__',
map(lambda y: y.split('.')[0], os.listdir('tasks'))
))
for module in modules:
imported = import_module(f'tasks.{module}')
if not hasattr(imported, '__all__'):
continue
self._routes[module] = imported.__all__
del imported
def get_method(self, action: str) -> Optional[Callable]:
module = action.split(':')[0] # Название файла
method = action.split(':')[1] # Название функции
if self._exists(module, method):
return getattr(import_module(f'tasks.{module}'), method)
Переменная _router заполняется задачами, которые расположены в папке tasks, в которой лежат сами функции (задачи). Так же они указаны в переменной all для экспорта. Для наглядности задачи выглядела примерно так:
async def test(is_test: bool):
print(f'Hello world! Value is: {is_test}')
__all__ = ['test']
Следующей задачей предстояло решить проблему с тем, что эти функции имеют произвольное количество аргументов. Я написал ещё один метод для роутера, который мог бы учесть и это:
router.py
def check_args(func: Callable, data: dict) -> bool:
hints = get_type_hints(func)
for arg, arg_type in hints.items():
if arg not in data:
return False
if not isinstance(data[arg], arg_type):
return False
return True
Мы передаем в данный метод функцию, которую импортировали из файла, а так же данные, которые пытаемся ей подсунуть. Мы так же проверяем типы указанные в аргументах функции. Если всё ок - то возвращаем True
Таким образом я регулировал количество доступных задач созданием \ удалением файлов из папки tasks. Это оказалось очень удобным и гибким решением.
Написание consumer'a
consumer.py
async def process_message(message: AbstractIncomingMessage):
async with message.process():
message = MessageSchema.parse_obj(json.loads(message.body.decode()))
method = router.get_method(message.action) # Импортируем функцию и записываем в переменную
if method:
if not router.check_args(method, message.body): # Проверяем атрибуты, которые собираемся передавать
print('Invalid args')
return
if inspect.iscoroutinefunction(method): # Проверяем является ли функция async или нет
await method(**message.body)
else:
method(**message.body)
async def main() -> None:
queue_key = rabbit_config.RABBITMQ_QUEUE
connection = await aio_pika.connect_robust(rabbit_config.url)
# Для корректной работы с RabbitMQ указываем publisher_confirms=False
channel = await connection.channel(publisher_confirms=False)
# Кол-во задач, которые consumer может выполнять в момент времени. В моём случае 100
await channel.set_qos(prefetch_count=100)
queue = await channel.declare_queue(queue_key)
exchange = await channel.declare_exchange(
# Объявляем exchange с именем main и типом, который поддерживает отложенные задачи
# Важно чтобы это имя (main) совпадало с именем на стороне publisher
'main', ExchangeType.X_DELAYED_MESSAGE,
arguments={
'x-delayed-type': 'direct'
}
)
await queue.bind(exchange, queue_key)
await queue.consume(process_message)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
В целом на этом сторона consumer'a закончена и можно приступить к интеграции всего этого добра в основное приложение (publisher).
Интеграция в основное приложение
На помощь снова приходит ООП и я написал класс для работы с aio-pika, который полностью закрыл мои нужды. Его инициализация происходила в новеньком lifespan (который скоро полностью вытолкнет старые способы):
@asynccontextmanager
async def lifespan(_: FastAPI):
await rabbit_connection.connect()
yield
await rabbit_connection.disconnect()
app = FastAPI(lifespan=lifespan)
Далее идет реализация этого класса:
rabbit_connection.py
class RabbitConnection:
_connection: AbstractRobustConnection | None = None
_channel: AbstractRobustChannel | None = None
_exchange: AbstractRobustExchange | None = None
async def disconnect(self) -> None:
if self._channel and not self._channel.is_closed:
await self._channel.close()
if self._connection and not self._connection.is_closed:
await self._connection.close()
self._connection = None
self._channel = None
async def connect(self) -> None:
try:
self._connection = await connect_robust(rabbit_config.url)
self._channel = await self._connection.channel(publisher_confirms=False)
self._exchange = await self._channel.declare_exchange(
# Повторяем из consumer'a. Важно указать одинакое
# имя exchange'ов. В моём случае `main`
'main', ExchangeType.X_DELAYED_MESSAGE,
arguments={
'x-delayed-type': 'direct'
}
)
except Exception as e:
await self.disconnect()
async def send_messages(
self,
messages: list[MessageSchema],
*,
routing_key: str = rabbit_config.RABBITMQ_QUEUE,
delay: int = None # Задержка, через которое нужно выполнить задачу (в секундах)
) -> None:
async with self._channel.transaction():
headers = None
if delay:
headers = {
'x-delay': f'{delay * 1000}' # Это тоже из документации плагина для RabbitMQ
}
for message in messages:
message = Message(
body=json.dumps(message.dict()).encode(),
headers=headers
)
await self._exchange.publish(
message,
routing_key=routing_key,
mandatory=False if delay else True # Чтобы в логах был порядок ;)
)
rabbit_connection = RabbitConnection()
В итоге для того, чтобы отправить работки worker'у достаточно было сделать следующее:
main.py
@router.get('/test')
async def test():
message = MessageSchema(
action='images:delete',
body={'path': 'assets/temp/temp.png'}
)
await rabbit_connection.send_messages(
[message for _ in range(150)],
delay=20
)
return {'status': 'published'}
Подводя итоги хочется сказать что worker теперь чувствует себя намного увереннее и может выполнять намного больше и быстрее.