Skip to content

Commit

Permalink
[AIRFLOW-2526] dag_run.conf can override params
Browse files Browse the repository at this point in the history
Make sure you have checked _all_ steps below.

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "\[AIRFLOW-XXX\] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-2526
    - In case you are fixing a typo in the
documentation you can prepend your commit with
\[AIRFLOW-XXX\], code changes always need a JIRA
issue.

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
params can be overridden by the dictionary passed
through `airflow backfill -c`

```
templated_command = """
    echo "text = {{ params.text }}"
"""

bash_operator = BashOperator(
    task_id='bash_task',
    bash_command=templated_command,
    dag=dag,
    params= {
        "text" : "normal processing"
    })
```

In daily processing it prints:
```
normal processing
```

In backfill processing `airflow trigger_dag -c
"{"text": "override success"}"`, it prints
```
override success
```

### Tests
- [ ] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

### Documentation
- [x] In case of new functionality, my PR adds
documentation that describes how to use it.
    - When adding new operators/hooks/sensors, the
autoclass documentation generation needs to be
added.

### Code Quality
- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes apache#3422 from milton0825/params-overridden-
through-cli
  • Loading branch information
milton0825 authored and mistercrunch committed Jun 1, 2018
1 parent 8a5e513 commit 2800c8e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 1 deletion.
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ enable_xcom_pickling = True
# it has to cleanup after it is sent a SIGTERM, before it is SIGKILLED
killed_task_cleanup_time = 60

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
Expand Down
7 changes: 7 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,9 @@ def get_template_context(self, session=None):
if task.params:
params.update(task.params)

if configuration.getboolean('core', 'dag_run_conf_overrides_params'):
self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)

class VariableAccessor:
"""
Wrapper around Variable. This way you can get variables in templates by using
Expand Down Expand Up @@ -1862,6 +1865,10 @@ def __repr__(self):
'outlets': task.outlets,
}

def overwrite_params_with_dag_run_conf(self, params, dag_run):
if dag_run and dag_run.conf:
params.update(dag_run.conf)

def render_templates(self):
task = self.task
jinja_context = self.get_template_context()
Expand Down
4 changes: 3 additions & 1 deletion docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ Variable Description
``{{ end_date }}`` same as ``{{ ds }}``
``{{ latest_date }}`` same as ``{{ ds }}``
``{{ ti }}`` same as ``{{ task_instance }}``
``{{ params }}`` a reference to the user-defined params dictionary
``{{ params }}`` a reference to the user-defined params dictionary which can be overridden by
the dictionary passed through ``trigger_dag -c`` if you enabled
``dag_run_conf_overrides_params` in ``airflow.cfg``
``{{ var.value.my_var }}`` global defined variables represented as a dictionary
``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary
with deserialized JSON object, append the path to the
Expand Down
31 changes: 31 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
from airflow.jobs import BackfillJob
from airflow.models import DAG, TaskInstance as TI
from airflow.models import DagRun
from airflow.models import State as ST
from airflow.models import DagModel, DagStat
from airflow.models import clear_task_instances
Expand Down Expand Up @@ -2030,6 +2031,36 @@ def test_mark_success_url(self):
self.assertEqual(d['task_id'][0], 'op')
self.assertEqual(pendulum.parse(d['execution_date'][0]), now)

def test_overwrite_params_with_dag_run_conf(self):
task = DummyOperator(task_id='op')
ti = TI(task=task, execution_date=datetime.datetime.now())
dag_run = DagRun()
dag_run.conf = {"override": True}
params = {"override": False}

ti.overwrite_params_with_dag_run_conf(params, dag_run)

self.assertEqual(True, params["override"])

def test_overwrite_params_with_dag_run_none(self):
task = DummyOperator(task_id='op')
ti = TI(task=task, execution_date=datetime.datetime.now())
params = {"override": False}

ti.overwrite_params_with_dag_run_conf(params, None)

self.assertEqual(False, params["override"])

def test_overwrite_params_with_dag_run_conf_none(self):
task = DummyOperator(task_id='op')
ti = TI(task=task, execution_date=datetime.datetime.now())
params = {"override": False}
dag_run = DagRun()

ti.overwrite_params_with_dag_run_conf(params, dag_run)

self.assertEqual(False, params["override"])


class ClearTasksTest(unittest.TestCase):
def test_clear_task_instances(self):
Expand Down

0 comments on commit 2800c8e

Please sign in to comment.