Skip to content

Commit

Permalink
[AIRFLOW-2537] Add reset-dagrun option to backfill command
Browse files Browse the repository at this point in the history
Closes apache#3444 from feng-
tao/add_reset_dagrun_for_backfill
  • Loading branch information
Tao feng authored and mistercrunch committed Jun 1, 2018
1 parent 2800c8e commit 4c6f1fd
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 17 deletions.
19 changes: 18 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ def backfill(args, dag=None):
ti = TaskInstance(task, args.start_date)
ti.dry_run()
else:
if args.reset_dagruns:
DAG.clear_dags(
[dag],
start_date=args.start_date,
end_date=args.end_date,
confirm_prompt=True,
include_subdags=False,
only_backfill_dagruns=True,
)

dag.run(
start_date=args.start_date,
end_date=args.end_date,
Expand Down Expand Up @@ -1364,6 +1374,12 @@ class CLIFactory(object):
"again."),
type=float,
default=1.0),
'reset_dag_run': Arg(
("--reset_dagruns",),
("if set, the backfill will delete existing "
"backfill-related DAG runs and start "
"anew with fresh, running DAG runs"),
"store_true"),
# list_tasks
'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
# list_dags
Expand Down Expand Up @@ -1683,7 +1699,8 @@ class CLIFactory(object):
'dag_id', 'task_regex', 'start_date', 'end_date',
'mark_success', 'local', 'donot_pickle',
'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf'
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf',
'reset_dag_run'
)
}, {
'func': list_tasks,
Expand Down
62 changes: 49 additions & 13 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,21 @@ def get_fernet():
_CONTEXT_MANAGER_DAG = None


def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
def clear_task_instances(tis,
session,
activate_dag_runs=True,
dag=None,
only_backfill_dagruns=False,
):
"""
Clears a set of task instances, but makes sure the running ones
get killed.
get killed. Reset backfill dag run state to removed if only_backfill_dagruns is set
:param tis: a list of task instances
:param session: current session
:param activate_dag_runs: flag to check for active dag run
:param dag: DAG object
:param only_backfill_dagruns: flag for setting backfill state
"""
job_ids = []
for ti in tis:
Expand Down Expand Up @@ -165,8 +176,13 @@ def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
DagRun.execution_date.in_({ti.execution_date for ti in tis}),
).all()
for dr in drs:
dr.state = State.RUNNING
dr.start_date = timezone.utcnow()
if only_backfill_dagruns and dr.is_backfill:
# If the flag is set, we reset backfill dag run for retry.
# dont reset start date
dr.state = State.REMOVED
else:
dr.state = State.RUNNING
dr.start_date = timezone.utcnow()


class DagBag(BaseDagBag, LoggingMixin):
Expand Down Expand Up @@ -3678,12 +3694,21 @@ def topological_sort(self):

@provide_session
def set_dag_runs_state(
self, state=State.RUNNING, session=None):
drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all()
self,
state=State.RUNNING,
session=None,
only_backfill_dagruns=False):
drs = session.query(DagRun).filter_by(dag_id=self.dag_id).all()
dirty_ids = []
for dr in drs:
dr.state = state
dirty_ids.append(dr.dag_id)
if only_backfill_dagruns:
if dr.is_backfill:
dr.state = state
dirty_ids.append(dr.dag_id)
else:
if not dr.is_backfill:
dr.state = state
dirty_ids.append(dr.dag_id)
DagStat.update(dirty_ids, session=session)

@provide_session
Expand All @@ -3695,7 +3720,9 @@ def clear(
include_subdags=True,
reset_dag_runs=True,
dry_run=False,
session=None):
session=None,
only_backfill_dagruns=False,
):
"""
Clears a set of task instances associated with the current dag for
a specified date range.
Expand Down Expand Up @@ -3742,9 +3769,14 @@ def clear(
do_it = utils.helpers.ask_yesno(question)

if do_it:
clear_task_instances(tis.all(), session, dag=self)
clear_task_instances(tis.all(),
session,
dag=self,
only_backfill_dagruns=only_backfill_dagruns,
)
if reset_dag_runs:
self.set_dag_runs_state(session=session)
self.set_dag_runs_state(session=session,
only_backfill_dagruns=only_backfill_dagruns)
else:
count = 0
print("Bail. Nothing was cleared.")
Expand All @@ -3762,7 +3794,9 @@ def clear_dags(
confirm_prompt=False,
include_subdags=True,
reset_dag_runs=True,
dry_run=False):
dry_run=False,
only_backfill_dagruns=False,
):
all_tis = []
for dag in dags:
tis = dag.clear(
Expand Down Expand Up @@ -3801,7 +3835,9 @@ def clear_dags(
confirm_prompt=False,
include_subdags=include_subdags,
reset_dag_runs=reset_dag_runs,
dry_run=False)
dry_run=False,
only_backfill_dagruns=only_backfill_dagruns,
)
else:
count = 0
print("Bail. Nothing was cleared.")
Expand Down
38 changes: 35 additions & 3 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from airflow.models import DAG, TaskInstance as TI
from airflow.models import DagRun
from airflow.models import State as ST
from airflow.models import DagModel, DagStat
from airflow.models import DagModel, DagRun, DagStat
from airflow.models import clear_task_instances
from airflow.models import XCom
from airflow.models import Connection
Expand Down Expand Up @@ -596,12 +596,21 @@ def test_dagstats_crud(self):

class DagRunTest(unittest.TestCase):

def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None):
def create_dag_run(self, dag,
state=State.RUNNING,
task_states=None,
execution_date=None,
is_backfill=False,
):
now = timezone.utcnow()
if execution_date is None:
execution_date = now
if is_backfill:
run_id = BackfillJob.ID_PREFIX + now.isoformat()
else:
run_id = 'manual__' + now.isoformat()
dag_run = dag.create_dagrun(
run_id='manual__' + now.isoformat(),
run_id=run_id,
execution_date=execution_date,
start_date=now,
state=state,
Expand All @@ -617,6 +626,28 @@ def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_d

return dag_run

def test_clear_task_instances_for_backfill_dagrun(self):
now = timezone.utcnow()
session = settings.Session()
dag_id = 'test_clear_task_instances_for_backfill_dagrun'
dag = DAG(dag_id=dag_id, start_date=now)
self.create_dag_run(dag, execution_date=now, is_backfill=True)

task0 = DummyOperator(task_id='backfill_task_0', owner='test', dag=dag)
ti0 = TI(task=task0, execution_date=now)
ti0.run()

qry = session.query(TI).filter(
TI.dag_id == dag.dag_id).all()
clear_task_instances(qry, session, only_backfill_dagruns=True)
session.commit()
ti0.refresh_from_db()
dr0 = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.execution_date == now
).first()
self.assertEquals(dr0.state, State.REMOVED)

def test_id_for_date(self):
run_id = models.DagRun.id_for_date(
timezone.datetime(2015, 1, 2, 3, 4, 5, 6))
Expand Down Expand Up @@ -2063,6 +2094,7 @@ def test_overwrite_params_with_dag_run_conf_none(self):


class ClearTasksTest(unittest.TestCase):

def test_clear_task_instances(self):
dag = DAG('test_clear_task_instances', start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
Expand Down

0 comments on commit 4c6f1fd

Please sign in to comment.