Skip to content

Commit

Permalink
dynamic plugin instantiation
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Loo committed Oct 3, 2019
1 parent c6d8cac commit f30c94d
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 77 deletions.
22 changes: 3 additions & 19 deletions detect_secrets/plugins/common/initialize.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
"""Intelligent initialization of plugins."""
from ..artifactory import ArtifactoryDetector # noqa: F401
from ..aws import AWSKeyDetector # noqa: F401
from ..base import BasePlugin
from ..basic_auth import BasicAuthDetector # noqa: F401
from ..common.util import get_mapping_from_secret_type_to_class_name
from ..high_entropy_strings import Base64HighEntropyString # noqa: F401
from ..high_entropy_strings import HexHighEntropyString # noqa: F401
from ..jwt import JwtTokenDetector # noqa: F401
from ..keyword import KeywordDetector # noqa: F401
from ..mailchimp import MailchimpDetector # noqa: F401
from ..private_key import PrivateKeyDetector # noqa: F401
from ..slack import SlackDetector # noqa: F401
from ..stripe import StripeDetector # noqa: F401
from .util import get_mapping_from_secret_type_to_class_name
from .util import import_plugins
from detect_secrets.core.log import log
from detect_secrets.core.usage import PluginOptions

Expand Down Expand Up @@ -173,12 +162,7 @@ def from_plugin_classname(
:type should_verify_secrets: bool
"""
klass = globals()[plugin_classname]

# Make sure the instance is a BasePlugin type, before creating it.
if not issubclass(klass, BasePlugin): # pragma: no cover
raise TypeError

klass = import_plugins()[plugin_classname]
try:
instance = klass(
exclude_lines_regex=exclude_lines_regex,
Expand Down
70 changes: 47 additions & 23 deletions detect_secrets/plugins/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,55 @@
except ImportError: # pragma: no cover
from functools32 import lru_cache

# These plugins need to be imported here so that globals()
# can find them.
from ..artifactory import ArtifactoryDetector # noqa: F401
from ..aws import AWSKeyDetector # noqa: F401
from ..base import BasePlugin
from ..basic_auth import BasicAuthDetector # noqa: F401
from ..high_entropy_strings import Base64HighEntropyString # noqa: F401
from ..high_entropy_strings import HexHighEntropyString # noqa: F401
from ..jwt import JwtTokenDetector # noqa: F401
from ..keyword import KeywordDetector # noqa: F401
from ..mailchimp import MailchimpDetector # noqa: F401
from ..private_key import PrivateKeyDetector # noqa: F401
from ..slack import SlackDetector # noqa: F401
from ..stripe import StripeDetector # noqa: F401
import os
from abc import abstractproperty
from importlib import import_module

from detect_secrets.plugins.base import BasePlugin
from detect_secrets.util import get_root_directory


@lru_cache(maxsize=1)
def get_mapping_from_secret_type_to_class_name():
"""Returns secret_type => plugin classname"""
mapping = {}
for key, value in globals().items():
try:
if issubclass(value, BasePlugin) and value != BasePlugin:
mapping[value.secret_type] = key
except TypeError:
pass

return mapping
return {
value.secret_type: key
for key, value in import_plugins().items()
}


@lru_cache(maxsize=1)
def import_plugins():
"""
:rtype: Dict[str, Type[TypeVar('Plugin', bound=BasePlugin)]]
"""
modules = []
for root, _, files in os.walk(
os.path.join(get_root_directory(), 'detect_secrets/plugins'),
):
for filename in files:
if not filename.startswith('_'):
modules.append(os.path.splitext(filename)[0])

# Only want to import top level files
break

plugins = {}
for module_name in modules:
module = import_module('detect_secrets.plugins.{}'.format(module_name))
for name in filter(lambda x: not x.startswith('_'), dir(module)):
plugin = getattr(module, name)
try:
if not issubclass(plugin, BasePlugin):
continue
except TypeError:
# Occurs when plugin is not a class type.
continue

# Use this as a heuristic to determine abstract classes
if isinstance(plugin.secret_type, abstractproperty):
continue

plugins[name] = plugin

return plugins
12 changes: 12 additions & 0 deletions testing/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import re

from detect_secrets.plugins.base import RegexBasedDetector
from detect_secrets.plugins.common.util import import_plugins


# https://stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
_ansi_escape = re.compile(r'\x1b\[[0-?]*[ -/]*[@-~]')


def uncolor(text):
return _ansi_escape.sub('', text)


def get_regex_based_plugins():
return {
name: plugin
for name, plugin in import_plugins().items()
if issubclass(plugin, RegexBasedDetector)
}
17 changes: 7 additions & 10 deletions tests/core/usage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from detect_secrets.core.usage import ParserBuilder
from detect_secrets.plugins.common.util import import_plugins


class TestPluginOptions(object):
Expand All @@ -25,25 +26,21 @@ def test_consolidates_output_basic(self):
"""Everything enabled by default, with default values"""
args = self.parse_args()

assert args.plugins == {
regex_based_plugins = {
key: {}
for key in import_plugins()
}
regex_based_plugins.update({
'HexHighEntropyString': {
'hex_limit': 3,
},
'BasicAuthDetector': {},
'Base64HighEntropyString': {
'base64_limit': 4.5,
},
'KeywordDetector': {
'keyword_exclude': None,
},
'PrivateKeyDetector': {},
'AWSKeyDetector': {},
'SlackDetector': {},
'ArtifactoryDetector': {},
'StripeDetector': {},
'MailchimpDetector': {},
'JwtTokenDetector': {},
}
})
assert not hasattr(args, 'no_private_key_scan')

def test_consolidates_removes_disabled_plugins(self):
Expand Down
37 changes: 12 additions & 25 deletions tests/pre_commit_hook_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from testing.mocks import mock_git_calls
from testing.mocks import mock_log as mock_log_base
from testing.mocks import SubprocessMock
from testing.util import get_regex_based_plugins


def assert_commit_blocked(command):
Expand Down Expand Up @@ -182,43 +183,29 @@ def test_that_baseline_gets_updated(

# See that we updated the plugins and version
assert current_version == baseline_written['version']
assert baseline_written['plugins_used'] == [
{
'name': 'AWSKeyDetector',
},

regex_based_plugins = [
{
'name': 'ArtifactoryDetector',
},
'name': name,
}
for name in get_regex_based_plugins()
]
regex_based_plugins.extend([
{
'base64_limit': 4.5,
'name': 'Base64HighEntropyString',
},
{
'name': 'BasicAuthDetector',
},
{
'hex_limit': 3,
'name': 'HexHighEntropyString',
},
{
'name': 'JwtTokenDetector',
},
{
'name': 'KeywordDetector',
},
{
'name': 'MailchimpDetector',
},
{
'name': 'PrivateKeyDetector',
},
{
'name': 'SlackDetector',
},
{
'name': 'StripeDetector',
},
]
])

assert baseline_written['plugins_used'] == \
sorted(regex_based_plugins, key=lambda x: x['name'])

def test_writes_new_baseline_if_modified(self):
baseline_string = _create_baseline()
Expand Down

0 comments on commit f30c94d

Please sign in to comment.