Skip to content

Commit

Permalink
Only send an SlaCallbackRequest if the DAG is scheduled (apache#26089)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham authored Sep 1, 2022
1 parent 9c4ab10 commit 1e19807
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 22 deletions.
37 changes: 17 additions & 20 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,26 +417,23 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:

sla_misses = []
next_info = dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False)
if next_info is None:
self.log.info("Skipping SLA check for %s because task does not have scheduled date", ti)
else:
while next_info.logical_date < ts:
next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)

if next_info is None:
break
if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query:
break
if next_info.logical_date + task.sla < ts:

sla_miss = SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=next_info.logical_date,
timestamp=ts,
)
sla_misses.append(sla_miss)
Stats.incr('sla_missed')
while next_info and next_info.logical_date < ts:
next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)

if next_info is None:
break
if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query:
break
if next_info.logical_date + task.sla < ts:

sla_miss = SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=next_info.logical_date,
timestamp=ts,
)
sla_misses.append(sla_miss)
Stats.incr('sla_missed')
if sla_misses:
session.add_all(sla_misses)
session.commit()
Expand Down
4 changes: 4 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,10 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> None:
self.log.debug("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
return

if not dag.timetable.periodic:
self.log.debug("Skipping SLA check for %s because DAG is not scheduled", dag)
return

request = SlaCallbackRequest(full_filepath=dag.fileloc, dag_id=dag.dag_id)
self.executor.send_callback(request)

Expand Down
32 changes: 30 additions & 2 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2985,10 +2985,18 @@ def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker):
self.scheduler_job._send_sla_callbacks_to_processor(dag)
self.scheduler_job.executor.callback_sink.send.assert_not_called()

def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, dag_maker):
@pytest.mark.parametrize(
"schedule",
[
"@daily",
"0 10 * * *",
timedelta(hours=2),
],
)
def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, schedule, dag_maker):
"""Test SLA Callbacks are sent to the DAG Processor when SLAs are defined on tasks"""
dag_id = 'test_send_sla_callbacks_to_processor_sla_with_task_slas'
with dag_maker(dag_id=dag_id, schedule='@daily') as dag:
with dag_maker(dag_id=dag_id, schedule=schedule) as dag:
EmptyOperator(task_id='task1', sla=timedelta(seconds=60))

with patch.object(settings, "CHECK_SLAS", True):
Expand All @@ -3000,6 +3008,26 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, dag_maker):
expected_callback = SlaCallbackRequest(full_filepath=dag.fileloc, dag_id=dag.dag_id)
self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)

@pytest.mark.parametrize(
"schedule",
[
None,
[Dataset("foo")],
],
)
def test_send_sla_callbacks_to_processor_sla_dag_not_scheduled(self, schedule, dag_maker):
"""Test SLA Callbacks are not sent when DAG isn't scheduled"""
dag_id = 'test_send_sla_callbacks_to_processor_sla_no_task_slas'
with dag_maker(dag_id=dag_id, schedule=schedule) as dag:
EmptyOperator(task_id='task1', sla=timedelta(seconds=5))

with patch.object(settings, "CHECK_SLAS", True):
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()

self.scheduler_job._send_sla_callbacks_to_processor(dag)
self.scheduler_job.executor.callback_sink.send.assert_not_called()

def test_create_dag_runs(self, dag_maker):
"""
Test various invariants of _create_dag_runs.
Expand Down

0 comments on commit 1e19807

Please sign in to comment.