Коллеги, здарова! Вряд ли вы попали на эту статью случайно, по этому не будет никаких лирических отступлений и переходим сразу к делу.
Notifier
В Apache Airflow существует абстрактный класс BaseNotifier
, который предоставляет базовую структуру для отправки уведомлений в Airflow с использованием различных callback методов.
В этом приложении уже реализованы некоторые Нотификаторы, которые позволяют отправлять уведомления во внешние системы:
JiraNotifier
DiscordNotifier
SlackNotofier
SmtpNotifier
По названию понятно в какую систему уходят уведомления, более подробно ознакомится можно на странице с документацией - https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/notifications.html
Но вышло так, что нет нотификатора, который отправляет уведомления в Telegram, по этому мы сейчас напишем максимально простой нотификатор, который будет выполнять эту задачу.
Для того чтобы создать свой нотификатор, необходимо унаследовать класс BaseNotifier, для того чтобы получать контекст информацию о выполнении DAG`ов. Для отправки сообщений в Telegram будем использовать пакет python-telegram-bot.
pip install python-telegram-bot==20.4
Далее реализовываем саму функцию для отправки сообщений в телеграм. И переопределяем метод notify, в котором будет происходить отправка уведомлений
from airflow.notifications.basenotifier import BaseNotifier
from telegram import Bot
import asyncio
import os
class TelegramNotification(BaseNotifier):
@staticmethod
async def send_tg_notifier(notifier_message):
telegram_bot_token = os.environ.get("TELEGRAM_NOTIFIER_BOT_TOKEN")
telegram_chat_id = os.environ.get("TELEGRAM_NOTIFIER_CHAT_ID")
bot = Bot(token=telegram_bot_token)
await bot.sendMessage(chat_id=telegram_chat_id, text=notifier_message, parse_mode="Markdown")
def notify(self, context):
task_id = context["ti"].task_id
task_state = context["ti"].state
task_log_url = context["ti"].log_url
dag_name = context["ti"].dag_id
message_template = (f"Dag name: `{dag_name}` \n"
f"Task id: `{task_id}` \n"
f"Task State: `🔴{task_state}🔴` \n"
f"Task Log URL: `{task_log_url}` \n"
)
asyncio.run(self.send_tg_notifier(notifier_message=message_template))
Как вы видите для получения значений с токеном бота и id чата в Telegram используются переменные окружения.
Я составляю шаблон сообщения, используя параметры:
dag_name
task_id
task_state
task_log_url
Полный список элементов, полученных от TaskInstance вы можете найти в документации - https://airflow.apache.org/docs/apache-airflow/2.0.1/_api/airflow/models/taskinstance/index.html
Обратите внимание, что я хочу получать сообщение в канал, только если DAG завершается неудачно, по этому я сразу в коде хардкожу красные кружки (🔴)
Для того чтобы Apache Airflow мог сам вызывать этот класс и уведомлять нас об ошибках, то нужноо модифицировать DAG`и.
для начала нужно импортировать пакет с Нотификатором:
from Notifier.Notifier import TelegramNotification
Далее при объявлении дефолтных значений, необходимо вызвать этот метод для on_failure_callback
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.empty import EmptyOperator
from airflow.decorators import dag, task
from datetime import datetime
from Notifier.Notifier import TelegramNotification
import pendulum
@dag(
default_args={'owner': 'airflow', 'on_failure_callback': TelegramNotification()},
schedule_interval="0 6 * * *",
start_date=pendulum.datetime(2023, 10, 10, tz="Europe/Moscow"),
catchup=False,
description='Этот DAG делает что-то явно сложное'
)
Apache Airflow поддерживает еще несколько разных типов callback:
on_success_callback - Вызывается при успешном выполнении задачи
on_failure_callback - Вызывает при ошибки задачи
on_execute_callback - Вызывается перед выполнением задачи
on_retry_callback - Вызывается, в случае если задача завершилась неудачно и готова выполнится повторно
sla_miss_callback - Вызывается, когда задача когда задача не успевает выполнится в свой выделенный промежуток времени
Для того, чтобы Нотификатор смог отправлять уведомления, нужно добавить ему значения chat id и bot token, которые можно получить при создании бота и канала. Эти значения я передаю через переменные окружения TELEGRAM_NOTIFIER_BOT_TOKEN
и TELEGRAM_NOTIFIER_CHAT_ID
Так как Apache Airflow я запускаю в K8S и использую community chart, то описываю переменные окружения в чарте helm.
existingSecret: &basic airflow-secret
extraEnv:
- name: TELEGRAM_NOTIFIER_BOT_TOKEN
valueFrom:
secretKeyRef:
name: *basic
key: TELEGRAM_NOTIFIER_BOT_TOKEN
- name: TELEGRAM_NOTIFIER_CHAT_ID
valueFrom:
secretKeyRef:
name: *basic
key: TELEGRAM_NOTIFIER_CHAT_ID
Сами же значения этих переменных хранятся в secrets.yaml.
Результат
В случае завершения DAG с ошибкой в Telegram канал приходит уведомление:

Обратите внимание на поле Task Log URL. Мы можем получить ссылку именно на лог об ошибке. В целом распарсить его и отправить прямо в Telegram не составит труда, но часто бывает, что логи огромные и в мессенджере это может быть не читаемо.
Итог
В данной микро-статье я хотел предложить вам готовый кусок кода, который реализует отправку сообщений об ошибке выполнения DAG Apache Airflow в ваш канал Telegram, а так же мельком рассказал о разных параметрах Airflow, такие как callback и Notifier, которые вы можете использовать в будущем.
Я использую этот мини-уведомитель, но вы можете легко увеличить его функционал и использовать для других частных случаев.
Бойся своих желаний, они имеют свойство сбываться