Skip to content

Commit

Permalink
[AIRFLOW-209] Add scheduler tests and improve lineage handling
Browse files Browse the repository at this point in the history
This patch adds schedule_dag and process_dag unittests. It also
fixes some minor bugs that were caught by these tests. Some
small changes for readability.
  • Loading branch information
bolkedebruin committed Jun 3, 2016
1 parent c2384cb commit fb5a3b3
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
comment: false
# keep default
11 changes: 6 additions & 5 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ def schedule_dag(self, dag):
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False
external_trigger=False,
session=session
)
if len(active_runs) >= dag.max_active_runs:
return
Expand All @@ -402,7 +403,7 @@ def schedule_dag(self, dag):
dr.end_date = datetime.now()
session.commit()

# this query should be replace by find dagrun
# this query should be replaced by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
Expand Down Expand Up @@ -434,9 +435,9 @@ def schedule_dag(self, dag):
# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
schedule_end = next_run_date
period_end = next_run_date
elif next_run_date:
schedule_end = dag.following_schedule(next_run_date)
period_end = dag.following_schedule(next_run_date)

# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
Expand All @@ -451,7 +452,7 @@ def schedule_dag(self, dag):
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return

if next_run_date and schedule_end and schedule_end <= datetime.now():
if next_run_date and period_end and period_end <= datetime.now():
next_run = dag.create_dagrun(
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
Expand Down
13 changes: 10 additions & 3 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3444,7 +3444,11 @@ def update_state(self, session=None):
.format(self, len(tis)))

for ti in tis:
ti.task = dag.get_task(ti.task_id)
# skip in db?
if ti.state == State.REMOVED:
tis.remove(ti)
else:
ti.task = dag.get_task(ti.task_id)

# pre-calculate
# db is faster
Expand Down Expand Up @@ -3511,8 +3515,11 @@ def verify_integrity(self, session=None):
task_ids = []
for ti in tis:
task_ids.append(ti.task_id)
if not dag.get_task(ti.task_id) and self.state not in State.unfinished():
ti.state = State.REMOVED
try:
dag.get_task(ti.task_id)
except AirflowException:
if self.state is not State.RUNNING:
ti.state = State.REMOVED

# check for missing tasks
for task in dag.tasks:
Expand Down
193 changes: 193 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,3 +491,196 @@ def test_scheduler_process_check_heartrate(self):
scheduler.process_dag(dag, queue=queue)

queue.put.assert_not_called()

def test_scheduler_do_not_schedule_removed_task(self):
dag = DAG(
dag_id='test_scheduler_do_not_schedule_removed_task',
start_date=DEFAULT_DATE)
dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag.clear()

dr = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr)

dag = DAG(
dag_id='test_scheduler_do_not_schedule_removed_task',
start_date=DEFAULT_DATE)

queue = mock.Mock()
scheduler.process_dag(dag, queue=queue)

queue.put.assert_not_called()

def test_scheduler_do_not_schedule_too_early(self):
dag = DAG(
dag_id='test_scheduler_do_not_schedule_too_early',
start_date=datetime.datetime(2200, 1, 1))
dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag.clear()

dr = scheduler.schedule_dag(dag)
self.assertIsNone(dr)

queue = mock.Mock()
scheduler.process_dag(dag, queue=queue)

queue.put.assert_not_called()

def test_scheduler_do_not_run_finished(self):
dag = DAG(
dag_id='test_scheduler_do_not_run_finished',
start_date=DEFAULT_DATE)
dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()

scheduler = SchedulerJob()
dag.clear()

dr = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr)

tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = State.SUCCESS

session.commit()
session.close()

queue = mock.Mock()
scheduler.process_dag(dag, queue=queue)

queue.put.assert_not_called()

def test_scheduler_add_new_task(self):
"""
Test if a task instance will be added if the dag is updated
"""
dag = DAG(
dag_id='test_scheduler_add_new_task',
start_date=DEFAULT_DATE)

dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag.clear()

dr = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr)

tis = dr.get_task_instances()
self.assertEquals(len(tis), 1)

dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')

queue = mock.Mock()
scheduler.process_dag(dag, queue=queue)

tis = dr.get_task_instances()
self.assertEquals(len(tis), 2)

def test_scheduler_verify_max_active_runs(self):
"""
Test if a a dagrun will not be scheduled if max_dag_runs has been reached
"""
dag = DAG(
dag_id='test_scheduler_verify_max_active_runs',
start_date=DEFAULT_DATE)
dag.max_active_runs = 1

dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag.clear()

dr = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr)

dr = scheduler.schedule_dag(dag)
self.assertIsNone(dr)

def test_scheduler_fail_dagrun_timeout(self):
"""
Test if a a dagrun wil be set failed if timeout
"""
dag = DAG(
dag_id='test_scheduler_fail_dagrun_timeout',
start_date=DEFAULT_DATE)
dag.dagrun_timeout = datetime.timedelta(seconds=60)

dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()

scheduler = SchedulerJob()
dag.clear()

dr = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr)
print(dr.start_date)
dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
print(dr.start_date)
session.merge(dr)
session.commit()

dr2 = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr2)

dr.refresh_from_db(session=session)
self.assertEquals(dr.state, State.FAILED)

0 comments on commit fb5a3b3

Please sign in to comment.