Skip to content

Commit

Permalink
Merge pull request apache#875 from r39132/master
Browse files Browse the repository at this point in the history
SLA Miss Alert Callbacks : Allow DAGs to specify a callback function for SLA miss handling
  • Loading branch information
mistercrunch committed Jan 27, 2016
2 parents ba8e4d2 + 8f86afb commit a4d109f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
19 changes: 17 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def manage_slas(self, dag, session=None):
slas = (
session
.query(SlaMiss)
.filter(SlaMiss.email_sent == False)
.filter(SlaMiss.email_sent == False or SlaMiss.notification_sent == False)
.filter(SlaMiss.dag_id == dag.dag_id)
.all()
)
Expand Down Expand Up @@ -309,6 +309,15 @@ def manage_slas(self, dag, session=None):
blocking_task_list = "\n".join([
ti.task_id + ' on ' + ti.execution_date.isoformat()
for ti in blocking_tis])
# Track whether email or any alert notification sent
# We consider email or the alert callback as notifications
email_sent = False
notification_sent = False
if dag.sla_miss_callback:
# Execute the alert callback
self.logger.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
from airflow import ascii
email_content = """\
Here's a list of tasks thas missed their SLAs:
Expand All @@ -331,8 +340,14 @@ def manage_slas(self, dag, session=None):
emails,
"[airflow] SLA miss on DAG=" + dag.dag_id,
email_content)
email_sent = True
notification_sent = True
# If we sent any notification, update the sla_miss table
if notification_sent:
for sla in slas:
sla.email_sent = True
if email_sent:
sla.email_sent = True
sla.notification_sent = True
session.merge(sla)
session.commit()
session.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add notification_sent column to sla_miss
Revision ID: bbc73705a13e
Revises: 4446e08588
Create Date: 2016-01-14 18:05:54.871682
"""

# revision identifiers, used by Alembic.
revision = 'bbc73705a13e'
down_revision = '4446e08588'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean,default=False))


def downgrade():
op.drop_column('sla_miss', 'notification_sent')
6 changes: 6 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,9 @@ class DAG(LoggingMixin):
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created
:type dagrun_timeout: datetime.timedelta
:param sla_miss_callback: specify a function to call when reporting SLA
timeouts.
:type sla_miss_callback: types.FunctionType
"""

def __init__(
Expand All @@ -2017,6 +2020,7 @@ def __init__(
max_active_runs=configuration.getint(
'core', 'max_active_runs_per_dag'),
dagrun_timeout=None,
sla_miss_callback=None,
params=None):

self.user_defined_macros = user_defined_macros
Expand Down Expand Up @@ -2044,6 +2048,7 @@ def __init__(
self.concurrency = concurrency
self.max_active_runs = max_active_runs
self.dagrun_timeout = dagrun_timeout
self.sla_miss_callback = sla_miss_callback

self._comps = {
'dag_id',
Expand Down Expand Up @@ -2850,6 +2855,7 @@ class SlaMiss(Base):
email_sent = Column(Boolean, default=False)
timestamp = Column(DateTime)
description = Column(Text)
notification_sent = Column(Boolean, default=False)

def __repr__(self):
return str((
Expand Down

0 comments on commit a4d109f

Please sign in to comment.