Как стать автором
Обновить
105

Освобождаем руки нескольким аналитикам: API Livy для автоматизации типовых банковских задач

Время на прочтение 5 мин
Количество просмотров 3K
Привет, Хабр!

Не секрет, что для оценки платежеспособности клиентов банки используют данные из различных источников (кредитное бюро, мобильные операторы и т.д.). Количество внешних партнёров может достигать нескольких десятков, а аналитиков в нашей команде наберётся лишь несколько человек. Возникает задача оптимизации работы небольшой команды и передачи рутинных задач вычислительным системам.

Как данные попадают в банк, и как команда аналитиков следит за этим процессом, разберём в данной статье.



Начнём по порядку.

Нашу распределённую систему на основе Hadoop, и все процессы, связанные с ней, мы коротко называем SmartData. SmartData получает данные по API от внешних агентов. (Причём агентами для неё являются как внешние партнёры, так и внутренние системы банка). Безусловно, было бы полезно собирать некий «актуальный профиль» по каждому клиенту, что мы и делаем. Обновлённые данные от источников попадают в Оперпрофиль. Оперпрофиль реализует идею Customer 360 и хранится в виде таблиц Hbase. Это удобно для дальнейшей работы с клиентом.

Customer 360
Customer 360 — подход реализации операционного хранилища с всевозможными атрибутами клиентских данных, используемых во всех процессах в организации, которые работают с клиентом и его данными, доступных по ключу клиента.

Работа с агентами осуществляется непрерывно, и её нужно контролировать. Для быстрой проверки качества взаимодействия и hit rate, а также простоты передачи этой информации другим командам, мы используем визуализацию, например, отчёты в Tableau.

Исходные данные поступают в Kafka, проходят предварительную обработку и помещаются в DataLake, построенный на основе HDFS. Потребовалось придумать решение, как организовать парсинг файлов с логами из HDFS, их обработку и ежедневную выгрузку в аналитические системы и системы визуализации. А ещё совместить это с любовью аналитиков к Python ноутбукам.

Закончим с внутренней кухней и перейдём к практике.

Нашим решением стало использование API Livy. Livy позволяет сабмитить код на кластер прямо из Jupyter. HTTP запрос, содержащий код, написанный на Python (или Scala), и мета-данные, отправляется в Livy. Livy инициирует запуск Spark сессии на кластере, которая управляется менеджером ресурсов Yarn. Для отправки HTTP запросов подойдёт модуль requests. Любители парсить сайты наверняка с ним уже знакомы (а если нет – вот шанс немного узнать про него).

Импортируем необходимые модули и создадим сессию. (Также сразу узнаем адрес нашей сессии, в будущем это пригодится). В параметры передаем данные для авторизации пользователя и название языка скрипта, который будет исполнять кластер.

import json, requests, schedule, time
 
host = 'http://***:8998'
data = {'kind': 'spark', 'proxyUser': 'user'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
session_id = r.json().get('id')
print("session_id: " + str(session_id))
session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)

Ждём, когда статус сессии перейдёт в idle. В случае, если время ожидания превысит установленный timeout – отправляем сообщение об ошибке.

timeout = time.time() + wait_time
sess_state = ['starting', 'success', 'idle']
 
while(True):
    time.sleep(7)
    req_st = requests.get(session_url, headers=headers).json().get('state')
    if req_st != 'idle' and time.time() > timeout:
        requests.delete(session_url, headers=headers)
        send_message("Scheduler_error", req_st)
        break
    if req_st == 'idle':
        break
    if req_st not in sess_state:
        send_message("Scheduler_error", req_st)
        break
print("Session_state: ", req_st) 

Теперь можно отправлять код в Livy.

statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
 
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
    time.sleep(15)
r = requests.get(statement_url, headers=headers).json()['output']
session_url = 'http://***:8998/sessions/' + str(session_id)

В цикле ждём окончания исполнения кода, получаем результат обработки:

r.get('data').get('text/plain')

Метод delete удалит сессию.

requests.delete(session_url, headers=headers) 

Для ежедневной выгрузки можно использовать несколько вариантов, про cron на хабре уже писали, а вот про user-friendly модуль schedule – нет. Просто добавлю его в код, объяснений он не потребует. И, для удобства, все выкладки соберу в одном месте.

Код
import json, requests, schedule, time
 
schedule.every().day.at("16:05").do(job, 300)
while True:
    schedule.run_pending()
 
def job(wait_time):
    host = 'http://***:8998'
    data = {'kind': 'spark', 'proxyUser': 'user'}
    headers = {'Content-Type': 'application/json'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
 
    session_id = r.json().get('id')
    print("session_id: " + str(session_id))
    session_url = host + r.headers['location']
    r = requests.get(session_url, headers=headers)
 
    timeout = time.time() + wait_time
    sess_state = ['starting', 'success', 'idle']
    while(True):
        time.sleep(7)
        req_st = requests.get(session_url, headers=headers).json().get('state')
        if req_st != 'idle' and time.time() > timeout:
            requests.delete(session_url, headers=headers)
            break
        if req_st == 'idle':
            break
        if req_st not in sess_state:
            send_message("Scheduler_error", req_st)
            break
    print("Session_state: ", req_st)   
    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
    r = requests.post(statements_url, data=json.dumps(data),headers=headers)
    statement_url = host + r.headers['location']
    r = requests.get(statement_url, headers=headers)
    while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
        time.sleep(15)
 
    r = requests.get(statement_url, headers=headers).json()['output']
    session_url = 'http://***:8998/sessions/' + str(session_id)
    print(r.get('data').get('text/plain'))
    #requests.delete(session_url, headers=headers)
 
def send_message(subject, text):
    import smtplib
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    me = "my_email_adress"
    you = "email_adress"
    msg = MIMEMultipart('alternative')
    msg['Subject'] = subject
    msg['From'] = me
    msg['To'] = you
    text = text
    part1 = MIMEText(text, 'plain')
    msg.attach(part1)
    s = smtplib.SMTP('domain.org')
    s.ehlo()
    s.starttls()
 
    s.login("user", "password")
    s.sendmail(me, you, msg.as_string())
    s.quit()


Заключение:


Быть может, это решение не претендует на лучшее, но оно прозрачно для команды аналитиков. Плюсы, которые в нём вижу я:

  • возможность использовать для автоматизации привычный Jupyter
  • наглядное взаимодействие
  • участник команды в праве сам выбрать, каким образом он будет работать с файлами (spark-зоопарк), как следствие, нет необходимости переписывать существующие скрипты

Конечно, при запуске большого количества заданий, придётся следить за освобождающимися ресурсами, настраивать коммуникацию между выгрузками. Эти вопросы решаются в индивидуальном порядке и согласовываются с коллегами.

Будет замечательно, если хотя бы одна команда возьмет это решение на заметку.

Ссылки


Документация Livy
Теги:
Хабы:
+8
Комментарии 0
Комментарии Комментировать

Публикации

Информация

Сайт
home.bank
Дата регистрации
Численность
свыше 10 000 человек
Местоположение
Россия