Dust-n-Rust by Spiritofdarkness
Команда разработки Cloud Big Data от VK Cloud Solution перевела статью с советами, которые касаются общих понятий работы с пайплайнами. Неважно, какую систему управления рабочими процессами вы используете, эти идеи можно применять везде. Сам автор пользуется Apache Airflow и приводит примеры кода на ее основе.
Эта статья будет полезна не только дата-инженерам, но и дата-сайентистам, так как хороший дата-сайентист тоже понимает принципы работы пайплайнов данных.
Подберите подходящие триггеры для разных заданий
В традиционной работе с пайплайнами есть базовые задания ETL/ELT — для их запуска всегда нужны триггеры, которые бывают двух типов: на основе расписания или алгоритма.
Триггер на основе расписания. Планировщики есть в любой системе управления рабочими процессами. С их помощью можно запускать задания регулярно: раз в минуту, в час, в день или в неделю. Такие триггеры полезны для обязательных заданий — тех, которые нужно запустить независимо от условий. Обычно это задания, которые не сбоят ни при каких обстоятельствах.
Триггеры на основе алгоритмов. Обычно такие триггеры зависят от других задач и запускают задание при определенных условиях. Например:
- файл импортирован в папку;
- строка добавлена в таблицу;
- завершено несколько зависимых процессов.
Триггеры на основе алгоритмов полезны для заданий, выполнение которых зависит от соблюдения конкретных условий.
Тип триггера выбирают в зависимости от характера задания. Спросите себя:
- Нужно ли выполнять задание каждый день?
- Тратится ли на задание больше ресурсов, чем необходимо?
- Как часто нужно выполнять задание?
- Зависит ли успешное выполнение задания от нескольких условий?
При этом помните: мы живем не в идеальном мире, где все задания выполняются безупречно. Используйте возникающие ошибки и сбои для обучения и улучшения ваших пайплайнов.
Выберите подходящую систему оповещений
Пайплайны ломаются, но это еще не конец света. В момент поломки нам нужно получить уведомление об ошибке, и базовых оповещений для этого недостаточно. Понадобятся еще настраиваемые оповещения.
Электронная почта — базовый минимум. Письмо приходит на почту всем членам команды и информирует об ошибке при выполнении задания.
Это не самое эффективное решение, поскольку письмо легко пропустить — не все из нас регулярно проверяют почту.
Вот так выглядит базовая форма оповещения по электронной почте в Airflow:
from airflow.utils.email import send_email
email_to = 'receivers@email.com'
email_subject = '(FAILED) ' + jobname + date
email_content = 'Your job has failed.'
send_email(email_to, email_subject, email_content)
Slack. Если вы работаете в технологической компании, то наверняка используете для общения Slack. Можно создать отдельный канал для оповещений, в который ваши пайплайны будут отправлять сообщения об ошибках. Ваша команда сразу узнает, что что-то пошло не так.
Вот так выглядит простая форма оповещения в Slack на базе его API.
from slack import WebClient
client = WebClient(token = 'your token here')
response = client.chat_postMessage(
channel = slack_cannel,
text = message
)
Настраивайте логирование
В программировании «логировать» значит подробно регистрировать все, что вы делаете. В работе с пайплайнами есть два типа логов: задач и заданий.
Журналы задач. В них хранится вся важная информация, которая появляется в ходе работы над задачей. Конечно, переносить каждую строчку выполненного кода не хочется, но подробные записи очень важны. Вот пример журнала:
Import PostgresHook
#Extraction Job
def ExtractFromSource(query):
query_to_run = query
logging.info("Query : %" query_to_run)
cursor = PostgresHook(connection).get_conn().cursor()
logging.info("Connecting to Postgres Connection %" connection)
cursor.execute(query_to_run)
result = cursor.fetchall()
Логи выполняют роль контрольной точки для отладки. В случае сбоя в них можно найти источник ошибки.
Например, в приведенном выше задании по извлечению данных мы логируем запрос и подключение. Лог выполняет две функции:
- сообщает, что задача выполняется нормально вплоть до каждой точки;
- показывает передаваемую переменную запроса и соединения.
Если что-то пойдет не так, мы сможем точно определить, на каком этапе произошла ошибка. Плюс можно проверить, нет ли ошибки в переменной запроса или подключения.
Логи заданий — записи запущенных заданий. В них регистрируют время начала и завершения задания и его статус — это набор-минимум. Даже если в вашей системе управления рабочими процессами не настроено логирование, эта информация наверняка где-то у вас хранится, например в хранилище данных.
Вот пример лога заданий:
Чтобы журнал был полезнее, создайте таблицу аудита в хранилище данных. Добавляйте в эту таблицу запись на каждое задание с параметрами, которые мы привели выше в примере. Дата-сайентистам и аналитикам будет нетрудно пройтись по этой таблице и проверить, какое задание выполнили последним и было ли оно успешным.
Настройте проверку дубликатов
Если вы извлекаете или преобразуете данные, то наверняка сталкивались с дублями. У их появления несколько причин, в том числе:
- ошибки в запросе, например в join-операции;
- задание было запущено дважды;
- некорректные данные.
Чтобы исправить ситуацию, в пайплайн нужно добавить этап «проверки на дубли». Для этого можно проверить количество записей по первичным ключам.
Select max(count) from (
Select primary_key, count(*) from table group by primary_key)
Если значение больше «1», значит, есть дубликаты.
Если у ваших данных нет первичных ключей, их можно присвоить:
alter table TABLENAME add primary key (ID,NAME,PMID)
Этих четырех хитростей достаточно, чтобы усовершенствовать ваши пайплайны по обработке данных. А закончить позвольте цитатой Даниэля Тункеланга: «Наша работа как дата-сайентистов заключается в том, чтобы извлечь сигнал из шума».
Команда VK Cloud Solutions развивает экосистему для построения Big-Data-решений и Machine Learning. Будет здорово, если вы протестируете наши решения и дадите обратную связь. Для тестирования пользователям при регистрации начисляем 3000 бонусных рублей.
Что почитать по теме: