Коллеги, здарова! Вряд ли вы попали на эту статью случайно, по этому не будет никаких лирических отступлений и переходим сразу к делу.
Notifier
В Apache Airflow существует абстрактный класс BaseNotifier , который предоставляет базовую структуру для отправки уведомлений в Airflow с использованием различных callback методов.
В этом приложении уже реализованы некоторые Нотификаторы, которые позволяют отправлять уведомления во внешние системы:
JiraNotifier
DiscordNotifier
SlackNotofier
SmtpNotifier
По названию понятно в какую систему уходят уведомления, более подробно ознакомится можно на странице с документацией - https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/notifications.html
Но вышло так, что нет нотификатора, который отправляет уведомления в Telegram, по этому мы сейчас напишем максимально простой нотификатор, который будет выполнять эту задачу.
Для того чтобы создать свой нотификатор, необходимо унаследовать класс BaseNotifier, для того чтобы получать контекст информацию о выполнении DAG`ов. Для отправки сообщений в Telegram будем использовать пакет python-telegram-bot.
pip install python-telegram-bot==20.4
Далее реализовываем саму функцию для отправки сообщений в телеграм. И переопределяем метод notify, в котором будет происходить отправка уведомлений
from airflow.notifications.basenotifier import BaseNotifier from telegram import Bot import asyncio import os class TelegramNotification(BaseNotifier): @staticmethod async def send_tg_notifier(notifier_message): telegram_bot_token = os.environ.get("TELEGRAM_NOTIFIER_BOT_TOKEN") telegram_chat_id = os.environ.get("TELEGRAM_NOTIFIER_CHAT_ID") bot = Bot(token=telegram_bot_token) await bot.sendMessage(chat_id=telegram_chat_id, text=notifier_message, parse_mode="Markdown") def notify(self, context): task_id = context["ti"].task_id task_state = context["ti"].state task_log_url = context["ti"].log_url dag_name = context["ti"].dag_id message_template = (f"Dag name: `{dag_name}` \n" f"Task id: `{task_id}` \n" f"Task State: `🔴{task_state}🔴` \n" f"Task Log URL: `{task_log_url}` \n" ) asyncio.run(self.send_tg_notifier(notifier_message=message_template))
Как вы видите для получения значений с токеном бота и id чата в Telegram используются переменные окружения.
Я составляю шаблон сообщения, используя параметры:
dag_name
task_id
task_state
task_log_url
Полный список элементов, полученных от TaskInstance вы можете найти в документации - https://airflow.apache.org/docs/apache-airflow/2.0.1/_api/airflow/models/taskinstance/index.html
Обратите внимание, что я хочу получать сообщение в канал, только если DAG завершается неудачно, по этому я сразу в коде хардкожу красные кружки (🔴)
Для того чтобы Apache Airflow мог сам вызывать этот класс и уведомлять нас об ошибках, то нужноо модифицировать DAG`и.
для начала нужно импортировать пакет с Нотификатором:
from Notifier.Notifier import TelegramNotification
Далее при объявлении дефолтных значений, необходимо вызвать этот метод для on_failure_callback
from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.operators.empty import EmptyOperator from airflow.decorators import dag, task from datetime import datetime from Notifier.Notifier import TelegramNotification import pendulum @dag( default_args={'owner': 'airflow', 'on_failure_callback': TelegramNotification()}, schedule_interval="0 6 * * *", start_date=pendulum.datetime(2023, 10, 10, tz="Europe/Moscow"), catchup=False, description='Этот DAG делает что-то явно сложное' )
Apache Airflow поддерживает еще несколько разных типов callback:
on_success_callback - Вызывается при успешном выполнении задачи
on_failure_callback - Вызывает при ошибки задачи
on_execute_callback - Вызывается перед выполнением задачи
on_retry_callback - Вызывается, в случае если задача завершилась неудачно и готова выполнится повторно
sla_miss_callback - Вызывается, когда задача когда задача не успевает выполнится в свой выделенный промежуток времени
Для того, чтобы Нотификатор смог отправлять уведомления, нужно добавить ему значения chat id и bot token, которые можно получить при создании бота и канала. Эти значения я передаю через переменные окружения TELEGRAM_NOTIFIER_BOT_TOKEN и TELEGRAM_NOTIFIER_CHAT_ID
Так как Apache Airflow я запускаю в K8S и использую community chart, то описываю переменные окружения в чарте helm.
existingSecret: &basic airflow-secret extraEnv: - name: TELEGRAM_NOTIFIER_BOT_TOKEN valueFrom: secretKeyRef: name: *basic key: TELEGRAM_NOTIFIER_BOT_TOKEN - name: TELEGRAM_NOTIFIER_CHAT_ID valueFrom: secretKeyRef: name: *basic key: TELEGRAM_NOTIFIER_CHAT_ID
Сами же значения этих переменных хранятся в secrets.yaml.
Результат
В случае завершения DAG с ошибкой в Telegram канал приходит уведомление:

Обратите внимание на поле Task Log URL. Мы можем получить ссылку именно на лог об ошибке. В целом распарсить его и отправить прямо в Telegram не составит труда, но часто бывает, что логи огромные и в мессенджере это может быть не читаемо.
Итог
В данной микро-статье я хотел предложить вам готовый кусок кода, который реализует отправку сообщений об ошибке выполнения DAG Apache Airflow в ваш канал Telegram, а так же мельком рассказал о разных параметрах Airflow, такие как callback и Notifier, которые вы можете использовать в будущем.
Я использую этот мини-уведомитель, но вы можете легко увеличить его функционал и использовать для других частных случаев.
Бойся своих желаний, они имеют свойство сбываться
