Как стать автором
Обновить

Коротко о работе с RabbitMQ из Python

Время на прочтение8 мин
Количество просмотров68K

КДПВ


Так повелось, что в процессе работы в МегаФоне, приходится сталкиваться с однотипными задачами при работе с RabbitMQ. Закономерно возникает вопрос: «Как упростить и автоматизировать выполнение таких задач?»


Первое решение, которое приходит в голову, использовать интерфейс HTTP, и, безусловно, из коробки RabbitMQ обладает хорошим веб-интерфейсом и HTTP API. Тем не менее, использование HTTP API не всегда удобно, а иногда и вообще невозможно (допустим у вас недостаточно прав доступа, а опубликовать сообщение ну очень хочется) в такие моменты возникает необходимость работы именно по протоколу AMQP


Не найдя подходящих мне готовых решений на просторах сети, было решено написать небольшое приложение для работы с RabbitMQ по протоколу AMQP с возможностью передавать параметры запуска через командную строку и обеспечивающее минимально необходимый набор возможностей, а именно:


  • Публикацию сообщений
  • Вычитку сообщений
  • Создание и редактирование основных элементов маршрутов

Python был выбран как наиболее простой (и на мой взгляд красивый) инструмент для реализации такой задачи. (тут можно поспорить, но что это изменит?)


На хабре представлены переводы официальных гайдов (раз, два) по RabbitMQ, однако, иногда полезным является простой пример из практики. В статье я попытаюсь на примере небольшого приложения осветить основные вопросы, возникающие при работе с «кроликами» по каналу AMQP из Python. Само приложение доступно на GitHub.


Коротко о протоколе AMQP и брокере сообщений RabbitMQ


AMQP – один из наиболее распространенных на сегодняшний день протоколов обмена сообщениями между компонентами распределённой системы. Главной отличительной особенностью данного протокола является концепция построения маршрута сообщения, содержащая два основных структурных элемента: очередь(queue) и точка обмена (exchange). Очередь накапливает в себе сообщения до момента их получения. Точка обмена представляет собой распределитель сообщений, который направляет их либо в нужную очередь, либо в другую точку обмена. Правила распределения (bindings), по которым точка обмена определяет куда именно направить сообщение, основаны на проверке ключа маршрутизации сообщения (routing key) на соответствие заданной маске. Более подробно о принципах работы протокола AMQP можно прочесть здесь.


RabbitMQ – опенсорсное приложение, в полной мере поддерживающее протокол AMQP и предлагающее ряд дополнительных возможностей. Для работы с RabbitMQ написано большое количество библиотек на самых разных языках программирования, включая Python.


Реализация на Python


Всегда можно накидать парочку скриптов для личного пользования и не знать с ними бед. Когда же дело доходит до распространения их в кругу коллег все становится сложнее. Каждому надо показать и рассказать, как и что запускать, что и где менять, где взять свежую версию, и что в ней поменялось… Невольно приходишь к мысли, что проще один раз проработать простенький интерфейс, чтоб в будущем не тратить время на рассказы. Для удобства использования было решено разделить приложение на 4 модуля:


  1. Модуль, отвечающий за публикацию сообщений
  2. Модуль, отвечающий за вычитку сообщений из очереди
  3. Модуль, предназначенный для внесения изменений в конфигурацию брокера RabbitMQ
  4. Модуль, содержащий общие для предыдущих модулей параметры и методы

Такой подход позволяет упростить набор параметров запуска. Выбрали нужный модуль, выбрали один из режимов его работы и передали нужные параметры (подробнее о режимах работы и параметрах в справке –help).


Поскольку структура «кроликов» в «МегаФоне» состоит из достаточно большого количества узлов, то для удобства пользования данные для подключения к узлам вынесены в модуль с общими параметрами и методами rmq_common_tools.py


Для работы по AMQP в Python мы будем использовать библиотеку Pika.


import pika

При использовании этой библиотеки работа с RabbitMQ будет состоять из трех основных этапов:


  1. Установка соединения
  2. Выполнение требуемых операций
  3. Закрытие соединения

Первый и последний этап одинаковы для всех модулей и реализованы в rmq_common_tools.py


Для установления соединения:


rmq_parameters = pika.URLParameters(rmq_url_connection_str)
rmq_connection = pika.BlockingConnection(rmq_parameters)
rmq_channel = rmq_connection.channel()

Библиотека Pika позволяет использовать различные варианты оформления параметров подключения к RabbitMQ. В данном случае наиболее удобным оказался вариант с передачей параметров в виде URL строки следующего формата:


‘amqp://rabbit_user:rabbit_password@host:port/vhost’

Для закрытия соединения:


rmq_connection.close()

Публикация сообщений


Публикация сообщения – это, вероятно, самая простая, но в то же время и самая востребованная операция при работе с «кроликами».


Инструменты для публикации сообщений собраны в rmq_publish.py


Для публикации сообщения используется метод


rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)

где:
exchange – имя точки обмена, в которую будет опубликовано сообщение
routing_key – ключ маршрутизации, с которым будет опубликовано сообщение
body – тело сообщения


rmq_publish.py поддерживает два режима ввода сообщения для публикации:


  1. Сообщение вводится как параметр через командную строку (from_console)
  2. Сообщение вычитывается из файла (from_file)

Второй режим, на мой взгляд, удобнее при работе с большими сообщениями или массивами сообщений. Первый же в свою очередь позволяет выполнить отправку сообщения без дополнительных файлов, что удобно при интеграции модуля в другие сценарии.


Получение сообщений


Вопрос получения сообщений уже не такая тривиальная вещь как публикация. Когда речь заходит о считывании сообщений необходимо понимать:


  • После подтверждении получения сообщения, оно будет удалено из очереди. А значит считывая сообщения из «боевой» очереди мы «отбираем» их у основного потребителя. Если мы не хотим терять поток сообщений, а просто хотим понимать, какие сообщения перемещаются в «кролике», то наиболее логичным вариантом является создание отдельной «логирующей» очереди, или как ее еще называют, «очереди-ловушки».
  • Считанные сообщения, как правило, требуют последующей обработки или анализа, а значит их нужно куда-либо сохранять, если обработка в режиме реального времени невозможна или не требуется.

Считыватель сообщений реализован в файле rmq_consume.py


Предусмотрены два режима работы:


  1. Считывание сообщений из существующей очереди
  2. Создание временной очереди и маршрута для считывания сообщений из этой очереди

Вопрос создания очереди и маршрутов будет рассмотрен ниже.


Непосредственно вычитка реализуется следующим образом:


channel.basic_consume(on_message, queue=params.queue)
try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
except Exception:
    channel.stop_consuming()
    rmq_tools.console_log("Ошибка:\n", traceback.format_exc())

где
on_message — процедура-обработчик сообщения
params.queue — имя очереди, из которой будет производиться вычитка


Обработчик сообщения должен выполнять некоторую операцию с прочитанным сообщением и подтверждать (или не подтверждать, если такое требуется) доставку сообщения.


def on_message(channel, method_frame, header_frame, body):
    global all_cnt, lim
    if all_cnt >= lim:
        rmq_tools.console_log('Достаточное количество информации собрано.')
        raise KeyboardInterrupt
    body_str = body.decode("utf-8")[:4000]
    rk = method_frame.routing_key
    rmq_params.file.write(rk + '\n')
    rmq_params.file.write(body_str + '\n\n')
    all_cnt = all_cnt + 1
    if (lim != 0) and (rmq_params.file == sys.stdout):
        sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r')
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)

где
all_cnt – глобальный счетчик
lim – количество сообщений которые необходимо считать


В такой реализации обработчика предусмотрена вычитка определённого количества сообщений и вывод информации о прогрессе вычитки в консоль, если запись происходит в файл.


Так же возможна реализация записи считаных сообщений в БД. В текущей реализации такая возможность не представлена, однако ее нетрудно добавить.


Запись в БД

Пример записи сообщений в базу рассмотрим для БД Oracle и библиотеки cx_oracle.


Подключаемся к БД


ora_adress = 'host:port/dbSID'
ora_creds = 'user/pass'
connection_ora = cx_Oracle.connect(ora_creds + ’@' + ora_address)
ora_cursor = connection_ora.cursor()

В обработчик on_message добавляем


global cnt, commit_int  
insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)'
ora_cursor.execute(insert_rec, text = body_str, rkey = rk)
if cnt > commit_int :
    ora_cursor.execute('commit')
    cnt = 1
cnt = cnt + 1

где
cnt – еще один счетчик
commit_int – количество вставок в базу, после которого необходимо делать «commit». Наличие такого параметра обусловлено желанием снизить нагрузку на БД. Однако, устанавливать его особо большим не стоит, т.к. в случае сбоя есть шанс потерять сообщения, считанные после последнего успешного commit.


И, как положено, по окончанию работы делаем финальный commit и закрываем соединение


ora_cursor.execute('commit')    
connection_ora.close()

Примерно так происходит считывание сообщений. Если убрать ограничение на количество считываемых сообщений, то можно сделать фоновый процесс для непрерывного считывания сообщений из «кролика».


Конфигурирование


Несмотря на то, что протокол AMQP в первую очередь предназначен для публикации и считывания сообщений, он так же позволяет выполнять простые манипуляции с конфигурацией маршрутов (речь не идет о конфигурации сетевых подключений и других настроек RabbitMQ как приложения).


Основные операции настройки это:


  1. Создание очереди (queue) или точки обмена (exchange)
  2. Создание правила пересылки (binding)
  3. Удаление очереди (queue) или точки обмена (exchange)
  4. Удаление правила пересылки (binding)
  5. Очистка очереди (queue)

Поскольку для каждой из них есть готовая процедура в библиотеке pika, то, для удобства запуска, они просто собраны в файле rmq_setup.py. Далее перечислим процедуры из библиотеки pika с небольшими комментариями по поводу параметров.


Создание очереди (queue)


rmq_channel.queue_declare(queue=params.queue, durable = params.durable)

тут все просто
queue – имя создаваемой очереди
durable – логический параметр, значение True будет означать, что при перезагрузке «кролика» очередь продолжит существовать. В случае значения False при перезагрузке очередь будет удалена. Второй вариант обычно используется для временных очередей, которые гарантированно не понадобятся в будущем.


Создание точки обмена (exchange)


rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)

здесь возникает новый параметр exchange_type — тип точки обмена. О том, какие бывают типы точек обмена читаем здесь.
exchange – имя создаваемой точки обмена


Удаление очереди (queue) или точки обмена (exchange)


rmq_channel.queue_delete(queue=params.queue)
rmq_channel.exchange_delete(exchange=params.exch)

Создание правила пересылки (binding)


rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)

exchange – имя точки обмена, из которой будет производиться пересылка
queue – имя очереди, в которую будет производится пересылка
routing_key – маска ключа маршрутизации, по которой будет производиться пересылка.


Допустимы следующие записи:


  • rk.my_key.* — в данной маске звездочка означает непустой набор символов. Иными словами, такая маска пропустит любой ключ вида rk.my_key. + что-то еще, но не пропустит ключ rk.my_key
  • rk.my_key.# — такая маска пропустит все, что и предыдущая + ключ rk.my_key

Удаление правила пересылки (binding)


rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)

тут все по аналогии с созданием правила пересылки.


Очистка очереди (queue)


rmq_channel.queue_purge(queue=params.queue)

queue – имя очереди, которую надо очистить


Об использовании интерфейса командной строки в приложениях на Python

Параметры запуска сильно облегчают жизнь. Чтоб не править код перед каждым запуском, логично предусмотреть механизм передачи параметров при запуске. Для этой целы была выбрана библиотека argparse. Подробно углубляться в тонкости ее использования не стану, гайдов по этому поводу достаточно (раз, два, три). Отмечу только, что этот инструмент помог мне значительно упростить процесс использования приложения (если его можно так назвать). Даже накидав простую последовательность команд и обернув их в подобный интерфейс, можно получить вполне полноценный и удобный в использовании инструмент.


Применение в повседневной жизни. Что пригодилось больше всего.


Ну а теперь немного впечатлений об использовании протокола AMQP в повседневной жизни.


Самой востребованной функцией явилась публикация сообщения. Права доступа конкретного пользователя не всегда позволяют использовать веб-интерфейс, хотя порой для тестирования того или иного сервиса это просто необходимо. Тут на помощь и проходит AMQP и авторизация от имени сервиса использующего этот канал.


Второй по популярности стала возможность считывания сообщений из временной очереди. Такая возможность полезна при настройке новых маршрутов и потоков сообщений, а также при предотвращении аварий.


Остальные возможности так же нашли применение в тех или иных задачах.

Теги:
Хабы:
Всего голосов 20: ↑18 и ↓2+16
Комментарии29

Публикации

Истории

Работа

Python разработчик
135 вакансий
Data Scientist
62 вакансии

Ближайшие события