Skip to content

Commit

Permalink
Merge pull request apache#1291 from jlowin/scheduler_start_date
Browse files Browse the repository at this point in the history
Don't schedule runs before the DAG's start_date
  • Loading branch information
bolkedebruin committed Apr 6, 2016
2 parents fd9388c + cb98181 commit a8234d0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
6 changes: 4 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,6 @@ def schedule_dag(self, dag):
# Migrating from previous version
# make the past 5 runs active
next_run_date = dag.date_range(latest_run, -5)[0]
if dag.start_date:
next_run_date = max(next_run_date, dag.start_date)
else:
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
Expand All @@ -426,6 +424,10 @@ def schedule_dag(self, dag):
elif dag.schedule_interval != '@once':
next_run_date = dag.following_schedule(last_scheduled_run)

# don't ever schedule prior to the dag's start_date
if dag.start_date:
next_run_date = max(next_run_date, dag.start_date)

# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
Expand Down
29 changes: 29 additions & 0 deletions tests/dags/test_scheduler_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

from airflow.models import DAG
from airflow.operators import DummyOperator
DEFAULT_DATE = datetime(2100, 1, 1)

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
start_date=DEFAULT_DATE)
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')
40 changes: 40 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,43 @@ def test_dagrun_deadlock_ignore_depends_on_past(self):
second_task_state=State.SUCCESS,
dagrun_state=State.SUCCESS,
run_kwargs=dict(ignore_first_depends_on_past=True))

def test_scheduler_start_date(self):
"""
Test that the scheduler respects start_dates, even when DAGS have run
"""

session = settings.Session()

dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
self.assertTrue(dag.start_date > DEFAULT_DATE)

scheduler = SchedulerJob(dag_id, num_runs=2)
scheduler.run()

# zero tasks ran
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)

# previously, running this backfill would kick off the Scheduler
# because it would take the most recent run and start from there
# That behavior still exists, but now it will only do so if after the
# start date
backfill = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
backfill.run()

# one task ran
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

scheduler = SchedulerJob(dag_id, num_runs=2)
scheduler.run()

# still one task
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

0 comments on commit a8234d0

Please sign in to comment.