Skip to content

Commit

Permalink
[AIRFLOW-2517] backfill support passing key values through CLI
Browse files Browse the repository at this point in the history
### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "\[AIRFLOW-XXX\] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-2517
    - In case you are fixing a typo in the
documentation you can prepend your commit with
\[AIRFLOW-XXX\], code changes always need a JIRA
issue.

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
In backfill, we can provide key-value pairs
through CLI and those pairs can be accessed
through macros. This is just like the way
`trigger_dag -c` works [1].

Let's walk through an example.

In the airflow CLI we specify a key-value pair.
```
airflow backfill hello_world -s 2018-02-01 -e
2018-02-08 -c '{"text": "some text"}'
```

In the DAG file, I have a `BashOperator` that
contains a template command and I want
{{ dag_run.conf.text }} resolves to the text I
passed in CLI.
```python
templated_command = """
    echo "ds = {{ ds }}"
    echo "prev_ds = {{
macros.datetime.strftime(prev_execution_date,
"%Y-%m-%d") }}"
    echo "next_ds = {{
macros.datetime.strftime(next_execution_date,
"%Y-%m-%d") }}"
    echo "text_through_conf = {{ dag_run.conf.text }}"
"""

bash_operator = BashOperator(
    task_id='bash_task',
    bash_command=templated_command,
    dag=dag
    )
```
Rendered Bash command in Airflow UI.
<img width="1246" alt="screen shot 2018-05-22 at 4
33 59 pm" src="https://user-images.githubuserconte
nt.com/6065051/40395666-04c41574-5dde-11e8-9ec2-c0
312b7203e6.png">

[1]
https://airflow.apache.org/cli.html#trigger_dag

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

### Documentation
- [x] In case of new functionality, my PR adds
documentation that describes how to use it.
    - When adding new operators/hooks/sensors, the
autoclass documentation generation needs to be
added.

### Code Quality
- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes apache#3406 from milton0825/backfill-support-conf
  • Loading branch information
milton0825 authored and mistercrunch committed May 30, 2018
1 parent 7945854 commit 3ed25a9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
7 changes: 6 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ def backfill(args, dag=None):
task_regex=args.task_regex,
include_upstream=not args.ignore_dependencies)

run_conf = None
if args.conf:
run_conf = json.loads(args.conf)

if args.dry_run:
print("Dry run of DAG {0} on {1}".format(args.dag_id,
args.start_date))
Expand All @@ -200,6 +204,7 @@ def backfill(args, dag=None):
pool=args.pool,
delay_on_limit_secs=args.delay_on_limit,
verbose=args.verbose,
conf=run_conf,
)


Expand Down Expand Up @@ -1678,7 +1683,7 @@ class CLIFactory(object):
'dag_id', 'task_regex', 'start_date', 'end_date',
'mark_success', 'local', 'donot_pickle',
'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose',
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf'
)
}, {
'func': list_tasks,
Expand Down
5 changes: 4 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,7 @@ def __init__(
pool=None,
delay_on_limit_secs=1.0,
verbose=False,
conf=None,
*args, **kwargs):
self.dag = dag
self.dag_id = dag.dag_id
Expand All @@ -1968,6 +1969,7 @@ def __init__(
self.pool = pool
self.delay_on_limit_secs = delay_on_limit_secs
self.verbose = verbose
self.conf = conf
super(BackfillJob, self).__init__(*args, **kwargs)

def _update_counters(self, ti_status):
Expand Down Expand Up @@ -2090,7 +2092,8 @@ def _get_dag_run(self, run_date, session=None):
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False,
session=session
session=session,
conf=self.conf,
)

# set required transient field
Expand Down
4 changes: 4 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3973,6 +3973,7 @@ def run(
pool=None,
delay_on_limit_secs=1.0,
verbose=False,
conf=None,
):
"""
Runs the DAG.
Expand Down Expand Up @@ -4001,6 +4002,8 @@ def run(
:type delay_on_limit_secs: float
:param verbose: Make logging output more verbose
:type verbose: boolean
:param conf: user defined dictionary passed from CLI
:type conf: dict
"""
from airflow.jobs import BackfillJob
if not executor and local:
Expand All @@ -4019,6 +4022,7 @@ def run(
pool=pool,
delay_on_limit_secs=delay_on_limit_secs,
verbose=verbose,
conf=conf,
)
job.run()

Expand Down
28 changes: 28 additions & 0 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from __future__ import unicode_literals

import datetime
import json
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -186,6 +187,33 @@ def test_backfill_examples(self):
ignore_first_depends_on_past=True)
job.run()

def test_backfill_conf(self):
dag = DAG(
dag_id='test_backfill_conf',
start_date=DEFAULT_DATE,
schedule_interval='@daily')

with dag:
DummyOperator(
task_id='op',
dag=dag)

dag.clear()

executor = TestExecutor(do_update=True)

conf = json.loads("""{"key": "value"}""")
job = BackfillJob(dag=dag,
executor=executor,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
conf=conf)
job.run()

dr = DagRun.find(dag_id='test_backfill_conf')

self.assertEqual(conf, dr[0].conf)

def test_backfill_ordered_concurrent_execute(self):
dag = DAG(
dag_id='test_backfill_ordered_concurrent_execute',
Expand Down

0 comments on commit 3ed25a9

Please sign in to comment.