Skip to content

Commit

Permalink
[AIRFLOW-7000] Allow passing in env var JSON dict in task_test (apach…
Browse files Browse the repository at this point in the history
…e#7639)

Co-authored-by: Haim Grosman <[email protected]>
  • Loading branch information
KevinYang21 and Haim Grosman authored Mar 18, 2020
1 parent 029c84e commit cc5376d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""Command-line interface"""

import argparse
import json
import os
import textwrap
from argparse import RawTextHelpFormatter
Expand Down Expand Up @@ -459,6 +460,10 @@ class CLIFactory:
action="store_true",
help="Open debugger on uncaught exception",
),
'env_vars': Arg(
("--env-vars", ),
help="Set env var in both parsing time and runtime for each of entry supplied in a JSON dict",
type=json.loads),
# connections
'conn_id': Arg(
('conn_id',),
Expand Down Expand Up @@ -734,7 +739,7 @@ class CLIFactory:
"dependencies or recording its state in the database"),
'args': (
'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
'task_params', 'post_mortem'),
'task_params', 'post_mortem', 'env_vars'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_states_for_dag_run'),
Expand Down
5 changes: 5 additions & 0 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ def task_test(args, dag=None):
if not already_has_stream_handler:
logging.getLogger('airflow.task').propagate = True

env_vars = {'AIRFLOW_TEST_MODE': 'True'}
if args.env_vars:
env_vars.update(args.env_vars)
os.environ.update(env_vars)

dag = dag or get_dag(args.subdir, args.dag_id)

task = dag.get_task(task_id=args.task_id)
Expand Down
19 changes: 19 additions & 0 deletions airflow/example_dags/example_passing_params_via_test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

"""Example DAG demonstrating the usage of the params arguments in templated arguments."""

import os
from datetime import timedelta

from airflow import DAG
Expand Down Expand Up @@ -70,4 +71,22 @@ def my_py_command(test_mode, params):
dag=dag,
)


def print_env_vars(test_mode):
"""
Print out the "foo" param passed in via
`airflow tasks test example_passing_params_via_test_command env_var_test_task <date>
--env-vars '{"foo":"bar"}'`
"""
if test_mode:
print("foo={}".format(os.environ.get('foo')))
print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE')))


env_var_test_task = PythonOperator(
task_id='env_var_test_task',
python_callable=print_env_vars,
dag=dag
)

run_this >> also_run_this
9 changes: 9 additions & 0 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ def test_cli_test_with_params(self):
'tasks', 'test', 'example_passing_params_via_test_command', 'also_run_this',
'--task-params', '{"foo":"bar"}', DEFAULT_DATE.isoformat()]))

def test_cli_test_with_env_vars(self):
with redirect_stdout(io.StringIO()) as stdout:
task_command.task_test(self.parser.parse_args([
'tasks', 'test', 'example_passing_params_via_test_command', 'env_var_test_task',
'--env-vars', '{"foo":"bar"}', DEFAULT_DATE.isoformat()]))
output = stdout.getvalue()
self.assertIn('foo=bar', output)
self.assertIn('AIRFLOW_TEST_MODE=True', output)

def test_cli_run(self):
task_command.task_run(self.parser.parse_args([
'tasks', 'run', 'example_bash_operator', 'runme_0', '--local',
Expand Down

0 comments on commit cc5376d

Please sign in to comment.