From cb04f3786e6763948a4353d098ef807e811529da Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Sat, 7 Nov 2020 10:36:29 -0800 Subject: [PATCH] refactoring scan, and introducing transformers --- detect_secrets/core/plugins/util.py | 35 +-- detect_secrets/core/scan.py | 198 ++++++++++++++ detect_secrets/core/secrets_collection.py | 182 +------------ detect_secrets/plugins/common/filetype.py | 48 ---- .../plugins/common/ini_file_parser.py | 156 +++++------ .../plugins/common/yaml_file_parser.py | 159 ----------- detect_secrets/settings.py | 4 +- detect_secrets/transformers/__init__.py | 32 +++ detect_secrets/transformers/base.py | 38 +++ detect_secrets/transformers/config.py | 215 +++++++++++++++ detect_secrets/transformers/exceptions.py | 3 + detect_secrets/transformers/yaml.py | 252 ++++++++++++++++++ detect_secrets/util/filetype.py | 37 +++ detect_secrets/util/importlib.py | 36 +++ detect_secrets/util/inject.py | 37 +++ tests/core/secrets_collection_test.py | 8 +- tests/transformers/config_transformer_test.py | 147 ++++++++++ tests/transformers/import_test.py | 13 + tests/transformers/yaml_transformer_test.py | 149 +++++++++++ 19 files changed, 1250 insertions(+), 499 deletions(-) create mode 100644 detect_secrets/core/scan.py delete mode 100644 detect_secrets/plugins/common/filetype.py delete mode 100644 detect_secrets/plugins/common/yaml_file_parser.py create mode 100644 detect_secrets/transformers/__init__.py create mode 100644 detect_secrets/transformers/base.py create mode 100644 detect_secrets/transformers/config.py create mode 100644 detect_secrets/transformers/exceptions.py create mode 100644 detect_secrets/transformers/yaml.py create mode 100644 detect_secrets/util/filetype.py create mode 100644 detect_secrets/util/importlib.py create mode 100644 detect_secrets/util/inject.py create mode 100644 tests/transformers/config_transformer_test.py create mode 100644 tests/transformers/import_test.py create mode 100644 tests/transformers/yaml_transformer_test.py diff --git a/detect_secrets/core/plugins/util.py b/detect_secrets/core/plugins/util.py index 61321a4af..5ea3ac401 100644 --- a/detect_secrets/core/plugins/util.py +++ b/detect_secrets/core/plugins/util.py @@ -1,15 +1,14 @@ import inspect -import pkgutil from abc import abstractproperty from functools import lru_cache -from importlib import import_module -from types import ModuleType +from typing import Any from typing import Dict from typing import Type from typing import TypeVar from ... import plugins from ...plugins.base import BasePlugin +from ...util.importlib import import_types_from_module Plugin = TypeVar('Plugin', bound=BasePlugin) @@ -18,35 +17,17 @@ @lru_cache(maxsize=1) def get_mapping_from_secret_type_to_class() -> Dict[str, Type[Plugin]]: # TODO: custom_plugin_paths - modules = [ - module - for _, module, is_package in pkgutil.walk_packages( - plugins.__path__, prefix=f'{plugins.__name__}.', # type: ignore # mypy issue #1422 - ) - if not is_package - ] - output = {} - - for module_path in modules: - module = import_module(module_path) - attributes = [ - getattr(module, attribute) - for attribute in dir(module) - if ( - not attribute.startswith('_') - and _is_valid_plugin(module, attribute) - ) - ] - - for attribute in attributes: - output[attribute.secret_type] = attribute + for plugin_class in import_types_from_module( + plugins, + filter=lambda x: not _is_valid_plugin(x), + ): + output[plugin_class.secret_type] = plugin_class return output -def _is_valid_plugin(module: ModuleType, name: str) -> bool: - attribute = getattr(module, name) +def _is_valid_plugin(attribute: Any) -> bool: return ( inspect.isclass(attribute) and issubclass(attribute, BasePlugin) diff --git a/detect_secrets/core/scan.py b/detect_secrets/core/scan.py new file mode 100644 index 000000000..668b27a56 --- /dev/null +++ b/detect_secrets/core/scan.py @@ -0,0 +1,198 @@ +from functools import lru_cache +from importlib import import_module +from typing import Generator +from typing import IO +from typing import List +from typing import Optional +from typing import Tuple + +from . import plugins +from ..settings import get_settings +from ..transformers import get_transformers +from ..transformers import ParsingError +from ..types import SelfAwareCallable +from ..util.inject import get_injectable_variables +from ..util.inject import inject_variables_into_function +from .log import log +from .plugins.util import Plugin +from .potential_secret import PotentialSecret + + +def scan_file(filename: str) -> Generator[PotentialSecret, None, None]: + if not get_plugins(): # pragma: no cover + log.warning('No plugins to scan with!') + return + + if _filter_files(filename): + return + + try: + with open(filename) as f: + log.info(f'Checking file: {filename}') + + lines = _get_transformed_file(f) + if not lines: + lines = f.readlines() + + has_secret = False + for secret in _process_line_based_plugins( + lines=list(enumerate(lines, 1)), + filename=f.name, + ): + has_secret = True + yield secret + + if has_secret: + return + + # Only if no secrets, then use eager transformers + f.seek(0) + lines = _get_transformed_file(f, use_eager_transformers=True) + if not lines: + return + + yield from _process_line_based_plugins( + lines=list(enumerate(lines, 1)), + filename=f.name, + ) + except IOError: + log.warning(f'Unable to open file: {filename}') + + +def scan_diff(diff: str) -> Generator[PotentialSecret, None, None]: + """ + :raises: ImportError + """ + # Local imports, so that we don't need to require unidiff for versions of + # detect-secrets that don't use it. + from unidiff import PatchSet + + if not get_plugins(): # pragma: no cover + log.warn('No plugins to scan with!') + return + + patch_set = PatchSet.from_string(diff) + for patch_file in patch_set: + filename = patch_file.path + if _filter_files(filename): + continue + + lines = [ + (line.target_line_no, line.value) + for chunk in patch_file + # target_lines refers to incoming (new) changes + for line in chunk.target_lines() + if line.is_added + ] + + yield from _process_line_based_plugins(lines, filename=filename) + + +def _filter_files(filename: str) -> bool: + """Returns True if successfully filtered.""" + for filter_fn in get_filters(): + if inject_variables_into_function(filter_fn, filename=filename): + log.info(f'Skipping "{filename}" due to "{filter_fn.path}"') + return True + + return False + + +def _get_transformed_file(file: IO, use_eager_transformers: bool = False) -> Optional[List[str]]: + for transformer in get_transformers(): + if not transformer.should_parse_file(file.name): + continue + + if use_eager_transformers != transformer.is_eager: + continue + + try: + return transformer.parse_file(file) + except ParsingError: + pass + finally: + file.seek(0) + + return None + + +def _process_line_based_plugins( + lines: List[Tuple[int, str]], + filename: str, +) -> Generator[PotentialSecret, None, None]: + # NOTE: We iterate through lines *then* plugins, because we want to quit early if any of the + # filters return True. + for line_number, line in lines: + line = line.rstrip() + + # We apply line-specific filters, and see whether that allows us to quit early. + if any([ + inject_variables_into_function(filter_fn, filename=filename, line=line) + for filter_fn in get_filters() + ]): + continue + + for plugin in get_plugins(): + yield from _scan_line(plugin, filename, line, line_number) + + +def _scan_line( + plugin: Plugin, + filename: str, + line: str, + line_number: int, +) -> Generator[PotentialSecret, None, None]: + # NOTE: We don't apply filter functions here yet, because we don't have any filters + # that operate on (filename, line, plugin) without `secret` + try: + secrets = plugin.analyze_line(filename=filename, line=line, line_number=line_number) + except AttributeError: + return + + if not secrets: + return + + for secret in secrets: + if any([ + inject_variables_into_function( + filter_fn, + filename=secret.filename, + secret=secret.secret_value, + plugin=plugin, + line=line, + ) + for filter_fn in get_filters() + ]): + continue + + yield secret + + +@lru_cache(maxsize=1) +def get_plugins() -> List[Plugin]: + return [ + plugins.initialize.from_plugin_classname(classname) + for classname in get_settings().plugins + ] + + +@lru_cache(maxsize=1) +def get_filters() -> List[SelfAwareCallable]: + output = [] + for path, config in get_settings().filters.items(): + module_path, function_name = path.rsplit('.', 1) + try: + function = getattr(import_module(module_path), function_name) + except (ModuleNotFoundError, AttributeError): + log.warn(f'Invalid filter: {path}') + continue + + # We attach this metadata to the function itself, so that we don't need to + # compute it everytime. This will allow for dependency injection for filters. + function.injectable_variables = set(get_injectable_variables(function)) + output.append(function) + + # This is for better logging. + function.path = path + + return output diff --git a/detect_secrets/core/secrets_collection.py b/detect_secrets/core/secrets_collection.py index 591da6e58..d82e4d335 100644 --- a/detect_secrets/core/secrets_collection.py +++ b/detect_secrets/core/secrets_collection.py @@ -1,21 +1,13 @@ from collections import defaultdict -from functools import lru_cache -from importlib import import_module from typing import Any -from typing import Callable from typing import Dict from typing import Generator -from typing import IO from typing import List from typing import Optional from typing import Set from typing import Tuple -from . import plugins -from ..settings import get_settings -from ..types import SelfAwareCallable -from .log import log -from .plugins.util import Plugin +from . import scan from .potential_secret import PotentialSecret @@ -46,55 +38,22 @@ def files(self) -> Set[str]: return set(self.data.keys()) def scan_file(self, filename: str) -> None: - if not get_plugins(): # pragma: no cover - log.warning('No plugins to scan with!') - return - - # First, we filter on filename, so that we can skip whole files if we've filtered - # them out. - for filter_fn in get_filters(): - if _inject_variables(filter_fn, filename=filename): - log.info(f'Skipping "{filename}" due to "{filter_fn.path}"') - return - - try: - with open(filename) as f: - for secret in _iterate_through_secrets_in_file(f): - self[filename].add(secret) - except IOError: - log.warning(f'Unable to open file: {filename}') + for secret in scan.scan_file(filename): + self[secret.filename].add(secret) def scan_diff(self, diff: str) -> None: """ :raises: UnidiffParseError """ - if not get_plugins(): # pragma: no cover - log.warning('No plugins to scan with!') - return - - # Local imports, so that we don't need to require unidiff for versions of - # detect-secrets that don't use it. try: - from unidiff import PatchSet + for secret in scan.scan_diff(diff): + self[secret.filename].add(secret) except ImportError: # pragma: no cover raise NotImplementedError( 'SecretsCollection.scan_diff requires `unidiff` to work. Try pip ' 'installing that package, and try again.', ) - filters = get_filters() - patch_set = PatchSet.from_string(diff) - for patch_file in patch_set: - filename = patch_file.path - - for filter_fn in filters: - if _inject_variables(filter_fn, filename=filename): - log.info(f'Skipping "{filename}" due to "{filter_fn.path}"') - break - else: - for secret in _iterate_through_secrets_in_patch_file(patch_file): - self[filename].add(secret) - def trim( self, scanned_results: 'SecretsCollection', @@ -238,134 +197,3 @@ def __sub__(self, other: Any) -> 'SecretsCollection': output[filename] = self[filename] - other[filename] return output - - -@lru_cache(maxsize=1) -def get_plugins() -> List[Plugin]: - return [ - plugins.initialize.from_plugin_classname(classname) - for classname in get_settings().plugins - ] - - -@lru_cache(maxsize=1) -def get_filters() -> List[SelfAwareCallable]: - output = [] - for path, config in get_settings().filters.items(): - module_path, function_name = path.rsplit('.', 1) - try: - function = getattr(import_module(module_path), function_name) - except (ModuleNotFoundError, AttributeError): - log.warn(f'Invalid filter: {path}') - continue - - # We attach this metadata to the function itself, so that we don't need to - # compute it everytime. This will allow for dependency injection for filters. - function.injectable_variables = set(_get_injectable_variables(function)) - output.append(function) - - # This is for better logging. - function.path = path - - return output - - -def _get_injectable_variables(func: Callable) -> Tuple[str, ...]: - """ - The easiest way to understand this is to see it as an example: - >>> def func(a, b=1, *args, c, d=2, **kwargs): - ... e = 5 - >>> - >>> print(func.__code__.co_varnames) - ('a', 'b', 'c', 'd', 'args', 'kwargs', 'e') - >>> print(func.__code__.co_argcount) # `a` and `b` - 2 - >>> print(func.__code__.co_kwonlyargcount) # `c` and `d` - 2 - """ - variable_names = func.__code__.co_varnames - arg_count = func.__code__.co_argcount + func.__code__.co_kwonlyargcount - - return variable_names[:arg_count] - - -def _inject_variables(func: SelfAwareCallable, **kwargs: Any) -> Any: - variables_to_inject = set(kwargs.keys()) - values = { - key: kwargs[key] - for key in (variables_to_inject & func.injectable_variables) - } - - if set(values.keys()) != func.injectable_variables: - return - - return func(**values) - - -def _iterate_through_secrets_in_file(file: IO) -> Generator[PotentialSecret, None, None]: - log.info(f'Checking file: {file.name}') - - for secret in _process_line_based_plugins(file.readlines(), filename=file.name): - yield secret - - file.seek(0) - - for secret in _process_file_based_plugins(file): - yield secret - - -def _iterate_through_secrets_in_patch_file( - patch_file: PatchedFile, -) -> Generator[PotentialSecret, None, None]: - for secret in _process_line_based_plugins( - [ - line.value - for chunk in patch_file - # target_lines refers to incoming (new) changes - for line in chunk.target_lines() - if line.is_added - ], - filename=patch_file.path, - ): - yield secret - - -def _process_line_based_plugins( - lines: List[str], - filename: str, -) -> Generator[PotentialSecret, None, None]: - for index, line in enumerate(lines): - line = line.rstrip() - - # Next, we apply line-specific filters, and see whether that allows us to quit early. - if any([ - _inject_variables(filter_fn, filename=filename, line=line) - for filter_fn in get_filters() - ]): - continue - - for plugin in get_plugins(): - secrets = plugin.analyze_line(line, line_num=index + 1, filename=filename) - if not secrets: - continue - - for secret in secrets: - # Lastly, we apply (filename, line, secret) filters, and see if we should consider - # the result an actual secret. - if any([ - _inject_variables( - filter_fn, - filename=filename, - line=line, - secret=secret.secret_value, - ) - for filter_fn in get_filters() - ]): - continue - - yield secret - - -def _process_file_based_plugins(file: IO) -> Generator[PotentialSecret, None, None]: - # TODO - return [] diff --git a/detect_secrets/plugins/common/filetype.py b/detect_secrets/plugins/common/filetype.py deleted file mode 100644 index 687805b56..000000000 --- a/detect_secrets/plugins/common/filetype.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -from enum import Enum - - -class FileType(Enum): - CLS = 0 - EXAMPLE = 1 - GO = 2 - JAVA = 3 - JAVASCRIPT = 4 - PHP = 5 - OBJECTIVE_C = 6 - PYTHON = 7 - SWIFT = 8 - TERRAFORM = 9 - YAML = 10 - OTHER = 11 - - -EXTENSION_TO_FILETYPE = { - '.cls': FileType.CLS, - '.example': FileType.EXAMPLE, - '.eyaml': FileType.YAML, - '.go': FileType.GO, - '.java': FileType.JAVA, - '.js': FileType.JAVASCRIPT, - '.m': FileType.OBJECTIVE_C, - '.php': FileType.PHP, - '.py': FileType.PYTHON, - '.pyi': FileType.PYTHON, - '.swift': FileType.SWIFT, - '.tf': FileType.TERRAFORM, - '.yaml': FileType.YAML, - '.yml': FileType.YAML, -} - - -def determine_file_type(filename): - """ - :param filename: str - - :rtype: FileType - """ - _, file_extension = os.path.splitext(filename) - return EXTENSION_TO_FILETYPE.get( - file_extension, - FileType.OTHER, - ) diff --git a/detect_secrets/plugins/common/ini_file_parser.py b/detect_secrets/plugins/common/ini_file_parser.py index 0b87a3fd5..dc9fc64e2 100644 --- a/detect_secrets/plugins/common/ini_file_parser.py +++ b/detect_secrets/plugins/common/ini_file_parser.py @@ -1,14 +1,18 @@ import configparser import re +from typing import Generator +from typing import IO +from typing import List +from typing import Tuple class EfficientParsingError(configparser.ParsingError): - def append(self, lineno, line): + def append(self, lineno: int, line: str): """ Rather than inefficiently add all the lines in the file - to the error message like the CPython code from 1998. - We just `return` because we will catch and `pass` + to the error message like the CPython code from 1998, + we just `return` because we will catch and `pass` the exception in `high_entropy_strings.py` anyway. """ return @@ -21,21 +25,13 @@ class IniFileParser: _comment_regex = re.compile(r'\s*[;#]') - def __init__(self, file, add_header=False, exclude_lines_regex=None): + def __init__(self, file: IO, add_header: bool = False) -> None: """ - :type file: file object - - :type add_header: bool :param add_header: whether or not to add a top-level [global] header. - - :type exclude_lines_regex: regex object - :param exclude_lines_regex: optional regex for ignored lines. """ self.parser = configparser.ConfigParser() self.parser.optionxform = str - self.exclude_lines_regex = exclude_lines_regex - content = file.read() if add_header: # This supports environment variables, or other files that look @@ -49,122 +45,116 @@ def __init__(self, file, add_header=False, exclude_lines_regex=None): self.lines = [line.strip() for line in file.readlines()] self.line_offset = 0 - def iterator(self): + def __iter__(self) -> Generator[Tuple[str, str, int], None, None]: if not self.parser.sections(): # To prevent cases where it's not an ini file, but the parser # helpfully attempts to parse everything to a DEFAULT section, # when not explicitly provided. raise configparser.Error - for section_name, _ in self.parser.items(): + for section_name in self.parser: for key, values in self.parser.items(section_name): - for value, offset in self._get_value_and_line_offset( - key, - values, - ): + for value, offset in self._get_value_and_line_offset(key, values): + if not value: + continue + yield key, value, offset - def _get_value_and_line_offset(self, key, values): + def _get_value_and_line_offset(self, key: str, values: str) -> List[Tuple[str, int]]: """Returns the index of the location of key, value pair in lines. - :type key: str :param key: key, in config file. - - :type values: str :param values: values for key, in config file. This is plural, because you can have multiple values per key. e.g. >>> key = ... value1 ... value2 - - :type lines: list - :param lines: a collection of lines-so-far in file - - :rtype: list(tuple) """ - values_list = self._construct_values_list(values) + values_list = _construct_values_list(values) if not values_list: return [] current_value_list_index = 0 output = [] - lines_modified = False - for index, line in enumerate(self.lines): + for line_offset, line in enumerate(self.lines): # Check ignored lines before checking values, because # you can write comments *after* the value. - if not line.strip() or self._comment_regex.match(line): - continue - - if ( - self.exclude_lines_regex and - self.exclude_lines_regex.search(line) - ): + if not line or self._comment_regex.match(line): continue + # The first line is special because it's the only one with the variable name. + # As such, we should handle it differently. if current_value_list_index == 0: + # In situations where the first line does not have an associated value, + # it will be an empty string. However, this regex still does its job because + # it's not necessarily the case where the first line is a non-empty one. + # + # Therefore, we *only* advance the current_value_list_index when we identify + # the key used. first_line_regex = re.compile( - r'^\s*{}[ :=]+{}'.format( - re.escape(key), - re.escape(values_list[current_value_list_index]), + r'^\s*{key}[ :=]+{value}'.format( + key=re.escape(key), + value=re.escape(values_list[current_value_list_index]), ), ) if first_line_regex.match(line): output.append(( values_list[current_value_list_index], - self.line_offset + index + 1, + self.line_offset + line_offset + 1, )) current_value_list_index += 1 + continue + # There's no more values to iterate over. if current_value_list_index == len(values_list): - if index == 0: - index = 1 # Don't want to count the same line again - self.line_offset += index - self.lines = self.lines[index:] - lines_modified = True - break - else: - output.append(( - values_list[current_value_list_index], - self.line_offset + index + 1, - )) + if line_offset == 0: + line_offset = 1 # Don't want to count the same line again + + self.line_offset += line_offset + self.lines = self.lines[line_offset:] - current_value_list_index += 1 + break - if not lines_modified: - # No more lines left, if loop was not explicitly left. + # This handles all other cases, when it isn't an empty or blank line. + output.append(( + values_list[current_value_list_index], + self.line_offset + line_offset + 1, + )) + current_value_list_index += 1 + else: self.lines = [] return output - @staticmethod - def _construct_values_list(values): - """ - This values_list is a strange construction, because of ini format. - We need to extract the values with the following supported format: - - >>> key = value0 - ... value1 - ... - ... # Comment line here - ... value2 - - given that normally, either value0 is supplied, or (value1, value2), - but still allowing for all three at once. - Furthermore, with the configparser, we will get a list of values, - and intermediate blank lines, but no comments. This means that we can't - merely use the count of values' items to heuristically "skip ahead" lines, - because we still have to manually parse through this. - - Therefore, we construct the values_list in the following fashion: - 1. Keep the first value (in the example, this is `value0`) - 2. For all other values, ignore blank lines. - Then, we can parse through, and look for values only. - """ - lines = values.splitlines() - values_list = lines[:1] - values_list.extend(filter(None, lines[1:])) - return values_list +def _construct_values_list(values: str): + """ + This values_list is a strange construction, because of ini format. + We need to extract the values with the following supported format: + + >>> key = value0 + ... value1 + ... + ... # Comment line here + ... value2 + + given that normally, either value0 is supplied, or (value1, value2), + but still allowing for all three at once. + + Furthermore, with the configparser, we will get a list of values, + and intermediate blank lines, but no comments. This means that we can't + merely use the count of values' items to heuristically "skip ahead" lines, + because we still have to manually parse through this. + + Therefore, we construct the values_list in the following fashion: + 1. Keep the first value (in the example, this is `value0`) + 2. For all other values, ignore blank lines. + Then, we can parse through, and look for values only. + """ + lines = values.splitlines() + values_list = lines[:1] + values_list.extend(filter(None, lines[1:])) + return values_list diff --git a/detect_secrets/plugins/common/yaml_file_parser.py b/detect_secrets/plugins/common/yaml_file_parser.py deleted file mode 100644 index 7b17fbae0..000000000 --- a/detect_secrets/plugins/common/yaml_file_parser.py +++ /dev/null @@ -1,159 +0,0 @@ -import yaml - -from detect_secrets.plugins.common.constants import ALLOWLIST_REGEX - - -class YamlFileParser: - """ - Yaml config files are interesting, because they don't necessarily conform - to our basic regex for detecting HighEntropyStrings as strings don't - need to be quoted. - - This causes interesting issues, because our regex won't catch non-quoted - strings, and if we ignore the quoting requirement, then we increase our - false positive rate, because any long string would have high entropy. - - Therefore, we take a different approach: intercept the parsing of the yaml - file to identify string values. This assumes: - - 1. Secrets are strings or binaries - 2. Secrets are not keys - - Then, we calculate the entropy of those string values. - - The difficulty comes from determining the line number which these values - come from. To do this, we transform the string into a dictionary of - meta-tags, in the following format: - - >>> { - 'key': { - '__value__': value, - '__line__': , - } - } - - This way, we can quickly identify the line number for auditing at a later - stage. - - This parsing method is inspired by https://stackoverflow.com/a/13319530. - """ - - def __init__(self, file, exclude_lines_regex=None): - """ - :type file: file object - - :type exclude_lines_regex: regex object - :param exclude_lines_regex: optional regex for ignored lines. - """ - self.content = file.read() - self.exclude_lines_regex = exclude_lines_regex - - self.loader = yaml.SafeLoader(self.content) - self.loader.compose_node = self._compose_node_shim - - def json(self): - return self.loader.get_single_data() - - def _compose_node_shim(self, parent, index): - line = self.loader.line - - node = yaml.composer.Composer.compose_node(self.loader, parent, index) - node.__line__ = line + 1 - - if node.tag.endswith(':map'): - return self._tag_dict_values(node) - - # TODO: Not sure if need to do :seq - - return node - - def _tag_dict_values(self, map_node): - """ - :type map_node: yaml.nodes.MappingNode - :param map_node: It looks like map_node.value contains a list of - pair tuples, corresponding to key,value pairs. - """ - new_values = [] - for key, value in map_node.value: - if not ( - value.tag.endswith(':str') or - value.tag.endswith(':binary') - ): - new_values.append((key, value)) - continue - - augmented_string = yaml.nodes.MappingNode( - tag=map_node.tag, - value=[ - self._create_key_value_pair_for_mapping_node_value( - key='__value__', - value=value.value, - tag=value.tag, - ), - self._create_key_value_pair_for_mapping_node_value( - key='__line__', - value=str(value.__line__), - tag='tag:yaml.org,2002:int', - ), - self._create_key_value_pair_for_mapping_node_value( - key='__is_binary__', - value=str(value.tag.endswith(':binary')), - tag='tag:yaml.org,2002:bool', - ), - self._create_key_value_pair_for_mapping_node_value( - key='__original_key__', - value=key.value, - tag='tag:yaml.org,2002:str', - ), - ], - ) - - new_values.append((key, augmented_string)) - - output = yaml.nodes.MappingNode( - tag=map_node.tag, - value=new_values, - start_mark=map_node.start_mark, - end_mark=map_node.end_mark, - flow_style=map_node.flow_style, - ) - return output - - @staticmethod - def _create_key_value_pair_for_mapping_node_value(key, value, tag): - return ( - yaml.nodes.ScalarNode( - tag='tag:yaml.org,2002:str', - value=key, - ), - yaml.nodes.ScalarNode( - tag=tag, - value=value, - ), - ) - - def get_ignored_lines(self): - """ - Return a set of integers that refer to line numbers that were - allowlisted by the user and should be ignored. - - We need to parse the file separately from PyYAML parsing because - the parser drops the comments (at least up to version 3.13): - https://github.com/yaml/pyyaml/blob/a2d481b8dbd2b352cb001f07091ccf669227290f/lib3/yaml/scanner.py#L749 - - :return: set - """ - ignored_lines = set() - - for line_number, line in enumerate(self.content.split('\n'), 1): - if ( - ALLOWLIST_REGEX['yaml'].search(line) - - or ( - self.exclude_lines_regex and - self.exclude_lines_regex.search(line) - ) - ): - ignored_lines.add(line_number) - - return ignored_lines diff --git a/detect_secrets/settings.py b/detect_secrets/settings.py index 5a72bf394..5844fabc8 100644 --- a/detect_secrets/settings.py +++ b/detect_secrets/settings.py @@ -49,8 +49,8 @@ def transient_settings(config: Dict[str, Any]) -> Generator['Settings', None, No def cache_bust() -> None: - from detect_secrets.core.secrets_collection import get_filters - from detect_secrets.core.secrets_collection import get_plugins + from detect_secrets.core.scan import get_filters + from detect_secrets.core.scan import get_plugins get_settings.cache_clear() get_filters.cache_clear() diff --git a/detect_secrets/transformers/__init__.py b/detect_secrets/transformers/__init__.py new file mode 100644 index 000000000..8188e703e --- /dev/null +++ b/detect_secrets/transformers/__init__.py @@ -0,0 +1,32 @@ +import inspect +import sys +from functools import lru_cache +from typing import Any +from typing import Iterable +from typing import TypeVar + +from ..util.importlib import import_types_from_module +from .base import BaseTransformer +from .exceptions import ParsingError # noqa: F401 + + +Transformer = TypeVar('Transformer', bound=BaseTransformer) + + +@lru_cache(maxsize=1) +def get_transformers() -> Iterable[Transformer]: + return [ + item() + for item in import_types_from_module( + sys.modules[__name__], + filter=lambda x: not _is_valid_transformer(x), + ) + ] + + +def _is_valid_transformer(attribute: Any) -> bool: + return ( + inspect.isclass(attribute) + and issubclass(attribute, BaseTransformer) + and attribute.__name__ != 'BaseTransformer' + ) diff --git a/detect_secrets/transformers/base.py b/detect_secrets/transformers/base.py new file mode 100644 index 000000000..0c6f88177 --- /dev/null +++ b/detect_secrets/transformers/base.py @@ -0,0 +1,38 @@ +from abc import ABCMeta +from abc import abstractmethod +from typing import IO +from typing import List + + +class BaseTransformer(metaclass=ABCMeta): + """ + There are special filetypes (e.g. YAML) that work better with our line-based secrets parsing + if we parse the file differently. In these cases, transformers can take the file, and parse + it to meet the needs of the secret detector. + + While the transformation may not be an original copy, it just needs to proxy the original + file so that we can obtain: + 1. The secret value + 2. The specific line that it's found on (for auditing purposes) + """ + @property + def is_eager(self) -> bool: + """ + Eager transformers tend to be over-aggressive, and cause performance issues / false + positives. We can make a transformer less eager through stricter validation checks + on `should_parse_file`, however, in the cases where we are unable to do so, this flag + informs the scanner to only use this transformer if all other methods fail to obtain + secrets. + """ + return False + + @abstractmethod + def should_parse_file(self, filename: str) -> bool: + raise NotImplementedError + + @abstractmethod + def parse_file(self, file: IO) -> List[str]: + """ + :raises: ParsingError + """ + raise NotImplementedError diff --git a/detect_secrets/transformers/config.py b/detect_secrets/transformers/config.py new file mode 100644 index 000000000..7ab64bfff --- /dev/null +++ b/detect_secrets/transformers/config.py @@ -0,0 +1,215 @@ +""" +This handles `.ini` files, or more generally known as `config` files. +""" +import configparser +import re +from typing import Generator +from typing import IO +from typing import List +from typing import Tuple + +from ..util.filetype import determine_file_type +from ..util.filetype import FileType +from .base import BaseTransformer +from .exceptions import ParsingError + + +class ConfigFileTransformer(BaseTransformer): + def should_parse_file(self, filename: str) -> bool: + return True + + def parse_file(self, file: IO) -> List[str]: + try: + return _parse_file(file) + except configparser.Error: + raise ParsingError + + +class EagerConfigFileTransformer(BaseTransformer): + # NOTE: Currently eager, since `determine_file_type` is minimalistic right now. + is_eager = True + + def should_parse_file(self, filename: str) -> bool: + return determine_file_type(filename) == FileType.OTHER + + def parse_file(self, file: IO) -> List[str]: + try: + return _parse_file(file, add_header=True) + except configparser.Error: + raise ParsingError + + +def _parse_file(file: IO, add_header: bool = False) -> List[str]: + """ + :raises: configparser.Error + """ + lines: List[str] = [] + for key, value, line_number in IniFileParser(file, add_header=add_header): + while len(lines) < line_number - 1: + lines.append('') + + # We artificially add quotes here because we know they are strings + # (because it's a config file), HighEntropyString will benefit from this, + # and all other plugins don't care. + if value[0] in {"'", '"'} and value[-1] == value[0]: + # Strip out quotes, because we'll add our own. + value = value[1:-1] + + value = value.replace('"', '\\"') + lines.append(f'{key} = "{value}"') + + return lines + + +class EfficientParsingError(configparser.ParsingError): + + def append(self, lineno: int, line: str): + """ + Rather than inefficiently add all the lines in the file + to the error message like the CPython code from 1998, + we just `return` because we will catch and `pass` + the exception in `high_entropy_strings.py` anyway. + """ + return + + +configparser.ParsingError = EfficientParsingError # type: ignore + + +class IniFileParser: + + _comment_regex = re.compile(r'\s*[;#]') + + def __init__(self, file: IO, add_header: bool = False) -> None: + """ + :param add_header: whether or not to add a top-level [global] header. + """ + self.parser = configparser.ConfigParser() + self.parser.optionxform = str # type: ignore + + content = file.read() + if add_header: + # This supports environment variables, or other files that look + # like config files, without a section header. + content = '[global]\n' + content + + self.parser.read_string(content) + + # Hacky way to keep track of line location + file.seek(0) + self.lines = [line.strip() for line in file.readlines()] + self.line_offset = 0 + + def __iter__(self) -> Generator[Tuple[str, str, int], None, None]: + if not self.parser.sections(): + # To prevent cases where it's not an ini file, but the parser + # helpfully attempts to parse everything to a DEFAULT section, + # when not explicitly provided. + raise configparser.Error + + for section_name in self.parser: + for key, values in self.parser.items(section_name): + for value, offset in self._get_value_and_line_offset(key, values): + if not value: + continue + + yield key, value, offset + + def _get_value_and_line_offset(self, key: str, values: str) -> List[Tuple[str, int]]: + """Returns the index of the location of key, value pair in lines. + + :param key: key, in config file. + :param values: values for key, in config file. This is plural, + because you can have multiple values per key. e.g. + + >>> key = + ... value1 + ... value2 + """ + values_list = _construct_values_list(values) + if not values_list: + return [] + + current_value_list_index = 0 + output = [] + + for line_offset, line in enumerate(self.lines): + # Check ignored lines before checking values, because + # you can write comments *after* the value. + if not line or self._comment_regex.match(line): + continue + + # The first line is special because it's the only one with the variable name. + # As such, we should handle it differently. + if current_value_list_index == 0: + # In situations where the first line does not have an associated value, + # it will be an empty string. However, this regex still does its job because + # it's not necessarily the case where the first line is a non-empty one. + # + # Therefore, we *only* advance the current_value_list_index when we identify + # the key used. + first_line_regex = re.compile( + r'^\s*{key}[ :=]+{value}'.format( + key=re.escape(key), + value=re.escape(values_list[current_value_list_index]), + ), + ) + if first_line_regex.match(line): + output.append(( + values_list[current_value_list_index], + self.line_offset + line_offset + 1, + )) + current_value_list_index += 1 + + continue + + # There's no more values to iterate over. + if current_value_list_index == len(values_list): + if line_offset == 0: + line_offset = 1 # Don't want to count the same line again + + self.line_offset += line_offset + self.lines = self.lines[line_offset:] + + break + + # This handles all other cases, when it isn't an empty or blank line. + output.append(( + values_list[current_value_list_index], + self.line_offset + line_offset + 1, + )) + current_value_list_index += 1 + else: + self.lines = [] + + return output + + +def _construct_values_list(values: str): + """ + This values_list is a strange construction, because of ini format. + We need to extract the values with the following supported format: + + >>> key = value0 + ... value1 + ... + ... # Comment line here + ... value2 + + given that normally, either value0 is supplied, or (value1, value2), + but still allowing for all three at once. + + Furthermore, with the configparser, we will get a list of values, + and intermediate blank lines, but no comments. This means that we can't + merely use the count of values' items to heuristically "skip ahead" lines, + because we still have to manually parse through this. + + Therefore, we construct the values_list in the following fashion: + 1. Keep the first value (in the example, this is `value0`) + 2. For all other values, ignore blank lines. + Then, we can parse through, and look for values only. + """ + lines = values.splitlines() + values_list = lines[:1] + values_list.extend(filter(None, lines[1:])) + return values_list diff --git a/detect_secrets/transformers/exceptions.py b/detect_secrets/transformers/exceptions.py new file mode 100644 index 000000000..06b7cb3ff --- /dev/null +++ b/detect_secrets/transformers/exceptions.py @@ -0,0 +1,3 @@ +class ParsingError(ValueError): + """Generalized parsing error raised by transformers.""" + pass diff --git a/detect_secrets/transformers/yaml.py b/detect_secrets/transformers/yaml.py new file mode 100644 index 000000000..0b2fc25d1 --- /dev/null +++ b/detect_secrets/transformers/yaml.py @@ -0,0 +1,252 @@ +import re +from collections import deque +from functools import lru_cache +from typing import Any +from typing import Dict +from typing import Generator +from typing import IO +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Pattern +from typing import Tuple +from typing import Union + +import yaml + +from ..core.log import log +from ..util.filetype import determine_file_type +from ..util.filetype import FileType +from .base import BaseTransformer + + +class YAMLTransformer(BaseTransformer): + def should_parse_file(self, filename: str) -> bool: + return determine_file_type(filename) == FileType.YAML + + def parse_file(self, file: IO) -> List[str]: + lines: List[str] = [] + for item in sorted(YAMLFileParser(file), key=lambda x: x.line_number): + while len(lines) < item.line_number - 1: + lines.append('') + + value = item.value + if isinstance(value, bytes): + # binary strings in YAML are base64 encoded. https://yaml.org/type/binary.html + # However, the YAML parser already decodes it for us. + # All we need to do is change it to a string. + try: + value = value.decode() + except UnicodeDecodeError: + log.error(f'Unable to process binary string: "{value}"') + continue + + line = item.line.strip() + # TODO: parse the difference between block_scalar styles, and handle appropriately. + # See test cases for more details. + + comment = '' + matches = _yaml_comment_regex().search(line) + if matches: + comment = matches.group(0) + + lines.append(f'{item.key}: {value}{comment}') + + return lines + + +@lru_cache(maxsize=1) +def _yaml_comment_regex() -> Pattern: + """ + From https://yaml-multiline.info/, it states that `#` cannot appear *after* a space + or a newline, otherwise it will be a syntax error (for multiline strings that don't + use a block scalar). This applies to single lines as well: for example, `a#b` will be + treated as a single value, but `a #b` will only capture `a`, leaving `#b` as a comment. + + For lines that *do* use a block scalar, the YAML parser will throw a syntax error if + there is additional text on the same line as the block scalar. Comments however, are fine. + e.g. + key: | # this is ok + blah + + key: | but this is not + blah + + Given that we've made it to this stage, we can assume the YAML file is syntactically + correct. Therefore, if we add whitespace before the comment character, we can know that + everything else *after* the comment character is a comment for a given line. + """ + return re.compile(r'(\s+#[\S ]*)') + + +class YAMLValue(NamedTuple): + key: str + value: Union[str, bytes] + line_number: int + line: str + + +class YAMLFileParser: + """ + Yaml config files are interesting, because they don't necessarily conform + to our basic regex for detecting HighEntropyStrings as strings don't + need to be quoted. + + This causes interesting issues, because our regex won't catch non-quoted + strings, and if we ignore the quoting requirement, then we increase our + false positive rate, because any long string would have high entropy. + + Therefore, we take a different approach: intercept the parsing of the yaml + file to identify string values. This assumes: + + 1. Secrets are strings or binaries + 2. Secrets are not keys + + Then, we calculate the entropy of those string values. + + The difficulty comes from determining the line number which these values + come from. To do this, we transform the string into a dictionary of + meta-tags, in the following format: + + >>> { + 'key': { + '__value__': value, + '__line__': , + } + } + + This way, we can quickly identify the line number for auditing at a later + stage. + + This parsing method is inspired by https://stackoverflow.com/a/13319530. + """ + + def __init__(self, file: IO): + self.content = file.read() + + self.loader = yaml.SafeLoader(self.content) + self.loader.compose_node = self._compose_node_shim # type: ignore + + def json(self) -> Dict[str, Any]: + return self.loader.get_single_data() + + def __iter__(self) -> Generator[YAMLValue, None, None]: + """ + :returns: (value, line_number) + """ + # Used to obtain inline comments + lines = self.content.splitlines() + + to_search = deque([self.json()]) + while to_search: + item = to_search.pop() + + if not item: + # mainly for base case (e.g. if file is all comments) + continue + + # If it doesn't have our meta-tags, it's not a value worth scanning. + if '__line__' not in item: + # However, we need to recursively search in the dictionary for other such values + # that we may care about. + for value in item.values(): + # TODO: We don't support arrays right now. + if not isinstance(value, dict): + continue + + to_search.append(value) + + continue + + yield YAMLValue( + key=item['__original_key__'], + value=item['__value__'], + line_number=item['__line__'], + + # We extract this separately because the parser drops the comments + # (at least up to version 3.13). + # https://github.com/yaml/pyyaml/blob/a2d481b8dbd2b352cb001f07091ccf669227290f/lib3/yaml/scanner.py#L749 + # The line value feeds into the filters, and helps us tune false positives. + line=lines[item['__line__'] - 1], + ) + + def _compose_node_shim( + self, + parent: Optional[yaml.nodes.Node], + index: Optional[yaml.nodes.Node], + ) -> yaml.nodes.Node: + line = self.loader.line + + node = yaml.composer.Composer.compose_node(self.loader, parent, index) + node.__line__ = line + 1 + + if node.tag.endswith(':map'): + return _tag_dict_values(node) + + # TODO: Not sure if need to do :seq + + return node + + +def _tag_dict_values(map_node: yaml.nodes.MappingNode) -> yaml.nodes.MappingNode: + """ + :param map_node: It looks like map_node.value contains a list of + pair tuples, corresponding to key,value pairs. + """ + new_values = [] + for key, value in map_node.value: + if not ( + value.tag.endswith(':str') or + value.tag.endswith(':binary') + ): + new_values.append((key, value)) + continue + + augmented_string = yaml.nodes.MappingNode( + tag=map_node.tag, + value=[ + _create_key_value_pair_for_mapping_node_value( + key='__value__', + value=value.value, + tag=value.tag, + ), + _create_key_value_pair_for_mapping_node_value( + key='__line__', + value=str(value.__line__), + tag='tag:yaml.org,2002:int', + ), + _create_key_value_pair_for_mapping_node_value( + key='__original_key__', + value=key.value, + tag='tag:yaml.org,2002:str', + ), + ], + ) + + new_values.append((key, augmented_string)) + + output = yaml.nodes.MappingNode( + tag=map_node.tag, + value=new_values, + start_mark=map_node.start_mark, + end_mark=map_node.end_mark, + flow_style=map_node.flow_style, + ) + return output + + +def _create_key_value_pair_for_mapping_node_value( + key: str, + value: Any, + tag: str, +) -> Tuple[yaml.nodes.ScalarNode, yaml.nodes.ScalarNode]: + return ( + yaml.nodes.ScalarNode( + tag='tag:yaml.org,2002:str', + value=key, + ), + yaml.nodes.ScalarNode( + tag=tag, + value=value, + ), + ) diff --git a/detect_secrets/util/filetype.py b/detect_secrets/util/filetype.py new file mode 100644 index 000000000..5f4eea205 --- /dev/null +++ b/detect_secrets/util/filetype.py @@ -0,0 +1,37 @@ +import os +from enum import Enum + + +class FileType(Enum): + CLS = 0 + EXAMPLE = 1 + GO = 2 + JAVA = 3 + JAVASCRIPT = 4 + PHP = 5 + OBJECTIVE_C = 6 + PYTHON = 7 + SWIFT = 8 + TERRAFORM = 9 + YAML = 10 + OTHER = 11 + + +def determine_file_type(filename: str) -> FileType: + _, file_extension = os.path.splitext(filename) + return { + '.cls': FileType.CLS, + '.example': FileType.EXAMPLE, + '.eyaml': FileType.YAML, + '.go': FileType.GO, + '.java': FileType.JAVA, + '.js': FileType.JAVASCRIPT, + '.m': FileType.OBJECTIVE_C, + '.php': FileType.PHP, + '.py': FileType.PYTHON, + '.pyi': FileType.PYTHON, + '.swift': FileType.SWIFT, + '.tf': FileType.TERRAFORM, + '.yaml': FileType.YAML, + '.yml': FileType.YAML, + }.get(file_extension, FileType.OTHER) diff --git a/detect_secrets/util/importlib.py b/detect_secrets/util/importlib.py new file mode 100644 index 000000000..c938613b1 --- /dev/null +++ b/detect_secrets/util/importlib.py @@ -0,0 +1,36 @@ +import pkgutil +from importlib import import_module +from types import ModuleType +from typing import Any +from typing import Callable +from typing import Iterable +from typing import Type + + +def import_types_from_module( + root: ModuleType, + filter: Callable[[Any], bool], +) -> Iterable[Type]: + output = [] + + modules = [ + module + for _, module, is_package in pkgutil.walk_packages( + root.__path__, prefix=f'{root.__name__}.', # type: ignore # mypy issue #1422 + ) + if not is_package + ] + + for module_path in modules: + module = import_module(module_path) + for name in dir(module): + if name.startswith('_'): + continue + + attribute = getattr(module, name) + if filter(attribute): + continue + + output.append(attribute) + + return output diff --git a/detect_secrets/util/inject.py b/detect_secrets/util/inject.py new file mode 100644 index 000000000..7c656aebb --- /dev/null +++ b/detect_secrets/util/inject.py @@ -0,0 +1,37 @@ +from typing import Any +from typing import Callable +from typing import Tuple + +from ..types import SelfAwareCallable + + +def inject_variables_into_function(func: SelfAwareCallable, **kwargs: Any) -> Any: + variables_to_inject = set(kwargs.keys()) + values = { + key: kwargs[key] + for key in (variables_to_inject & func.injectable_variables) + } + + if set(values.keys()) != func.injectable_variables: + return + + return func(**values) + + +def get_injectable_variables(func: Callable) -> Tuple[str, ...]: + """ + The easiest way to understand this is to see it as an example: + >>> def func(a, b=1, *args, c, d=2, **kwargs): + ... e = 5 + >>> + >>> print(func.__code__.co_varnames) + ('a', 'b', 'c', 'd', 'args', 'kwargs', 'e') + >>> print(func.__code__.co_argcount) # `a` and `b` + 2 + >>> print(func.__code__.co_kwonlyargcount) # `c` and `d` + 2 + """ + variable_names = func.__code__.co_varnames + arg_count = func.__code__.co_argcount + func.__code__.co_kwonlyargcount + + return variable_names[:arg_count] diff --git a/tests/core/secrets_collection_test.py b/tests/core/secrets_collection_test.py index ac2cb631b..a0fda08cf 100644 --- a/tests/core/secrets_collection_test.py +++ b/tests/core/secrets_collection_test.py @@ -37,7 +37,7 @@ def test_filename_filters_are_invoked_first(mock_log): @staticmethod def test_error_reading_file(mock_log): with mock.patch( - 'detect_secrets.core.secrets_collection.open', + 'detect_secrets.core.scan.open', side_effect=IOError, ): SecretsCollection().scan_file('test_data/config.env') @@ -68,9 +68,11 @@ def test_line_based_success(): assert len(secrets['test_data/each_secret.py']) == 1 @staticmethod - @pytest.mark.skip(reason='TODO') def test_file_based_success(): - pass + secrets = SecretsCollection() + secrets.scan_file('test_data/config.env') + + assert bool(secrets) class TestScanDiff: diff --git a/tests/transformers/config_transformer_test.py b/tests/transformers/config_transformer_test.py new file mode 100644 index 000000000..88b2349fe --- /dev/null +++ b/tests/transformers/config_transformer_test.py @@ -0,0 +1,147 @@ +import configparser +import textwrap + +import pytest + +from detect_secrets.transformers.config import ConfigFileTransformer +from detect_secrets.transformers.config import EagerConfigFileTransformer +from detect_secrets.transformers.config import IniFileParser +from testing.mocks import mock_file_object + + +@pytest.mark.parametrize( + 'transformer', + ( + ConfigFileTransformer, + EagerConfigFileTransformer, + ), +) +def test_transformer(transformer): + file = mock_file_object( + textwrap.dedent(""" + [section] + keyA = value + + keyB = "double" + keyC = 'single' + + keyD = o'brian + keyE = "chai" tea + """)[1:-1], + ) + + assert transformer().parse_file(file) == [ + '', + 'keyA = "value"', + '', + 'keyB = "double"', + 'keyC = "single"', + '', + 'keyD = "o\'brian"', + 'keyE = "\\\"chai\\\" tea"', + ] + + +def test_basic(): + file = mock_file_object( + textwrap.dedent(""" + [section] + key = value + rice = fried + + # comment + tea = chai + + [other] + + water = unflavored + """)[1:-1], + ) + + assert list(IniFileParser(file)) == [ + ('key', 'value', 2), + ('rice', 'fried', 3), + ('tea', 'chai', 6), + ('water', 'unflavored', 10), + ] + + +@pytest.mark.parametrize( + 'content', + ( + textwrap.dedent(""" + key = value + + # comment + tea = chai + """)[1:-1], + + # This case needs `add_header=True` to work. + textwrap.dedent(""" + key = value + + [other] + water = unflavored + """)[1:-1], + ), +) +def test_not_ini_file(content): + file = mock_file_object(content) + + with pytest.raises(configparser.Error): + list(IniFileParser(file)) + + +def test_add_header(): + file = mock_file_object( + textwrap.dedent(""" + key = value + + # comment + tea = chai + """)[1:-1], + ) + + assert list(IniFileParser(file, add_header=True)) == [ + ('key', 'value', 1), + ('tea', 'chai', 4), + ] + + +class TestMultipleValues: + @staticmethod + def test_all(): + file = mock_file_object( + textwrap.dedent(""" + [section] + key = value0 + value1 + + # comment + value2 + """)[1:-1], + ) + + assert list(IniFileParser(file)) == [ + ('key', 'value0', 2), + ('key', 'value1', 3), + ('key', 'value2', 6), + ] + + @staticmethod + def test_not_first(): + file = mock_file_object( + textwrap.dedent(""" + [section] + key = + value1 + + # comment + value2 + """)[1:-1], + ) + + assert list(IniFileParser(file)) == [ + ('key', 'value1', 3), + ('key', 'value2', 6), + ] diff --git a/tests/transformers/import_test.py b/tests/transformers/import_test.py new file mode 100644 index 000000000..ccd221d11 --- /dev/null +++ b/tests/transformers/import_test.py @@ -0,0 +1,13 @@ +from detect_secrets.transformers import get_transformers + + +def test_success(): + transformers = get_transformers() + assert { + transformer.__class__.__name__ + for transformer in transformers + } == { + 'ConfigFileTransformer', + 'EagerConfigFileTransformer', + 'YAMLTransformer', + } diff --git a/tests/transformers/yaml_transformer_test.py b/tests/transformers/yaml_transformer_test.py new file mode 100644 index 000000000..04a767373 --- /dev/null +++ b/tests/transformers/yaml_transformer_test.py @@ -0,0 +1,149 @@ +import textwrap +from unittest import mock + +import pytest + +from detect_secrets.transformers.yaml import YAMLFileParser +from detect_secrets.transformers.yaml import YAMLTransformer +from testing.mocks import mock_file_object + + +class TestYAMLTransformer: + @staticmethod + def test_basic(): + file = mock_file_object( + textwrap.dedent(""" + keyA: string + keyB: string # with comments + + keyC: !!binary YWJjZGVm + keyD: !!binary YWJjZGVm # with comments + keyE: !!binary invalidBinar + + dict: + keyD: nested string + + num: 1 # don't care + """)[1:-1], + ) + + assert YAMLTransformer().parse_file(file) == [ + 'keyA: string', + 'keyB: string # with comments', + '', + 'keyC: abcdef', + 'keyD: abcdef # with comments', + '', + '', + '', + 'keyD: nested string', + ] + + @staticmethod + @pytest.mark.xfail(reason='TODO') + @pytest.mark.parametrize( + 'block_chomping', + ('', '-', '+'), + ) + def test_multiline_block_scalar_folded_style(block_chomping): + # NOTE(2020-11-07|domanchi): For YAML parsing, we don't really care about "literal" style + # (the one with `|`) since that will keep new lines, and our assumption is that secrets + # won't have new lines. + # + # However, "folded" style may be used to keep a certain line limit with very long secrets, + # so we should probably handle that. + file = mock_file_object( + textwrap.dedent(f""" + multiline: |{block_chomping} # example + this is + a basic multiline string + """)[1:-1], + ) + + assert YAMLTransformer().parse_file(file) == [ + 'multiline: this is a basic multiline string # example', + ] + + @staticmethod + @pytest.mark.xfail(reason='TODO') + @pytest.mark.parametrize( + 'block_chomping', + ('', '-', '+'), + ) + def test_multiline_block_scalar_literal_style(block_chomping): + file = mock_file_object( + textwrap.dedent(f""" + multiline: > + this will be skipped + """)[1:-1], + ) + + assert YAMLTransformer().parse_file(file) == [''] + + +class TestYAMLFileParser: + @staticmethod + def test_basic(): + file = mock_file_object( + textwrap.dedent(""" + keyA: string + dict: + keyB: 123 + """)[1:-1], + ) + + assert YAMLFileParser(file).json() == { + 'keyA': { + '__value__': 'string', + '__line__': 1, + '__original_key__': 'keyA', + }, + + # Ignores non-string or non-binary + 'dict': { + 'keyB': 123, + }, + } + + @staticmethod + @pytest.mark.parametrize( + 'block_scalar_style', + ('>', '|'), + ) + @pytest.mark.parametrize( + 'block_chomping', + ('', '-', '+'), + ) + def test_multi_line(block_scalar_style, block_chomping): + # NOTE: Referenced https://yaml-multiline.info/ for the many ways to do multi line strings + file = mock_file_object( + textwrap.dedent(f""" + key: {block_scalar_style}{block_chomping} # comment + multi + #line + string + """)[1:-1], + ) + + assert [item.line for item in YAMLFileParser(file)] == [ + f'key: {block_scalar_style}{block_chomping} # comment', + ] + + @staticmethod + @pytest.mark.parametrize( + ['yaml_value', 'expected_value'], + [ + ('string_value', 'string_value'), + ('!!binary YWJjZGVm', b'abcdef'), + ], + ) + def test_possible_secret_format(yaml_value, expected_value): + content = 'key: {yaml_value}'.format(yaml_value=yaml_value) + f = mock_file_object(content) + + result = YAMLFileParser(f).json() + assert result['key'] == { + '__value__': expected_value, + '__line__': mock.ANY, + '__original_key__': mock.ANY, + }