Содержание:
Вступление
Для задач аналитики Zeppelin - это чуть ли не находка. Он может в одной книжке исполнять код на любом языке (был бы интерпретатор), выводить его в виде красивых табличек, графиков и в любом другом виде, который удобен. И на Хабре есть много статей, посвященных плюсом данного решения для задач аналитики.
В результате в аналитический отдел данный инструмент проник достаточно глубоко: на нем писались аналитические отчеты для разных отделов, расчетные таблицы, которые затем отсылались в дашборды, также данными книжками могли пользоваться все, кто знал какие-либо языки программирования.
И мы попали в зависимость от него. Панацея, которая должна была стать легким решением для (почти) любой задачи превратилась в наше проклятие. Из-за проблем с данной системой на нас приходило по 5-7 тикеров в неделю, а также потерей доверия к нашим сервисам.
Надеюсь, что данная статья будет интересна аналитикам и инженерам, которые работают с различными отчетами и предоставляют их
Почему Zeppelin - не панацея аналитического отдела
В связи с этим, когда впервые встала задача рассылать сообщения пользователям с какой-либо информацией (об офертах, размещениях, сделках и других) казалось, что Zeppelin может справиться и с этой задачей, т. к. внутри он может запускать книжки по расписанию. И действительно, это работало. Рассылки шли, пользователи были довольны. Однако очень скоро стало ясно, что данный подход имеет ряд недостатков:
Если книжка не могла отправить необходимое сообщение, то узнать об этом можно было только после того, как данное сообщение не было отправлено (использовать try ... except выражения не воспрещалось, однако это требовало достаточно много времени для разработки разного рода fallback-ов);
Если книжка отваливалась из-за ошибок интерпретатора, то рассылка также не приходила (тут уже и fallback не поможет, код просто не запускался);
Очень тормознутое и медленное Web UI (к сожалению, здесь трудно что-то добавить, пользоваться им на больших книжках становиться практически невозможно);
Необходимость делать commit при каждом изменении. Т. к. вся информация шла с корпоративного GitLab, то при перезагрузке сервиса он подкачивал все данные оттуда. Если работа прерывалась в процессе редактирования кода (что случалось не раз) или кто-то успешно забывал нажать кнопку коммита, то изменения просто не сохранялись. Приходилось всегда следить за историей коммитов (т. к. иногда Web UI мог не отправить коммит из-за связанности Zeppelin и GitLab и даже не сообщить об этом);
Следующий минус вытекает напрямую из предыдущего: репа Zeppelin раздувается из-за того, что там хранится множество данных помимо кода. Прочитать код где-то кроме самого Zeppelin почти невозможно;
Сложное управление паролями во вкладке Credentials (во всяком случае, так кажется), хранение их и переиспользование не является интуитивно понятным. Для управления паролями мы использовали такой код:
creds = z.getInterpreterContext().getAuthenticationInfo().getUserCredentials().getUsernamePassword("clickhouse")
creds_arr = str(creds).split(', ')
username = creds_arr[0].split('=')[-1].replace("'", '')
password = creds_arr[-1].split('=')[-1].replace("'", '').replace('}', '')
ch_user = username
ch_pass = password
Этот код хорошо работает, не позволяет вытащить пароли простым print из переменной книжки, тем не менее выглядит громозко и неприятно.
Кстати, на генерацию этого кода у меня ушел целый рабочий день. Конечно, может и я кривой, но я искал решение на различных сервисах, пытаясь реверс-инженирить сам класс z, который встроен в Zeppelin. В итоге сделал солянку из решения на StackOverflow и своих ухищрений
Особенной проблемой стала проблема 2: на DevOps-а стало ложиться множество задач по поддер��ке этого инструмента. В связи с множеством обращений от пользователей о некорректности расчетов и возросшем количестве обращений к DevOps было принято решение, что с нас хватит и мы решили перейти на другие сервисы.
У нас было 2 варианта сценариев, с которыми мы работали в рамках книжек в Zeppelin:
Формы, используемые пользователем (на вход принимаются параметры и выдаются результаты в виде графиков/таблиц);
Расчеты для дашбордов и рассылки (стоят на расписании, параметры статичны и задаются заранее).
Таким образом появился следующий роудмап перехода с данного решения:
Все существующие формы переехали в портал нашего отдела на Streamlit:

Все существующие рассылки перешли на Airflow:

В данной статье хотелось бы рассказать про второй пункт, а именно:
Про то, как был реализован переход;
Как организован отказоустойчивая и гарантированная доставка сообщений пользователям;
Как организована работа с базой данных и исполнение запросов в каждом DAG.
Особенности реализации
Особенности перехода
Типичная книжка Zeppelin для рассылок в нашем случае - это последовательность исполняемых SQL запросов, которые в конце заканчивались блоком на Python, который осуществлял отправку данной информации пользователям. Для легкости перехода на Airflow мы решили, что оптимально будет сохранить данную структуру. Типичная папка с DAG-ом рассылки у нас выглядит следующим образом:

Вся логика DAG-а помещается в основном файле, который соответствует названию самого DAG-а (в данном случае это ren_cash_position.py). Там описывается DAG, который описан выше.
В каждом DAG-е есть последовательность SQL скриптов, которые запускаются для выгрузки данных. Примерно так выглядит одна таска DAG-а для получения данных:
@task(task_id='get_last_record',
on_failure_callback=CommonNotifier('Task failed'))
def get_last_record():
query = load_clickhouse_query(sync_query, DAG_FOLDER_PATH)
return pd.DataFrame(execute_clickhouse_query_raw(
query=query,
clickhouse_conn_id=CLICKHOUSE_CONN_ID,
)['results'])
Разбор того, что находится в
on_failure_callbackбудет произведен в следующей главе
Сама последовательность формируется в DAG-е, а выполняется при помощи скрипта, который общий для всех остальных DAG-ов. Это позволяет переиспользовать код и не плодить одинаковые функции для обращения к данным скриптам. Также это помогает при переносе с тестового окружения. Достаточно лишь выполнить:
git checkout $"путь_до_DAG-а"И сразу перенести с окружения на окружение без потерь. Различаются только коннекторы к базам данных и расписание самих DAG-ов.
Сам mailing также является общей для всех DAG-ов функцией, который совершает отправку сообщений до пользователей через библиотеку SMTP. Пример данной функции:
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import pandas as pd
import logging
def send_dataframe_email(dataframe, recipient_list, subject, from_address=ME_ADDRESS):
"""
Универсальная функция для отправки DataFrame по email
Args:
dataframe (pd.DataFrame): DataFrame для отправки
recipient_list (list): список email адресов для отправки
subject (str): тема письма
from_address (str): адрес отправителя
"""
logger.info('Start building message')
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = from_address
text = "Для более детальной информации по денежным потокам см. дэшборд: [ссылка](ссылка)"
html = dataframe.to_html()
part1 = MIMEText(text, 'plain')
part2 = MIMEText(html, 'html')
msg.attach(part1)
msg.attach(part2)
logger.info('Start sending emails')
s = smtplib.SMTP(Variable.get('SMTP_server'))
for recipient in recipient_list:
msg['To'] = recipient
s.sendmail(from_address, recipient, msg.as_string())
s.quit()
logger.info('Email sent successfully')
Таким образом переход рассылок не стал вызовом для нас. Весь код был обернут в то, что знакомо с Zeppelin, изменилась только среда, которая его вызывает. Вся логика инкапсулирована в отдельных файлах и является заменяемой для конкретного DAG-а без изменения его общей структуры.
Использование fallback
Конечно, несмотря на то, что DAG-и выполняются на двух окружениях хотелось бы получать информацию о том, выполнились ли они на самом деле. Для этого мы используем встроенный в Airflow механизм fallback, который позволяет совершить некоторые действия в случае, если DAG упал. В настоящее время fallback выглядит следующим образом:
from airflow.notifications.basenotifier import BaseNotifier
from airflow.models import Variable
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class CommonNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message):
self.message = message
self.smpt_server = Variable.get('SMTP_server')
self.me_address = Variable.get('Me')
self.recipient_list = ['электронная_почта']
def send_message(self, title, message):
msg = MIMEMultipart('alternative')
msg['Subject'] = title
msg['From'] = self.me_address
text = message
part1 = MIMEText(text, 'plain')
msg.attach(part1)
s = smtplib.SMTP(self.smpt_server)
for recipient in self.recipient_list:
msg['To'] = recipient
s.sendmail(self.me_address, recipient, msg.as_string())
s.quit()
def notify(self, context):
# Send notification here, below is an example
title = f"Task {context['task_instance'].task_id} failed"
self.send_message(title, self.message)
Таким образом при провале DAG-а мы знаем об этом с помощью сообщения, которое направляется на электронные почты всех, кто перечислен в self.recipient_list. Это позволяет узнать о провале запуска DAG-а для того, чтобы предпринимать меры. Сообщение на почте выглядит следующим образом:

В этот момент мы залили это все на прод, обрадовались тому, что рассылки приходят, попили чай и пошли домой. Радовались мы где-то неделю, пока не получили fallback сообщение и лежащий airflow:

Ошибка не зависела от нас, это понятно. Но из-за того, что airflow вместе с sheduler-ом успешно лег рассылка не пришла, а потребитель отчета стучится в почту и говорит, что отправить его нужно было час назад. Неприятно.
Пришлось думать дальше...
Обеспечение отказоустойчивости
Одной из проблем Zeppelin в качестве реализации для рассылок было то, что если книжка не запускалась по независящим от нас обстоятельствам (проблема с интерпретатором, проблема с cron под капотом и прочие проблемы), то рассылка просто не отправлялась. Т. к. Airflow тоже может "лечь" (о чем мы уже знаем) хотелось избавится от этой проблемы на корню.
Тут на помощь пришел тот факт, что инстанта Airlow у нас 2: один продовый Airflow, который заполняет продовую базу данных, а также тестовый, который используется для заполнения тестовой базы данных. Было принято решение использовать эту архитектуру для того, чтобы обезопасить себя от подобных проблем.
А настоящий момент любой DAG с рассылками выглядит примерно следующим образом (красным выделены общие таски для всех подобных DAG-ов):

Также для синхронизации используется таблица в ClickHouse со следующими столбцами:

Пройдемся по основным таскам и опишем их смысл:
get_last_record - данная таска выбирает все данные из таблицы airflow_sync с соответствующим DAG-у именем с флагом OK. Полученные данные передаются в следующую таску;
check_airflow_status - данная таска анализирует данные, которые были получены от get_last_record. У нее есть информация относительно того, какое расписание стоит на DAG-е (т. к. это есть в глобальной переменной для Airflow) и она может сказать, когда в последний раз запускался DAG (благодаря данным от предыдущей таски). Таким образом:
Если скрипт уже запускался в пределах shedule этого DAG-а, то мы можем гарантировано сказать, что DAG был запущен и расчет произошел. Тогда переходим к таске skip_processing;
Если скрипт не запускался в пределах shedule этого DAG-а, то мы можем гарантировано сказать, что DAG не был запущен. В таком случае начинается основное тело DAG-а.
skip_processing - эта таска является заглушкой для того, чтобы завершить выполнение DAG-а успешно;
final_logging - на данном этапе мы записываем в нашу таблицу airflow_sync информацию о том, что DAG успешно завершен, записывая имя DAG-а, время выполнения и статус выполнения (успешно).
Данная схема позволила нам синхронизировать 2 наших Airflow для работы в связке. Таким образом получилось, что:
Если DAG выполнился на продовом окружении (что является стандартным сценарием), то тогда тестовый Airflow читает информацию об этом запуске и видит, что запуск производился и ложится по skip_processing
Если по каким-либо причинам DAG не выполнился, то он выполнится на тестовом окружении.
Использование данного алгоритма позволило решить 90% проблем с рассылками и доступностью их для пользователей.
Итоги
Таким образом, у нас получилось построить систему, которая соответствует нашим задачам:
Сообщение рассылается лишь один раз и при недоступности одного из узлов не теряется, а рассылается на другом узле;
Информация об ошибках сразу поступает разработчикам, что позволяет быстрее приняться за работу над ее устранением;
Переход был осуществлен достаточно быстро за счет того, что структура проекта почти не изменилась. Если объективно оценивать наше новое решение, то в нем есть как свои плюсы, так и минусы:
Что получили от реализации | Что потеряли от реализации |
|---|---|
Предсказуемость в доставке необходимых отчетов до конечного пользователя | Сложную архитектуру, на стыке аналитики и ETL процессов |
Разделение логики отправки отчета и его формирования | Потерю "простоты" Zeppelin, который позволял запустить отчет одним cron-ом |
Отказоустойчивость даже при падении одного из узлов | Не каноничное использование тестового Airflow |
Конечно, данное решение не идеально, и на то есть много причин:
Все завязано на том, что ClickHouse жив и он ответит на запросы. Гарантировать этого, увы, нельзя;
Вставка данных в ClickHouse об отработке Airflow может быть излишней. В конечном итоге, для этого сгодился бы и S3, да и ответил он бы, наверное, быстрее;
При росте количества DAG-ов таблица синхронизации может стать узким местом, требующим дополнительной оптимизации;
Ну и, конечно, тестовый Airflow теперь не такой уж и тестовый: теперь он обрабатывает также и продовые скрипты (хоть и замаскированные под тест). Это усложняет логику и, возможно, не до конца правильно архитектурно
У нас не самое большое количество рассылок: в настоящий момент их всего 6 штук, при этом отсылаются они раз в день. В результате внедрения данного решения у нас получилось исключить падения рассылок почти до нуля (всегда есть вероятность, что данные не дойдут и она придет неполной, но это другая история).
Ну и конечно, DevOps нам очень благодарен (во всяком случае с этим вопросом мы к нему теперь не бегаем).
Однако рассылки - это не все, что жило у нас в Zeppelin. Также там жили и формы, о которых я рассказывал выше. Как мы переносили их я расскажу уже в следующей части (вдруг кто-то все еще только присматривается к Streamlit и моя статья поможет сделать выбор). Как и в этом переносе мы постарались сделать минимальное количество нового кода)
Надеюсь, данная статья будет кому-то полезна, спасибо за прочтение!
