Skip to content

Commit

Permalink
AIRFLOW-124 Implement create_dagrun
Browse files Browse the repository at this point in the history
This adds the create_dagrun function to DAG and the staticmethod
DagRun.find. create_dagrun will create a dagrun including its tasks.

By having taskinstances created at dagrun instantiation time,
deadlocks that were tested for will not take place anymore. Tests
have been adjusted accordingly.

In addition, integrity has been improved by a bugfix to add_task
of the BaseOperator to make sure to always assign a Dag if it is
present to a task.

DagRun.find is a convenience function that returns the DagRuns
for a given dag. It makes sure to have a single place how to
find dagruns.
  • Loading branch information
bolkedebruin committed May 18, 2016
1 parent 72ab63e commit cb56289
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 54 deletions.
41 changes: 22 additions & 19 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,32 @@ def backfill(args, dag=None):


def trigger_dag(args):
session = settings.Session()
# TODO: verify dag_id
dag = get_dag(args)

if not dag:
logging.error("Cannot find dag {}".format(args.dag_id))
sys.exit(1)

execution_date = datetime.now()
run_id = args.run_id or "manual__{0}".format(execution_date.isoformat())
dr = session.query(DagRun).filter(
DagRun.dag_id == args.dag_id, DagRun.run_id == run_id).first()

conf = {}
if args.conf:
conf = json.loads(args.conf)
dr = DagRun.find(dag_id=args.dag_id, run_id=run_id)
if dr:
logging.error("This run_id already exists")
else:
trigger = DagRun(
dag_id=args.dag_id,
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=conf,
external_trigger=True)
session.add(trigger)
logging.info("Created {}".format(trigger))
session.commit()
logging.error("This run_id {} already exists".format(run_id))
raise AirflowException()

run_conf = {}
if args.conf:
run_conf = json.loads(args.conf)

trigger = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True
)
logging.info("Created {}".format(trigger))


def variables(args):
Expand Down
14 changes: 5 additions & 9 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,11 @@ def schedule_dag(self, dag):
if dag.schedule_interval:
DagRun = models.DagRun
session = settings.Session()
qry = session.query(DagRun).filter(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False,
DagRun.state == State.RUNNING,
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False
)
active_runs = qry.all()
if len(active_runs) >= dag.max_active_runs:
return
for dr in active_runs:
Expand Down Expand Up @@ -457,16 +456,13 @@ def schedule_dag(self, dag):
return

if next_run_date and schedule_end and schedule_end <= datetime.now():
next_run = DagRun(
dag_id=dag.dag_id,
next_run = dag.create_dagrun(
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.now(),
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
session.commit()
return next_run

def process_dag(self, dag, queue):
Expand Down
97 changes: 96 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,8 @@ def dag(self, dag):
"The DAG assigned to {} can not be changed.".format(self))
elif self.task_id not in dag.task_dict:
dag.add_task(self)
self._dag = dag

self._dag = dag

def has_dag(self):
"""
Expand Down Expand Up @@ -3073,6 +3074,56 @@ def cli(self):
args = parser.parse_args()
args.func(args, self)

@provide_session
def create_dagrun(self,
run_id,
execution_date,
state,
start_date=None,
external_trigger=False,
conf=None,
session=None):
"""
Creates a dag run from this dag including the tasks associated with this dag. Returns the dag
run.
:param run_id: defines the the run id for this dag run
:type run_id: string
:param execution_date: the execution date of this dag run
:type execution_date: datetime
:param state: the state of the dag run
:type state: State
:param start_date: the date this dag run should be evaluated
:type state_date: datetime
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param session: database session
:type session: Session
"""
run = DagRun(
dag_id=self.dag_id,
run_id=run_id,
execution_date=execution_date,
start_date=start_date,
external_trigger=external_trigger,
conf=conf,
state=state
)
session.add(run)

# create the associated taskinstances
# state is None at the moment of creation
for task in self.tasks:
if task.adhoc:
continue

ti = TaskInstance(task, execution_date)
session.add(ti)

session.commit()

run.refresh_from_db()
return run


class Chart(Base):
__tablename__ = "chart"
Expand Down Expand Up @@ -3368,6 +3419,50 @@ def __repr__(self):
def id_for_date(klass, date, prefix=ID_FORMAT_PREFIX):
return prefix.format(date.isoformat()[:19])

@provide_session
def refresh_from_db(self, session=None):
"""
Reloads the current dagrun from the database
:param session: database session
"""
DR = DagRun

dr = session.query(DR).filter(
DR.dag_id == self.dag_id,
DR.execution_date == self.execution_date,
DR.run_id == self.run_id
).one()
if dr:
self.id = dr.id
self.state = dr.state

@staticmethod
@provide_session
def find(dag_id, run_id=None, state=None, external_trigger=None, session=None):
"""
Returns a set of dag runs for the given search criteria.
:param run_id: defines the the run id for this dag run
:type run_id: string
:param state: the state of the dag run
:type state: State
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param session: database session
:type session: Session
"""
DR = DagRun

qry = session.query(DR).filter(DR.dag_id == dag_id)
if run_id:
qry = qry.filter(DR.run_id == run_id)
if state:
qry = qry.filter(DR.state == state)
if external_trigger:
qry = qry.filter(DR.external_trigger == external_trigger)
dr = qry.all()

return dr


class Pool(Base):
__tablename__ = "slot_pool"
Expand Down
33 changes: 11 additions & 22 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_schedule_dag_no_previous_runs(self):
.format(dag_run.execution_date))
assert dag_run.state == State.RUNNING
assert dag_run.external_trigger == False
dag.clear()

def test_schedule_dag_fake_scheduled_previous(self):
"""
Expand All @@ -131,14 +132,10 @@ def test_schedule_dag_fake_scheduled_previous(self):
owner='Also fake',
start_date=DEFAULT_DATE))
scheduler = jobs.SchedulerJob(test_mode=True)
trigger = models.DagRun(
dag_id=dag.dag_id,
run_id=models.DagRun.id_for_date(DEFAULT_DATE),
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
external_trigger=True)
settings.Session().add(trigger)
settings.Session().commit()
dag.create_dagrun(run_id=models.DagRun.id_for_date(DEFAULT_DATE),
execution_date=DEFAULT_DATE,
state=State.SUCCESS,
external_trigger=True)
dag_run = scheduler.schedule_dag(dag)
assert dag_run is not None
assert dag_run.dag_id == dag.dag_id
Expand Down Expand Up @@ -166,6 +163,7 @@ def test_schedule_dag_once(self):

assert dag_run is not None
assert dag_run2 is None
dag.clear()

def test_schedule_dag_start_end_dates(self):
"""
Expand All @@ -180,16 +178,13 @@ def test_schedule_dag_start_end_dates(self):
start_date=start_date,
end_date=end_date,
schedule_interval=delta)
dag.add_task(models.BaseOperator(task_id='faketastic',
owner='Also fake'))

# Create and schedule the dag runs
dag_runs = []
scheduler = jobs.SchedulerJob(test_mode=True)
for i in range(runs):
date = dag.start_date + i * delta
task = models.BaseOperator(task_id='faketastic__%s' % i,
owner='Also fake',
start_date=date)
dag.task_dict[task.task_id] = task
dag_runs.append(scheduler.schedule_dag(dag))

additional_dag_run = scheduler.schedule_dag(dag)
Expand Down Expand Up @@ -219,19 +214,12 @@ def test_schedule_dag_no_end_date_up_to_today_only(self):
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_no_end_date_up_to_today_only',
start_date=start_date,
schedule_interval=delta)
dag.add_task(models.BaseOperator(task_id='faketastic',
owner='Also fake'))

dag_runs = []
scheduler = jobs.SchedulerJob(test_mode=True)
for i in range(runs):
# Create the DagRun
date = dag.start_date + i * delta
task = models.BaseOperator(task_id='faketastic__%s' % i,
owner='Also fake',
start_date=date)

dag.task_dict[task.task_id] = task

# Schedule the DagRun
dag_run = scheduler.schedule_dag(dag)
dag_runs.append(dag_run)

Expand Down Expand Up @@ -730,6 +718,7 @@ def test_trigger_dag(self):
cli.trigger_dag,
self.parser.parse_args([
'trigger_dag', 'example_bash_operator',
'--run_id', 'trigger_dag_xxx',
'-c', 'NOT JSON'])
)

Expand Down
6 changes: 3 additions & 3 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def test_dagrun_root_fail(self):

def test_dagrun_deadlock(self):
"""
Deadlocked DagRun is marked a failure
Do not deadlock
Test that a deadlocked dagrun is marked as a failure by having
Test that a dagrun is marked as a running by having
depends_on_past and an execution_date after the start_date
"""
self.evaluate_dagrun(
Expand All @@ -263,7 +263,7 @@ def test_dagrun_deadlock(self):
'test_depends_on_past': None,
'test_depends_on_past_2': None,
},
dagrun_state=State.FAILED,
dagrun_state=State.RUNNING,
advance_execution_date=True)

def test_scheduler_pooled_tasks(self):
Expand Down

0 comments on commit cb56289

Please sign in to comment.