Skip to content

Commit

Permalink
[AIRFLOW-881] Check if SubDagOperator is in DAG context manager
Browse files Browse the repository at this point in the history
When initializing a SubDagOperator, the `dag`
param should not be
required if it is within a DAG context manager. So
we check if that
is the case and use that as the parent DAG if
found (and `dag` param
is not specified).

Closes apache#2087 from dhuang/AIRFLOW-881
  • Loading branch information
dhuang authored and bolkedebruin committed Feb 19, 2017
1 parent a279be6 commit 0ed608d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
8 changes: 5 additions & 3 deletions airflow/operators/subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ def __init__(
:param dag: the parent DAG
:type subdag: airflow.DAG
"""
if 'dag' not in kwargs:
raise AirflowException("Please pass in the `dag` param")
dag = kwargs['dag']
import airflow.models
dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG
if not dag:
raise AirflowException('Please pass in the `dag` param or call '
'within a DAG context manager')
session = kwargs.pop('session')
super(SubDagOperator, self).__init__(*args, **kwargs)

Expand Down
11 changes: 11 additions & 0 deletions tests/operators/subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ def test_subdag_name(self):
AirflowException,
SubDagOperator, task_id='test', dag=dag, subdag=subdag_bad3)

def test_subdag_in_context_manager(self):
"""
Creating a sub DAG within a main DAG's context manager
"""
with DAG('parent', default_args=default_args) as dag:
subdag = DAG('parent.test', default_args=default_args)
op = SubDagOperator(task_id='test', subdag=subdag)

self.assertEqual(op.dag, dag)
self.assertEqual(op.subdag, subdag)

def test_subdag_pools(self):
"""
Subdags and subdag tasks can't both have a pool with 1 slot
Expand Down

0 comments on commit 0ed608d

Please sign in to comment.