Skip to content

Commit

Permalink
refactoring scan, and introducing transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Loo committed Nov 7, 2020
1 parent c8f7685 commit cb04f37
Show file tree
Hide file tree
Showing 19 changed files with 1,250 additions and 499 deletions.
35 changes: 8 additions & 27 deletions detect_secrets/core/plugins/util.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import inspect
import pkgutil
from abc import abstractproperty
from functools import lru_cache
from importlib import import_module
from types import ModuleType
from typing import Any
from typing import Dict
from typing import Type
from typing import TypeVar

from ... import plugins
from ...plugins.base import BasePlugin
from ...util.importlib import import_types_from_module


Plugin = TypeVar('Plugin', bound=BasePlugin)
Expand All @@ -18,35 +17,17 @@
@lru_cache(maxsize=1)
def get_mapping_from_secret_type_to_class() -> Dict[str, Type[Plugin]]:
# TODO: custom_plugin_paths
modules = [
module
for _, module, is_package in pkgutil.walk_packages(
plugins.__path__, prefix=f'{plugins.__name__}.', # type: ignore # mypy issue #1422
)
if not is_package
]

output = {}

for module_path in modules:
module = import_module(module_path)
attributes = [
getattr(module, attribute)
for attribute in dir(module)
if (
not attribute.startswith('_')
and _is_valid_plugin(module, attribute)
)
]

for attribute in attributes:
output[attribute.secret_type] = attribute
for plugin_class in import_types_from_module(
plugins,
filter=lambda x: not _is_valid_plugin(x),
):
output[plugin_class.secret_type] = plugin_class

return output


def _is_valid_plugin(module: ModuleType, name: str) -> bool:
attribute = getattr(module, name)
def _is_valid_plugin(attribute: Any) -> bool:
return (
inspect.isclass(attribute)
and issubclass(attribute, BasePlugin)
Expand Down
198 changes: 198 additions & 0 deletions detect_secrets/core/scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from functools import lru_cache
from importlib import import_module
from typing import Generator
from typing import IO
from typing import List
from typing import Optional
from typing import Tuple

from . import plugins
from ..settings import get_settings
from ..transformers import get_transformers
from ..transformers import ParsingError
from ..types import SelfAwareCallable
from ..util.inject import get_injectable_variables
from ..util.inject import inject_variables_into_function
from .log import log
from .plugins.util import Plugin
from .potential_secret import PotentialSecret


def scan_file(filename: str) -> Generator[PotentialSecret, None, None]:
if not get_plugins(): # pragma: no cover
log.warning('No plugins to scan with!')
return

if _filter_files(filename):
return

try:
with open(filename) as f:
log.info(f'Checking file: {filename}')

lines = _get_transformed_file(f)
if not lines:
lines = f.readlines()

has_secret = False
for secret in _process_line_based_plugins(
lines=list(enumerate(lines, 1)),
filename=f.name,
):
has_secret = True
yield secret

if has_secret:
return

# Only if no secrets, then use eager transformers
f.seek(0)
lines = _get_transformed_file(f, use_eager_transformers=True)
if not lines:
return

yield from _process_line_based_plugins(
lines=list(enumerate(lines, 1)),
filename=f.name,
)
except IOError:
log.warning(f'Unable to open file: {filename}')


def scan_diff(diff: str) -> Generator[PotentialSecret, None, None]:
"""
:raises: ImportError
"""
# Local imports, so that we don't need to require unidiff for versions of
# detect-secrets that don't use it.
from unidiff import PatchSet

if not get_plugins(): # pragma: no cover
log.warn('No plugins to scan with!')
return

patch_set = PatchSet.from_string(diff)
for patch_file in patch_set:
filename = patch_file.path
if _filter_files(filename):
continue

lines = [
(line.target_line_no, line.value)
for chunk in patch_file
# target_lines refers to incoming (new) changes
for line in chunk.target_lines()
if line.is_added
]

yield from _process_line_based_plugins(lines, filename=filename)


def _filter_files(filename: str) -> bool:
"""Returns True if successfully filtered."""
for filter_fn in get_filters():
if inject_variables_into_function(filter_fn, filename=filename):
log.info(f'Skipping "{filename}" due to "{filter_fn.path}"')
return True

return False


def _get_transformed_file(file: IO, use_eager_transformers: bool = False) -> Optional[List[str]]:
for transformer in get_transformers():
if not transformer.should_parse_file(file.name):
continue

if use_eager_transformers != transformer.is_eager:
continue

try:
return transformer.parse_file(file)
except ParsingError:
pass
finally:
file.seek(0)

return None


def _process_line_based_plugins(
lines: List[Tuple[int, str]],
filename: str,
) -> Generator[PotentialSecret, None, None]:
# NOTE: We iterate through lines *then* plugins, because we want to quit early if any of the
# filters return True.
for line_number, line in lines:
line = line.rstrip()

# We apply line-specific filters, and see whether that allows us to quit early.
if any([
inject_variables_into_function(filter_fn, filename=filename, line=line)
for filter_fn in get_filters()
]):
continue

for plugin in get_plugins():
yield from _scan_line(plugin, filename, line, line_number)


def _scan_line(
plugin: Plugin,
filename: str,
line: str,
line_number: int,
) -> Generator[PotentialSecret, None, None]:
# NOTE: We don't apply filter functions here yet, because we don't have any filters
# that operate on (filename, line, plugin) without `secret`
try:
secrets = plugin.analyze_line(filename=filename, line=line, line_number=line_number)
except AttributeError:
return

if not secrets:
return

for secret in secrets:
if any([
inject_variables_into_function(
filter_fn,
filename=secret.filename,
secret=secret.secret_value,
plugin=plugin,
line=line,
)
for filter_fn in get_filters()
]):
continue

yield secret


@lru_cache(maxsize=1)
def get_plugins() -> List[Plugin]:
return [
plugins.initialize.from_plugin_classname(classname)
for classname in get_settings().plugins
]


@lru_cache(maxsize=1)
def get_filters() -> List[SelfAwareCallable]:
output = []
for path, config in get_settings().filters.items():
module_path, function_name = path.rsplit('.', 1)
try:
function = getattr(import_module(module_path), function_name)
except (ModuleNotFoundError, AttributeError):
log.warn(f'Invalid filter: {path}')
continue

# We attach this metadata to the function itself, so that we don't need to
# compute it everytime. This will allow for dependency injection for filters.
function.injectable_variables = set(get_injectable_variables(function))
output.append(function)

# This is for better logging.
function.path = path

return output
Loading

0 comments on commit cb04f37

Please sign in to comment.