Простые примеры на Pika python для работы со статичными (reject) очередями RabbitMQ

По работе приходится сталкиваться с очередями, которые по каким-то причинам не удается передать дальше. Они копятся (например что-то вроде queue.queue.reject) в очередь простоя.

Нужно такие очереди мониторить и понимать(также парсить и пр.), что там собирается. Т.е. по сути нужно эти очереди просто прочитать(не удаляя) и посмотреть, что с ними делать дальше:

import pika
queue_from = 'queue.queue.reject'
mr = 'Юг'
# можно хранить аутетификацию таким образом. 
# довольно удобно для большого количества серверов с разными паролями хостами
mr_dict = {'Юг': {'host': '192.168.1.240', 'auth': {'username': 'pavel', 'password': 'passw'}}
}

credentials = pika.PlainCredentials(**mr_dict[mr]['auth'])
parameters = pika.ConnectionParameters(host=mr_dict[mr]['host'], port='5672', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# тут получаем количество сообщений в нашей очереди
cnt = channel.queue_declare(queue=queue_from, passive=True).method.message_count

if cnt:
  for count_massages in range(cnt):
    messages = ''
    method_frame, properties, body = channel.basic_get(queue_from)
    # тут можно сохранить сообщения в переменную и затем в файл
    messages += f"Cообщение №{count_massages}: \nproperties:
              {str(properties)}\nbody:\n{str(body)}\n===\n"
f = open('text_of_queues.txt', 'w')
f.write(messages + '\n')
f.close()
channel.close()
connection.close()

Бывает нужно просто перекинуть из одной очереди в другую:

import pika
queue_from = 'queue.queue.reject'
queue_to = 'queue.queue'

mr = 'Юг'
mr_dict = {'Юг': {'host': '192.168.1.240', 'auth': {'username': 'pavel', 'password': 'passw'}}
}

credentials = pika.PlainCredentials(**mr_dict[mr]['auth'])
parameters = pika.ConnectionParameters(host=mr_dict[mr]['host'], port='5672', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# тут получаем количество сообщений в нашей очереди
cnt = channel.queue_declare(queue=queue_from, passive=True).method.message_count

if cnt:
  for method_frame, properties, body in channel.consume(queue_from):
    channel.basic_publish(exchange='', routing_key=queue_to, body=body, properties=properties)
    channel.basic_ack(method_frame.delivery_tag)
    if method_frame.delivery_tag == cnt:
      break
  requeued_messages = channel.cancel()
  print(f'moved to queue "{queue_to}" from queue "{queue_from}": {cnt} messages successfully')
else:
  print(f'nothing to move from queue "{queue_from}"')
  
channel.close()
connection.close()

В указанном выше коде очередь при прочтении будет удалена из очереди queue_from. Т.е. тут реализован именно перенос, а не копирование. Изменений теоретически быть не должно — FIFO принцип должен в таком же порядке уложить очередь в новую.

Теоретически можно еще передать свойство exchange (channel.basic_publish(exchange=method_frame.exchange, routing_key=queue_to, body=body, properties=properties) ), но связки могут быть слишком сложными, что проще действовать напрямую без exchange.

Публикую отчет по таким очередям:

import pika, os, zipfile
from xml.etree import cElementTree as ElementTree
from datetime import timedelta, datetime
import smtplib, email
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText

queues_dict = {
# очереди
}

mr_dict = {'Юг': {'host': '192.168.1.240', 'auth': {'username': 'pavel', 'password': 'passw'}}
}
mr_queue_dict = {}

# название папки согласно дате
folder_name = datetime.now().strftime("%Y%m%d_%H")
path_to = os.getcwd() + fr'\output\{folder_name}' 
if not os.path.exists(path_to):
  os.makedirs(path_to)
if os.path.isfile(os.path.join(os.getcwd(), 'queues_service.zip')):
  os.remove(os.path.join(os.getcwd(), 'queues_service.zip'))

class XmlListConfig(list):
  def __init__(self, aList):
    for element in aList:
      if element:
        if len(element) == 1 or element[0].tag != element[1].tag:
          self.append(XmlDictConfig(element))
        elif element[0].tag == element[1].tag:
          self.append(XmlListConfig(element))
      elif element.text:
        text = element.text.strip()
        if text:
          self.append(text)

class XmlDictConfig(dict):
  def __init__(self, parent_element):
    if parent_element.items():
      self.update(dict(parent_element.items()))
    for element in parent_element:
      if element:
        if len(element) == 1 or element[0].tag != element[1].tag:
          aDict = XmlDictConfig(element)
        else:
          aDict = {element[0].tag: XmlListConfig(element)}
        if element.items():
          aDict.update(dict(element.items()))
        self.update({element.tag: aDict})
      elif element.items():
        self.update({element.tag: dict(element.items())})
      else:
        self.update({element.tag: element.text})

def to_csv_queue(dict_messages, csv_file_name):
  # создадим оглавление , с учетом всех очередей
  item_list = []
  for n in dict_messages:
    for i in range(len(dict_messages[n]['table']['row']['column'])):
      if dict_messages[n]['table']['row']['column'][i]['name'] not in item_list:
        item_list.append(dict_messages[n]['table']['row']['column'][i]['name'])
  
  # создадим словарь, где ключом будет название поля
  dict_messages_to_csv = {}
  dict_messages_to_csv['MessageId'] = []
  for item in item_list:
    dict_messages_to_csv[item] = []
    
  # наполняем по одному значению из одного сообщения в лист значений по полю оглавления
  for n in dict_messages:
    dict_messages_to_csv['MessageId'].append(n)
    dict_messages_to_csv['property'].append(dict_messages[n]['property']['name'] + ' - ' + dict_messages[n]['property']['value'])
    dict_messages_to_csv['table'].append(dict_messages[n]['table']['name'])
    for item in item_list:
      # если нет значения по кокретному полю - заполняем None
      filled = 0
      for i in range(len(dict_messages[n]['table']['row']['column'])):
        if item in dict_messages[n]['table']['row']['column'][i]['name']:
          dict_messages_to_csv[item].append(dict_messages[n]['table']['row']['column'][i]['value'])
          filled = 1
      if not filled:
        dict_messages_to_csv[item].append('None')
  
  # внесем еще названия полей вначало листа названия полей
  item_list.insert(0,'MessageId')
  
  text_to_csv = ''
  # внесем оглавление в тект, что сохраним в csv
  text_to_csv += ';'.join(item_list) + '\n'

  # наполняем строками выбирая из словаря по листу оглавления
  for n in range(len(dict_messages_to_csv['MessageId'])):
    line_of_text = []
    for item in item_list:
      if ';' in str(dict_messages_to_csv[item][n]):
        dict_messages_to_csv[item][n] = '"' + dict_messages_to_csv[item][n] + '"'
      line_of_text.append(str(dict_messages_to_csv[item][n]))
    text_to_csv += ';'.join(line_of_text) + '\n'
  
  f4 = open(csv_file_name, 'w')
  f4.write(text_to_csv + '\n')
  f4.close()
  
def queue_move(mr, mr_queue_dict):
  credentials = pika.PlainCredentials(**mr_dict[mr]['auth'])
  parameters = pika.ConnectionParameters(host=mr_dict[mr]['host'], port='5672', credentials=credentials)
  connection = pika.BlockingConnection(parameters)
  channel = connection.channel()
  mr_queue_dict[mr] = {}
  
  for queue in queues_dict:
    queue_from = queues_dict[queue][1]
    cnt = channel.queue_declare(queue=queue_from, passive=True).method.message_count 
    mr_queue_dict[mr][queue_from] = cnt
    
    if cnt:
      log_file = os.path.join(path_to, mr + '.' + queue_from + '.csv')
      csv_file_name = os.path.join(path_to, 'parsed.' + mr + '.' + queue_from + '.csv')
      messages = ''
      dict_messages = {}
      for count_massages in range(cnt):
        method_frame, properties, body = channel.basic_get(queue_from)
        messages += f"""Cообщение №{count_massages}: \nproperties:{str(properties)}\nbody:\n{str(body)}\n===\n"""
        root = ElementTree.XML(body.decode())
        dict_messages[count_massages] = XmlDictConfig(root)

        to_csv_queue(dict_messages, csv_file_name)

      f4 = open(log_file, 'w')
      f4.write(messages + '\n')
      f4.close()

  channel.close()
  connection.close()

def send_mail(sender, destination_to, destination_cc, subject, text_message, files):
  # Формирование тела письма
  msg = MIMEMultipart()
  msg['From'] = sender
  msg['To'] = ', '.join(destination_to)
  msg['cc'] = ', '.join(destination_cc)
  msg['Subject'] = subject

  # формирование текста письма
  msg_html = MIMEText((text_message).encode('utf-8'), 'html', 'utf-8')
  msg.attach(msg_html)

  for filename in files:
    if(os.path.exists(filename) and os.path.isfile(filename)):
      filename = os.path.basename(filename)
      with open(filename, 'rb') as file:
        ctype = 'application/octet-stream'
        maintype, subtype = ctype.split('/', 1)
      
        # добавление файла в attach 
        msg_attach = MIMEBase(maintype, subtype)
        msg_attach.set_payload(file.read())
        email.encoders.encode_base64(msg_attach)
        msg_attach.add_header('Content-Disposition', 'attachment', filename=filename)
        msg.attach(msg_attach)
        file.close()
    else:
      print('Файл для атача не найден: ' + filename)

  try:
    smtp_obj = smtplib.SMTP('mail.inpochta.ru', 25)
    smtp_obj.set_debuglevel(0)
    smtp_obj.ehlo()
    smtp_obj.starttls()
    smtp_obj.ehlo()
    smtp_obj.login(msg['From'], "пароль")
    smtp_obj.sendmail(msg['From'], destination_to + destination_cc, msg.as_string())
  # except smtplib.socket.gaierror:
  except smtplib.SMTPException as err:
    print('Ошибка при отправке e-mail сообщения: ', err)
  finally:
    smtp_obj.quit()

for mr in mr_dict:
  queue_move(mr, mr_queue_dict)
    
subject = 'Отчет по очередям service'

html = f"""<h4>Количество очередей на {datetime.now().strftime("%H:%M %d.%m.%Y:")} </h4>
  <p>Сводная таблица по всем серверам: </p>"""
  
html += "<p>Проверить сетевые настройки.</p>"
html += '<p></p><br><label><i>в аттаче файлы в разобранном виде и в сыром</i></label><br>'

# соберем полученные файлы в один по нашей папке и сожмем
files = os.listdir(path_to)
queue_zip = zipfile.ZipFile(os.path.join(os.getcwd(), 'queues_service.zip'), 'w')

for file in files:
  queue_zip.write(os.path.join(path_to, file), file, compress_type=zipfile.ZIP_DEFLATED)
  os.remove(os.path.join(path_to, file))
queue_zip.close() 
print(os.path.join(os.getcwd(), 'queues_service.zip'))

send_mail('pavel@inpochta.ru', # отправитель
['info@inpochta.ru'], # получатель
['Anton@inpochta.ru', 'svetlana@inpochta.ru'], # в копию 
subject, # тема сообщения 
html, # текст письма
['queues_service.zip'] # файлы
)

Теги:
python3 pika rabbitmq

Данная статья не подлежит комментированию, поскольку её автор ещё не является полноправным участником сообщества. Вы сможете связаться с автором только после того, как он получит приглашение от кого-либо из участников сообщества. До этого момента его username будет скрыт псевдонимом.