company_banner

Освобождаем руки нескольким аналитикам: API Livy для автоматизации типовых банковских задач

    Привет, Хабр!

    Не секрет, что для оценки платежеспособности клиентов банки используют данные из различных источников (кредитное бюро, мобильные операторы и т.д.). Количество внешних партнёров может достигать нескольких десятков, а аналитиков в нашей команде наберётся лишь несколько человек. Возникает задача оптимизации работы небольшой команды и передачи рутинных задач вычислительным системам.

    Как данные попадают в банк, и как команда аналитиков следит за этим процессом, разберём в данной статье.



    Начнём по порядку.

    Нашу распределённую систему на основе Hadoop, и все процессы, связанные с ней, мы коротко называем SmartData. SmartData получает данные по API от внешних агентов. (Причём агентами для неё являются как внешние партнёры, так и внутренние системы банка). Безусловно, было бы полезно собирать некий «актуальный профиль» по каждому клиенту, что мы и делаем. Обновлённые данные от источников попадают в Оперпрофиль. Оперпрофиль реализует идею Customer 360 и хранится в виде таблиц Hbase. Это удобно для дальнейшей работы с клиентом.

    Customer 360
    Customer 360 — подход реализации операционного хранилища с всевозможными атрибутами клиентских данных, используемых во всех процессах в организации, которые работают с клиентом и его данными, доступных по ключу клиента.

    Работа с агентами осуществляется непрерывно, и её нужно контролировать. Для быстрой проверки качества взаимодействия и hit rate, а также простоты передачи этой информации другим командам, мы используем визуализацию, например, отчёты в Tableau.

    Исходные данные поступают в Kafka, проходят предварительную обработку и помещаются в DataLake, построенный на основе HDFS. Потребовалось придумать решение, как организовать парсинг файлов с логами из HDFS, их обработку и ежедневную выгрузку в аналитические системы и системы визуализации. А ещё совместить это с любовью аналитиков к Python ноутбукам.

    Закончим с внутренней кухней и перейдём к практике.

    Нашим решением стало использование API Livy. Livy позволяет сабмитить код на кластер прямо из Jupyter. HTTP запрос, содержащий код, написанный на Python (или Scala), и мета-данные, отправляется в Livy. Livy инициирует запуск Spark сессии на кластере, которая управляется менеджером ресурсов Yarn. Для отправки HTTP запросов подойдёт модуль requests. Любители парсить сайты наверняка с ним уже знакомы (а если нет – вот шанс немного узнать про него).

    Импортируем необходимые модули и создадим сессию. (Также сразу узнаем адрес нашей сессии, в будущем это пригодится). В параметры передаем данные для авторизации пользователя и название языка скрипта, который будет исполнять кластер.

    import json, requests, schedule, time
     
    host = 'http://***:8998'
    data = {'kind': 'spark', 'proxyUser': 'user'}
    headers = {'Content-Type': 'application/json'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    session_id = r.json().get('id')
    print("session_id: " + str(session_id))
    session_url = host + r.headers['location']
    r = requests.get(session_url, headers=headers)
    

    Ждём, когда статус сессии перейдёт в idle. В случае, если время ожидания превысит установленный timeout – отправляем сообщение об ошибке.

    timeout = time.time() + wait_time
    sess_state = ['starting', 'success', 'idle']
     
    while(True):
        time.sleep(7)
        req_st = requests.get(session_url, headers=headers).json().get('state')
        if req_st != 'idle' and time.time() > timeout:
            requests.delete(session_url, headers=headers)
            send_message("Scheduler_error", req_st)
            break
        if req_st == 'idle':
            break
        if req_st not in sess_state:
            send_message("Scheduler_error", req_st)
            break
    print("Session_state: ", req_st) 
    

    Теперь можно отправлять код в Livy.

    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
     
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    r = requests.get(statement_url, headers=headers)
    while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
        time.sleep(15)
    r = requests.get(statement_url, headers=headers).json()['output']
    session_url = 'http://***:8998/sessions/' + str(session_id)
    

    В цикле ждём окончания исполнения кода, получаем результат обработки:

    r.get('data').get('text/plain')

    Метод delete удалит сессию.

    requests.delete(session_url, headers=headers) 

    Для ежедневной выгрузки можно использовать несколько вариантов, про cron на хабре уже писали, а вот про user-friendly модуль schedule – нет. Просто добавлю его в код, объяснений он не потребует. И, для удобства, все выкладки соберу в одном месте.

    Код
    import json, requests, schedule, time
     
    schedule.every().day.at("16:05").do(job, 300)
    while True:
        schedule.run_pending()
     
    def job(wait_time):
        host = 'http://***:8998'
        data = {'kind': 'spark', 'proxyUser': 'user'}
        headers = {'Content-Type': 'application/json'}
        r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
     
        session_id = r.json().get('id')
        print("session_id: " + str(session_id))
        session_url = host + r.headers['location']
        r = requests.get(session_url, headers=headers)
     
        timeout = time.time() + wait_time
        sess_state = ['starting', 'success', 'idle']
        while(True):
            time.sleep(7)
            req_st = requests.get(session_url, headers=headers).json().get('state')
            if req_st != 'idle' and time.time() > timeout:
                requests.delete(session_url, headers=headers)
                break
            if req_st == 'idle':
                break
            if req_st not in sess_state:
                send_message("Scheduler_error", req_st)
                break
        print("Session_state: ", req_st)   
        statements_url = session_url + '/statements'
        data = {'code': '1 + 1'}
        r = requests.post(statements_url, data=json.dumps(data),headers=headers)
        statement_url = host + r.headers['location']
        r = requests.get(statement_url, headers=headers)
        while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
            time.sleep(15)
     
        r = requests.get(statement_url, headers=headers).json()['output']
        session_url = 'http://***:8998/sessions/' + str(session_id)
        print(r.get('data').get('text/plain'))
        #requests.delete(session_url, headers=headers)
     
    def send_message(subject, text):
        import smtplib
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        me = "my_email_adress"
        you = "email_adress"
        msg = MIMEMultipart('alternative')
        msg['Subject'] = subject
        msg['From'] = me
        msg['To'] = you
        text = text
        part1 = MIMEText(text, 'plain')
        msg.attach(part1)
        s = smtplib.SMTP('domain.org')
        s.ehlo()
        s.starttls()
     
        s.login("user", "password")
        s.sendmail(me, you, msg.as_string())
        s.quit()
    


    Заключение:


    Быть может, это решение не претендует на лучшее, но оно прозрачно для команды аналитиков. Плюсы, которые в нём вижу я:

    • возможность использовать для автоматизации привычный Jupyter
    • наглядное взаимодействие
    • участник команды в праве сам выбрать, каким образом он будет работать с файлами (spark-зоопарк), как следствие, нет необходимости переписывать существующие скрипты

    Конечно, при запуске большого количества заданий, придётся следить за освобождающимися ресурсами, настраивать коммуникацию между выгрузками. Эти вопросы решаются в индивидуальном порядке и согласовываются с коллегами.

    Будет замечательно, если хотя бы одна команда возьмет это решение на заметку.

    Ссылки


    Документация Livy
    ООО «Хоум Кредит Энд Финанс Банк»
    43,12
    Компания
    Поделиться публикацией

    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое