Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize analyze_outcomes.py #8560

190 changes: 105 additions & 85 deletions tests/scripts/analyze_outcomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,36 @@
import re
import subprocess
import os
import typing

import check_test_cases


# `ComponentOutcomes` is a named tuple which is defined as:
# ComponentOutcomes(
# successes = {
# "<suite_case>",
# ...
# },
# failures = {
# "<suite_case>",
# ...
# }
# )
# suite_case = "<suite>;<case>"
ComponentOutcomes = typing.NamedTuple('ComponentOutcomes',
[('successes', typing.Set[str]),
('failures', typing.Set[str])])

# `Outcomes` is a representation of the outcomes file,
# which defined as:
# Outcomes = {
# "<component>": ComponentOutcomes,
# ...
# }
Outcomes = typing.Dict[str, ComponentOutcomes]


class Results:
"""Process analysis results."""

Expand All @@ -40,35 +67,12 @@ def warning(self, fmt, *args, **kwargs):
def _print_line(fmt, *args, **kwargs):
sys.stderr.write((fmt + '\n').format(*args, **kwargs))

class TestCaseOutcomes:
"""The outcomes of one test case across many configurations."""
# pylint: disable=too-few-public-methods

def __init__(self):
# Collect a list of witnesses of the test case succeeding or failing.
# Currently we don't do anything with witnesses except count them.
# The format of a witness is determined by the read_outcome_file
# function; it's the platform and configuration joined by ';'.
self.successes = []
self.failures = []

def hits(self):
"""Return the number of times a test case has been run.

This includes passes and failures, but not skips.
"""
return len(self.successes) + len(self.failures)

def execute_reference_driver_tests(results: Results, ref_component, driver_component, \
outcome_file):
def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \
outcome_file: str) -> None:
"""Run the tests specified in ref_component and driver_component. Results
are stored in the output_file and they will be used for the following
coverage analysis"""
# If the outcome file already exists, we assume that the user wants to
# perform the comparison analysis again without repeating the tests.
if os.path.exists(outcome_file):
results.info("Outcome file ({}) already exists. Tests will be skipped.", outcome_file)
return
results.new_section("Test {} and {}", ref_component, driver_component)

shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \
" " + ref_component + " " + driver_component
Expand All @@ -78,49 +82,63 @@ def execute_reference_driver_tests(results: Results, ref_component, driver_compo
if ret_val != 0:
results.error("failed to run reference/driver components")

def analyze_coverage(results, outcomes, allow_list, full_coverage):
def analyze_coverage(results: Results, outcomes: Outcomes,
allow_list: typing.List[str], full_coverage: bool) -> None:
"""Check that all available test cases are executed at least once."""
available = check_test_cases.collect_available_test_cases()
for key in available:
hits = outcomes[key].hits() if key in outcomes else 0
if hits == 0 and key not in allow_list:
for suite_case in available:
hit = any(suite_case in comp_outcomes.successes or
suite_case in comp_outcomes.failures
for comp_outcomes in outcomes.values())

if not hit and suite_case not in allow_list:
if full_coverage:
results.error('Test case not executed: {}', key)
results.error('Test case not executed: {}', suite_case)
else:
results.warning('Test case not executed: {}', key)
elif hits != 0 and key in allow_list:
results.warning('Test case not executed: {}', suite_case)
elif hit and suite_case in allow_list:
# Test Case should be removed from the allow list.
if full_coverage:
results.error('Allow listed test case was executed: {}', key)
results.error('Allow listed test case was executed: {}', suite_case)
else:
results.warning('Allow listed test case was executed: {}', key)
results.warning('Allow listed test case was executed: {}', suite_case)

def name_matches_pattern(name, str_or_re):
def name_matches_pattern(name: str, str_or_re) -> bool:
"""Check if name matches a pattern, that may be a string or regex.
- If the pattern is a string, name must be equal to match.
- If the pattern is a regex, name must fully match.
"""
# The CI's python is too old for re.Pattern
#if isinstance(str_or_re, re.Pattern):
if not isinstance(str_or_re, str):
return str_or_re.fullmatch(name)
return str_or_re.fullmatch(name) is not None
else:
return str_or_re == name

def analyze_driver_vs_reference(results: Results, outcomes,
component_ref, component_driver,
ignored_suites, ignored_tests=None):
"""Check that all tests executed in the reference component are also
executed in the corresponding driver component.
def analyze_driver_vs_reference(results: Results, outcomes: Outcomes,
component_ref: str, component_driver: str,
ignored_suites: typing.List[str], ignored_tests=None) -> None:
"""Check that all tests passing in the reference component are also
passing in the corresponding driver component.
Skip:
- full test suites provided in ignored_suites list
- only some specific test inside a test suite, for which the corresponding
output string is provided
"""
seen_reference_passing = False
for key in outcomes:
# key is like "test_suite_foo.bar;Description of test case"
(full_test_suite, test_string) = key.split(';')
ref_outcomes = outcomes.get("component_" + component_ref)
driver_outcomes = outcomes.get("component_" + component_driver)

if ref_outcomes is None or driver_outcomes is None:
results.error("required components are missing: bad outcome file?")
return

if not ref_outcomes.successes:
results.error("no passing test in reference component: bad outcome file?")
return

for suite_case in ref_outcomes.successes:
# suite_case is like "test_suite_foo.bar;Description of test case"
(full_test_suite, test_string) = suite_case.split(';')
test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name

# Immediately skip fully-ignored test suites
Expand All @@ -136,67 +154,48 @@ def analyze_driver_vs_reference(results: Results, outcomes,
if name_matches_pattern(test_string, str_or_re):
ignored = True

# Search for tests that run in reference component and not in driver component
driver_test_passed = False
reference_test_passed = False
for entry in outcomes[key].successes:
if component_driver in entry:
driver_test_passed = True
if component_ref in entry:
reference_test_passed = True
seen_reference_passing = True
if reference_test_passed and not driver_test_passed and not ignored:
results.error("PASS -> SKIP/FAIL: {}", key)
if ignored and driver_test_passed:
results.error("uselessly ignored: {}", key)

if not seen_reference_passing:
results.error("no passing test in reference component: bad outcome file?")
if not ignored and not suite_case in driver_outcomes.successes:
results.error("PASS -> SKIP/FAIL: {}", suite_case)
if ignored and suite_case in driver_outcomes.successes:
results.error("uselessly ignored: {}", suite_case)

def analyze_outcomes(results: Results, outcomes, args):
def analyze_outcomes(results: Results, outcomes: Outcomes, args) -> None:
"""Run all analyses on the given outcome collection."""
analyze_coverage(results, outcomes, args['allow_list'],
args['full_coverage'])

def read_outcome_file(outcome_file):
def read_outcome_file(outcome_file: str) -> Outcomes:
"""Parse an outcome file and return an outcome collection.

An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects.
The keys are the test suite name and the test case description, separated
by a semicolon.
"""
"""
outcomes = {}
with open(outcome_file, 'r', encoding='utf-8') as input_file:
for line in input_file:
(platform, config, suite, case, result, _cause) = line.split(';')
key = ';'.join([suite, case])
setup = ';'.join([platform, config])
if key not in outcomes:
outcomes[key] = TestCaseOutcomes()
(_platform, component, suite, case, result, _cause) = line.split(';')
# Note that `component` is not unique. If a test case passes on Linux
# and fails on FreeBSD, it'll end up in both the successes set and
# the failures set.
suite_case = ';'.join([suite, case])
if component not in outcomes:
outcomes[component] = ComponentOutcomes(set(), set())
if result == 'PASS':
outcomes[key].successes.append(setup)
outcomes[component].successes.add(suite_case)
elif result == 'FAIL':
outcomes[key].failures.append(setup)
outcomes[component].failures.add(suite_case)

return outcomes

def do_analyze_coverage(results: Results, outcome_file, args):
def do_analyze_coverage(results: Results, outcomes: Outcomes, args) -> None:
"""Perform coverage analysis."""
results.new_section("Analyze coverage")
outcomes = read_outcome_file(outcome_file)
analyze_outcomes(results, outcomes, args)

def do_analyze_driver_vs_reference(results: Results, outcome_file, args):
def do_analyze_driver_vs_reference(results: Results, outcomes: Outcomes, args) -> None:
"""Perform driver vs reference analyze."""
results.new_section("Analyze driver {} vs reference {}",
args['component_driver'], args['component_ref'])

execute_reference_driver_tests(results, args['component_ref'], \
args['component_driver'], outcome_file)

ignored_suites = ['test_suite_' + x for x in args['ignored_suites']]

outcomes = read_outcome_file(outcome_file)

analyze_driver_vs_reference(results, outcomes,
args['component_ref'], args['component_driver'],
ignored_suites, args['ignored_tests'])
Expand Down Expand Up @@ -493,10 +492,31 @@ def main():

KNOWN_TASKS['analyze_coverage']['args']['full_coverage'] = options.full_coverage

# If the outcome file exists, parse it once and share the result
# among tasks to improve performance.
# Otherwise, it will be generated by execute_reference_driver_tests.
if not os.path.exists(options.outcomes):
if len(tasks_list) > 1:
sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n")
sys.exit(2)

task_name = tasks_list[0]
task = KNOWN_TASKS[task_name]
if task['test_function'] != do_analyze_driver_vs_reference: # pylint: disable=comparison-with-callable
sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name))
sys.exit(2)

execute_reference_driver_tests(main_results,
task['args']['component_ref'],
task['args']['component_driver'],
options.outcomes)

outcomes = read_outcome_file(options.outcomes)

for task in tasks_list:
test_function = KNOWN_TASKS[task]['test_function']
test_args = KNOWN_TASKS[task]['args']
test_function(main_results, options.outcomes, test_args)
test_function(main_results, outcomes, test_args)

main_results.info("Overall results: {} warnings and {} errors",
main_results.warning_count, main_results.error_count)
Expand Down