Skip to content

Latest commit

 

History

History
105 lines (89 loc) · 2.71 KB

4- Notofication.md

File metadata and controls

105 lines (89 loc) · 2.71 KB

Airflow has some call backs:

![[Pasted image 20240511070709.png]]

Send with python request lib

1- Create Channel in Microsoft Team 2- Create include file:

# include/notifications.py
from airflow.models import Variable

def notify_teams(context):
    print("Sending Teams notification")
    import requests
    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "title": "Airflow Task Failed",
        "summary": f"Task {context['task_instance_key_str']} failed",
        "themeColor": "0078D7",
        "sections": [
            {
                "activityTitle": f"Task {context['task_instance_key_str']} failed",
                "activitySubtitle": f"DAG: {context['dag'].dag_id}",
                "facts": [
                    {
                        "name": "Logical Date",
                        "value": context['ds']
                    },
                    {
                        "name": "Log URL",
                        "value": context['task_instance'].log_url
                    }
                ]
            }
        ],
        "potentialAction": [{
            "@type": "OpenUri",
            "name": "View Logs",
            "targets": [{
                "os": "default",
                "uri": context['task_instance'].log_url
            }]
        }]
    }
    
    headers = {"content-type": "application/json"}
    requests.post(Variable.get('teams_webhook_secret'), json=payload, headers=headers)
    print("Teams notification sent")

3- Write pipeline

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.exceptions import AirflowFailException
from include.notifications import notify_teams

@dag(schedule=None, catchup=False, on_failure_callback=notify_teams)
def my_dag():
    
    @task
    def a():
        print('good')
        
    @task
    def b():
        print('bad')
        raise AirflowFailException()
    
    chain(a(), b())
    
my_dag()

Send with Airflow Webhooks

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.exceptions import AirflowFailException
from airflow.providers.apprise.notifications.apprise import send_apprise_notification
from apprise import NotifyType

@dag(schedule=None, catchup=False, on_failure_callback=send_apprise_notification(
    title='Airflow Task Failed',
    body='Task {{ task_instance_key_str }} failed',
    notify_type=NotifyType.FAILURE,
    apprise_conn_id='notifier',
    tag='alerts'
))
def my_dag():
    
    @task
    def a():
        print('good')
        
    @task
    def b():
        print('bad')
        raise AirflowFailException()
    
    chain(a(), b())
    
my_dag()