Pull to refresh

Comments 23

Я не знаю сюда ли я пишу, но на сколько я смог понять из документации aio-pika, когда мы используем channel.set_qos(prefetch_count=<int>) то мы задаём количество операций, которое за раз может взять на себя consumer. Но я нашел у вас в коде это (propan/brokers/rabbit/rabbit_broker.py:69):

await self._channel.set_qos(prefetch_count=int(self._max_consumers))

max_consumers, это переменная которая отвечает за количество инстансов consumer'ов которые будут крутиться (судя по названию). В таком случае получается что каждый из них будет брать столько задач, сколько всего instance'ов. Просто я например ставлю это значение на 50-100, зная что у меня все задачи IO-bound, и в таком случае всё ок работает. Поправьте если я ошибаюсь.

Добрый день! В целом, вы правы: prefetch_count в aio-pika говорит о том, сколько сообщений за раз может обрабатывать consumer. Этакий MessagePool.
Просто для меня, когда я первый раз взялся за работу с RabbitMQ смысловое значение этого аргумента вызвало некоторый ступор и необходимость лезть в документацию самого Rabbit'а. Поэтому в рамках Propan я немного пересмотрел эту концепцию: в рамках приложения в качестве потребителя рассматривается 1 задача обработки одного конкретного сообщения. Поэтому max_consumers говорит о том, что у нас будет обрабатываться не более N сообщений одновременно: тот же MessagePool, вид сбоку.
Мне кажется, что в таком ключе эта концепция легче укладывается в головах у джунов, которые первый раз видят RabbitMQ. Впрочем, я готов изменить свое мнение и переименовать данный параметр.
Количество инстансов в Propan - это workers, которые мы запускаем из CLI.

Наконец! Пилить каждый раз новый велосипед — это мы могём.

Давно хотел поработать над чем-то опенсорсным, проект выглядит перспективно. Если найду свободное время - может чем-то помогу.
Сейчас посмотрел глазом на код кафки, вот эта конструкция может генерировать исключения
async for msg in handler.consumer
Я по этой причине использую getone()
Какие планы по интеграции MQTT?

Буду рад, если вы присоединитесь к работе над проектом: мне пригодилась бы помощь с той же кафкой. Одному рук катастрофически не хватает.
Реализацию MQTT я вижу пока поверх paho. С этим не должно быть много проблем, но сейчас это не приоритетная задача. Пока приоритет следующий: реализация SQS, добить Kafka, NatsJS, Redis Streams, затем - документация AsyncAPI. Задач еще много...

А как быть, если сообщения не в формате json?

Оно и не обязано быть json: можно послать все примитивные типы python, а сериализовать их при приеме с помощью pydantic. На крайний случай - можно принимать сырые bytes и сериализовать их самостотельно: например, в Depends, или прямо в теле функции.

Ну, если речь о protobuf, то нет, не умеет. В перспективе - будет. Но вопрос был о json. Нет, можно посылать не только json. И нет, protobuf не умеем. Хотя, технически, вы можете легко его интегрировать в свой проект самостоятельно способом, который я описал выше.

А как еще могут сериализоваться примитивные типы с помощью pydantic, кроме как через json?

Из строки, например. Сообщение же может быть просто строкой/числом/bool, etc.

Допустим, в сообщении число. Как оно будет сериализовано?

Отправится как строка, на входе будет преобразовано в int в соответствии с аннотацией функции - потребителя

Тем, что это уже не является форматом JSON (согласно его спецификации)

Если честно, не вижу смысла развивать этот диалог в рамках статьи

А как по-вашему выглядит число 123 в формате JSON, согласно спецификации?

Хорошо, правда за вами: RFC 8259 утверждает, что просто значения тоже могут быть JSON, хотя и ссылается на то, что "в предыдущих стандартах JSON опеределялся только либо как object, либо как array"
Видимо, я почему-то руководствовался всю жизнь однажды прочитанным старым стандартом 2013 года.

Привет, проект выглядит очень заманчиво и перспективно. Хотелось бы предложить идею, так как искали на своем проекте, но подобного не нашли - swagger-подобная документация, чтобы можно было в автоматическом режиме смотреть какие очереди с какими параметрами и сообщениями обрабатываются

Есть такой проект https://www.asyncapi.com/
Основная проблема в подобных проектах заключается в том, что, в отличие от HTTP, в RMQ все взаимодействие происходит "кто во что горазд". Поэтому сложно найти вещи типа swagger.

Собственно, сейчас я и работаю над генерацией схем в соответствии с этой спецификацией. Правда, процесс очень не быстрый, так как нет никаких инструментов для python и придется лепить практически все с 0: от генерации схемы до ее html представления.

Pydantic умеет генерировать json-sсhema, а entity там как раз они (вроде как)
А вот все остальное ручками. Это да.
Или я не прав?

Это правда. Только это наименьшая проблема: нужно еще перелопатить эти json схемы в нормальный AsyncAPI документ с учетом специфики каждого брокера, потом сгенерировать из этого документа html файл (сейчас есть только консольная утилита для npm для этого), а потом захостить это документ на том же генераторе на ноде (либо в контейнере). Поэтому придется писать свой генератор html для питона + хостинг этой схемы самому писать, чтобы можно было оттуда запросы тестировать, т.к. такого функционала нигде нет. А это еще и фронт на реакте пилить...
Ну и генерацию проекта из предоставленной схемы тоже пилить придется. В общем, процесс не быстрый.

Sign up to leave a comment.

Articles