ETL процесс получения данных из электронной почты в Apache Airflow


    Как бы сильно не развивались технологии, за развитием всегда тянется вереница устаревших подходов. Это может быть обусловлено плавным переходом, человеческим фактором, технологическими необходимостями или чем-то другим. В области обработки данных наиболее показательными в этой части являются источники данных. Как бы мы не мечтали от этого избавиться, но пока часть данных пересылается в мессенджерах и электронных письмах, не говоря и про более архаичные форматы. Приглашаю под кат разобрать один из вариантов для Apache Airflow, иллюстрирующий, как можно забирать данные из электронных писем.


    Предыстория


    Многие данные до сих пор передаются через электронную почту, начиная с межличностных коммуникаций и заканчивая стандартами взаимодействия между компаниями. Хорошо, если удается для получения данных написать интерфейс или посадить людей в офисе, которые эту информацию будут вносить в более удобные источники, но зачастую такой возможности может просто не быть. Конкретная задача, с которой столкнулся я, — это подключение небезызвестной CRM системы к хранилищу данных, а далее — к системе OLAP. Так исторически сложилось, что для нашей компании использование этой системы было удобно в отдельно взятой области бизнеса. Поэтому всем очень хотелось иметь возможность оперировать данными и из этой сторонней системы в том числе. В первую очередь, конечно, была изучена возможность получения данных из открытого API. К сожалению, API не покрывало получение всех необходимых данных, да и, выражаясь простым языком, было во многом кривовато, а техническая поддержка не захотела или не смогла пойти навстречу для предоставления более исчерпывающего функционала. Зато данная система предоставляла возможность периодического получения недостающих данных на почту в виде ссылки для выгрузки архива.


    Нужно отметить, что это был не единственный кейс, по которому бизнес хотел собирать данные из почтовых писем или мессенджеров. Однако, в данном случае мы не могли повлиять на стороннюю компанию, которая предоставляет часть данных только таким способом.


    Apache Airflow


    Для построения ETL процессов мы чаще всего используем Apache Airflow. Для того чтобы читатель, незнакомый с этой технологией, лучше понял, как это выглядит в контексте и в целом, опишу пару вводных.


    Apache Airflow — свободная платформа, которая используется для построения, выполнения и мониторинга ETL (Extract-Transform-Loading) процессов на языке Python. Основным понятием в Airflow является ориентированный ацикличный граф, где вершины графа — конкретные процессы, а ребра графа — поток управления или информации. Процесс может просто вызывать любую Python функцию, а может иметь более сложную логику из последовательного вызова нескольких функций в контексте класса. Для наиболее частых операций уже есть множество готовых наработок, которые можно использовать в качестве процессов. К таким наработкам относятся:


    • операторы — для перегона данных из одного места в другое, например из таблицы БД в хранилище данных;
    • сенсоры — для ожидания наступления определенного события и направления потока управления в последующие вершины графа;
    • хуки — для более низкоуровневых операций, например, для получения данных из таблицы БД (используются в операторах);
    • и т.д.

    Описывать Apache Airflow подробно в этой статье будет нецелесообразно. Краткие введения можно посмотреть здесь или здесь.


    Хук для получения данных


    В первую очередь, для решения задачи нужно написать хук, с помощью которого мы могли бы:


    • подключаться к электронной почте;
    • находить нужное письмо;
    • получать данные из письма.

    from airflow.hooks.base_hook import BaseHook
    import imaplib
    import logging
    
    class IMAPHook(BaseHook):
        def __init__(self, imap_conn_id):
            """
               IMAP hook для получения данных с электронной почты
    
               :param imap_conn_id:       Идентификатор подключения к почте
               :type imap_conn_id:        string
            """
            self.connection = self.get_connection(imap_conn_id)
            self.mail = None
    
        def authenticate(self):
            """ 
                Подключаемся к почте
            """
            mail = imaplib.IMAP4_SSL(self.connection.host)
            response, detail = mail.login(user=self.connection.login, password=self.connection.password)
            if response != "OK":
                raise AirflowException("Sign in failed")
            else:
                self.mail = mail
    
        def get_last_mail(self, check_seen=True, box="INBOX", condition="(UNSEEN)"):
            """
                Метод для получения идентификатора последнего письма, 
                удовлетвораяющего условиям поиска
    
                :param check_seen:      Отмечать последнее письмо как прочитанное
                :type check_seen:       bool
                :param box:             Наименования ящика
                :type box:              string
                :param condition:       Условия поиска писем
                :type condition:        string
            """
            self.authenticate()
            self.mail.select(mailbox=box)
            response, data = self.mail.search(None, condition)
            mail_ids = data[0].split()
            logging.info("В ящике найдены следующие письма: " + str(mail_ids))
    
            if not mail_ids:
                logging.info("Не найдено новых писем")
                return None
    
            mail_id = mail_ids[0]
    
            # если таких писем несколько
            if len(mail_ids) > 1:
                # отмечаем остальные прочитанными
                for id in mail_ids:
                    self.mail.store(id, "+FLAGS", "\\Seen")
    
                # возвращаем последнее
                mail_id = mail_ids[-1]
    
            # нужно ли отметить последнее прочитанным
            if not check_seen:
                self.mail.store(mail_id, "-FLAGS", "\\Seen")
    
            return mail_id

    Логика такая: подключаемся, находим последнее самое актуальное письмо, если есть другие — игнорируем их. Используется именно такая функция, потому что более поздние письма содержат все данные ранних. Если это не так, то можно возвращать массив всех писем или обрабатывать первое, а остальные — при следующем проходе. В общем, все как всегда зависит от задачи.


    Добавляем к хуку две вспомогательные функции: для скачивания файла и для скачивания файла по ссылке из письма. К слову, их можно вынести в оператор, это зависит от частоты использования данного функционала. Что еще дописывать в хук, опять же, зависит от задачи: если в письме приходят сразу файлы, то можно скачивать приложения к письму, если данные приходят в письме, то нужно парсить письмо и т.д. В моем случае, письмо приходит с одной ссылкой на архив, который мне нужно положить в определенное место и запустить дальнейший процесс обработки.


        def download_from_url(self, url, path, chunk_size=128):
            """
                Метод для скачивания файла
    
                :param url:              Адрес загрузки
                :type url:               string
                :param path:             Куда положить файл
                :type path:              string
                :param chunk_size:       По сколько байтов писать
                :type chunk_size:        int
            """
            r = requests.get(url, stream=True)
            with open(path, "wb") as fd:
                for chunk in r.iter_content(chunk_size=chunk_size):
                    fd.write(chunk)
    
        def download_mail_href_attachment(self, mail_id, path):
            """
                Метод для скачивания файла по ссылке из письма
    
                :param mail_id:         Идентификатор письма
                :type mail_id:          string
                :param path:            Куда положить файл
                :type path:             string
            """
            response, data = self.mail.fetch(mail_id, "(RFC822)")
            raw_email = data[0][1]
            raw_soup = raw_email.decode().replace("\r", "").replace("\n", "")
            parse_soup = BeautifulSoup(raw_soup, "html.parser")
            link_text = ""
    
            for a in parse_soup.find_all("a", href=True, text=True):
                link_text = a["href"]
    
            self.download_from_url(link_text, path)

    Код простой, поэтому вряд ли нуждается в дополнительных пояснениях. Расскажу лишь про магическую строчку imap_conn_id. Apache Airflow хранит параметры подключений (логин, пароль, адрес и другие параметры), к которым можно обращаться по строковому идентификатору. Визуально управление подключениями выглядит вот так



    Сенсор для ожидания данных


    Поскольку мы уже умеем подключаться и получать данные из почты, теперь можем написать сенсор для их ожидания. Написать сразу оператор, который будет обрабатывать данные, если они имеются, в моем случае не получилось, так как на основании полученных данных из почты работают и другие процессы, в том числе, берущие связанные данные из других источников (API, телефония, веб метрики и т.д.). Приведу пример. В CRM системе появился новый пользователь, и мы еще не знаем про его UUID. Тогда при попытке получить данные с SIP-телефонии мы получим звонки, привязанные к его UUID, но корректно сохранить и использовать их не сможем. В таких вопросах важно иметь в виду зависимость данных, особенно, если они из разных источников. Это, конечно, недостаточные меры сохранения целостности данных, но в некоторых случаях необходимые. Да и в холостую занимать ресурсы тоже нерационально.


    Таким образом, наш сенсор будет запускать последующие вершины графа, если есть свежая информация на почте, а также помечать неактуальной предыдущую информацию.


    from airflow.sensors.base_sensor_operator import BaseSensorOperator
    from airflow.utils.decorators import apply_defaults
    from my_plugin.hooks.imap_hook import IMAPHook
    
    class MailSensor(BaseSensorOperator):
        @apply_defaults
        def __init__(self, conn_id, check_seen=True, box="Inbox", condition="(UNSEEN)", *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.conn_id = conn_id
            self.check_seen = check_seen
            self.box = box
            self.condition = condition
    
        def poke(self, context):
            conn = IMAPHook(self.conn_id)
            mail_id = conn.get_last_mail(check_seen=self.check_seen, box=self.box, condition=self.condition)
    
            if mail_id is None:
                return False
            else:
                return True

    Получаем и используем данные


    Для получения и обработки данных можно написать отдельный оператор, можно использовать готовые. Поскольку пока логика тривиальная — забрать данные из письма, то для примера предлагаю стандартный PythonOperator


    from airflow.models import DAG
    
    from airflow.operators.python_operator import PythonOperator
    from airflow.sensors.my_plugin import MailSensor
    from my_plugin.hooks.imap_hook import IMAPHook
    
    start_date = datetime(2020, 4, 4)
    
    # Стандартное конфигурирование графа
    args = {
        "owner": "example",
        "start_date": start_date,
        "email": ["home@home.ru"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retry_delay": timedelta(minutes=15),
        "provide_context": False,
    }
    
    dag = DAG(
        dag_id="test_etl",
        default_args=args,
        schedule_interval="@hourly",
    )
    
    # Определяем сенсор
    mail_check_sensor = MailSensor(
        task_id="check_new_emails",
        poke_interval=10,
        conn_id="mail_conn_id",
        timeout=10,
        soft_fail=True,
        box="my_box",
        dag=dag,
        mode="poke",
    )
    
    # Функция для получения данных из письма
    def prepare_mail():
        imap_hook = IMAPHook("mail_conn_id")
        mail_id = imap_hook.get_last_mail(check_seen=True, box="my_box")
        if mail_id is None:
            raise AirflowException("Empty mailbox")
    
        conn.download_mail_href_attachment(mail_id, "./path.zip")
    
    prepare_mail_data = PythonOperator(task_id="prepare_mail_data", default_args=args, dag=dag, python_callable= prepare_mail)
    
    # Описание остальных вершин графа
    ...
    
    # Задаем связь на графе
    mail_check_sensor >> prepare_mail_data
    prepare_data >> ...
    # Описание остальных потоков управления

    Кстати, если ваша корпоративная почта тоже на mail.ru, то вам будет недоступен поиск писем по теме, отправителю и т.д. Они еще в далеком 2016 обещали ввести, но, видимо, передумали. Я решил эту проблему, создав под нужные письма отдельную папку и настроив в веб-интерфейсе почты фильтр на нужные письма. Таким образом, в эту папку попадают только нужные письма и условия для поиска в моем случае просто (UNSEEN).


    Резюмируя, мы имеем следующую последовательность: проверяем, есть ли новые письма, соответствующие условиям, если есть, то скачиваем архив по ссылке из последнего письма.
    Под последними многоточиями опущено, что этот архив будет распакован, данные из архива очищены и обработаны, и в итоге все это дело уйдет далее на конвейер ETL процесса, но это уже выходит за рамки темы статьи. Если получилось интересно и полезно, то с радостью продолжу описывать ETL решения и их части для Apache Airflow.

    Комментарии 3

      0
      Спасибо за статью, забрал в закладки.
        0
        Слишком сложно.
        Fetchmail+procmail, а дальше уже ссылку передаем для нужного сервиса (на загрузку?).
          0
          Безусловно, есть очень много утилит, программ и скриптов, решающих данную проблему. В то же время, не стоит забывать, что ключевым контекстом в статье является описываемый ETL процесс и данные операции рассматриваются как небольшая часть одного большого непрерывного процесса, где одни действия и данные зависят от других.

          С технической стороны, если используется конкретный фреймворк (в нашем случае Apache Airflow), то куда логичнее использовать его стандартные средства и архитектуру (сенсоры, операторы).

          Вероятно, при необходимости можно построить ETL процесс в Airflow с использованием Fetchmail+procmail оптимальнее (зависит от критерия) представленного. Но будет ли он проще, чем использование стандартной библиотеки вопрос открытый. С удовольствием посмотрел бы на такую реализацию.

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое