Skip to content

Commit

Permalink
Improve test coverage of task_command.py FIXES: apache#15524 (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vikramcse authored May 14, 2021
1 parent 37d549b commit 51e54cb
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import logging
import os
import re
import unittest
from contextlib import redirect_stdout
from datetime import datetime, timedelta
Expand Down Expand Up @@ -226,6 +227,47 @@ def test_cli_run_mutually_exclusive(self):
)
)

def test_task_render(self):
"""
tasks render should render and displays templated fields for a given task
"""
with redirect_stdout(io.StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(['tasks', 'render', 'tutorial', 'templated', DEFAULT_DATE.isoformat()])
)

output = stdout.getvalue()

assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output
assert 'echo "Parameter I passed in"' in output

def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
"""
tasks run should return an AirflowException when invalid pickle_id is passed
"""
pickle_id = 'pickle_id'

with pytest.raises(
AirflowException,
match=re.escape("You cannot use the --pickle option when using DAG.cli() method."),
):
dag = self.dagbag.get_dag('test_run_ignores_all_dependencies')
task_command.task_run(
self.parser.parse_args(
[
'tasks',
'run',
'example_bash_operator',
'runme_0',
DEFAULT_DATE.isoformat(),
'--pickle',
pickle_id,
]
),
dag,
)

def test_task_state(self):
task_command.task_state(
self.parser.parse_args(
Expand Down Expand Up @@ -271,6 +313,25 @@ def test_task_states_for_dag_run(self):
'end_date': ti_end.isoformat(),
}

def test_task_states_for_dag_run_when_dag_run_not_exists(self):
"""
task_states_for_dag_run should return an AirflowException when invalid dag id is passed
"""
with pytest.raises(AirflowException, match="DagRun does not exist."):
default_date2 = timezone.make_aware(datetime(2016, 1, 9))
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
'tasks',
'states-for-dag-run',
'not_exists_dag',
default_date2.isoformat(),
'--output',
"json",
]
)
)

def test_subdag_clear(self):
args = self.parser.parse_args(['tasks', 'clear', 'example_subdag_operator', '--yes'])
task_command.task_clear(args)
Expand Down

0 comments on commit 51e54cb

Please sign in to comment.