Skip to content

Commit

Permalink
[AIRFLOW-993] Update date inference logic
Browse files Browse the repository at this point in the history
DAGs should set task start_date and end_date when
possible, making sure
they agree with the DAG’s own dates.

Closes apache#2157 from jlowin/run-bug
  • Loading branch information
jlowin authored and bolkedebruin committed May 13, 2017
1 parent 3b589a9 commit 624c133
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
15 changes: 14 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3323,8 +3323,21 @@ def add_task(self, task):
"""
if not self.start_date and not task.start_date:
raise AirflowException("Task is missing the start_date parameter")
if not task.start_date:
# if the task has no start date, assign it the same as the DAG
elif not task.start_date:
task.start_date = self.start_date
# otherwise, the task will start on the later of its own start date and
# the DAG's start date
elif self.start_date:
task.start_date = max(task.start_date, self.start_date)

# if the task has no end date, assign it the same as the dag
if not task.end_date:
task.end_date = self.end_date
# otherwise, the task will end on the earlier of its own end date and
# the DAG's end date
elif task.end_date and self.end_date:
task.end_date = min(task.end_date, self.end_date)

if task.task_id in self.task_dict:
# TODO: raise an error in Airflow 2.0
Expand Down
39 changes: 39 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,45 @@ def test_get_dag_fileloc(self):

class TaskInstanceTest(unittest.TestCase):

def test_set_task_dates(self):
"""
Test that tasks properly take start/end dates from DAGs
"""
dag = DAG('dag', start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + datetime.timedelta(days=10))

op1 = DummyOperator(task_id='op_1', owner='test')

self.assertTrue(op1.start_date is None and op1.end_date is None)

# dag should assign its dates to op1 because op1 has no dates
dag.add_task(op1)
self.assertTrue(
op1.start_date == dag.start_date and op1.end_date == dag.end_date)

op2 = DummyOperator(
task_id='op_2',
owner='test',
start_date=DEFAULT_DATE - datetime.timedelta(days=1),
end_date=DEFAULT_DATE + datetime.timedelta(days=11))

# dag should assign its dates to op2 because they are more restrictive
dag.add_task(op2)
self.assertTrue(
op2.start_date == dag.start_date and op2.end_date == dag.end_date)

op3 = DummyOperator(
task_id='op_3',
owner='test',
start_date=DEFAULT_DATE + datetime.timedelta(days=1),
end_date=DEFAULT_DATE + datetime.timedelta(days=9))
# op3 should keep its dates because they are more restrictive
dag.add_task(op3)
self.assertTrue(
op3.start_date == DEFAULT_DATE + datetime.timedelta(days=1))
self.assertTrue(
op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9))


def test_set_dag(self):
"""
Test assigning Operators to Dags, including deferred assignment
Expand Down

0 comments on commit 624c133

Please sign in to comment.