В предыдущей статье вы могли узнать что такое очередь в целом и как работает FIFO-очередь asyncio.Queue. Давайте продолжим и посмотрим на примере библиотеки aiohttp как работают очереди с приоритетом asyncio.PriorityQueue.
readme
Автор предыдущей статьи про asyncio.Queue довольно подробно описал тему про FIFO очередь, но дальше, про остальные типы очередей, пишет довольно шаблонно, примеры в статье мне тоже не понравились и я решил продолжить тему частичным переводом другой статьи.
Если вы нашли ошибку, пожалуйста, используйте Ctrl+Enter и я исправлю. Спасибо!
Паттерн producer-consumer
Повторим в чем смысл паттерна producer-consumer(производитель-потребитель). Представьте себе два типа задач, разделяющих очередь. Задача A производит данные и помещает их в очередь, а задача B извлекает данные из очереди для обработки. Это и есть модель "производитель-потребитель", где задача A - производитель, а задача B - потребитель. По аналогии с супермаркетом, покупатели являются производителями, кассиры - потребителями, а очередь покупателей представляет собой очередь.
Зачем использовать паттерн производитель-потребитель
В высококонкурентных программах производители часто генерируют данные быстро, а потребители обрабатывают их медленно. Таким образом, производители должны дождаться окончания обработки данных потребителями, прежде чем продолжать генерировать данные.
Иногда потребители обрабатывают данные быстро, а производители — медленно. Это приводит к тому, что потребители ждут, пока производители сгенерируют данные, прежде чем продолжить работу. Для баланса между производителями и потребителями необходима очередь, в которой хранятся данные, произведенные производителем. Очередь выполняет роль буфера и разделяет производителей и потребителей.

Очередь с приоритетом asyncio.PriorityQueue
В этой статье мы обсудили как работает очередь типа FIFO в asyncio. Теперь давайте посмотрим как работают очереди с приоритетом asyncio.PriorityQueue.
Очередь с приоритетом (англ. priority queue) — абстрактный тип данных в программировании, поддерживающий две обязательные операции — добавить элемент и извлечь максимум[1] (минимум). Предполагается, что для каждого элемента можно вычислить его приоритет — действительное число или в общем случае элемент линейно упорядоченного множества.
Зачем использовать asyncio.PriorityQueue?
Предположим, имеется очередь, в которой стоят задачи, каждая из которых требует длительного времени обработки. Журнал ошибок или VIP-доступ пользователя - это высокоприоритетная задача, требующая немедленного внимания. Что же делать? Именно здесь на помощь приходит asyncio.PriorityQueue.
Кратко опишем реализацию asyncio.PriorityQueue
В отличие от очередей FIFO, основанных на списках, asyncio.PriorityQueue основана на кучах. Она построена с использованием структуры бинарного дерева.
Вы возможно знакомы с двоичными деревьями поиска, которые гарантируют, что самый младший узел всегда является крайним левым узлом. Однако двоичное дерево в asyncio.PriorityQueue гарантирует, что самый младший узел всегда находится наверху, поэтому узел с наивысшим приоритетом постоянно удаляется первым.

Методы для очередей с приоритетом asyncio.PriorityQueue такие же, как для asyncio.Queue. Подробнее с методами вы можете ознакомиться в этой статье
Методы класса asyncio.PriorityQueue
await queue.put(item) - для помещения элемента в очередь. Если очередь переполнена, этот метод будет ждать, пока не освободится свободный слот.
await queue.get() - для получения элемента из очереди. Если очередь пуста, этот метод будет ждать, пока элемент не станет доступен.
queue.task_done() - для индикации того, что ранее полученный элемент был обработан. Этот метод должен вызываться потребительскими корутинами после завершения работы с элементом.
await queue.join() - для блокировки обработки всех элементов в очереди. Этот метод должен вызываться корутинами-производителями после того, как они завершат занесение элементов в очередь.
Можно также использовать некоторые "некорутинные" методы очереди, например:
queue.put_nowait(item) - чтобы поместить элемент в очередь без блокировки. Если очередь переполнена, этот метод вызовет исключение QueueFull.
queue.get_nowait() - для получения элемента из очереди без блокировки. Если очередь пуста, то этот метод вызовет исключение QueueEmpty.
queue.qsize() - для получения количества элементов в очереди.
queue.empty() - для проверки, пуста ли очередь.
queue.full() - для проверки заполнения очереди.
Пример использования asyncio.PriorityQueue в реальном мире
Проиллюстрируем использование asyncio.PriorityQueue на примере реального сценария. Представьте, что у нас есть API сервиса заказов. API требует времени на обработку каждого заказа, но мы не можем заставлять пользователей ждать слишком долго. Поэтому, когда пользователь размещает заказ, API сначала помещает его в очередь, позволяя фоновой задаче обработать его асинхронно и сразу же вернуть сообщение пользователю.
Данный API принимает заказы от двух типов пользователей: обычных и VIP. При этом необходимо следить за тем, чтобы заказы VIP-пользователей обрабатывались с наивысшим приоритетом.

Давай рассмотрим реализацию программы с приоритетной очередью на примере aiohttp:
import asyncio from asyncio import PriorityQueue, Task from dataclasses import dataclass, field from enum import IntEnum from random import randrange from aiohttp import web from aiohttp.web_app import Application from aiohttp.web_request import Request from aiohttp.web_response import Response app = Application() routers = web.RouteTableDef() QUEUE_KEY = "QUEUE_KEY" TASK_KEY = "TASK_KEY" class UserType(IntEnum): POWER_USER = 1 NORMAL_USER = 2 @dataclass(order=True) class WorkItem: user_type: UserType order_delay: int = field(compare=False)
Сначала мы определяем Enum(перечисление), обозначающее две категории: обычные пользователи и VIP-пользователи.
Затем с помощью dataclass определяем заказ пользователя, который содержит тип пользователя и продолжительность обработки заказа. Продолжительность обработки заказа не учитывается при сортировке приоритетов.
Дальше мы определяем метод-потребитель process_order_worker, который получает заказы из очереди и имитирует их обработку. Не забудьте использовать queue.task_done(), чтобы сообщить очереди, что мы закончили обработку заказа.
async def process_order_worker(worker_id: int, queue: PriorityQueue): while True: work_item: WorkItem = await queue.get() print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}") await asyncio.sleep(work_item.order_delay) print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}") queue.task_done()
После этого мы реализуем API заказа, используя aiohttp. Этот API отвечает на запросы пользователей, генерирует объект заказа и помещает его в asyncio.PriorityQueue. Затем он немедленно возвращает ответ пользователю, что позволяет избежать времени ожидания.
@routers.post("/order") async def order(request: Request) -> Response: queue: PriorityQueue = app[QUEUE_KEY] body = await request.json() user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER work_item = WorkItem(user_type, randrange(5)) await queue.put(work_item) return Response(body="order placed!")
При запуске программы мы используем команду create_order_queue для инициализации очереди и упорядочивания задач потребления.
async def create_order_queue(app: Application): print("create_order_queue: Begin to initialize queue and tasks.") queue: PriorityQueue = PriorityQueue(10) tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)] app[QUEUE_KEY] = queue app[TASK_KEY] = tasks print("create_order_queue: Initialize queue and tasks success..")
При завершении работы программы мы используем команду destroy_order_queue, чтобы убедиться, что все заказы в очереди обработаны и фоновые задачи корректно закрыты.
Функция queue.join() будет ожидать обработки всех данных в очереди. asyncio.wait_for устанавливает таймаут в 20 секунд, по истечении которого она больше не будет ожидать завершения работы queue.join().
async def destroy_order_queue(app: Application): queue: PriorityQueue = app[QUEUE_KEY] tasks: list[Task] = app[TASK_KEY] try: print("destroy_order_queue: Wait for 20 sec to let all work done.") await asyncio.wait_for(queue.join(), timeout=20.0) except Exception as e: print("destroy_order_queue: Cancel all tasks.") [task.cancel() for task in tasks] app.add_routes(routers) app.on_startup.append(create_order_queue) app.on_shutdown.append(destroy_order_queue) web.run_app(app)
Полный листинг кода программы
import asyncio from asyncio import PriorityQueue, Task from dataclasses import dataclass, field from enum import IntEnum from random import randrange from aiohttp import web from aiohttp.web_app import Application from aiohttp.web_request import Request from aiohttp.web_response import Response app = Application() routers = web.RouteTableDef() QUEUE_KEY = "QUEUE_KEY" TASK_KEY = "TASK_KEY" class UserType(IntEnum): POWER_USER = 1 NORMAL_USER = 2 @dataclass(order=True) class WorkItem: user_type: UserType order_delay: int = field(compare=False) async def process_order_worker(worker_id: int, queue: PriorityQueue): while True: work_item: WorkItem = await queue.get() print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}") await asyncio.sleep(work_item.order_delay) print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}") queue.task_done() @routers.post("/order") async def order(request: Request) -> Response: queue: PriorityQueue = app[QUEUE_KEY] body = await request.json() user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER work_item = WorkItem(user_type, randrange(5)) await queue.put(work_item) return Response(body="order placed!") async def create_order_queue(app: Application): print("create_order_queue: Begin to initialize queue and tasks.") queue: PriorityQueue = PriorityQueue(10) tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)] app[QUEUE_KEY] = queue app[TASK_KEY] = tasks print("create_order_queue: Initialize queue and tasks success..") async def destroy_order_queue(app: Application): queue: PriorityQueue = app[QUEUE_KEY] tasks: list[Task] = app[TASK_KEY] try: print("destroy_order_queue: Wait for 20 sec to let all work done.") await asyncio.wait_for(queue.join(), timeout=20.0) except Exception as e: print("destroy_order_queue: Cancel all tasks.") [task.cancel() for task in tasks] app.add_routes(routers) app.on_startup.append(create_order_queue) app.on_shutdown.append(destroy_order_queue) web.run_app(app)
Мы можем протестировать эту реализацию, используя HTTP-запрос PyCharm (только в Pro версии):
HTTP-запросы
POST http://localhost:8080/order Content-Type: application/json {"power_user": "True"} ### POST http://localhost:8080/order Content-Type: application/json {"power_user": "False"} ### POST http://localhost:8080/order Content-Type: application/json {"power_user": "False"} ### POST http://localhost:8080/order Content-Type: application/json {"power_user": "True"}
Или, например, так, если у вас community версия:
import requests from random import randint url = 'http://localhost:8080/order' myobj = [{"power_user": "True"}, {"power_user": "False"}] for i in range(5): requests.post(url, json=myobj[randint(0,1)])

Как видите, две высокоприоритетные задачи обрабатываются, как и ожидалось. Отлично!
Выводы
В этой статье мы повторили для чего нужен паттерн производитель-потребитель:
Балансировка между производителями и потребителями, максимизация использования ресурсов.
Развязывание системы, позволяющее производителям и потребителям масштабироваться независимо друг от друга.
Также, на реальном примере увидели, как использовать asyncio.PriorityQueue для решения ситуаций, когда задачам требуется расстановка приоритетов.
Асинхронное программирование на Python - мощный инструмент, а паттерн производитель-потребитель с asyncio.Queue - это универсальный подход к обработке параллелизма и расстановке приоритетов в ваших приложениях.
Контакты автора статьи
Помогайте другим там, где вы это можете делать. (с) Хабраэтикет
