Как стать автором
Обновить

Почтовый бот

Время на прочтение12 мин
Количество просмотров18K
Электронная почта один из самых используемых инструментов для обмена информацией, постановки и выполнения задач. Зачастую поступающие письма носят повторяющийся характер предоставления какой либо однотипной информации меняющейся с течением времени. К таким задачам можно отнести:

  1. Различного рода агрегацию информации из нескольких писем в единый формат данных, отправка к определенной дате или напоминание об отправке.
  2. Запросы, связанные с необходимостью проверить информацию в какой либо системе/базе данных и ответным письмо направить ее запрашивающему.
  3. На основании письма внести информацию в какую-либо систему.
  4. Произвести какие либо вычисления и отправить данные.
  5. С какой либо периодичность отправлять отчеты, файлы и много другое.

Поэтому пробуем, не используя готовых решений, создать цифрового помощника, для решения наиболее повторяющихся и возможных к автоматизации задач. Ниже представлена базисная структура такого цифрового помощника с использованием базовых знаний языка Python3. Кому-то, возможно эта структура будет полезна, а кто уже ее использует, посмотрев статью, вспомнит как это работает. Дополняя базовую конструкцию новыми типами задач, можно получить достаточно многофункциональное решение, что в свою очередь должно привести к существенному снижению собственных трудозатрат, если конечно цель оправдывает время затраченное на разработку.

Алгоритм работы:

  • Периодическая проверка полученной почты (чем большей действий за цикл, тем больше интервал).
  • При соответствии писем критериям, обработка и занесение информации в БД.
  • Периодическая отправка писем на основании данных из БД.

За работу алгоритма отвечает несколько модулей (скриптов) взаимосвязанных между собой, условно разделенных на уровни:

Первый уровень:

  • telegraf запускается как процесс и последовательно с заданным интервалом выполняет две команды по обработке входящей почты и по отправке исходящих писем.

Втрой уровень:
  • recive_parser_email за выборку нужных для обработки писем и занесения данных в БД
  • send_parse_email за выборку нужной информации из БД и отправку писем

Третий уровень:
  • _email2 за базовое взаимодействие с почтовым сервером, приема и отправки писем
  • _sqlite за базовое взаимодействие с БД SQLite, внесение удаление и извлечение информации
  • _data_parser за обработку содержимого тела письма и преобразования в нужный вид.

Алгоритм работы схематично:


telegraf


В качестве процесса используется скрипт, который запускается как процесс. Вместо него также могут быть различные утилиты такие как Telegraf (широко используемый с работой influxDB) или crontab например. Данный скрипт можно запустить как процесс в фоновом режиме:

(vnev) [linux@test] python3.8 /path_to_script/telegraf.py >/dev/null 2>&1 < /dev/null &

Исходный код может выглядеть так:

telegraf.py
import subprocess
import path
import time
import _email2
import sys

count=0
while True:
    try:
        subprocess.check_call(['python3.8 '+path.path+'recive_parse_email.py'],shell=True, timeout=14)
        subprocess.check_call(['python3.8 '+path.path+'send_parse_email.py'],shell=True, timeout=14)
    except subprocess.TimeoutExpired:
        _email2.send_email('Ошибка', 'Exception timeout subprocess, module telegraf, line 11 '+str(count), 'test1@gmail.com')
        count+=1
        if count > 5:
            sys.exit(1)
    except Exception as e:
        _email2.send_email('Ошибка', 'Exception in module telegraf, line 11' + str(e), 'test1@gmail.com')
        sys.exit(1)
    time.sleep(30)


Алгоритм выполняет последовательно два скрипта. Интервал повторения в данном случае 30 секунд подбирается индивидуально, исходя из времени выполняемых действий и ограничений почтового сервера. При этом на каждую команду выделяется по 14 секунд, если по каким-то причинам происходит подвисание процесса, то он сбрасывается по времени. Если таких подвисаний более 5 то скрипт прекращает работу. При этом если что-то идет не так отправляется письмо администратору системы с кодом ошибки. В более сложных проектах возможно лучше будет дополнить логи по каждому действую бота, для этого есть неплохая библиотека logging. Библиотека Path, файл содержащий одну строку (возможно не самое удобное решение, но для миграции между разными системами и удаленного запуска через оболочки работает без необходимости глобального исправления всех путей)

path.py
path="/absolute_path_to_project_on_your system/"


recive_parser_email


Модуль используется для обработки полученных писем в почтовом ящике. При соответствии заголовка определенному критерию, происходит обработка содержимого письма и занесение необходимых данных в БД.

recive_parser_email.py
import re
import _sqlite
import _email2
import datetime
from datetime import datetime
import _data_parser
#
def write_data_to_sql(id_, from_,  subject, content, content_type):
	d={}
	#   0                1                  2                3                     4                         5
	d["ID"] = " "; d["Что напомнить"] = " "; d["статус"] = ' '; d["дата"] = ' '; d['Когда напомнить'] = ' '; d['адрес почты'] = ' '
	if re.findall(r'Что ты можешь', subject) != [] or re.findall(r'что ты можешь', subject) != []:
		status = "well_to_know"
		_sqlite.insert_data_sql("test_db", "'"+d["ID"]+"', '"+d["Что напомнить"]+"', '"+status+"', '"\
		+str(datetime.now().strftime('%d.%m.%y  %H:%M:%S'))+"', '"+d['Когда напомнить']+"', '"+from_+"'")
	if re.findall(r'Напомни_', subject) != [] or re.findall(r'напомни_', subject) != []:
		d=_data_parser.table_parser2(content)
		status='remind'
		_sqlite.insert_data_sql("test_db", "'" + d["ID"] + "', '" + d["Что напомнить"] + "', '" + status + "', '" \
		+ str(datetime.now().strftime('%d.%m.%y  %H:%M:%S')) + "', '" + d['Когда напомнить'] + "', '" + from_ + "'")
	# Далее может быть конструкнуция по другим совпадениям и действиям с ними
#
# Проверяем почту через модуль _email2, записываем данные в k
k=_email2.check_email_box()
#
# обрабатыаем входящую почту и формируем перечень сообщений для удаления
sublist_to_delete=[]; list_to_delete=[]
for i in k:
	#Обработанные данные на основании загловка сообщения, записываем в БД
	write_data_to_sql(i[0], i[1],  i[2], i[3], i[4])
	#Формируем список сообщений для удаления
	if re.findall(r'Что ты можешь', str(i[2])) != [] or re.findall(r'что ты можешь', str(i[2])) != [] or re.findall(r'Напомни_', str(i[2])) != []:
		sublist_to_delete.append(i[0]);	sublist_to_delete.append(i[2])
		list_to_delete.append(sublist_to_delete)
#Удаляем обработанные сообщения по средством модуля _email2
_email2.delete_several_email(list_to_delete)


Состоит из одной функции write_data_to_sql, результатом выполнения которой является запись в базу данных. Каждый новый тип задачи формируется на основании ключевых слов в заголовке письма (новый тип задачи — новый if statement). Для примера показаны два простых типа задач на основании писем в почтовом ящике:

  1. Инструкция пользования ботом, бот отправляет типы задач которые может обрабатывать в ответ на письмо с темой «Что ты можешь».
  2. Устанавливает напоминание, в данному случае обрабатывает письма с темой содержащей слово «Напомни_».

Также во втором примере добавлен скрипт обработчика таблиц тела сообщения _data_parser.table_parser2, код обработчика для примера (ни к чему не привязан, just in case):

_data_parser.py
import re
from html_table_parser import HTMLTableParser

def table_parser2(data):
	d={}
	p = HTMLTableParser()
	p.feed(data)
	for i in p.tables[0]:
		if re.findall("ID", str(i)) != [] and re.findall("ID", str(d)) == []: d["ID"] = i[(i.index("ID")+1)].replace(" ","")
		if re.findall("Что напомнить", str(i)) != [] and re.findall("Что напомнить", str(d)) == []: d["Что напомнить"] = i[(i.index("Что напомнить")+1)]
		if re.findall("Когда напомнить", str(i)) != [] and re.findall("Когда напомнить", str(d)) == []: d["Когда напомнить"] = i[(i.index("Когда напомнить") + 1)]
	if re.findall("ID", str(d)) == []: d["ID"] = 'нет данных'
	if re.findall("Что напомнить", str(d)) == []: d["Что напомнить"] = 'нет данных'
	if re.findall("Когда напомнить", str(d)) == []: d["Когда напомнить"] = 'нет данных'
	return d


Для записи в базу данных формируется словарь состоящий из 6 значений (для более сложных проектах может быть и больше значений и таблиц в SQL для каждой задачи). Занесение данных происходит посредством модуля _sqlite.py. Обработка почты происходит на основании сформированного list посредством модуля _email2. После занесения данных происходит формирование писем и удаление с помощью того же модуля _email2.

send_parse_email


Модуль предназначен для проверки данных в БД. При наличии информации к отправке, модуль производит подготовку и обработку данных в нужный вид и дальнейшую отправку и корректировку информации в БД. В данном модуле логика добавления нового типа выполняемых задач идентична модулю recive_parser_email, каждый новый тип задач добавляется в функции check_sql_requests_action_by_one (новый тип задачи — новый if statement).

send_parse_email.py
import re
import _sqlite
import _email2
import datetime
from datetime import timedelta
from datetime import datetime
import path

def adresses_from(list_):
	x2=re.findall('<(.*?)>', list_)
	if x2!=[]:
		x1=re.findall('<(.*?)>', list_)[0]
	else:
		x1=list_
	return x1

def check_sql_requests_action_by_one():
	check_exist = _sqlite.sql_row_names("test_db")
	for i in check_exist:
		if i[2]=='well_to_know':
			name='Bot'
			f = open(path.path+'template_what_you_can.html', 'r')
			html=f.read()
			html = html.replace('{_data_}', name)
			_email2.send_email('что я могу', html, adresses_from (i[5]))
			_sqlite.delete_data_sql("test_db", 'data_last_modify', i[3])
		if i[2]=='remind':
			try:
				time_o = datetime.strptime(i[4], '%d.%m.%y %H:%M')
				time_error = ' '
			except:
				time_o = datetime.strptime('01.01.30 00:00', '%d.%m.%y %H:%M')
				time_error = 'error_time_format'
			if time_o <= datetime.now() and i[2] == 'remind' or time_error == 'error_time_format':
				_email2.send_email('Напоминание ' + i[0]+' '+time_error, i[1], adresses_from(i[5]))
				_sqlite.delete_data_sql("test_db", 'data_last_modify', i[3])
		if i[2] == 'YOUR PARSER':
			#Различные другие совпадени и действия с ними
			pass

check_sql_requests_action_by_one()



Для примера данный модуль выполняет те же два типа задач описанные в предыдущем модуле на основании данных из БД. При этом для формирования тела письма используется HTML шаблон, а также для примера показано возможность замены некоторых слов в шаблоне. Пример шаблона:

template_what_you_can.html
<!DOCTYPE html>
<html>
<head>
    <title>MF</title>
</head>
<body>
    <p>Добрый день, я {_data_},</p>

    <p>Вот что я умею:</p>

    <p>
        1. Могу сделайть ЗАДАЧУ1 ..., для этого отправь письмао с заголовком ..., в теле письма укажи ....
	</p>
    <p>
        2. Могу сделать ЗАДАЧУ2. 
    </p>
<p>
	Хорошего дня! 

    </p>
</body>
</html>


_email2


Базовый набор функций для приема и передачи почты, очень много подобного когда на просторах Интернета

_email2.py
import smtplib
from email.mime.text import MIMEText
from email.header import decode_header
from email.mime.multipart import MIMEMultipart
import mailparser
import email
import imaplib

# connect to Google's servers
smtp_ssl_host = 'smtp.gmail.com'
smtp_ssl_port = 465
EMAIL=username=from_addr = 'your_email@gmail.com'
PASSWORD=password = 'your_pass'
SERVER = 'imap.gmail.com'

#Список на какие адреса разрешается отправка почты
legal_list=['test1@gmail.com', 'test2@gmail.com']

#функция для проверки разрешенного списка адресатов почты
def legal_check(list_adrrs):
	k=0
	#ststus=''
	global legal_list
	if list_adrrs==list(list_adrrs):
		for i in list_adrrs:
			if i in legal_list:
				k+=1
	else:
		if list_adrrs in legal_list:
			k+=1
	if k >0:
		status='allow'
	else:
		status='deny'
	return status

#Функция проверки почтовго ящика
def check_email_box():
	# возвращает значения [[id, from, subject, content],[...]]
	global EMAIL; global PASSWORD;	global SERVER
	mail = imaplib.IMAP4_SSL(SERVER)
	mail.login(EMAIL, PASSWORD)
	mail.select('inbox')
	# it will return with its status and a list of ids
	status, data = mail.search(None, 'ALL')
	# by white spaces on this format: [b'1 2 3', b'4 5 6']
	mail_ids = []
	for block in data:
		# b'1 2 3'.split() => [b'1', b'2', b'3']
		mail_ids += block.split()
	# now for every id we'll fetch the email
	# to extract its content
	email_list = []
	for i in mail_ids:
		email_=[]
		email_.append(i)
		# the fetch function fetch the email given its id
		# and format that you want the message to be
		status, data = mail.fetch(i, '(RFC822)')
		# the content data at the '(RFC822)' format comes on
		# a list with a tuple with header, content, and the closing
		# byte b')'
		for response_part in data:
			# so if its a tuple...
			if isinstance(response_part, tuple):
				mail_p = mailparser.parse_from_bytes(response_part[1])
				if len(mail_p.from_[0])>1:
					email_.append(mail_p.from_[0][1])
				else:
					email_.append(mail_p.from_[0][0])
				email_.append(mail_p.subject)
				email_.append(mail_p.body)
				email_.append('html')
		email_list.append(email_)
	return email_list

#Удаление одного письма
def delete_one_email(id_, subject):
	# возвращает значения [[id=b'1', from, subject, content],[...]]
	global EMAIL
	global PASSWORD
	global SERVER
	mail = imaplib.IMAP4_SSL(SERVER)
	mail.login(EMAIL, PASSWORD)
	mail.select('inbox')
	status, data = mail.search(None, 'ALL')
	mail_ids = []
	mail_list = []
	for block in data:
		mail_ids += block.split()
	for i in mail_ids:
		mail_ = []
		mail_.append(i)
		status, data = mail.fetch(i, '(RFC822)')
		for response_part in data:
			if isinstance(response_part, tuple):
				message = email.message_from_bytes(response_part[1])
				mail_from = message['from']
				mail_subject = message['subject']
				mail_.append(mail_from)
				try:
					bytes, encoding = decode_header(mail_subject)[0]
					mail_subject = bytes.decode(encoding)
					mail_.append(mail_subject)
				except:
					pass
					mail_.append(mail_subject)
		mail_list.append(mail_)
	k=0
	for i in mail_list:
		if id_== i[0] and subject == i[2]:
			k+=1
		else:
			pass
	if k > 0:
		pass
		mail.store(id_, '+FLAGS', '(\Deleted)')
		mail.expunge()

#Удаление нескольких писем
def delete_several_email(id_subject):
	# возвращает значения [[id=b'1', from, subject, content],[...]]
	global EMAIL; global PASSWORD; global SERVER
	mail = imaplib.IMAP4_SSL(SERVER)
	mail.login(EMAIL, PASSWORD)
	mail.select('inbox')
	status, data = mail.search(None, 'ALL')
	mail_ids = []
	mail_list = []
	for block in data:
		mail_ids += block.split()
	for i in mail_ids:
		mail_ = []
		mail_.append(i)
		status, data = mail.fetch(i, '(RFC822)')
		for response_part in data:
			if isinstance(response_part, tuple):
				message = email.message_from_bytes(response_part[1])
				mail_from = message['from']
				mail_subject = message['subject']
				mail_.append(mail_from)
				try:
					bytes, encoding = decode_header(mail_subject)[0]
					mail_subject = bytes.decode(encoding)
					mail_.append(mail_subject)
				except:
					pass
					mail_.append(mail_subject)
		mail_list.append(mail_)
	for i in mail_list:
		for y in id_subject:
			if y[0] == i[0] and y[1] == i[2]:
				mail.store(y[0], '+FLAGS', '(\Deleted)') 
				mail.expunge()
			else:
				pass

#Отправка почты
def send_email(subject_, message_, to_addrs, format='html'):
	global smtp_ssl_host; global smtp_ssl_port;	global username
	global password; global from_addr
	if legal_check(to_addrs)=="deny":
		subject_ ="email is not legal "+str(to_addrs)
		to_addrs=username
	message = MIMEMultipart("alternative")
	if format=='html':
		message = MIMEMultipart("alternative", None, [MIMEText(message_,'html')])
	if format=='text':
		message = MIMEMultipart("alternative", None, [MIMEText(message_)])
	message['subject'] = subject_
	message['from'] = from_addr
	if to_addrs==list(to_addrs):
		message['to'] = ', '.join(to_addrs)
	else:
		message['to'] = (to_addrs)
	server = smtplib.SMTP_SSL(smtp_ssl_host, smtp_ssl_port)
	server.login(username, password)
	server.sendmail(from_addr, to_addrs, message.as_string())
	server.quit()


Для примера взят почтовый сервис gmail, в целом может быть использован практически любой другой (соответственно нужно будет поменять порты на которых работает сервер). Данный модуль это набор базовых функций, единственное стоит отметить функцию проверки белого списка legal_check, которая в свою очередь проверяет возможность отправки писем адресатам сравнивая с разрешенным списком адресатов legal_list.

_sqlite


Базовый модуль для взаимодействия с базой данных sqlite, представляет собой набор функций для взаимодействия с базой данных и выполнения стандартных операций.

_sqlite.py
import sqlite3
import path
 
db_name = path.path+"test_db.db"

def insert_data_sql(table, data):
    global db_name
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute("INSERT INTO "+table+" VALUES ("+data+")")
    conn.commit(); conn.close()

def delete_data_sql(table, Column, data):
    global db_name
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute("DELETE FROM "+table+" WHERE "+Column+" = '"+data+"'")
    conn.commit(); conn.close()

def update_data_sql(table, data):
    global db_name
    global db_table_name
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute("UPDATE "+table+" SET "+data)
    conn.commit(); conn.close()

def sql_get_posts():
    global db_name
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    with conn:
        cursor.execute("SELECT * FROM "+db_table_name)
        print(cursor.fetchall())

def sql_row_names(table):
        global db_name
        conn = sqlite3.connect(db_name)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM "+table)
        names = [description[0] for description in cursor.description]
        names = cursor.fetchall()
        return names

def sql_select_data(db_table_name, column_, name_):
        global db_name
        conn = sqlite3.connect(db_name)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM "+db_table_name)
        names = [description[0] for description in cursor.description]
        cursor.execute("SELECT * FROM "+db_table_name+" WHERE "+column_+"='"+name_+"';")
        rows = cursor.fetchall()
        for row in rows:
            names.append(row)
        conn.commit(); conn.close()
        return names


Для проверки данных в SQLite полезным будет набор базовых команд из консоли:

- sqlite3 /your_path/test_db.db - зайти в базу из консоли
- .mode column - для удобства представления по колонкам
- .header on - для отображения названия заголовков
- SELECT * FROM test_db; - вывод данных таблицы
- DROP TABLE test_db - удаление таблицы
- .quit - выход

Перед началом использования базы нужно создать таблицу, это также можно сделать через скрипт:

_sqlite_new_table.py
import sqlite3
import path

conn = sqlite3.connect(path.path+"test_db.db")
cursor = conn.cursor()
 
# Создание таблицы
cursor.execute("""CREATE TABLE test_db (ID text, Service text, status text, data_last_modify text, remind_time text, from_email text)""")



Весь код также выложен на GitHub по ссылке.
Спасибо за Ваше время.
Теги:
Хабы:
Всего голосов 6: ↑4 и ↓2+4
Комментарии3

Публикации

Истории

Работа

Data Scientist
41 вакансия

Ближайшие события