Простые примеры на Pika python для работы со статичными (reject) очередями RabbitMQ
Ожидает приглашения
По работе приходится сталкиваться с очередями, которые по каким-то причинам не удается передать дальше. Они копятся (например что-то вроде queue.queue.reject) в очередь простоя.
Нужно такие очереди мониторить и понимать(также парсить и пр.), что там собирается. Т.е. по сути нужно эти очереди просто прочитать(не удаляя) и посмотреть, что с ними делать дальше:
Бывает нужно просто перекинуть из одной очереди в другую:
В указанном выше коде очередь при прочтении будет удалена из очереди queue_from. Т.е. тут реализован именно перенос, а не копирование. Изменений теоретически быть не должно — FIFO принцип должен в таком же порядке уложить очередь в новую.
Теоретически можно еще передать свойство exchange (channel.basic_publish(exchange=method_frame.exchange, routing_key=queue_to, body=body, properties=properties) ), но связки могут быть слишком сложными, что проще действовать напрямую без exchange.
Публикую отчет по таким очередям:
Нужно такие очереди мониторить и понимать(также парсить и пр.), что там собирается. Т.е. по сути нужно эти очереди просто прочитать(не удаляя) и посмотреть, что с ними делать дальше:
import pika
queue_from = 'queue.queue.reject'
mr = 'Юг'
# можно хранить аутетификацию таким образом.
# довольно удобно для большого количества серверов с разными паролями хостами
mr_dict = {'Юг': {'host': '192.168.1.240', 'auth': {'username': 'pavel', 'password': 'passw'}}
}
credentials = pika.PlainCredentials(**mr_dict[mr]['auth'])
parameters = pika.ConnectionParameters(host=mr_dict[mr]['host'], port='5672', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# тут получаем количество сообщений в нашей очереди
cnt = channel.queue_declare(queue=queue_from, passive=True).method.message_count
if cnt:
for count_massages in range(cnt):
messages = ''
method_frame, properties, body = channel.basic_get(queue_from)
# тут можно сохранить сообщения в переменную и затем в файл
messages += f"Cообщение №{count_massages}: \nproperties:
{str(properties)}\nbody:\n{str(body)}\n===\n"
f = open('text_of_queues.txt', 'w')
f.write(messages + '\n')
f.close()
channel.close()
connection.close()
Бывает нужно просто перекинуть из одной очереди в другую:
import pika
queue_from = 'queue.queue.reject'
queue_to = 'queue.queue'
mr = 'Юг'
mr_dict = {'Юг': {'host': '192.168.1.240', 'auth': {'username': 'pavel', 'password': 'passw'}}
}
credentials = pika.PlainCredentials(**mr_dict[mr]['auth'])
parameters = pika.ConnectionParameters(host=mr_dict[mr]['host'], port='5672', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# тут получаем количество сообщений в нашей очереди
cnt = channel.queue_declare(queue=queue_from, passive=True).method.message_count
if cnt:
for method_frame, properties, body in channel.consume(queue_from):
channel.basic_publish(exchange='', routing_key=queue_to, body=body, properties=properties)
channel.basic_ack(method_frame.delivery_tag)
if method_frame.delivery_tag == cnt:
break
requeued_messages = channel.cancel()
print(f'moved to queue "{queue_to}" from queue "{queue_from}": {cnt} messages successfully')
else:
print(f'nothing to move from queue "{queue_from}"')
channel.close()
connection.close()
В указанном выше коде очередь при прочтении будет удалена из очереди queue_from. Т.е. тут реализован именно перенос, а не копирование. Изменений теоретически быть не должно — FIFO принцип должен в таком же порядке уложить очередь в новую.
Теоретически можно еще передать свойство exchange (channel.basic_publish(exchange=method_frame.exchange, routing_key=queue_to, body=body, properties=properties) ), но связки могут быть слишком сложными, что проще действовать напрямую без exchange.
Публикую отчет по таким очередям:
import pika, os, zipfile
from xml.etree import cElementTree as ElementTree
from datetime import timedelta, datetime
import smtplib, email
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
queues_dict = {
# очереди
}
mr_dict = {'Юг': {'host': '192.168.1.240', 'auth': {'username': 'pavel', 'password': 'passw'}}
}
mr_queue_dict = {}
# название папки согласно дате
folder_name = datetime.now().strftime("%Y%m%d_%H")
path_to = os.getcwd() + fr'\output\{folder_name}'
if not os.path.exists(path_to):
os.makedirs(path_to)
if os.path.isfile(os.path.join(os.getcwd(), 'queues_service.zip')):
os.remove(os.path.join(os.getcwd(), 'queues_service.zip'))
class XmlListConfig(list):
def __init__(self, aList):
for element in aList:
if element:
if len(element) == 1 or element[0].tag != element[1].tag:
self.append(XmlDictConfig(element))
elif element[0].tag == element[1].tag:
self.append(XmlListConfig(element))
elif element.text:
text = element.text.strip()
if text:
self.append(text)
class XmlDictConfig(dict):
def __init__(self, parent_element):
if parent_element.items():
self.update(dict(parent_element.items()))
for element in parent_element:
if element:
if len(element) == 1 or element[0].tag != element[1].tag:
aDict = XmlDictConfig(element)
else:
aDict = {element[0].tag: XmlListConfig(element)}
if element.items():
aDict.update(dict(element.items()))
self.update({element.tag: aDict})
elif element.items():
self.update({element.tag: dict(element.items())})
else:
self.update({element.tag: element.text})
def to_csv_queue(dict_messages, csv_file_name):
# создадим оглавление , с учетом всех очередей
item_list = []
for n in dict_messages:
for i in range(len(dict_messages[n]['table']['row']['column'])):
if dict_messages[n]['table']['row']['column'][i]['name'] not in item_list:
item_list.append(dict_messages[n]['table']['row']['column'][i]['name'])
# создадим словарь, где ключом будет название поля
dict_messages_to_csv = {}
dict_messages_to_csv['MessageId'] = []
for item in item_list:
dict_messages_to_csv[item] = []
# наполняем по одному значению из одного сообщения в лист значений по полю оглавления
for n in dict_messages:
dict_messages_to_csv['MessageId'].append(n)
dict_messages_to_csv['property'].append(dict_messages[n]['property']['name'] + ' - ' + dict_messages[n]['property']['value'])
dict_messages_to_csv['table'].append(dict_messages[n]['table']['name'])
for item in item_list:
# если нет значения по кокретному полю - заполняем None
filled = 0
for i in range(len(dict_messages[n]['table']['row']['column'])):
if item in dict_messages[n]['table']['row']['column'][i]['name']:
dict_messages_to_csv[item].append(dict_messages[n]['table']['row']['column'][i]['value'])
filled = 1
if not filled:
dict_messages_to_csv[item].append('None')
# внесем еще названия полей вначало листа названия полей
item_list.insert(0,'MessageId')
text_to_csv = ''
# внесем оглавление в тект, что сохраним в csv
text_to_csv += ';'.join(item_list) + '\n'
# наполняем строками выбирая из словаря по листу оглавления
for n in range(len(dict_messages_to_csv['MessageId'])):
line_of_text = []
for item in item_list:
if ';' in str(dict_messages_to_csv[item][n]):
dict_messages_to_csv[item][n] = '"' + dict_messages_to_csv[item][n] + '"'
line_of_text.append(str(dict_messages_to_csv[item][n]))
text_to_csv += ';'.join(line_of_text) + '\n'
f4 = open(csv_file_name, 'w')
f4.write(text_to_csv + '\n')
f4.close()
def queue_move(mr, mr_queue_dict):
credentials = pika.PlainCredentials(**mr_dict[mr]['auth'])
parameters = pika.ConnectionParameters(host=mr_dict[mr]['host'], port='5672', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
mr_queue_dict[mr] = {}
for queue in queues_dict:
queue_from = queues_dict[queue][1]
cnt = channel.queue_declare(queue=queue_from, passive=True).method.message_count
mr_queue_dict[mr][queue_from] = cnt
if cnt:
log_file = os.path.join(path_to, mr + '.' + queue_from + '.csv')
csv_file_name = os.path.join(path_to, 'parsed.' + mr + '.' + queue_from + '.csv')
messages = ''
dict_messages = {}
for count_massages in range(cnt):
method_frame, properties, body = channel.basic_get(queue_from)
messages += f"""Cообщение №{count_massages}: \nproperties:{str(properties)}\nbody:\n{str(body)}\n===\n"""
root = ElementTree.XML(body.decode())
dict_messages[count_massages] = XmlDictConfig(root)
to_csv_queue(dict_messages, csv_file_name)
f4 = open(log_file, 'w')
f4.write(messages + '\n')
f4.close()
channel.close()
connection.close()
def send_mail(sender, destination_to, destination_cc, subject, text_message, files):
# Формирование тела письма
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = ', '.join(destination_to)
msg['cc'] = ', '.join(destination_cc)
msg['Subject'] = subject
# формирование текста письма
msg_html = MIMEText((text_message).encode('utf-8'), 'html', 'utf-8')
msg.attach(msg_html)
for filename in files:
if(os.path.exists(filename) and os.path.isfile(filename)):
filename = os.path.basename(filename)
with open(filename, 'rb') as file:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
# добавление файла в attach
msg_attach = MIMEBase(maintype, subtype)
msg_attach.set_payload(file.read())
email.encoders.encode_base64(msg_attach)
msg_attach.add_header('Content-Disposition', 'attachment', filename=filename)
msg.attach(msg_attach)
file.close()
else:
print('Файл для атача не найден: ' + filename)
try:
smtp_obj = smtplib.SMTP('mail.inpochta.ru', 25)
smtp_obj.set_debuglevel(0)
smtp_obj.ehlo()
smtp_obj.starttls()
smtp_obj.ehlo()
smtp_obj.login(msg['From'], "пароль")
smtp_obj.sendmail(msg['From'], destination_to + destination_cc, msg.as_string())
# except smtplib.socket.gaierror:
except smtplib.SMTPException as err:
print('Ошибка при отправке e-mail сообщения: ', err)
finally:
smtp_obj.quit()
for mr in mr_dict:
queue_move(mr, mr_queue_dict)
subject = 'Отчет по очередям service'
html = f"""<h4>Количество очередей на {datetime.now().strftime("%H:%M %d.%m.%Y:")} </h4>
<p>Сводная таблица по всем серверам: </p>"""
html += "<p>Проверить сетевые настройки.</p>"
html += '<p></p><br><label><i>в аттаче файлы в разобранном виде и в сыром</i></label><br>'
# соберем полученные файлы в один по нашей папке и сожмем
files = os.listdir(path_to)
queue_zip = zipfile.ZipFile(os.path.join(os.getcwd(), 'queues_service.zip'), 'w')
for file in files:
queue_zip.write(os.path.join(path_to, file), file, compress_type=zipfile.ZIP_DEFLATED)
os.remove(os.path.join(path_to, file))
queue_zip.close()
print(os.path.join(os.getcwd(), 'queues_service.zip'))
send_mail('pavel@inpochta.ru', # отправитель
['info@inpochta.ru'], # получатель
['Anton@inpochta.ru', 'svetlana@inpochta.ru'], # в копию
subject, # тема сообщения
html, # текст письма
['queues_service.zip'] # файлы
)