From a4d66fa60eb9f582a728c21805aa99102c19a1b7 Mon Sep 17 00:00:00 2001 From: Kevin Hock Date: Mon, 4 Dec 2017 17:15:37 -0800 Subject: [PATCH] first commit --- .activate.sh | 1 + .coveragerc | 15 + .deactivate.sh | 1 + .gitignore | 14 + .pre-commit-config.yaml | 18 + .pre-commit-hooks.yaml | 8 + .pysensu.config.yaml.sample | 14 + Makefile | 21 + README.md | 134 +++++ config.yaml.sample | 7 + credentials.sample.json | 5 + detect_secrets/__init__.py | 0 detect_secrets/core/__init__.py | 0 detect_secrets/core/baseline.py | 113 ++++ detect_secrets/core/log.py | 59 +++ detect_secrets/core/potential_secret.py | 64 +++ detect_secrets/core/secrets_collection.py | 265 ++++++++++ detect_secrets/core/usage.py | 156 ++++++ detect_secrets/hooks/__init__.py | 0 detect_secrets/hooks/base.py | 13 + detect_secrets/hooks/pysensu_yelp.py | 34 ++ detect_secrets/main.py | 42 ++ detect_secrets/plugins/__init__.py | 87 ++++ detect_secrets/plugins/base.py | 33 ++ .../plugins/high_entropy_strings.py | 80 +++ detect_secrets/pre_commit_hook.py | 85 +++ detect_secrets/server/__init__.py | 17 + detect_secrets/server/base_tracked_repo.py | 415 +++++++++++++++ detect_secrets/server/local_tracked_repo.py | 59 +++ detect_secrets/server/repo_config.py | 11 + detect_secrets/server/s3_tracked_repo.py | 158 ++++++ detect_secrets/server_main.py | 301 +++++++++++ repos.yaml.sample | 7 + requirements-dev.txt | 23 + s3.yaml.sample | 3 + setup.cfg | 2 + setup.py | 26 + test_data/file_with_no_secrets.py | 11 + test_data/file_with_secrets.py | 11 + test_data/sample.diff | 69 +++ test_data/tmp/file_with_no_secrets.py | 11 + test_data/tmp/file_with_secrets.py | 11 + tests/__init__.py | 0 tests/core/__init__.py | 0 tests/core/baseline_test.py | 201 +++++++ tests/core/potential_secret_test.py | 26 + tests/core/secrets_collection_test.py | 357 +++++++++++++ tests/main_test.py | 59 +++ tests/plugins/__init__.py | 0 tests/plugins/high_entropy_strings_test.py | 134 +++++ tests/plugins/init_test.py | 41 ++ tests/pre_commit_hook_test.py | 78 +++ tests/server/__init__.py | 0 tests/server/base_tracked_repo_test.py | 293 +++++++++++ tests/server/local_tracked_repo_test.py | 56 ++ tests/server/s3_tracked_repo_test.py | 153 ++++++ tests/server_main_test.py | 493 ++++++++++++++++++ tests/util/__init__.py | 0 tests/util/file_util.py | 36 ++ tests/util/mock_util.py | 104 ++++ tox.ini | 28 + 61 files changed, 4463 insertions(+) create mode 120000 .activate.sh create mode 100644 .coveragerc create mode 100644 .deactivate.sh create mode 100644 .gitignore create mode 100644 .pre-commit-config.yaml create mode 100644 .pre-commit-hooks.yaml create mode 100644 .pysensu.config.yaml.sample create mode 100644 Makefile create mode 100644 README.md create mode 100644 config.yaml.sample create mode 100644 credentials.sample.json create mode 100644 detect_secrets/__init__.py create mode 100644 detect_secrets/core/__init__.py create mode 100644 detect_secrets/core/baseline.py create mode 100644 detect_secrets/core/log.py create mode 100644 detect_secrets/core/potential_secret.py create mode 100644 detect_secrets/core/secrets_collection.py create mode 100644 detect_secrets/core/usage.py create mode 100644 detect_secrets/hooks/__init__.py create mode 100644 detect_secrets/hooks/base.py create mode 100644 detect_secrets/hooks/pysensu_yelp.py create mode 100644 detect_secrets/main.py create mode 100644 detect_secrets/plugins/__init__.py create mode 100644 detect_secrets/plugins/base.py create mode 100644 detect_secrets/plugins/high_entropy_strings.py create mode 100644 detect_secrets/pre_commit_hook.py create mode 100644 detect_secrets/server/__init__.py create mode 100644 detect_secrets/server/base_tracked_repo.py create mode 100644 detect_secrets/server/local_tracked_repo.py create mode 100644 detect_secrets/server/repo_config.py create mode 100644 detect_secrets/server/s3_tracked_repo.py create mode 100644 detect_secrets/server_main.py create mode 100644 repos.yaml.sample create mode 100644 requirements-dev.txt create mode 100644 s3.yaml.sample create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 test_data/file_with_no_secrets.py create mode 100644 test_data/file_with_secrets.py create mode 100644 test_data/sample.diff create mode 100644 test_data/tmp/file_with_no_secrets.py create mode 100644 test_data/tmp/file_with_secrets.py create mode 100644 tests/__init__.py create mode 100644 tests/core/__init__.py create mode 100644 tests/core/baseline_test.py create mode 100644 tests/core/potential_secret_test.py create mode 100644 tests/core/secrets_collection_test.py create mode 100644 tests/main_test.py create mode 100644 tests/plugins/__init__.py create mode 100644 tests/plugins/high_entropy_strings_test.py create mode 100644 tests/plugins/init_test.py create mode 100644 tests/pre_commit_hook_test.py create mode 100644 tests/server/__init__.py create mode 100644 tests/server/base_tracked_repo_test.py create mode 100644 tests/server/local_tracked_repo_test.py create mode 100644 tests/server/s3_tracked_repo_test.py create mode 100644 tests/server_main_test.py create mode 100644 tests/util/__init__.py create mode 100644 tests/util/file_util.py create mode 100644 tests/util/mock_util.py create mode 100644 tox.ini diff --git a/.activate.sh b/.activate.sh new file mode 120000 index 000000000..9308d33e7 --- /dev/null +++ b/.activate.sh @@ -0,0 +1 @@ +venv/bin/activate \ No newline at end of file diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 000000000..d62837448 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,15 @@ +[run] +branch = True +source = . +omit = + .tox/* + /tmp* + setup.py + +[report] +exclude_lines = + # Don't complain if non-runnable code isn't run: + ^if __name__ == ['"]__main__['"]:$ + + # Need to redefine this, as per documentation + pragma: no cover diff --git a/.deactivate.sh b/.deactivate.sh new file mode 100644 index 000000000..d1898d740 --- /dev/null +++ b/.deactivate.sh @@ -0,0 +1 @@ +deactivate diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..15501ae46 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +*.py[co] +*.sw[op] + +.coverage +*.egg-info +.tox +venv +/tmp + + +.*ignore +!.gitignore + +.pysensu.config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..1bc3924e2 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,18 @@ +- repo: https://github.com/pre-commit/pre-commit-hooks + sha: v0.9.1 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: autopep8-wrapper + - id: check-docstring-first + - id: debug-statements + - id: name-tests-test + exclude: tests/util + - id: flake8 + args: ['--ignore=E501'] + exclude: ^test_data/ +- repo: https://github.com/asottile/reorder_python_imports + sha: v0.3.5 + hooks: + - id: reorder-python-imports + language_version: python3.6 diff --git a/.pre-commit-hooks.yaml b/.pre-commit-hooks.yaml new file mode 100644 index 000000000..f3cc03936 --- /dev/null +++ b/.pre-commit-hooks.yaml @@ -0,0 +1,8 @@ +- id: detect-secrets + name: Detect secrets + description: Detects high entropy strings that are likely to be passwords. + entry: detect-secrets-hook + args: ['--base64-limit', '4.5', '--hex-limit', '3'] + language: python + # for backward compatibility + files: .* diff --git a/.pysensu.config.yaml.sample b/.pysensu.config.yaml.sample new file mode 100644 index 000000000..94d061eda --- /dev/null +++ b/.pysensu.config.yaml.sample @@ -0,0 +1,14 @@ +name: SecretFound # name needs to be one word +alert_after: 0 +realert_every: -1 # -1 means exponential backoff +runbook: no-runbook-available +dependencies: [] +team: team-security +irc_channels: [] +notification_email: to-whom-it-may-concern@example.com +ticket: False +project: False +page: False +tip: detect_secrets found a secret +status: 1 # status needs to be 1 (warning) or higher to send the email +ttl: null # null gets constructed into None diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..46467c20d --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +.PHONY: minimal +minimal: setup + +.PHONY: setup +setup: + tox -e venv + +.PHONY: install-hooks +install-hooks: + tox -e pre-commit -- install -f --install-hooks + +.PHONY: test +test: + tox + +.PHONY: clean +clean: + find -name '*.pyc' -delete + find -name '__pycache__' -delete + rm -rf .tox + rm -rf venv diff --git a/README.md b/README.md new file mode 100644 index 000000000..c95cb50a6 --- /dev/null +++ b/README.md @@ -0,0 +1,134 @@ +# detect_secrets + +## Description + +This is loosely based off [truffleHog](https://github.com/dxa4481/truffleHog/blob/master/truffleHog/truffleHog.py)'s secret scanner. However, instead of analyzing the entire git-history for secrets that have **ever** entered the repository, we wanted to perform preventative checks to make sure that no **additional** secrets will be added to the codebase. + +This is perfect for a backwards compatible solution, where you accept that there may **currently** be secrets hiding in your large repository, and you want to prevent new ones from entering without first dealing with the potentially gargantuous effort to move existing secrets away. + +We deal with this in two steps: + +1. Use a client-side pre-commit hook, to alert developers when they attempt to enter a secret in the code base. +2. Set up a server-side cron job to periodically scan tracked repositories, to make sure that developers didn't accidentally skip the pre-commit check. + +## Installation + +There are three components that you can setup, depending on your purposes. + +### Pre-Commit Hook + +See [pre-commit](https://github.com/pre-commit/pre-commit) for instructions to install the pre-commit framework. + +Hooks available: + +- `detect-secrets`: This hook detects and prevents high entropy strings from entering the codebase. + +### Console Use / Server Use + +`pip install detect-secrets` + +## Configuration + +### Installing a baseline + +#### Step 1: Initialize your baseline. + +``` +$ detect-secrets --initialize --exclude='^(\.git|venv)' > .secrets.baseline +``` + +#### Use your baseline in your pre-commit hook + +``` +- repo: + hooks: + - id: detect-secrets + args: ['--baseline', '.secrets.baseline'] +``` + +Remember to initialize your baseline with the same sensitivity configurations as your pre-commit hook! + +### Sensitivity Configuration + +This module works by searching for high entropy strings in the codebase, and [calculating their Shannon entropy](http://blog.dkbza.org/2007/05/scanning-data-for-entropy-anomalies.html). If the entropy of a given string exceeds the preset amount, the string will be rejected as a potential secret. + +The sensitivity of this pre-commit hook can be adjusted with command-line flags (eg. `--base64_limit` and `--hex_limit`). Lowering these limits will identify more potential secrets, but also create more false positives. Adjust these limits to suit your needs. + +If you want a lower limit, but also want to whitelist specific strings from being detected, you can add the comment `# pragma: whitelist secret` to the line of code. + +For example: + +``` +API_KEY = "blah-blah-but-actually-not-secret" # pragma: whitelist secret + +def main(): + print('hello world') + +if __name__ == '__main__' + main() +``` + +This is the preferred way of whitelisting high entropy strings (rather than adding it to the baseline file), because it is easily searchable, auditable, and maintainable. + +### Setting up your server + +#### Step 1: Configure your config.yaml + +The following keys are accepted in your config file: + +``` +config.yaml + |- default # These are default values to use for each tracked repo. + |- tracked # This is a list of tracked repos' details. +``` + +Each tracked repository can have the following attributes: + +| attribute | description +| --------------| ----------- +| repo | where to `git clone` the repo from (**required**) +| is_local_repo | True or False depending on if the repo is already on the filesystem (**required**) +| sha | the commit hash to start scanning from (**required**) +| cron | [crontab syntax](https://crontab.guru/) of how often to run a scan for this repo +| plugins | list of plugins, with their respective settings +| baseline | the filename to parse the detect-secrets baseline from + +See the sample `config.yaml.sample` for an example. + +#### Step 2: Configure your .pysensu.config.yaml + +See (pysensu-yelp)[http://pysensu-yelp.readthedocs.io/en/latest/#pysensu_yelp.send_event] for instructions on configuring your Sensu events. + +See the sample `.pysensu.config.yaml.sample` for an example, but be sure to name your file `.pysensu.config.yaml`. + +#### Step 3: Setup your cron jobs + +``` +echo -e "$(crontab -l)\n\n$(detect-secrets-server --initialize)" | crontab - +``` + +## Use Cases + +### Fresh Respository + +**Scenario**: You are starting a brand new repo, so you **know** you haven't committed any secrets to the codebase yet. Moving forward, you want to make sure you don't do so. + +**Solution**: Great! Just [install the pre-commit hook](TODO:Link) for preventative measures. + +### Existing Repository + +**Scenario**: You have an existing repo that may or may not have secrets added to it before. You want to prevent further secrets from being committed, yet it's too much work to migrate all currently existing secrets in the codebase out. + +**Solution**: + +1. Create a baseline of existing secrets, so that the pre-commit hook will only detect the new secrets added. +2. [Install the pre-commit hook](TODO:Link) for preventative measures. + +## A Few Caveats + +This is not meant to be a sure-fire solution to prevent secrets from entering the codebase. Only proper developer education can truly do that. This pre-commit hook merely implements several heuristics to try and prevent obvious cases of committing secrets. + +### Things that won't be prevented + +* Multi-line secrets. +* Default passwords (eg. `password = "password"`) diff --git a/config.yaml.sample b/config.yaml.sample new file mode 100644 index 000000000..bfd095f60 --- /dev/null +++ b/config.yaml.sample @@ -0,0 +1,7 @@ +default: + plugins: + HexHighEntropyString: 3 + Base64HighEntropyString: 4.5 + baseline: .secrets.baseline + base_tmp_dir: /tmp/detect_secrets_tracked_repos + exclude_regex: ^(\.git|build|logs|node_modules|virtualenv_run)|.*tests/.* diff --git a/credentials.sample.json b/credentials.sample.json new file mode 100644 index 000000000..cd5e58736 --- /dev/null +++ b/credentials.sample.json @@ -0,0 +1,5 @@ +{ + "accessKeyId": "", + "secretAccessKey": "", + "region": "us-east-1" +} diff --git a/detect_secrets/__init__.py b/detect_secrets/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/detect_secrets/core/__init__.py b/detect_secrets/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/detect_secrets/core/baseline.py b/detect_secrets/core/baseline.py new file mode 100644 index 000000000..3c79e1630 --- /dev/null +++ b/detect_secrets/core/baseline.py @@ -0,0 +1,113 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import os +import re + +from detect_secrets.core.secrets_collection import SecretsCollection + + +def apply_baseline_filter(results, baseline, filelist): + """ + :param results: SecretsCollection of current results + :param baseline: SecretsCollection of baseline results. + This will be updated accordingly (by reference) + :param filelist: list of strings; filenames that are scanned. + :returns: SecretsCollection of new results (filtering out baseline) + """ + output = SecretsCollection() + + if baseline.exclude_regex: + regex = re.compile(baseline.exclude_regex, re.IGNORECASE) + + # First, we find all the secrets that are not currently in the baseline. + for filename in results.data: + # If the file matches the exclude_regex, we skip it + if baseline.exclude_regex and regex.search(filename): + continue + if filename not in baseline.data: + # We don't have a previous record of this file, so obviously + # everything is new. + output.data[filename] = results.data[filename] + continue + + # The __hash__ method of PotentialSecret makes this work + tmp = {secret: secret for secret in results.data[filename] if secret not in baseline.data[filename]} + + if tmp: + output.data[filename] = tmp + + # If there are new secrets, stop the process here. Otherwise, + # try to update the baseline with recently removed secrets. + if len(output.data) > 0: + return output + + # Only attempt baseline modifications if we don't find any new secrets + for filename in filelist: + if filename not in baseline.data: + # Nothing to modify, because not even there in the first place. + continue + + if filename not in results.data: + # All secrets relating to that file was removed. + del baseline.data[filename] + continue + + baseline_clone = baseline.data[filename].copy() + for obj in baseline_clone: + results_obj = results.get_secret( + filename, + obj.secret_hash, + obj.type + ) + if results_obj is None: + # No longer in results, so can remove from baseline + obj_to_delete = baseline.get_secret( + filename, + obj.secret_hash, + obj.type + ) + del baseline.data[filename][obj_to_delete] + + elif results_obj.lineno != obj.lineno: + # Secret moved around, should update baseline with new location + baseline_obj = baseline.get_secret( + filename, + obj.secret_hash, + obj.type + ) + baseline_obj.lineno = results_obj.lineno + + return output + + +def initialize(plugins, exclude_regex=None, rootdir='.'): + """Scans the entire codebase for high entropy strings, and returns a + SecretsCollection object. + + :param plugins: tuple of detect_secrets.plugins.base.BasePlugin. + :param [exclude_regex]: string; for optional regex string for ignored paths. + :param [rootdir]: string; specify root directory. + :returns: SecretsCollection + """ + output = SecretsCollection(plugins) + + if exclude_regex: + regex = re.compile(exclude_regex, re.IGNORECASE) + + rootdir = os.path.abspath(rootdir) + + for subdir, dirs, files in os.walk(rootdir): + if exclude_regex and regex.search(subdir[len(rootdir) + 1:]): + continue + + for file in files: + fullpath = os.path.join(subdir, file) + + # Cover root-level files (because the preliminary regex check won't cover it) + if exclude_regex and regex.search(fullpath[len(rootdir) + 1:]): + continue + + output.scan_file(fullpath, fullpath[len(rootdir) + 1:]) + + return output diff --git a/detect_secrets/core/log.py b/detect_secrets/core/log.py new file mode 100644 index 000000000..c87cbf390 --- /dev/null +++ b/detect_secrets/core/log.py @@ -0,0 +1,59 @@ +#!/usr/bin/python +import logging +import sys + + +class CustomLog(logging.getLoggerClass()): # pragma: no cover + + log_format_string = '[%(module)s]\t%(levelname)s\t%(message)s' + + # See CustomLog.enableDebug + debug_mode = 0 + + def __init__(self, debug_mode=None, formatter=None, *args, **kwargs): + """ + :param name: string; used for declaring log channels. + :param debug_mode: debug level for this specific logger instance. + :param formatter: string; for custom formatting + """ + super(CustomLog, self).__init__('', *args, **kwargs) + + if debug_mode is not None: + self.debug_mode = debug_mode + + if formatter is None: + self.formatter = logging.Formatter(CustomLog.log_format_string) + elif isinstance(formatter, str): + self.formatter = logging.Formatter(formatter) + + @classmethod + def enableDebug(cls, verbose_level): + """Configure the global verbosity of logs + + :param verbose_level: integer; between 0-2 + """ + cls.debug_mode = verbose_level + + def getLogger(self, name=None): + log = logging.getLogger(name) + + debug_mode = self.debug_mode if self.debug_mode is not None else CustomLog.debug_mode + + # Apply custom default options + log_level = logging.ERROR + if debug_mode == 1: + log_level = logging.INFO + elif debug_mode == 2: + log_level = logging.DEBUG + + log.setLevel(log_level) + + if self.formatter: + log.handlers = [] + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter(self.formatter) + log.addHandler(handler) + + logging.captureWarnings(True) + + return log diff --git a/detect_secrets/core/potential_secret.py b/detect_secrets/core/potential_secret.py new file mode 100644 index 000000000..ba2462bc0 --- /dev/null +++ b/detect_secrets/core/potential_secret.py @@ -0,0 +1,64 @@ +#!/usr/bin/python +import hashlib + + +class PotentialSecret(object): + + def __init__(self, typ, filename, lineno, secret): + """ + :param typ: string; human-readable typing of what makes this + secret identified a "potential secret" + :param filename: string; name of file that this potential secret was found + :param lineno: integer; location of secret + :param secret: string; the secret identified + """ + self.type = typ + self.filename = filename + self.lineno = lineno + self.secret_hash = self.hash_secret(secret) + + # If two PotentialSecrets have the same values for these fields, + # they are considered equal. + self.fields_to_compare = ['filename', 'secret_hash', 'type'] + + @classmethod + def hash_secret(self, secret): + """ + :param secret: string + :returns: string + """ + return hashlib.sha1(secret.encode('utf-8')).hexdigest() + + def json(self): + """Custom JSON encoder""" + return { + 'type': self.type, + 'filename': self.filename, + 'line_number': self.lineno, + 'hashed_secret': self.secret_hash + } + + def __eq__(self, other): + return all( + getattr(self, field) == getattr(other, field) + for field in self.fields_to_compare + ) + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash( + tuple([getattr(self, x) for x in self.fields_to_compare]) + ) + + def __str__(self): # pragma: no cover + return ( + "Secret Type: %s\n" + "Location: ./%s:%d\n" + # "Hash: %s\n" + ) % ( + self.type, + self.filename, self.lineno, + # self.secret_hash + ) diff --git a/detect_secrets/core/secrets_collection.py b/detect_secrets/core/secrets_collection.py new file mode 100644 index 000000000..027c665f9 --- /dev/null +++ b/detect_secrets/core/secrets_collection.py @@ -0,0 +1,265 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import codecs +import json +import os +import re +from time import gmtime +from time import strftime + +from unidiff import PatchSet + +from detect_secrets.core.log import CustomLog +from detect_secrets.core.potential_secret import PotentialSecret + + +CustomLogObj = CustomLog() + + +class SecretsCollection(object): + + def __init__(self, plugins=()): + """ + :param plugins: tuple of plugins to determine secrets + """ + self.data = {} + self.plugins = plugins + self.exclude_regex = '' + + @classmethod + def load_from_file(cls, filename): + """Initialize a SecretsCollection object from file. + + :param filename: string; name of file to load + :returns: SecretsCollection + :raises: IOError + """ + try: + with codecs.open(filename, encoding='utf-8') as f: + baseline = json.loads(f.read()) + + except (IOError, UnicodeDecodeError): + CustomLogObj.getLogger().error( + "Unable to open baseline file: %s.", filename + ) + + raise + + try: + return cls.load_from_dict(baseline) + except IOError: + CustomLogObj.getLogger().error('Incorrectly formatted baseline!') + raise + + @classmethod + def load_from_string(cls, string): + """Initializes a SecretsCollection object from string + + :param string: string; string to load SecretsCollection from. + :returns: SecretsCollection + :raises: IOError + """ + try: + return cls.load_from_dict(json.loads(string)) + except (IOError, ValueError): + CustomLogObj.getLogger().error('Incorrectly formatted baseline!') + raise + + @classmethod + def load_from_dict(cls, data): + """Initializes a SecretsCollection object from dictionary. + + :param data: dict; properly formatted dictionary to load SecretsCollection from. + :returns: SecretsCollection + :raises: IOError + """ + result = SecretsCollection() + if 'results' not in data or 'exclude_regex' not in data: + raise IOError + + for filename in data['results']: + result.data[filename] = {} + + for item in data['results'][filename]: + secret = PotentialSecret( + item['type'], + filename, + item['line_number'], + 'will be replaced' + ) + secret.secret_hash = item['hashed_secret'] + result.data[filename][secret] = secret + + result.exclude_regex = data['exclude_regex'] + + return result + + def load_from_diff(self, diff, exclude_regex='', baseline_file=''): + """Initializes a SecretsCollection object from diff. + Not a classmethod, since it needs the list of self.plugins for secret scanning. + + :param diff: string; diff string + :param exclude_regex: string; a regular expression of what files to skip over + :param baseline_file: string or None; the baseline_file of the repo, to skip over since it contains hashes + """ + patch_set = PatchSet.from_string(diff) + + if exclude_regex: + regex = re.compile(exclude_regex, re.IGNORECASE) + + for patch_file in patch_set: + filename = patch_file.path + # If the file matches the exclude_regex, we skip it + if exclude_regex and regex.search(filename): + continue + # Skip over the baseline_file, because it will have hashes in it. + if filename == baseline_file: + continue + + # We only want to capture incoming secrets (so added lines) + # Terminology: + # - A "hunk" is a patch chunk in the patch_file + # - `target_lines` is from the incoming changes + results = {} + for hunk in patch_file: + for line in hunk.target_lines(): + if line.is_added: + for plugin in self.plugins: + results.update(plugin.analyze_string( + line.value, + line.target_line_no, + filename + )) + + if not results: + continue + + if filename not in self.data: + self.data[filename] = results + else: + self.data[filename].update(results) + + def scan_file(self, filename, filename_key=None): + """Scans a specified file, and adds information to self.data + + :param filename: string; full path to file to scan. + :param filename_key: string; key to store in self.data + :returns: boolean; used for testing + """ + + if filename_key is None: + filename_key = filename + + if os.path.islink(filename): + return False + + try: + with codecs.open(filename, encoding='utf-8') as f: + self._extract_secrets(f, filename_key) + + return True + + except IOError: + CustomLogObj.getLogger().warning("Unable to open file: %s", filename) + return False + + def get_secret(self, filename, secret, typ=None): + """Checks to see whether a secret is found in the collection. + + :param filename: string; which file to search in. + :param secret: string; secret hash of secret to search for. + :param [typ]: string; type of secret, if known. + :returns: PotentialSecret or None + """ + if filename not in self.data: + return None + + if typ: + # Optimized lookup, because we know the type of secret + # (and therefore, its hash) + tmp_secret = PotentialSecret(typ, filename, 0, 'will be overriden') + tmp_secret.secret_hash = secret + + if tmp_secret in self.data[filename]: + return self.data[filename][tmp_secret] + + return None + + # NOTE: We can only optimize this, if we knew the type of secret. + # Otherwise, we need to iterate through the set and find out. + for obj in self.data[filename]: + if obj.secret_hash == secret: + return obj + + return None + + def _extract_secrets(self, f, filename): + """Extract the secrets from a given file object. + + :param f: File object + :param filename: string + """ + log = CustomLogObj.getLogger() + try: + log.info("Checking file: %s", filename) + + results = {} + for plugin in self.plugins: + results.update(plugin.analyze(f, filename)) + f.seek(0) + + if not results: + return + + if filename not in self.data: + self.data[filename] = results + else: + self.data[filename].update(results) + + except UnicodeDecodeError: + log.warning("%s failed to load.", filename) + + def output_baseline(self, exclude_regex=''): + """Formats the SecretsCollection for baseline output. + + :param [exclude_regex]: string; for optional regex string for ignored paths. + :returns: json-formatted string. + """ + if not exclude_regex: + exclude_regex = '' + + results = self.json() + for key in results: + results[key] = sorted(results[key], key=lambda x: x['line_number']) + + obj = { + 'generated_at': strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()), + 'exclude_regex': exclude_regex, + 'results': results, + } + + return json.dumps(obj, indent=2) + + def json(self): + """Custom JSON encoder""" + output = {} + for filename in self.data: + output[filename] = [] + + for secret_hash in self.data[filename]: + tmp = self.data[filename][secret_hash].json() + del tmp['filename'] # not necessary + + output[filename].append(tmp) + + return output + + def __str__(self): # pragma: no cover + return json.dumps(self.json(), indent=2) + + def __getitem__(self, key): # pragma: no cover + return self.data[key] + + def __setitem__(self, key, value): + self.data[key] = value diff --git a/detect_secrets/core/usage.py b/detect_secrets/core/usage.py new file mode 100644 index 000000000..08f8212ab --- /dev/null +++ b/detect_secrets/core/usage.py @@ -0,0 +1,156 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import argparse + + +class ParserBuilder(object): + + def __init__(self): + self.parser = argparse.ArgumentParser() + self.add_default_arguments() + + def add_default_arguments(self): + self.add_custom_limits() + self.add_verbosity_argument() + + def add_custom_limits(self): + self.parser.add_argument( + '--base64-limit', + type=argparse_minmax_type, + nargs=1, + default=[4.5], + help='Sets the entropy limit for base64 strings. Value must be between 0.0 and 8.0.' + ) + self.parser.add_argument( + '--hex-limit', + type=argparse_minmax_type, + nargs=1, + default=[3], + help='Sets the entropy limit for hex strings. Value must be between 0.0 and 8.0' + ) + return self + + def add_verbosity_argument(self): + self.parser.add_argument( + '-v', + '--verbose', + action='count', + help='Verbose mode', + ) + return self + + def add_filenames_argument(self): + self.parser.add_argument('filenames', nargs='*', help='Filenames to check') + return self + + def add_initialize_baseline_argument(self): + self.parser.add_argument( + '--initialize', + nargs='?', + const='.', + help=( + 'Scans the entire codebase and outputs a snapshot of currently identified ' + 'secrets.' + ) + ) + + # Pairing `--exclude` with `--initialize` because it's only used for the initialization. + # The pre-commit hook framework already has an `exclude` option that can be used instead. + self.parser.add_argument( + '--exclude', + nargs=1, + help='Pass in regex to specify ignored paths during initialization scan.' + ) + + return self + + def add_set_baseline_argument(self): + self.parser.add_argument( + '--baseline', + nargs=1, + default=[''], + help='Sets a baseline for explicitly ignored secrets, generated by `--initialize`', + ) + return self + + def add_initialize_server_argument(self): + self.parser.add_argument( + '--initialize', + nargs='?', + const='repos.yaml', + help='Initializes tracked repositories based on a supplied repos.yaml.', + metavar='CUSTOM_REPO_CONFIG_FILE', + ) + + return self + + def add_scan_repo_argument(self): + self.parser.add_argument( + '--scan-repo', + nargs=1, + help='Specify the name of the repo (or path, if local) to scan.', + metavar='REPO_TO_SCAN', + ) + + return self + + def add_config_file_argument(self): + self.parser.add_argument( + '--config-file', + nargs=1, + help='Path to a config.yaml which will be used to initialize defaults and plugins.', + ) + + return self + + def add_add_repo_argument(self): + self.parser.add_argument( + '--add-repo', + nargs=1, + help=( + 'Enables the addition of individual tracked git repos, without including it in the config file. ' + 'Takes in a git URL (or path to repo, if local) as an argument. ' + 'Newly tracked repos will store HEAD as the last scanned commit sha. ' + 'Also uses config file specified by `--config-file` to initialize default plugins and other settings.' + ), + metavar='REPO_TO_ADD' + ) + + return self + + def add_local_repo_flag(self): + self.parser.add_argument( + '-L', + '--local', + action='store_true', + help=( + 'Allows scanner to be pointed to locally stored repos (instead of git cloning). ' + 'Use with --scan-repo or --add-repo.' + ), + ) + + return self + + def add_s3_config_file_argument(self): + self.parser.add_argument( + '--s3-config-file', + nargs=1, + help='Specify keys for storing files on Amazon S3.', + metavar='S3_CONFIG_FILE', + ) + + return self + + def parse_args(self, argv): + return self.parser.parse_args(argv) + + +def argparse_minmax_type(string): # pragma: no cover + """Custom type for argparse to enforce value limits""" + value = float(string) + if value < 0 or value > 8: + raise argparse.ArgumentTypeError( + '%s must be between 0.0 and 8.0' % string) + + return value diff --git a/detect_secrets/hooks/__init__.py b/detect_secrets/hooks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/detect_secrets/hooks/base.py b/detect_secrets/hooks/base.py new file mode 100644 index 000000000..bb04e2406 --- /dev/null +++ b/detect_secrets/hooks/base.py @@ -0,0 +1,13 @@ +#!/usr/bin/python + + +class BaseHook(object): # pragma: no cover + """This is an abstract class to define Hooks API. A hook is an alerting system + that allows you connect your server scanning results to your larger ecosystem + (eg. email alerts, IRC pings...)""" + + def alert(self, data): + """ + :param data: dictionary; where keys are filenames + """ + raise NotImplementedError diff --git a/detect_secrets/hooks/pysensu_yelp.py b/detect_secrets/hooks/pysensu_yelp.py new file mode 100644 index 000000000..0e613934a --- /dev/null +++ b/detect_secrets/hooks/pysensu_yelp.py @@ -0,0 +1,34 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import codecs + +import pysensu_yelp +import yaml + +from detect_secrets.core.log import CustomLog +from detect_secrets.hooks.base import BaseHook + + +CustomLogObj = CustomLog() + + +class PySensuYelpHook(BaseHook): # pragma: no cover + """This sends an alert to Sensu as specified in the pysensu configuration file.""" + + def __init__(self, config_file): + self.config_file = config_file + + def alert(self, secrets, repo_name): + try: + with codecs.open(self.config_file) as f: + config_data = yaml.safe_load(f) + + except IOError: + CustomLogObj.getLogger().error( + 'Unable to open pysensu config file: %s.', self.config_file + ) + + raise + config_data['output'] = "In repo " + repo_name + "\n" + str(secrets) + pysensu_yelp.send_event(**config_data) diff --git a/detect_secrets/main.py b/detect_secrets/main.py new file mode 100644 index 000000000..5457edad5 --- /dev/null +++ b/detect_secrets/main.py @@ -0,0 +1,42 @@ +#!/usr/bin/python +from __future__ import absolute_import +from __future__ import print_function + +import sys + +from detect_secrets.core.baseline import initialize +from detect_secrets.core.log import CustomLog +from detect_secrets.core.usage import ParserBuilder +from detect_secrets.plugins.high_entropy_strings import Base64HighEntropyString +from detect_secrets.plugins.high_entropy_strings import HexHighEntropyString + + +def parse_args(argv): + return ParserBuilder().add_initialize_baseline_argument() \ + .parse_args(argv) + + +def main(argv=None): + if len(sys.argv) == 1: # pragma: no cover + sys.argv.append('-h') + + args = parse_args(argv) + if args.verbose: # pragma: no cover + CustomLog.enableDebug(args.verbose) + + default_plugins = ( + HexHighEntropyString(args.hex_limit[0]), + Base64HighEntropyString(args.base64_limit[0]), + ) + + if args.initialize: + if args.exclude: + args.exclude = args.exclude[0] + + print(initialize(default_plugins, args.exclude, args.initialize).output_baseline(args.exclude)) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/detect_secrets/plugins/__init__.py b/detect_secrets/plugins/__init__.py new file mode 100644 index 000000000..03d3a6be3 --- /dev/null +++ b/detect_secrets/plugins/__init__.py @@ -0,0 +1,87 @@ +from collections import namedtuple + +from .base import BasePlugin +from .high_entropy_strings import Base64HighEntropyString # noqa: F401 +from .high_entropy_strings import HexHighEntropyString # noqa: F401 +from detect_secrets.core.log import CustomLog + + +_SensitivityValues = namedtuple( + 'SensitivityValues', + [ + 'base64_limit', + 'hex_limit', + ] +) + + +class SensitivityValues(_SensitivityValues): + + def __new__(cls, base64_limit=None, hex_limit=None, **kwargs): + if base64_limit is None and 'Base64HighEntropyString' in kwargs: + base64_limit = kwargs['Base64HighEntropyString'] + + if hex_limit is None and 'HexHighEntropyString' in kwargs: + hex_limit = kwargs['HexHighEntropyString'] + + return super(SensitivityValues, cls).__new__( + cls, + base64_limit=base64_limit, + hex_limit=hex_limit, + ) + + +_CustomLogObj = CustomLog() + + +def _convert_sensitivity_values_to_class_tuple(sensitivity_values): + """ + :param sensitivity_values: SensitivityValues + :return: tuple in the format (, ) + This way, we can initialize the class with () + """ + mapping = { + 'base64_limit': 'Base64HighEntropyString', + 'hex_limit': 'HexHighEntropyString', + } + + output = [] + for key in sensitivity_values._fields: + if key in mapping and getattr(sensitivity_values, key) is not None: + output.append((mapping[key], getattr(sensitivity_values, key),)) + + return tuple(output) + + +def initialize(plugin_config): + """Converts a list of plugin names (and corresponding initializing parameters) + to instances of plugins, for scanning purposes. + + :type plugin_config: SensitivityValues + + :return: list of BasePlugins + """ + output = [] + if not isinstance(plugin_config, SensitivityValues): + return output + + plugin_config_tuple = _convert_sensitivity_values_to_class_tuple(plugin_config) + + for plugin, value in plugin_config_tuple: + klass = globals()[plugin] + + # Make sure the instance is a BasePlugin type, before creating it. + if not issubclass(klass, BasePlugin): + continue + + try: + instance = klass(value) + except TypeError: + _CustomLogObj.getLogger().warning( + 'Unable to initialize plugin!' + ) + continue + + output.append(instance) + + return output diff --git a/detect_secrets/plugins/base.py b/detect_secrets/plugins/base.py new file mode 100644 index 000000000..9d63d224f --- /dev/null +++ b/detect_secrets/plugins/base.py @@ -0,0 +1,33 @@ +#!/usr/bin/python + + +class BasePlugin(object): + """This is an abstract class to define Plugins API""" + + def analyze(self, file, filename): # pragma: no cover + """ + :param file: The File object itself. + :param filename: string; filename of File object, used for creating + PotentialSecret objects + :returns dictionary representation of set (for random access by hash) + { detect_secrets.core.potential_secret.__hash__: + detect_secrets.core.potential_secret } + """ + potential_secrets = {} + for line_num, line in enumerate(file, start=1): + secrets = self.analyze_string(line, line_num, filename) + potential_secrets.update(secrets) + + return potential_secrets + + def analyze_string(self, string, line_num, filename): # pragma: no cover + """ + :param string: string; the line to analyze + :param line_num: integer; line number that is currently being analyzed + :param filename: string; name of file being analyzed + :returns: dictionary + + NOTE: line_num and filename are used for PotentialSecret creation only. + """ + + raise NotImplementedError("%s needs to implement analyze_string()" % self.__class__.__name__) diff --git a/detect_secrets/plugins/high_entropy_strings.py b/detect_secrets/plugins/high_entropy_strings.py new file mode 100644 index 000000000..6bec0f8ce --- /dev/null +++ b/detect_secrets/plugins/high_entropy_strings.py @@ -0,0 +1,80 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import math +import re +import string + +from detect_secrets.core.potential_secret import PotentialSecret +from detect_secrets.plugins.base import BasePlugin + + +class HighEntropyStringsPlugin(BasePlugin): + """Base class for string pattern matching""" + + def __init__(self, charset, limit): + self.charset = charset + self.entropy_limit = limit + self.regex = re.compile(r'([\'"])([%s]+)(\1)' % charset) + + self.secret_type = 'High Entropy String' + + # Allow whitelisting individual lines. + # TODO: Update for not just python comments? + self.ignore_regex = re.compile(r'# ?pragma: ?whitelist[ -]secret') + + def calculate_shannon_entropy(self, data): + """Returns the entropy of a given string. + + Borrowed from: http://blog.dkbza.org/2007/05/scanning-data-for-entropy-anomalies.html. + + :param string: string. The word to analyze. + :param charset: string. The character set from which to calculate entropy. + :returns: float, between 0.0 and 8.0 + """ + if not data: # pragma: no cover + return 0 + + entropy = 0 + for x in self.charset: + p_x = float(data.count(x)) / len(data) + if p_x > 0: + entropy += - p_x * math.log(p_x, 2) + + return entropy + + def analyze_string(self, string, line_num, filename): + """Searches string for custom pattern, and captures all high entropy strings that + match self.regex, with a limit defined as self.entropy_limit.""" + + output = {} + + if self.ignore_regex.search(string): + return output + + # There may be multiple strings on the same line + results = self.regex.findall(string) + for result in results: + entropy_value = self.calculate_shannon_entropy(result[1]) + if entropy_value > self.entropy_limit: + secret = PotentialSecret(self.secret_type, filename, line_num, result[1]) + output[secret] = secret + + return output + + +class HexHighEntropyString(HighEntropyStringsPlugin): + """HighEntropyStringsPlugin for hex strings""" + + def __init__(self, limit): + super(HexHighEntropyString, self).__init__(string.hexdigits, limit) + + +class Base64HighEntropyString(HighEntropyStringsPlugin): + """HighEntropyStringsPlugin for base64 encoded strings""" + + def __init__(self, limit): + super(Base64HighEntropyString, self).__init__( + string.ascii_letters + string.digits + '+/=', + limit + ) diff --git a/detect_secrets/pre_commit_hook.py b/detect_secrets/pre_commit_hook.py new file mode 100644 index 000000000..f0535ee41 --- /dev/null +++ b/detect_secrets/pre_commit_hook.py @@ -0,0 +1,85 @@ +from __future__ import absolute_import + +import sys + +from detect_secrets.core.baseline import apply_baseline_filter +from detect_secrets.core.log import CustomLog +from detect_secrets.core.secrets_collection import SecretsCollection +from detect_secrets.core.usage import ParserBuilder +from detect_secrets.plugins.high_entropy_strings import Base64HighEntropyString +from detect_secrets.plugins.high_entropy_strings import HexHighEntropyString + + +def parse_args(argv): + return ParserBuilder().add_filenames_argument() \ + .add_set_baseline_argument() \ + .parse_args(argv) + + +def pretty_print_diagnostics(secrets): + """Prints a helpful error message, for better usability. + + :param secrets: SecretsCollection + """ + log = CustomLog(formatter='%(message)s').getLogger() + log.error('Potential secrets about to be committed to git repo! Please rectify or\n' + + 'explicitly ignore with `pragma: whitelist secret` comment.\n') + + for filename in secrets.data: + for secret in secrets.data[filename].values(): + log.error(secret) + + log.error('Possible mitigations:\n' + + ' - For information about putting your secrets in a safer place, please ask in #security\n' + + ' - Mark false positives with `# pragma: whitelist secret`\n' + + ' - Use `--no-verify` if this is a one-time false positive\n') + + log.error('If a secret has already been committed, visit https://help.github.com/articles/removing-sensitive-data-from-a-repository/\n') + + +def main(argv=None): + args = parse_args(argv) + if args.verbose: # pragma: no cover + CustomLog.enableDebug(args.verbose) + + if args.baseline[0]: + # If baseline is provided, we first want to make sure it's valid, before + # doing any further computation. + try: + baseline_collection = SecretsCollection.load_from_file( + args.baseline[0] + ) + except IOError: + # Error logs handled in load_from_file logic. + return 1 + + default_plugins = ( + HexHighEntropyString(args.hex_limit[0]), + Base64HighEntropyString(args.base64_limit[0]), + ) + collection = SecretsCollection(default_plugins) + + for filename in args.filenames: + if filename == args.baseline[0]: + # Obviously, don't detect the baseline file + continue + + collection.scan_file(filename) + + results = collection + if args.baseline[0]: + results = apply_baseline_filter( + collection, + baseline_collection, + args.filenames + ) + + if len(results.data) > 0: + pretty_print_diagnostics(results) + return 1 + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/detect_secrets/server/__init__.py b/detect_secrets/server/__init__.py new file mode 100644 index 000000000..2e7ee328a --- /dev/null +++ b/detect_secrets/server/__init__.py @@ -0,0 +1,17 @@ +from .base_tracked_repo import BaseTrackedRepo +from .local_tracked_repo import LocalTrackedRepo +from .s3_tracked_repo import S3LocalTrackedRepo +from .s3_tracked_repo import S3TrackedRepo + + +def tracked_repo_factory(is_local=False, is_s3=False): + if is_s3: + if is_local: + return S3LocalTrackedRepo + else: + return S3TrackedRepo + else: + if is_local: + return LocalTrackedRepo + else: + return BaseTrackedRepo diff --git a/detect_secrets/server/base_tracked_repo.py b/detect_secrets/server/base_tracked_repo.py new file mode 100644 index 000000000..6cf11deb1 --- /dev/null +++ b/detect_secrets/server/base_tracked_repo.py @@ -0,0 +1,415 @@ +from __future__ import absolute_import + +import codecs +import hashlib +import json +import os +import re +import subprocess +import sys +from enum import Enum + +from detect_secrets.core.baseline import apply_baseline_filter +from detect_secrets.core.log import CustomLog +from detect_secrets.core.secrets_collection import SecretsCollection +from detect_secrets.plugins import initialize +from detect_secrets.plugins import SensitivityValues +from detect_secrets.server.repo_config import RepoConfig + + +DEFAULT_BASE_TMP_DIR = os.path.expanduser('~/.detect-secrets-server') + + +CustomLogObj = CustomLog() + + +class OverrideLevel(Enum): + NEVER = 0 + ASK_USER = 1 + ALWAYS = 2 + + +def get_filepath_safe(prefix, file): + """Attempts to prevent file traversal when trying to get `prefix/file`""" + prefix_realpath = os.path.realpath(prefix) + filepath = os.path.realpath('%(prefix_realpath)s/%(file)s' % {'prefix_realpath': prefix_realpath, 'file': file}) + if not filepath.startswith(prefix_realpath): + return None + + return filepath + + +class BaseTrackedRepo(object): + + def __init__( + self, + sha, + repo, + plugin_sensitivity, + repo_config, + cron='', + **kwargs + ): + """ + :type sha: string + :param sha: last commit hash scanned + + :type repo: string + :param repo: git URL or local path of repo + + :type plugin_sensitivity: SensitivityValues + :param plugin_sensitivity: values to configure various plugins + + :type repo_config: RepoConfig + :param repo_config: values to configure repos, See `server_main` for more + details. + + :type cron: string + :param cron: crontab syntax + """ + self.last_commit_hash = sha + self.repo = repo + self.crontab = cron + self.plugin_config = plugin_sensitivity + self.base_tmp_dir = repo_config.base_tmp_dir + self.baseline_file = repo_config.baseline + self.exclude_regex = repo_config.exclude_regex + + self.name = self._get_repo_name(repo) + + self._initialize_tmp_dir(repo_config.base_tmp_dir) + + @classmethod + def load_from_file(cls, repo_name, repo_config, *args, **kwargs): + """This will load a TrackedRepo to memory, from a given tracked file. + For automated management without a database. + + :type repo_name: string + :param repo_name: git URL or local path of repo + + :type repo_config: RepoConfig + :param repo_config: values to configure repos, See `server_main` for more + details. + + :return: TrackedRepo + """ + repo_name = cls._get_repo_name(repo_name) + + data = cls._read_tracked_file(repo_name, repo_config.base_tmp_dir) + if data is None: + return None + + data = cls._modify_tracked_file_contents(data) + + # Add server-side configuration to repo + data['repo_config'] = RepoConfig( + base_tmp_dir=repo_config.base_tmp_dir, + exclude_regex=repo_config.exclude_regex, + baseline=data['baseline_file'], + ) + + return cls(**data) + + def cron(self): + """Returns the cron command to be appended to crontab""" + return '%(crontab)s detect-secrets-server --scan-repo %(name)s' % { + 'crontab': self.crontab, + 'name': self.name, + } + + def scan(self): + """Clones the repo, and scans the git diff between last_commit_hash and HEAD. + + :raises: subprocess.CalledProcessError + """ + self.clone_and_fetch_repo() + diff = self._get_latest_changes() + baseline = self._get_baseline() + + default_plugins = initialize(self.plugin_config) + + secrets = SecretsCollection(default_plugins) + + secrets.load_from_diff(diff.decode('utf-8'), self.exclude_regex, baseline_file=baseline) + if baseline: + + baseline_collection = SecretsCollection.load_from_string(baseline) + + # Don't need to supply filelist, because we're not updating the baseline + secrets = apply_baseline_filter(secrets, baseline_collection, ()) + + return secrets + + def update(self): + """Updates TrackedRepo to latest commit. + + :raises: subprocess.CalledProcessError + """ + + sha = subprocess.check_output([ + 'git', + '--git-dir', self.repo_location, + 'rev-parse', + 'HEAD' + ], stderr=subprocess.STDOUT) + + self.last_commit_hash = sha.decode('ascii').strip() + + def save(self, override_level=OverrideLevel.ASK_USER): + """Saves tracked repo config to file. Returns True if successful. + + :type override_level: OverrideLevel + :param override_level: determines if we overwrite the JSON file, if exists. + """ + if self.tracked_file_location is None: + return False + + # If file exists, check OverrideLevel + if os.path.isfile(self.tracked_file_location): + if override_level == OverrideLevel.NEVER: + return False + + elif override_level == OverrideLevel.ASK_USER: + if not self._prompt_user_override(): + return False + + with codecs.open(self.tracked_file_location, 'w') as f: + f.write(json.dumps(self.__dict__, indent=2)) + + return True + + @property + def repo_location(self): + return get_filepath_safe( + '%s/repos' % self.base_tmp_dir, + self.internal_filename + ) + + @property + def internal_filename(self): + return hashlib.sha512(self.name.encode('utf-8')).hexdigest() + + @property + def tracked_file_location(self): + return self._get_tracked_file_location( + self.base_tmp_dir, + self.internal_filename + ) + + @classmethod + def _initialize_tmp_dir(self, base_tmp_dir): # pragma: no cover + """Make base tmp folder, if non-existent.""" + if not os.path.isdir(base_tmp_dir): + os.makedirs(base_tmp_dir) + os.makedirs(base_tmp_dir + '/repos') + os.makedirs(base_tmp_dir + '/tracked') + + @classmethod + def _get_repo_name(cls, url): + """Obtains the repo name repo URL. + This allows for local file saving, as compared to the URL, which indicates WHERE to clone from. + + :type url: string + """ + # e.g. 'git@github.com:pre-commit/pre-commit-hooks' -> pre-commit/pre-commit-hooks + name = url.split(':')[-1] + + # The url_or_path will still work without the `.git` suffix. + if name.endswith('.git'): + return name[:-4] + + return name + + def clone_and_fetch_repo(self): + """We want to update the repository that we're tracking, to get the latest changes. + Then, we can subsequently scan these new changes. + + :raises: subprocess.CalledProcessError + """ + # We clone a bare repo, because we're not interested in the files themselves. + # This will be more space efficient for local disk storage. + try: + subprocess.check_output([ + 'git', + 'clone', + self.repo, + self.repo_location, + '--bare' + ], stderr=subprocess.STDOUT) + + except subprocess.CalledProcessError as e: + error_msg = e.output.decode('ascii') + + # Ignore this message, because it's expected if the repo has already been tracked. + match = re.match(r"fatal: destination path '[^']+' already exists", error_msg) + if not match: + raise + + # Once we know that we're tracking the repo (after cloning it), then fetch the latest changes. + try: + # Retrieve the current branch name + main_branch = subprocess.check_output([ + 'git', + '--git-dir', + self.repo_location, + 'rev-parse', + '--abbrev-ref', + 'HEAD' + ], stderr=subprocess.STDOUT).strip() + + # Fetch the latest HEAD into the bare repo + subprocess.check_output([ + 'git', + '--git-dir', + self.repo_location, + 'fetch', + '-q', + 'origin', + main_branch + ], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError: + raise + + def _get_latest_changes(self): + """ + :return: string + This will be the patch file format of difference between last saved "clean" + commit hash, and HEAD. + + :raises: subprocess.CalledProcessError + """ + try: + diff = subprocess.check_output([ + 'git', + '--git-dir', self.repo_location, + 'diff', self.last_commit_hash, 'HEAD' + ], stderr=subprocess.STDOUT) + + except subprocess.CalledProcessError: + raise + + return diff + + def _get_baseline(self): + """Take the most updated baseline, because want to get the most updated + baseline. Note that this means it's still "user-dependent", but at the + same time, we want to ignore new explicit whitelists. + Also, this would mean that we **always** get a whitelist, if exists + (rather than worrying about fixing on a commit that has a whitelist) + + :return: file contents of baseline_file + :raises: subprocess.CalledProcessError + """ + if not self.baseline_file: + return + + try: + baseline = subprocess.check_output([ + 'git', + '--git-dir', self.repo_location, + 'show', 'HEAD:%s' % self.baseline_file, + ], stderr=subprocess.STDOUT) + + return baseline.decode('ascii') + + except subprocess.CalledProcessError as e: + error_msg = e.output.decode('ascii') + + # Some repositories may not have baselines. + # This is a non-breaking error, if so. + match = re.match(r"fatal: Path '[^']+' does not exist", error_msg) + if not match: + raise + + @classmethod + def get_tracked_filepath_prefix(cls, base_tmp_dir): + """Returns the directory where the tracked file lives on disk.""" + return '%s/tracked' % base_tmp_dir + + @classmethod + def _get_tracked_file_location(cls, base_tmp_dir, internal_filename): + """We use the file system (instead of a DB) to track and monitor changes to + all TrackedRepos. This function returns where this file lives. + + :return: string + """ + return get_filepath_safe( + cls.get_tracked_filepath_prefix(base_tmp_dir), + internal_filename + '.json' + ) + + @classmethod + def _read_tracked_file(cls, repo_name, base_tmp_dir): + """ + :type repo_name: string + :param repo_name: name of repo to scan + :return: TrackedRepo __dict__ representation + """ + # We need to manually get the `internal_name` of the repo, to know which file to read from. + filename = cls._get_tracked_file_location( + base_tmp_dir, + hashlib.sha512(repo_name.encode('utf-8')).hexdigest() + ) + if not filename: + return None + + try: + with codecs.open(filename) as f: + return json.loads(f.read()) + except (IOError, ValueError, TypeError): + CustomLogObj.getLogger().error( + 'Unable to open repo data file: %s. Aborting.', filename, + ) + return None + + def _prompt_user_override(self): # pragma: no cover + """Prompts for user input to check if should override file. + :return: bool + """ + # Make sure to write to stderr, because crontab output is going to be to stdout + sys.stdout = sys.stderr + + override = None + while override not in ['y', 'n']: + override = str(input( + '"%s" repo already tracked! Do you want to override this (y|n)? ' % self.name + )).lower() + + sys.stdout = sys.__stdout__ + + if override == 'n': + return False + return True + + @classmethod + def _modify_tracked_file_contents(cls, data): + """For better representation, we use namedtuples. However, these do not directly + correlate to file dumps (which `save` does, using `__dict__`. Therefore, we may + need to modify these values, before loading them into the class constructor. + + :type data: dict + :param data: pretty much the layout of __dict__ + :return: dict + """ + # Need to change plugins to type SensitivityValues + data['plugin_sensitivity'] = SensitivityValues(**data['plugins']) + + return data + + @property + def __dict__(self): + """This is written to the filesystem, and used in load_from_file. + Should contain all variables needed to initialize TrackedRepo.""" + output = { + 'sha': self.last_commit_hash, + 'repo': self.repo, + 'plugins': {}, + 'cron': self.crontab, + 'baseline_file': self.baseline_file, + } + + # Add plugin_config + for plugin_name in self.plugin_config._fields: + output['plugins'][plugin_name] = getattr(self.plugin_config, plugin_name) + + return output diff --git a/detect_secrets/server/local_tracked_repo.py b/detect_secrets/server/local_tracked_repo.py new file mode 100644 index 000000000..1c29d69a1 --- /dev/null +++ b/detect_secrets/server/local_tracked_repo.py @@ -0,0 +1,59 @@ +from __future__ import absolute_import + +import os +import subprocess + +from detect_secrets.core.log import CustomLog +from detect_secrets.server.base_tracked_repo import BaseTrackedRepo + + +CustomLogObj = CustomLog() + + +class LocalTrackedRepo(BaseTrackedRepo): + + def cron(self): + return "%s %s" % (super(LocalTrackedRepo, self).cron(), '--local') + + @property + def repo_location(self): + # When we're performing git commands on a local repository, we need to reference + # the `/.git` folder within the cloned git repo. + return os.path.join(self.repo, '.git') + + def clone_and_fetch_repo(self): + # Assumption: If you are scanning a local git repo, then you are "actively" + # working on it. Therefore, this module will not bear the responsibility + # of auto-updating the repo with `git pull`. + pass + + @classmethod + def get_tracked_filepath_prefix(cls, base_tmp_dir): + """Returns the directory where the tracked file lives on disk.""" + return '%s/tracked/local' % base_tmp_dir + + @classmethod + def _get_repo_name(cls, path): + """ + :type path: string + :param path: path to git repo + :return: string + """ + # First, get the git URL from local repository + if not path.endswith('/.git'): + path = os.path.join(path, '.git') + repo_url = subprocess.check_output([ + 'git', + '--git-dir', path, + 'remote', + 'get-url', + 'origin' + ], stderr=subprocess.STDOUT).strip() + return super(LocalTrackedRepo, cls)._get_repo_name(repo_url.decode('utf-8')) + + @classmethod + def _initialize_tmp_dir(cls, base_tmp_dir): # pragma: no cover + super(LocalTrackedRepo, cls)._initialize_tmp_dir(base_tmp_dir) + tracked_local_dir = base_tmp_dir + '/tracked/local' + if not os.path.isdir(tracked_local_dir): + os.makedirs(tracked_local_dir) diff --git a/detect_secrets/server/repo_config.py b/detect_secrets/server/repo_config.py new file mode 100644 index 000000000..ca317123c --- /dev/null +++ b/detect_secrets/server/repo_config.py @@ -0,0 +1,11 @@ +from collections import namedtuple + + +RepoConfig = namedtuple( + 'RepoConfig', + [ + 'base_tmp_dir', + 'baseline', + 'exclude_regex', + ] +) diff --git a/detect_secrets/server/s3_tracked_repo.py b/detect_secrets/server/s3_tracked_repo.py new file mode 100644 index 000000000..a13154f38 --- /dev/null +++ b/detect_secrets/server/s3_tracked_repo.py @@ -0,0 +1,158 @@ +from __future__ import absolute_import + +import hashlib +import json +from collections import namedtuple + +import boto3 + +from detect_secrets.server.base_tracked_repo import BaseTrackedRepo +from detect_secrets.server.base_tracked_repo import DEFAULT_BASE_TMP_DIR +from detect_secrets.server.base_tracked_repo import OverrideLevel +from detect_secrets.server.local_tracked_repo import LocalTrackedRepo + +S3Config = namedtuple( + 'S3Config', + [ + 's3_creds_file', + 'bucket_name', + 'prefix' + ] +) + + +class S3TrackedRepo(BaseTrackedRepo): + + S3 = None + + def __init__(self, s3_config, *args, **kwargs): + """ + :type s3_config: S3Config + """ + super(S3TrackedRepo, self).__init__(*args, **kwargs) + + self.bucket_name = s3_config.bucket_name + self.s3_prefix = s3_config.prefix + + # Need to save, for self.__dict__ + self.credentials_file = s3_config.s3_creds_file + self._initialize_s3_client(self.credentials_file) + + @classmethod + def _initialize_s3_client(cls, filename): + with open(filename) as f: + creds = json.load(f) + + cls.S3 = boto3.client( + 's3', + aws_access_key_id=creds['accessKeyId'], + aws_secret_access_key=creds['secretAccessKey'] + ) + + @classmethod + def _download(cls, bucket_name, prefix, name, destination_path): # pragma: no cover + """Downloads file from S3 into local storage.""" + cls.S3.download_file( + bucket_name, + "%s.json" % (prefix + name), + destination_path + ) + + def _does_file_exist(self): # pragma: no cover + """Determines if a file exists on S3.""" + response = self.S3.list_objects_v2( + Bucket=self.bucket_name, + Prefix=self.s3_key, + ) + + for obj in response.get('Contents', []): + if obj['Key'] == self.s3_key: + return obj['Size'] + + return False + + def _upload(self): # pragma: no cover + self.S3.upload_file( + self.tracked_file_location, + self.bucket_name, + self.s3_key, + ) + + @classmethod + def load_from_file(cls, repo_name, repo_config, s3_config): + """Just download the file from S3 and then call super load_from_file.""" + + repo_name_used_for_file_save = cls._get_repo_name(repo_name) + + # Need to do this manually, because classmethod can't access properties. + internal_filename = hashlib.sha512(repo_name_used_for_file_save.encode('utf-8')).hexdigest() + + base_tmp_dir = repo_config.base_tmp_dir + if not base_tmp_dir: + base_tmp_dir = DEFAULT_BASE_TMP_DIR + + tracked_filepath = cls._get_tracked_file_location(base_tmp_dir, internal_filename) + + cls._initialize_s3_client(s3_config.s3_creds_file) + cls._download( + s3_config.bucket_name, + s3_config.prefix, + internal_filename, + tracked_filepath, + ) + + return cls._load_from_file(repo_name, repo_config) + + @classmethod + def _load_from_file(cls, repo_name, repo_config): # pragma: no cover + """For easier mocking""" + return super(S3TrackedRepo, cls).load_from_file(repo_name, repo_config) + + def save(self, override_level=OverrideLevel.ASK_USER): + success = self._parent_save(override_level) + + if success or not self._does_file_exist(): + # If **only** never overriding, but file doesn't exist, we still want + # to upload it, because we're not overriding anything. + if override_level == OverrideLevel.NEVER and self._does_file_exist(): + return False + + self._upload() + + return True + + @property + def s3_key(self): + output = self.s3_prefix + if not output.endswith('/'): + output += '/' + return output + self.internal_filename + '.json' + + @classmethod + def _modify_tracked_file_contents(cls, data): + data = super(S3TrackedRepo, cls)._modify_tracked_file_contents(data) + + # Need to change s3_config to type S3Config + data['s3_config'] = S3Config(**data['s3_config']) + + return data + + def _parent_save(self, override_level): # pragma: no cover + """For easier mocking""" + return super(S3TrackedRepo, self).save(override_level) + + @property + def __dict__(self): + output = super(S3TrackedRepo, self).__dict__ + + output['s3_config'] = { + 's3_creds_file': self.credentials_file, + 'bucket_name': self.bucket_name, + 'prefix': self.s3_prefix, + } + + return output + + +class S3LocalTrackedRepo(S3TrackedRepo, LocalTrackedRepo): + pass diff --git a/detect_secrets/server_main.py b/detect_secrets/server_main.py new file mode 100644 index 000000000..af59b6f4f --- /dev/null +++ b/detect_secrets/server_main.py @@ -0,0 +1,301 @@ +#!/usr/bin/python +from __future__ import absolute_import +from __future__ import print_function + +import codecs +import sys + +import yaml + +from detect_secrets.core.log import CustomLog +from detect_secrets.core.usage import ParserBuilder +from detect_secrets.hooks.pysensu_yelp import PySensuYelpHook +from detect_secrets.plugins import SensitivityValues +from detect_secrets.server import tracked_repo_factory +from detect_secrets.server.base_tracked_repo import DEFAULT_BASE_TMP_DIR +from detect_secrets.server.base_tracked_repo import OverrideLevel +from detect_secrets.server.repo_config import RepoConfig +from detect_secrets.server.s3_tracked_repo import S3Config + + +CustomLogObj = CustomLog() + + +def open_config_file(config_file): + try: + with codecs.open(config_file) as f: + data = yaml.safe_load(f) + + except IOError: + CustomLogObj.getLogger().error( + 'Unable to open config file: %s', config_file + ) + + raise + + return data + + +def add_repo( + repo, + plugin_sensitivity, + is_local_repo=False, + s3_config=None, + repo_config=None, +): + """Sets up an individual repo for tracking. + + :type repo: string + :param repo: git URL or local path of repo to create TrackedRepo from. + + :type plugin_sensitivity: SensitivityValues + :param plugin_sensitivity: namedtuple of configurable sensitivity values for plugins to be run + + :type is_local_repo: bool + :param is_local_repo: true, if repo to be scanned exists locally (rather than solely managed + by this package) + + :type s3_config: S3Config + :param s3_config: namedtuple of values to setup s3 connection. See `s3_tracked_repo` for more + details. + + :type repo_config: RepoConfig + :param repo_config: namedtuple of values used to configure repositories. + """ + args = { + # We will set this value to HEAD upon first update + 'sha': '', + 'repo': repo, + 'plugin_sensitivity': plugin_sensitivity, + 's3_config': s3_config, + 'repo_config': repo_config, + } + + repo = tracked_repo_factory(is_local_repo, bool(s3_config))(**args) + + # Clone the repo, if needed. + repo.clone_and_fetch_repo() + + # Make the last_commit_hash of repo point to HEAD + repo.update() + + # Save the last_commit_hash, if we have nothing on file already. + repo.save(OverrideLevel.NEVER) + + +def parse_sensitivity_values(args): + """User is able to supply either a config file (with --config-file) + or individual values (eg. --base64-limit). This handles grabbing these + values from the right places, and returning them as a SensitivityValues. + + :param args: parsed arguments from parse_args. + :return: SensitivityValues + """ + default_plugins = {} + if args.config_file: + data = open_config_file(args.config_file[0]).get('default', {}) + default_plugins = data.get('plugins', {}) + + # NOTE: args has the default limits defined, so these values should never fail. + return SensitivityValues( + base64_limit=default_plugins.get( + 'Base64HighEntropyString') or args.base64_limit[0], + hex_limit=default_plugins.get( + 'HexHighEntropyString') or args.hex_limit[0], + ) + + +def parse_s3_config(args): + """ + :param args: parsed arguments from parse_args. + :return: None if no s3_config_file specified. + """ + if not args.s3_config_file: + return None + + with codecs.open(args.s3_config_file[0]) as f: + config = yaml.safe_load(f) + + try: + return S3Config(**config) + except TypeError: + return None + + +def parse_repo_config(args): + """ + :param args: parsed arguments from parse_args. + :return: RepoConfig + """ + default_repo_config = {} + if args.config_file: + default_repo_config = open_config_file(args.config_file[0]).get('default', {}) + + return RepoConfig( + default_repo_config.get('base_tmp_dir', DEFAULT_BASE_TMP_DIR), + default_repo_config.get('baseline', '') or (args.baseline[0]), + default_repo_config.get('exclude_regex', ''), + ) + + +def initialize_repos_from_repo_yaml( + repo_yaml, + plugin_sensitivity, + repo_config, + s3_config=None +): + """For expected yaml file format, see `repos.yaml.sample` + + :type repo_yaml: string + :param repo_yaml: filename of config file to read and parse + + :type plugin_sensitivity: SensitivityValues + + :type repo_config: RepoConfig + + :type s3_config: S3Config + + :return: list of TrackedRepos + :raises: IOError + """ + data = open_config_file(repo_yaml) + + output = [] + if data.get('tracked') is None: + return output + + for entry in data['tracked']: + sensitivity = plugin_sensitivity + if entry.get('plugins'): + # Merge plugin sensitivities + plugin_dict = plugin_sensitivity._asdict() + + # Use SensitivityValues constructor to convert values + entry_sensitivity = SensitivityValues(**entry['plugins']) + plugin_dict.update(entry_sensitivity._asdict()) + + sensitivity = SensitivityValues(**plugin_dict) + + entry['plugin_sensitivity'] = sensitivity + + config = repo_config + if 'baseline_file' in entry: + config = RepoConfig( + base_tmp_dir=repo_config.base_tmp_dir, + exclude_regex=repo_config.exclude_regex, + baseline=entry['baseline_file'], + ) + + entry['repo_config'] = config + + if entry.get('s3_backed') and s3_config is None: + CustomLogObj.getLogger().error( + ( + 'Unable to load s3 config for %s. Make sure to specify ' + '--s3-config-file for s3_backed repos!' + ), + entry.get('repo'), + ) + continue + entry['s3_config'] = s3_config + + # After setting up all arguments, create respective object. + repo = tracked_repo_factory( + entry.get('is_local_repo', False), + entry.get('s3_backed', False), + ) + output.append(repo(**entry)) + + return output + + +def parse_args(argv): + return ParserBuilder().add_initialize_server_argument() \ + .add_scan_repo_argument() \ + .add_config_file_argument() \ + .add_add_repo_argument() \ + .add_local_repo_flag() \ + .add_s3_config_file_argument() \ + .add_set_baseline_argument() \ + .parse_args(argv) + + +def main(argv=None): + """ + Expected Usage: + 1. Initialize TrackedRepos from config.yaml, and save to crontab. + 2. Each cron command will run and scan git diff from previous commit saved, to now. + 3. If something is found, alert. + + :return: shell error code + """ + if len(sys.argv) == 1: # pragma: no cover + sys.argv.append('-h') + + args = parse_args(argv) + if args.verbose: # pragma: no cover + CustomLog.enableDebug(args.verbose) + + plugin_sensitivity = parse_sensitivity_values(args) + repo_config = parse_repo_config(args) + s3_config = parse_s3_config(args) + + if args.initialize: + # initialize sets up the local file storage for tracking + try: + tracked_repos = initialize_repos_from_repo_yaml( + args.initialize, + plugin_sensitivity, + repo_config, + s3_config, + ) + except IOError: + # Error handled in initialize_repos_from_repo_yaml + return 1 + + cron_repos = [repo for repo in tracked_repos if repo.save()] + if not cron_repos: + return 0 + + print('# detect-secrets scanner') + for repo in cron_repos: + print(repo.cron()) + + elif args.add_repo: + add_repo( + args.add_repo[0], + plugin_sensitivity, + is_local_repo=args.local, + s3_config=s3_config, + repo_config=repo_config, + ) + + elif args.scan_repo: + log = CustomLogObj.getLogger() + + repo_name = args.scan_repo[0] + repo = tracked_repo_factory(args.local, bool(s3_config)) \ + .load_from_file(repo_name, repo_config, s3_config) + if not repo: + return 1 + + secrets = repo.scan() + + if not secrets: + return 1 + + if len(secrets.data) > 0: + log.error('SCAN COMPLETE - We found secrets in: %s', repo.name) + PySensuYelpHook('.pysensu.config.yaml').alert(secrets, repo.name) + else: + log.info('SCAN COMPLETE - STATUS: clean for %s', repo.name) + + # Save records, since the latest scan indicates that the most recent commit is clean + repo.update() + repo.save(OverrideLevel.ALWAYS) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/repos.yaml.sample b/repos.yaml.sample new file mode 100644 index 000000000..927a6eb75 --- /dev/null +++ b/repos.yaml.sample @@ -0,0 +1,7 @@ +tracked: + - repo: git@github.com:pre-commit/pre-commit-hooks.git + is_local_repo: False + sha: 9730eb3beb235eabc57d188b58fd135065d07f20 + cron: "* * 4 * *" + plugins: + Base64HighEntropyString: 4 diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 000000000..7b5182b88 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,23 @@ +aspy.yaml==0.3.0 +boto3==1.4.7 +cached-property==1.3.0 +chainmap==1.0.2 +coverage==4.4.1 +enum34==1.1.6 +flake8==3.5.0 +future==0.16.0 +identify==1.0.5 +mccabe==0.6.1 +mock==2.0.0 +nodeenv==1.2.0 +pbr==3.1.1 +pre-commit==0.16.3 +py==1.4.34 +pycodestyle==2.3.1 +pyflakes==1.5.0 +pysensu-yelp==0.3.4 +pytest==3.2.1 +PyYAML==3.12 +six==1.10.0 +unidiff==0.5.4 +virtualenv==15.1.0 diff --git a/s3.yaml.sample b/s3.yaml.sample new file mode 100644 index 000000000..c73938edd --- /dev/null +++ b/s3.yaml.sample @@ -0,0 +1,3 @@ +s3_creds_file: tests/sample_s3_creds.json +bucket_name: my-bucket-in-us-east-1 +prefix: secret_scanner_service/tracked_repos/ diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 000000000..e57d130e3 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[wheel] +universal = True diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..13740b5bf --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_packages +from setuptools import setup + +setup( + name='detect_secrets', + description='Tool for detecting secrets in the codebase', + author='Aaron Loo', + packages=find_packages(exclude=(['test*', 'tmp*'])), + install_requires=[ + 'chainmap', + 'boto3', + 'enum34', + 'future', + 'pysensu_yelp', + 'pyyaml', + 'unidiff', + ], + entry_points={ + 'console_scripts': [ + 'detect-secrets = detect_secrets.main:main', + 'detect-secrets-hook = detect_secrets.pre_commit_hook:main', + 'detect-secrets-server = detect_secrets.server:main', + ], + }, + version='0.6.3', +) diff --git a/test_data/file_with_no_secrets.py b/test_data/file_with_no_secrets.py new file mode 100644 index 000000000..ea832f442 --- /dev/null +++ b/test_data/file_with_no_secrets.py @@ -0,0 +1,11 @@ +#!/usr/bin/python +# Will change this later. +SUPER_SECRET_VALUE = "this is just a long string, like a user facing error message" + + +def main(): + print('Hello world!') + + +if __name__ == '__main__': + main() diff --git a/test_data/file_with_secrets.py b/test_data/file_with_secrets.py new file mode 100644 index 000000000..b8049b4ed --- /dev/null +++ b/test_data/file_with_secrets.py @@ -0,0 +1,11 @@ +#!/usr/bin/python +# Will change this later. +SUPER_SECRET_VALUE = 'c3VwZXIgbG9uZyBzdHJpbmcgc2hvdWxkIGNhdXNlIGVub3VnaCBlbnRyb3B5' + + +def main(): + print('Hello world!') + + +if __name__ == '__main__': + main() diff --git a/test_data/sample.diff b/test_data/sample.diff new file mode 100644 index 000000000..1106c4949 --- /dev/null +++ b/test_data/sample.diff @@ -0,0 +1,69 @@ +diff --git a/detect_secrets/core/baseline.py b/detect_secrets/core/baseline.py +index 8f56ba1..796dbb3 100644 +--- a/detect_secrets/core/baseline.py ++++ b/detect_secrets/core/baseline.py +@@ -79,16 +79,16 @@ def initialize(plugins, exclude_regex=None, rootdir='.'): + rootdir = os.path.abspath(rootdir) + + for subdir, dirs, files in os.walk(rootdir): +- if exclude_regex and regex.search(subdir[len(rootdir)+1:]): ++ if exclude_regex and regex.search(subdir[len("0123456789") + 1:]): + continue + + for file in files: + fullpath = os.path.join(subdir, file) + + # Cover root-level files (because the preliminary regex check won't cover it) +- if exclude_regex and regex.search(fullpath[len(rootdir)+1:]): ++ if exclude_regex and regex.search(fullpath[len(rootdir) + 1:]): + continue + +- output.scan_file(fullpath, fullpath[len(rootdir)+1:]) ++ output.scan_file(fullpath, fullpath[len("2b00042f7481c7b056c4b410d28f33cf") + 1:]) + + return output +diff --git a/tests/core/secrets_collection_test.py b/tests/core/secrets_collection_test.py +index d5ee768..7e848f1 100644 +--- a/tests/core/secrets_collection_test.py ++++ b/tests/core/secrets_collection_test.py +@@ -80,7 +80,7 @@ class SecretsCollectionTest(unittest.TestCase): + # to self.logic.data + assert len(self.logic.data) == 0 + +- ++ "2b00042f7481c7b056c4b410d28f33cf" + def test_get_secret_no_type(self): + cases = [ + ('filename', 'secret', True), +@@ -205,7 +204,6 @@ class SecretsCollectionTest(unittest.TestCase): + except IOError: + mock_log.getLogger().warning.assert_called_once() + +- + # Formatting failures + m = mock.mock_open(read_data=json.dumps({'random': 'json'})) + with mock.patch('detect_secrets.core.secrets_collection.codecs.open', m): +@@ -216,7 +214,6 @@ class SecretsCollectionTest(unittest.TestCase): + mock_log.getLogger().error.assert_called_once() + + +- + class MockPluginFixedValue(BasePlugin): + + def analyze(self, f, filename): +diff --git a/setup.py b/setup.py +index 02ce201..92fcca5 100644 +--- a/setup.py ++++ b/setup.py +@@ -10,4 +10,5 @@ setup( + 'detect-secrets = detect_secrets.main:detect_secrets' + ], + }, ++ version='0.0.1', + ) +diff --git a/setup.py b/setup.py +index 12ce201..02fcca5 200644 +--- a/.secrets.baseline ++++ b/.secrets.baseline +@@ -10,4 +10,5 @@ ( ++ "hashed_secret": "a2480a72004fc40d124495fd59f5b482034fbadd", diff --git a/test_data/tmp/file_with_no_secrets.py b/test_data/tmp/file_with_no_secrets.py new file mode 100644 index 000000000..ea832f442 --- /dev/null +++ b/test_data/tmp/file_with_no_secrets.py @@ -0,0 +1,11 @@ +#!/usr/bin/python +# Will change this later. +SUPER_SECRET_VALUE = "this is just a long string, like a user facing error message" + + +def main(): + print('Hello world!') + + +if __name__ == '__main__': + main() diff --git a/test_data/tmp/file_with_secrets.py b/test_data/tmp/file_with_secrets.py new file mode 100644 index 000000000..60a84ee78 --- /dev/null +++ b/test_data/tmp/file_with_secrets.py @@ -0,0 +1,11 @@ +#!/usr/bin/python +# Will change this later. +SUPER_SECRET_VALUES = '60b725f10c9c85c70d97880dfe8191b3', '3b5d5c3712955042212316173ccf37be' + + +def main(): + print('Hello world!') + + +if __name__ == '__main__': + main() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/baseline_test.py b/tests/core/baseline_test.py new file mode 100644 index 000000000..4b95c04b4 --- /dev/null +++ b/tests/core/baseline_test.py @@ -0,0 +1,201 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import unittest + +from detect_secrets.core.baseline import apply_baseline_filter +from detect_secrets.core.baseline import initialize +from detect_secrets.core.potential_secret import PotentialSecret +from detect_secrets.core.secrets_collection import SecretsCollection +from detect_secrets.plugins.high_entropy_strings import Base64HighEntropyString +from detect_secrets.plugins.high_entropy_strings import HexHighEntropyString + + +def add_secret(collection, filename, lineno, secret): + """Utility function to add individual secrets to a SecretCollection. + + :param collection: SecretCollection; will be modified by this function. + :param filename: string + :param secret: string; secret to add + :param lineno: integer; line number of occurring secret + """ + if filename not in collection.data: # pragma: no cover + collection[filename] = {} + + tmp_secret = PotentialSecret('type', filename, lineno, secret) + collection.data[filename][tmp_secret] = tmp_secret + + +class BaselineTest(unittest.TestCase): + + def test_apply_baseline_filter_nothing_new(self): + new_findings = SecretsCollection() + baseline = SecretsCollection() + + for collection in [new_findings, baseline]: + add_secret(collection, 'filename', 1, 'asdf') + + results = apply_baseline_filter(new_findings, baseline, ['filename']) + + # No expected results, because everything filtered out by baseline + assert len(results.data) == 0 + + # Make sure that baseline didn't get modified either + assert len(baseline.data) == 1 + assert next(iter(baseline.data['filename'])).lineno == 1 + + def test_apply_baseline_filter_new_file(self): + new_findings = SecretsCollection() + add_secret(new_findings, 'filename1', 1, 'asdf') + + baseline = SecretsCollection() + add_secret(baseline, 'filename2', 1, 'asdf') + + backup_baseline = baseline.data.copy() + results = apply_baseline_filter(new_findings, baseline, ['filename1', 'filename2']) + + assert len(results.data) == 1 + assert 'filename1' in results.data + assert baseline.data == backup_baseline + + def test_apply_baseline_filter_new_file_excluded(self): + new_findings = SecretsCollection() + add_secret(new_findings, 'filename1', 1, 'asdf') + add_secret(new_findings, 'filename2', 1, 'asdf') + + baseline = SecretsCollection() + add_secret(baseline, 'filename3', 1, 'asdf') + + backup_baseline = baseline.data.copy() + baseline.exclude_regex = 'filename1' + results = apply_baseline_filter(new_findings, baseline, ['filename1', 'filename2']) + + assert len(results.data) == 1 + assert 'filename1' not in results.data + assert baseline.data == backup_baseline + + def test_apply_baseline_filter_new_secret_line_old_file(self): + """Same file, new line with potential secret""" + new_findings = SecretsCollection() + add_secret(new_findings, 'filename', 1, 'secret1') + + baseline = SecretsCollection() + add_secret(baseline, 'filename', 2, 'secret2') + + backup_baseline = baseline.data.copy() + results = apply_baseline_filter(new_findings, baseline, ['filename']) + + assert len(results.data['filename']) == 1 + secretA = PotentialSecret('type', 'filename', 1, 'secret1') + assert results.data['filename'][secretA].secret_hash == PotentialSecret.hash_secret('secret1') + assert baseline.data == backup_baseline + + def test_apply_baseline_filter_rolled_creds(self): + """Same line, different secret""" + new_findings = SecretsCollection() + add_secret(new_findings, 'filename', 1, 'secret_new') + + baseline = SecretsCollection() + add_secret(baseline, 'filename', 1, 'secret') + + backup_baseline = baseline.data.copy() + results = apply_baseline_filter(new_findings, baseline, ['filename']) + + assert len(results.data['filename']) == 1 + + secretA = PotentialSecret('type', 'filename', 1, 'secret_new') + assert results.data['filename'][secretA].secret_hash == PotentialSecret.hash_secret('secret_new') + assert baseline.data == backup_baseline + + def test_apply_baseline_filter_deleted_secret(self): + new_findings = SecretsCollection() + add_secret(new_findings, 'filename', 2, 'tofu') + + baseline = SecretsCollection() + add_secret(baseline, 'filename', 1, 'hotdog') + add_secret(baseline, 'filename', 2, 'tofu') + + results = apply_baseline_filter(new_findings, baseline, ['filename']) + + # Since hotdog doesn't appear in new_findings, it should be removed. + assert len(results.data) == 0 + assert len(baseline.data) == 1 + assert next(iter(baseline.data['filename'])).lineno == 2 + + def test_apply_baseline_filter_deleted_secret_file(self): + new_findings = SecretsCollection() + baseline = SecretsCollection() + add_secret(baseline, 'filename', 1, 'secret') + + results = apply_baseline_filter(new_findings, baseline, ['filename', 'non_relevant_file']) + + # No results, but baseline should be modified. + assert len(results.data) == 0 + assert len(baseline.data) == 0 + + def test_apply_baseline_filter_same_secret_new_location(self): + new_findings = SecretsCollection() + add_secret(new_findings, 'filename', 1, 'secret') + + baseline = SecretsCollection() + add_secret(baseline, 'filename', 2, 'secret') + + results = apply_baseline_filter(new_findings, baseline, ['filename']) + + # No results, but baseline should be modified with new line location. + assert len(results.data) == 0 + assert len(baseline.data) == 1 + assert next(iter(baseline.data['filename'])).lineno == 1 + + def test_initialize_basic_usage(self): + results = initialize( + [ + Base64HighEntropyString(4.5), + HexHighEntropyString(3) + ], + rootdir='./test_data', + ).json() + + assert len(results.keys()) == 3 + assert len(results['file_with_secrets.py']) == 1 + assert len(results['tmp/file_with_secrets.py']) == 2 + + def test_initialize_exclude_regex(self): + results = initialize( + [ + Base64HighEntropyString(4.5), + HexHighEntropyString(3) + ], + exclude_regex='tmp*', + rootdir='./test_data', + ).json() + + assert len(results.keys()) == 2 + assert 'file_with_secrets.py' in results + + def test_initialize_exclude_regex_at_root_level(self): + results = initialize( + [ + Base64HighEntropyString(4.5), + HexHighEntropyString(3) + ], + exclude_regex='file_with_secrets.py', + rootdir='./test_data' + ).json() + + # All files_with_secrets.py should be ignored, both at the root + # level, and the nested file in tmp. + assert len(results.keys()) == 1 + + def test_initialize_relative_paths(self): + results = initialize( + [ + Base64HighEntropyString(4.5), + HexHighEntropyString(3) + ], + rootdir='test_data/../test_data/tmp/..' + ).json() + + assert len(results.keys()) == 3 + assert len(results['file_with_secrets.py']) == 1 + assert len(results['tmp/file_with_secrets.py']) == 2 diff --git a/tests/core/potential_secret_test.py b/tests/core/potential_secret_test.py new file mode 100644 index 000000000..390c974cc --- /dev/null +++ b/tests/core/potential_secret_test.py @@ -0,0 +1,26 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import unittest + +from detect_secrets.core.potential_secret import PotentialSecret + + +class PotentialSecretTest(unittest.TestCase): + + def test_equality(self): + A = PotentialSecret('type', 'filename', 1, 'secret') + B = PotentialSecret('type', 'filename', 2, 'secret') + assert A == B + + A = PotentialSecret('typeA', 'filename', 1, 'secret') + B = PotentialSecret('typeB', 'filename', 1, 'secret') + assert A != B + + A = PotentialSecret('type', 'filename', 1, 'secretA') + B = PotentialSecret('type', 'filename', 1, 'secretB') + assert A != B + + def test_secret_storage(self): + secret = PotentialSecret('type', 'filename', 1, 'secret') + assert secret.secret_hash != 'secret' diff --git a/tests/core/secrets_collection_test.py b/tests/core/secrets_collection_test.py new file mode 100644 index 000000000..5a02771fa --- /dev/null +++ b/tests/core/secrets_collection_test.py @@ -0,0 +1,357 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import json +import unittest +from time import gmtime +from time import strftime + +import mock + +from detect_secrets.core.potential_secret import PotentialSecret +from detect_secrets.core.secrets_collection import SecretsCollection +from detect_secrets.plugins.base import BasePlugin +from detect_secrets.plugins.high_entropy_strings import HexHighEntropyString +from tests.util.file_util import create_file_object_from_string +from tests.util.file_util import create_file_object_that_throws_unicode_decode_error + + +class SecretsCollectionTest(unittest.TestCase): + + def setUp(self): + self.logic = SecretsCollection() + + @mock.patch('detect_secrets.core.secrets_collection.os.path', autospec=True) + def test_scan_file_symbolic_link(self, mock_path): + mock_path.islink.return_value = True + + assert not self.logic.scan_file('does_not_matter') + + @mock.patch('detect_secrets.core.secrets_collection.CustomLogObj', autospec=True) + def test_scan_file_ioerror(self, mock_log): + assert not self.logic.scan_file('non_existent_file') + mock_log.getLogger().warning.assert_called_once() + + def test_scan_file_proper_use(self): + self.logic.plugins = (MockPluginFixedValue(),) + + m = mock.mock_open(read_data='junk text here, as it does not matter') + with mock.patch('detect_secrets.core.secrets_collection.codecs.open', m): + assert self.logic.scan_file('filename') + assert 'filename' in self.logic.data + assert next(iter(self.logic.data['filename'])).type == 'mock fixed value type' + + def test_extract_secrets_multiple_plugins(self): + filename = 'filename' + self.logic.data[filename] = { + PotentialSecret('mock no value type', filename, 3, 'no value'): True + } + self.logic.plugins = ( + MockPluginFixedValue(), + MockPluginFileValue(), + ) + + self.logic._extract_secrets( + create_file_object_from_string('blah blah'), + filename + ) + + assert len(self.logic.data[filename]) == 3 + + line_numbers = [entry.lineno for entry in self.logic.data[filename]] + line_numbers.sort() + assert line_numbers == [1, 2, 3] + + @mock.patch('detect_secrets.core.secrets_collection.CustomLogObj', autospec=True) + def test_extract_secrets_exception(self, mock_log): + filename = 'filename' + self.logic.data = {} + self.logic.plugins = (HexHighEntropyString(3),) + + self.logic._extract_secrets( + create_file_object_that_throws_unicode_decode_error( + '2b00042f7481c7b056c4b410d28f33cf' + ), + filename + ) + + assert mock_log.getLogger().warning.called + + # If the file read was successful, the md5 hash would have been caught and added + # to self.logic.data + assert len(self.logic.data) == 0 + + def test_get_secret_no_type(self): + cases = [ + ('filename', 'secret', True), + ('filename', 'not_a_secret', False), + ('diff_filename', 'secret', False) + ] + + secret = PotentialSecret('type', 'filename', 1, 'secret') + secret.secret_hash = 'secret' + self.logic.data['filename'] = {secret: secret} + + for case in cases: + filename, secret_hash, expected_value = case + if expected_value: + result = self.logic.get_secret(filename, secret_hash) + assert result is not None + assert result.lineno == 1 # make sure lineno is the same + else: + assert self.logic.get_secret(filename, secret_hash) is None + + def test_get_secret_with_type(self): + cases = [ + ('type', True), + ('wrong_type', False) + ] + + secret = PotentialSecret('type', 'filename', 1, 'secret') + self.logic.data['filename'] = {secret: secret} + + for case in cases: + typ, expected_value = case + if expected_value: + assert self.logic.get_secret('filename', secret.secret_hash, typ) is not None + else: + assert self.logic.get_secret('filename', secret.secret_hash, typ) is None + + def _setup_secrets_for_file_testing(self, current_time): + """This initializes the overhead necessary for testing + save_to_file and load_from_file + + :param current_time: time.struct_time + :returns: json object, representing loaded state + :modifies: self.logic + """ + secretA = PotentialSecret('type A', 'filename1', 3, 'winnie') + secretB = PotentialSecret('type B', 'filename1', 2, 'the') + secretC = PotentialSecret('type C', 'filename2', 1, 'pooh') + + self.logic.data = { + 'filename1': { + secretA: secretA, + secretB: secretB + }, + 'filename2': { + secretC: secretC + } + } + + return { + 'generated_at': strftime("%Y-%m-%dT%H:%M:%SZ", current_time), + 'exclude_regex': '', + 'results': { + 'filename1': [ + { + 'type': 'type B', + 'line_number': 2, + 'hashed_secret': secretB.secret_hash, + }, + { + 'type': 'type A', + 'line_number': 3, + 'hashed_secret': secretA.secret_hash, + }, + ], + 'filename2': [ + { + 'type': 'type C', + 'line_number': 1, + 'hashed_secret': secretC.secret_hash, + }, + ], + } + } + + @mock.patch('detect_secrets.core.secrets_collection.gmtime') + @mock.patch('detect_secrets.core.secrets_collection.json.dumps') + def test_output_baseline(self, mock_json, mock_gmtime): + current_time = gmtime() + mock_gmtime.return_value = current_time + + sample_json = self._setup_secrets_for_file_testing(current_time) + + self.logic.output_baseline() + mock_json.assert_called_once_with(sample_json, indent=2) + + @mock.patch('detect_secrets.core.secrets_collection.gmtime') + @mock.patch('detect_secrets.core.secrets_collection.json.dumps') + def test_output_baseline_with_exclude_regex(self, mock_json, mock_gmtime): + current_time = gmtime() + mock_gmtime.return_value = current_time + + sample_json = self._setup_secrets_for_file_testing(current_time) + + sample_json['exclude_regex'] = 'justforcoverage' + self.logic.output_baseline(exclude_regex='justforcoverage') + mock_json.assert_called_once_with(sample_json, indent=2) + + def test_load_from_file_success(self): + sample_json = self._setup_secrets_for_file_testing(gmtime()) + + m = mock.mock_open(read_data=json.dumps(sample_json)) + with mock.patch('detect_secrets.core.secrets_collection.codecs.open', m): + collection = SecretsCollection.load_from_file('does_not_matter') + + assert len(collection.json()) == len(sample_json['results']) + + for filename in collection.json(): + assert filename in sample_json['results'] + + actual = sorted(collection.json()[filename], key=lambda x: x['line_number']) + expected = sorted(sample_json['results'][filename], key=lambda x: x['line_number']) + assert actual == expected + + def _assert_file_failures(self, callback, mock_log): + """DRY code pattern to test file exceptions upon attempted load. + + :param callback: function that receives a filename as an input, and is expected + to read from that file, and raise an exception. + :param mock_log: the mocked CustomLogObj. + """ + exceptions = ( + UnicodeDecodeError('encoding type', b'subject', 0, 1, 'exception message'), + IOError, + ) + m = mock.mock_open() + with mock.patch('detect_secrets.core.secrets_collection.codecs.open', m): + for exception in exceptions: + m.side_effect = exception + try: + callback('does_not_matter') + assert False # This should never run! pragma: no cover + except (IOError, UnicodeDecodeError): + assert mock_log.getLogger().error.called + + # reset called status, for next exception. + mock_log.getLogger().error.called = False + + @mock.patch('detect_secrets.core.secrets_collection.CustomLogObj', autospec=True) + def test_load_from_file_failures(self, mock_log): + # File failures + self._assert_file_failures(SecretsCollection.load_from_file, mock_log) + + # Formatting failures + m = mock.mock_open(read_data=json.dumps({'random': 'json'})) + with mock.patch('detect_secrets.core.secrets_collection.codecs.open', m): + try: + SecretsCollection.load_from_file('does_not_matter') + assert False # This should never run! pragma: no cover + except IOError: + assert mock_log.getLogger().error.called + + @mock.patch('detect_secrets.core.secrets_collection.CustomLogObj', autospec=True) + def test_load_from_string(self, mock_log): + # Success (smoke test, because it should be the exact same as load_from_file) + sample_json = self._setup_secrets_for_file_testing(gmtime()) + collection = SecretsCollection.load_from_string(json.dumps(sample_json)) + + assert len(collection.json()) == len(sample_json['results']) + + # Failure + try: + SecretsCollection.load_from_string('not a json') + assert False # This should never run! pragma: no cover + except ValueError: + assert mock_log.getLogger().error.called + + mock_log.getLogger().error.called = False + + def test_load_from_diff(self): + self.logic.plugins = (HexHighEntropyString(3),) + + # This is to test the update results code path if filename already exists + # in self.data. + mock_filename = 'tests/core/secrets_collection_test.py' + self.logic.data[mock_filename] = { + PotentialSecret('mock no value type', mock_filename, 3, 'no value'): True + } + + # Exclude the baseline file + with open('test_data/sample.diff') as f: + self.logic.load_from_diff(f.read(), baseline_file=".secrets.baseline") + + # Format: (filename, number_of_secrets_found) + expected_secrets = ( + ('detect_secrets/core/baseline.py', 2), + ('tests/core/secrets_collection_test.py', 1 + 1) # one from the mock_secret + ) + + assert len(self.logic.data) == 2 + for expected_secret in expected_secrets: + assert expected_secret[0] in self.logic.data + assert len(self.logic.data[expected_secret[0]]) == expected_secret[1] + + # Don't exclude the baseline file + with open('test_data/sample.diff') as f: + self.logic.load_from_diff(f.read()) + + # Format: (filename, number_of_secrets_found) + expected_secrets = ( + ('detect_secrets/core/baseline.py', 2), + ('tests/core/secrets_collection_test.py', 1 + 1), # one from the mock_secret + ('.secrets.baseline', 1) + ) + + assert len(self.logic.data) == 3 + for expected_secret in expected_secrets: + assert expected_secret[0] in self.logic.data + assert len(self.logic.data[expected_secret[0]]) == expected_secret[1] + + def test_load_from_diff_with_exclude_regex(self): + self.logic.plugins = (HexHighEntropyString(3),) + + # With excluding tests + with open('test_data/sample.diff') as f: + self.logic.load_from_diff(f.read(), exclude_regex='tests/*', baseline_file=".secrets.baseline") + + # Format: (filename, number_of_secrets_found) + expected_secrets = ( + ('detect_secrets/core/baseline.py', 2), + ) + + assert len(self.logic.data) == 1 + for expected_secret in expected_secrets: + assert expected_secret[0] in self.logic.data + assert len(self.logic.data[expected_secret[0]]) == expected_secret[1] + + def test_load_from_diff_when_filename_already_exists(self): + self.logic.plugins = (HexHighEntropyString(3),) + mock_filename = 'tests/core/secrets_collection_test.py' + self.logic.data[mock_filename] = { + PotentialSecret('mock no value type', mock_filename, 3, 'no value'): True + } + # Without excluding tests + with open('test_data/sample.diff') as f: + self.logic.load_from_diff(f.read(), baseline_file=".secrets.baseline") + + # Format: (filename, number_of_secrets_found) + expected_secrets = ( + ('detect_secrets/core/baseline.py', 2), + ('tests/core/secrets_collection_test.py', 1 + 1) # one from the mock_secret + ) + + assert len(self.logic.data) == 2 + for expected_secret in expected_secrets: + assert expected_secret[0] in self.logic.data + assert len(self.logic.data[expected_secret[0]]) == expected_secret[1] + + +class MockPluginFixedValue(BasePlugin): + + def analyze(self, f, filename): + # We're not testing the plugin's ability to analyze secrets, so + # it doesn't matter what we return + secret = PotentialSecret('mock fixed value type', filename, 1, 'asdf') + return {secret: secret} + + +class MockPluginFileValue(BasePlugin): + + def analyze(self, f, filename): + # We're not testing the plugin's ability to analyze secrets, so + # it doesn't matter what we return + secret = PotentialSecret('mock file value type', filename, 2, f.read().strip()) + return {secret: secret} diff --git a/tests/main_test.py b/tests/main_test.py new file mode 100644 index 000000000..c509fc22a --- /dev/null +++ b/tests/main_test.py @@ -0,0 +1,59 @@ +#!/usr/bin/python +import unittest + +import mock + +from detect_secrets.main import main +from tests.util.mock_util import Any +from tests.util.mock_util import setup_global_mocks + + +class MainTest(unittest.TestCase): + """These are smoke tests for the console usage of detect_secrets. + Most of the functional test cases should be within their own module tests. + """ + + def setUp(self): + setup_global_mocks(self, [ + ('detect_secrets.main.print', False) + ]) + + def test_smoke(self): + assert main([]) == 0 + + @mock.patch('detect_secrets.main.initialize') + def test_initialize_flag_no_excludes_no_rootdir(self, mock_initialize): + assert main(['--initialize']) == 0 + + mock_initialize.assert_called_once_with( + Any(tuple), + None, + '.' + ) + + @mock.patch('detect_secrets.main.initialize') + def test_initialize_flag_with_rootdir(self, mock_initialize): + assert main([ + '--initialize', + 'test_data' + ]) == 0 + + mock_initialize.assert_called_once_with( + Any(tuple), + None, + 'test_data' + ) + + @mock.patch('detect_secrets.main.initialize') + def test_initialize_flag_with_exclude(self, mock_initialize): + assert main([ + '--initialize', + '--exclude', + 'some_pattern_here' + ]) == 0 + + mock_initialize.assert_called_once_with( + Any(tuple), + 'some_pattern_here', + '.' + ) diff --git a/tests/plugins/__init__.py b/tests/plugins/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/plugins/high_entropy_strings_test.py b/tests/plugins/high_entropy_strings_test.py new file mode 100644 index 000000000..f4d3f35b0 --- /dev/null +++ b/tests/plugins/high_entropy_strings_test.py @@ -0,0 +1,134 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import unittest + +from detect_secrets.plugins.high_entropy_strings import Base64HighEntropyString +from detect_secrets.plugins.high_entropy_strings import HexHighEntropyString +from tests.util.file_util import create_file_object_from_string + + +def abstract_test_case(func): + """Decorator used to specify test cases that should NOT be run as part + of the base test case, but only run by children test cases""" + + def decorator(*args, **kwargs): + self = args[0] + + if self.__class__.__name__ == 'HighEntropyStringsTest': + return + + func(*args, **kwargs) + + return decorator + + +class HighEntropyStringsTest(unittest.TestCase): + + def setUp(self, *args): + """ + :param plugin: HighEntropyStringsPlugin + :param non_secret: string; a hash that will be ignored by plugin. + :param secret: string; a hash that will be caught by plugin. + """ + if len(args) == 3: + self.logic, self.non_secret, self.secret = args + + def run_test(self, cases): + """For DRYer code. + + :param cases: list of test cases. + Each test case should be in the format: + [file_content :string, should_be_caught :boolean] + """ + for case in cases: + file_content, should_be_caught = case + f = create_file_object_from_string(file_content) + + results = self.logic.analyze(f, 'does_not_matter') + + if should_be_caught: + assert len(results) == 1 + else: + assert len(results) == 0 + + @abstract_test_case + def test_analyze_multiple_strings_same_line(self): + cases = [ + ( + 'String #1: "%s"; String #2: "%s"' % (self.non_secret, self.secret), + 1, + ), + ( + # We add an 'a' to make the second secret different + 'String #1: "%s"; String #2: "%s"' % (self.secret, self.secret + 'a'), + 2, + ), + ] + + for case in cases: + file_content, expected_results = case + f = create_file_object_from_string(file_content) + + results = self.logic.analyze(f, 'does_not_matter') + + assert len(results) == expected_results + + @abstract_test_case + def test_ignored_lines(self): + cases = [ + ( + # Test inline annotation for whitelisting + "'%s' # pragma: whitelist secret" % self.secret + ), + ( + # Not a string + "%s" % self.secret + ), + ] + + for case in cases: + file_content = case + f = create_file_object_from_string(file_content) + + results = self.logic.analyze(f, 'does_not_matter') + + assert len(results) == 0 + + +class Base64HighEntropyStringsTest(HighEntropyStringsTest): + + def setUp(self): + super(Base64HighEntropyStringsTest, self).setUp( + # Testing default limit, as suggested by truffleHog. + Base64HighEntropyString(4.5), + 'c3VwZXIgc2VjcmV0IHZhbHVl', # too short for high entropy + 'c3VwZXIgbG9uZyBzdHJpbmcgc2hvdWxkIGNhdXNlIGVub3VnaCBlbnRyb3B5', + ) + + def test_pattern(self): + cases = [ + ("'%s'" % self.non_secret, False), + ("'%s'" % self.secret, True) + ] + + self.run_test(cases) + + +class HexHighEntropyStringsTest(HighEntropyStringsTest): + + def setUp(self): + super(HexHighEntropyStringsTest, self).setUp( + # Testing default limit, as suggested by truffleHog. + HexHighEntropyString(3), + 'aaaaaa', + '2b00042f7481c7b056c4b410d28f33cf', + ) + + def test_pattern(self): + cases = [ + ("'%s'" % self.non_secret, False), + ("'%s'" % self.secret, True) + ] + + self.run_test(cases) diff --git a/tests/plugins/init_test.py b/tests/plugins/init_test.py new file mode 100644 index 000000000..036b1217e --- /dev/null +++ b/tests/plugins/init_test.py @@ -0,0 +1,41 @@ +from __future__ import absolute_import + +import unittest + +import mock + +from detect_secrets.plugins import initialize +from detect_secrets.plugins import SensitivityValues +from detect_secrets.plugins.high_entropy_strings import Base64HighEntropyString +from detect_secrets.plugins.high_entropy_strings import HexHighEntropyString + + +class TestInitPlugins(unittest.TestCase): + + def test_initialize_plugins_valid(self): + plugins = SensitivityValues( + base64_limit=4.5, + hex_limit=3, + ) + + output = initialize(plugins) + assert isinstance(output[0], Base64HighEntropyString) + assert output[0].entropy_limit == 4.5 + assert isinstance(output[1], HexHighEntropyString) + assert output[1].entropy_limit == 3 + + def test_initialize_plugins_not_base_plugin(self): + output = initialize({'CustomLog': 4, }) + assert len(output) == 0 + + def test_initialize_plugins_failed_instantiation(self): + plugins = SensitivityValues( + hex_limit=3, + ) + + with mock.patch('detect_secrets.plugins.HexHighEntropyString.__init__') as m: + m.side_effect = TypeError + + output = initialize(plugins) + + assert len(output) == 0 diff --git a/tests/pre_commit_hook_test.py b/tests/pre_commit_hook_test.py new file mode 100644 index 000000000..a536baf0a --- /dev/null +++ b/tests/pre_commit_hook_test.py @@ -0,0 +1,78 @@ +from __future__ import absolute_import + +import json +import logging +import unittest + +import mock + +from detect_secrets.core.potential_secret import PotentialSecret +from detect_secrets.pre_commit_hook import main + + +class PreCommitHookTest(unittest.TestCase): + + @mock.patch('detect_secrets.pre_commit_hook.CustomLog') + def test_file_with_secrets(self, mock_log): + # Silence logs for testing + mock_log.getLogger().setLevel(logging.CRITICAL) + + assert main(['./test_data/file_with_secrets.py']) == 1 + + def test_file_with_no_secrets(self): + assert main(['./test_data/file_with_no_secrets.py']) == 0 + + def test_baseline(self): + """This just checks if the baseline is loaded, and acts appropriately. + More detailed baseline tests are in their own separate test suite.""" + + base64_hash = 'c3VwZXIgbG9uZyBzdHJpbmcgc2hvdWxkIGNhdXNlIGVub3VnaCBlbnRyb3B5' + + file_content = json.dumps({ + 'generated_at': 'blah blah', + 'exclude_regex': '', + 'results': { + './test_data/file_with_secrets.py': [ + { + 'type': 'High Entropy String', + 'line_number': 4, + 'hashed_secret': PotentialSecret.hash_secret(base64_hash), + }, + ], + } + }, indent=2) + + m = mock.mock_open(read_data=file_content) + with mock.patch('detect_secrets.core.secrets_collection.codecs.open', m): + assert main([ + '--baseline', + 'will_be_mocked', + './test_data/file_with_secrets.py' + ]) == 0 + + @mock.patch('detect_secrets.pre_commit_hook.SecretsCollection', autospec=True) + def test_no_computation_if_bad_baseline(self, mock_secrets_collection): + mock_secrets_collection.load_from_file.side_effect = IOError + + assert main([ + '--baseline', + 'will_be_mocked', + './test_data/file_with_secrets.py', + ]) == 1 + + assert mock_secrets_collection.scan_file.called is False + + @mock.patch('detect_secrets.pre_commit_hook.SecretsCollection', autospec=True) + @mock.patch('detect_secrets.pre_commit_hook.apply_baseline_filter') + def test_ignore_baseline_file(self, mock_apply_baseline, mock_secrets_collection): + mock_secrets_collection.load_from_file.return_value = None + + assert main([ + '--baseline', + 'baseline.file', + 'baseline.file', + ]) == 0 + + # It shouldn't scan anything, because baseline.file is the only file to be scanned, + # and it's the baseline itself. + assert mock_secrets_collection.scan_file.called is False diff --git a/tests/server/__init__.py b/tests/server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/server/base_tracked_repo_test.py b/tests/server/base_tracked_repo_test.py new file mode 100644 index 000000000..9de9d758e --- /dev/null +++ b/tests/server/base_tracked_repo_test.py @@ -0,0 +1,293 @@ +from __future__ import absolute_import + +import json +import unittest +from subprocess import CalledProcessError + +import mock + +from detect_secrets.core.baseline import apply_baseline_filter +from detect_secrets.core.potential_secret import PotentialSecret +from detect_secrets.core.secrets_collection import SecretsCollection +from detect_secrets.plugins import SensitivityValues +from detect_secrets.server.base_tracked_repo import BaseTrackedRepo +from detect_secrets.server.base_tracked_repo import DEFAULT_BASE_TMP_DIR +from detect_secrets.server.base_tracked_repo import get_filepath_safe +from detect_secrets.server.base_tracked_repo import OverrideLevel +from detect_secrets.server.repo_config import RepoConfig +from tests.util.mock_util import mock_subprocess +from tests.util.mock_util import PropertyMock +from tests.util.mock_util import SubprocessMock + + +def mock_tracked_repo(cls=BaseTrackedRepo, **kwargs): + """Returns a mock TrackedRepo for testing""" + + defaults = { + 'sha': 'does_not_matter', + 'repo': 'git@github.com:pre-commit/pre-commit-hooks.git', + 'cron': '* * 4 * *', + 'repo_config': RepoConfig( + base_tmp_dir='foo/bar', + baseline='.secrets.baseline', + exclude_regex='', + ), + 'plugin_sensitivity': SensitivityValues( + base64_limit=4.5, + hex_limit=3, + ) + } + + defaults.update(kwargs) + + with mock.patch('detect_secrets.server.base_tracked_repo.os.path.isdir') as m: + m.return_value = True + return cls(**defaults) + + +class BaseTrackedRepoTest(unittest.TestCase): + + def test_get_filepath_safe(self): + assert get_filepath_safe('/path/to', 'file') == '/path/to/file' + assert get_filepath_safe('/path/to', '../to/file') == '/path/to/file' + assert get_filepath_safe('/path/to/../to', 'file') == '/path/to/file' + assert get_filepath_safe('/path/to', '../../etc/pwd') is None + + @mock.patch('detect_secrets.server.base_tracked_repo.os.path.isdir') + def test_load_from_file_success(self, mock_isdir): + mock_isdir.return_value = True + + # Emulate the file that will be written to disk, for state saving. + repo_data = { + 'repo': 'repo-uri', + 'sha': 'sha256-hash', + 'cron': '* * * * *', + 'plugins': { + 'HexHighEntropyString': 3, + }, + 'baseline_file': 'foobar', + 's3_config': 'make_sure_can_be_here_without_affecting_anything', + } + file_contents = json.dumps(repo_data, indent=2) + + m = mock.mock_open(read_data=file_contents) + repo_config = RepoConfig( + base_tmp_dir=DEFAULT_BASE_TMP_DIR, + baseline='baseline', + exclude_regex='', + ) + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', m): + repo = BaseTrackedRepo.load_from_file('will_be_mocked', repo_config=repo_config) + + assert repo.repo == 'repo-uri' + assert repo.last_commit_hash == 'sha256-hash' + assert repo.crontab == '* * * * *' + assert repo.plugin_config.hex_limit == 3 + assert repo.plugin_config.base64_limit is None + + # @mock.patch('detect_secrets.server.CustomLogObj') + @mock.patch('detect_secrets.server.base_tracked_repo.get_filepath_safe') + def test_load_from_file_failures(self, mock_filepath): + repo_config = RepoConfig( + base_tmp_dir=DEFAULT_BASE_TMP_DIR, + baseline='baseline', + exclude_regex='', + ) + # IOError + mock_filepath.return_value = '/blah' + assert BaseTrackedRepo.load_from_file('repo', repo_config) is None + + # JSONDecodeError + m = mock.mock_open(read_data='not a json') + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', m): + assert BaseTrackedRepo.load_from_file('repo', repo_config) is None + + # TypeError + mock_filepath.return_value = None + assert BaseTrackedRepo.load_from_file('repo', repo_config) is None + + def test_cron(self): + repo = mock_tracked_repo() + assert repo.cron() == '* * 4 * * detect-secrets-server --scan-repo pre-commit/pre-commit-hooks' + + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output', autospec=True) + def test_scan_no_baseline(self, mock_subprocess_obj): + repo = mock_tracked_repo() + repo.baseline_file = None + + # We don't really care about any **actual** git results, because mocked. + mock_subprocess_obj.side_effect = mock_subprocess(( + SubprocessMock( + expected_input='git show', + mocked_output=b'will be mocked', + ), + )) + secrets = repo.scan() + assert isinstance(secrets, SecretsCollection) + assert len(secrets.data) == 0 + + # `git clone` unnecessary, because already cloned. However, should still work. + mock_subprocess_obj.side_effect = mock_subprocess(( + SubprocessMock( + expected_input='git clone', + mocked_output=b"fatal: destination path 'asdf' already exists", + should_throw_exception=True, + ), + )) + secrets = repo.scan() + assert isinstance(secrets, SecretsCollection) + + # Baseline supplied, but unable to find baseline file. Should still work. + repo.baseline_file = 'asdf' + mock_subprocess_obj.side_effect = mock_subprocess(( + SubprocessMock( + expected_input='git show', + mocked_output=b"fatal: Path 'asdf' does not exist", + should_throw_exception=True, + ), + )) + secrets = repo.scan() + assert isinstance(secrets, SecretsCollection) + + @mock.patch('detect_secrets.server.base_tracked_repo.apply_baseline_filter') + @mock.patch('detect_secrets.server.base_tracked_repo.SecretsCollection.load_from_string') + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output', autospec=True) + def test_scan_with_baseline(self, mock_subprocess_obj, mock_load_from_string, mock_apply): + repo = mock_tracked_repo() + + # Setup secrets + secretA = PotentialSecret('type', 'filenameA', 1, 'blah') + secretB = PotentialSecret('type', 'filenameA', 2, 'curry') + original_secrets = SecretsCollection() + original_secrets.data['filenameA'] = { + secretA: secretA, + secretB: secretB, + } + baseline_secrets = SecretsCollection() + baseline_secrets.data['filenameA'] = { + secretA: secretA, + } + + # Easier than mocking load_from_diff. + mock_apply.side_effect = lambda orig, base, filelist: \ + apply_baseline_filter(original_secrets, baseline_secrets, filelist) + + mock_subprocess_obj.side_effect = mock_subprocess(( + SubprocessMock( + expected_input='git show', + mocked_output=b'will be mocked', + ), + )) + secrets = repo.scan() + + assert len(secrets.data) == 1 + assert secrets.data['filenameA'][secretB] == secretB + + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output', autospec=True) + def test_scan_bad_input(self, mock_subprocess_obj): + repo = mock_tracked_repo() + + cases = [ + ( + 'git clone', + b'fatal: Could not read from remote repository', + ), + ( + 'git fetch', + b'fatal: Could not read from remote repository', + ), + ( + 'git diff', + b'fatal: some unknown error', + ), + ( + 'git show', + b'fatal: some unknown error', + ), + ] + + for case in cases: + mock_subprocess_obj.side_effect = mock_subprocess(( + SubprocessMock( + expected_input=case[0], + mocked_output=case[1], + should_throw_exception=True, + ), + )) + try: + repo.scan() + assert False + except CalledProcessError: + pass + + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output', autospec=True) + def test_update(self, mock_subprocess): + mock_subprocess.return_value = b'asdf' + repo = mock_tracked_repo() + + repo.update() + + assert repo.last_commit_hash == 'asdf' + + @mock.patch('detect_secrets.server.base_tracked_repo.os.path.isfile') + def test_save_no_existing_file(self, mock_isfile): + mock_isfile.return_value = False + repo = mock_tracked_repo() + + m = mock.mock_open() + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', m): + repo.save() + + m().write.assert_called_once_with(json.dumps(repo.__dict__, indent=2)) + + @mock.patch('detect_secrets.server.base_tracked_repo.codecs.open') + def test_save_bad_input(self, mock_open): + # Needed for coverage + repo = mock_tracked_repo() + + mock_stub = PropertyMock(return_value=None) + with mock.patch.object(BaseTrackedRepo, 'tracked_file_location', mock_stub): + assert repo.save() is False + assert mock_open.called is False + + @mock.patch('detect_secrets.server.base_tracked_repo.codecs.open') + @mock.patch('detect_secrets.server.base_tracked_repo.os.path.isfile') + def test_save_override_levels(self, mock_isfile, mock_open): + mock_isfile.return_value = True + repo = mock_tracked_repo() + + # If NEVER override, then make sure that's true. + assert repo.save(OverrideLevel.NEVER) is False + + mock_stub = mock.Mock() + with mock.patch.object(repo, '_prompt_user_override', mock_stub): + # If user says NO to override + mock_stub.return_value = False + assert repo.save() is False + + # If user says YES to override + mock_stub.return_value = True + assert repo.save() is True + + def test_get_repo_name(self): + cases = [ + ( + 'git@github.com:pre-commit/pre-commit-hooks.git', + 'pre-commit/pre-commit-hooks', + ), + + # Doesn't end with `.git` + ( + 'git@github.com:pre-commit/pre-commit-hooks', + 'pre-commit/pre-commit-hooks', + ), + + # No slash + ( + 'git@git.example.com:pre-commit-hooks', + 'pre-commit-hooks', + ), + ] + + for case in cases: + assert mock_tracked_repo(repo=case[0]).name == case[1] diff --git a/tests/server/local_tracked_repo_test.py b/tests/server/local_tracked_repo_test.py new file mode 100644 index 000000000..13ef77d65 --- /dev/null +++ b/tests/server/local_tracked_repo_test.py @@ -0,0 +1,56 @@ +from __future__ import absolute_import + +import subprocess +import unittest + +import mock + +from detect_secrets.server.local_tracked_repo import LocalTrackedRepo +from tests.server.base_tracked_repo_test import mock_tracked_repo as _mock_tracked_repo +from tests.util.mock_util import mock_subprocess +from tests.util.mock_util import SubprocessMock + + +def mock_tracked_repo(**kwargs): + repo_name = kwargs.get('repo_name') or b'git@github.com:pre-commit/pre-commit-hooks' + + # Need to mock out, because __init__ runs `git remote get-url origin` + with mock.patch( + 'detect_secrets.server.local_tracked_repo.subprocess.check_output', + autospec=True + ) as m: + m.side_effect = mock_subprocess(( + SubprocessMock( + expected_input='git remote get-url origin', + mocked_output=repo_name, + ), + )) + + output = _mock_tracked_repo(cls=LocalTrackedRepo, **kwargs) + + if 'git_dir' in kwargs: + m.assert_called_with([ + 'git', + '--git-dir', kwargs.get('git_dir'), + 'remote', + 'get-url', + 'origin' + ], stderr=subprocess.STDOUT) + + return output + + +class LocalTrackedRepoTest(unittest.TestCase): + + def test_cron(self): + repo = mock_tracked_repo() + + assert repo.cron() == \ + '* * 4 * * detect-secrets-server --scan-repo pre-commit/pre-commit-hooks --local' + + def test_get_repo_name(self): + assert mock_tracked_repo( + repo='/Users/morpheus/hooks/pre-commit-hooks', + git_dir='/Users/morpheus/hooks/pre-commit-hooks/.git', + repo_name=b'git@github.com:pre-commit/pre-commit-hooks', + ).name == 'pre-commit/pre-commit-hooks' diff --git a/tests/server/s3_tracked_repo_test.py b/tests/server/s3_tracked_repo_test.py new file mode 100644 index 000000000..e2775bb71 --- /dev/null +++ b/tests/server/s3_tracked_repo_test.py @@ -0,0 +1,153 @@ +from __future__ import absolute_import + +import hashlib +import unittest + +import mock + +from detect_secrets.plugins import SensitivityValues +from detect_secrets.server.base_tracked_repo import DEFAULT_BASE_TMP_DIR +from detect_secrets.server.base_tracked_repo import OverrideLevel +from detect_secrets.server.repo_config import RepoConfig +from detect_secrets.server.s3_tracked_repo import S3Config +from detect_secrets.server.s3_tracked_repo import S3TrackedRepo +from tests.server.base_tracked_repo_test import mock_tracked_repo as _mock_tracked_repo +from tests.util.mock_util import PropertyMock + + +def mock_tracked_repo(**kwargs): + additional_s3_args = { + 's3_creds_file': 'credentials.sample.json', + 'bucket_name': 'bucket', + 'prefix': 'prefix' + } + additional_s3_args.update(kwargs) + + config = S3Config( + s3_creds_file=additional_s3_args['s3_creds_file'], + bucket_name=additional_s3_args['bucket_name'], + prefix=additional_s3_args['prefix'], + ) + + with mock.patch.object(S3TrackedRepo, '_download'),\ + mock.patch.object(S3TrackedRepo, '_initialize_s3_client'): + return _mock_tracked_repo(cls=S3TrackedRepo, s3_config=config, **kwargs) + + +class S3TrackedRepoTest(unittest.TestCase): + + def test_load_from_file_success(self): + repo_name = 'name' + internal_filename = hashlib.sha512( + repo_name.encode('utf-8')).hexdigest() + + mock_download = mock.Mock() + mock_super = mock.Mock() + with mock.patch.object(S3TrackedRepo, '_download', mock_download), \ + mock.patch.object(S3TrackedRepo, '_load_from_file', mock_super), \ + mock.patch.object(S3TrackedRepo, '_initialize_s3_client'): + S3TrackedRepo.load_from_file( + repo_name, + RepoConfig( + base_tmp_dir=DEFAULT_BASE_TMP_DIR, + baseline='baseline', + exclude_regex='', + ), + S3Config( + s3_creds_file='s3_creds_file', + bucket_name='bucket_name', + prefix='prefix_value', + ), + ) + + mock_download.assert_called_once_with( + 'bucket_name', + 'prefix_value', + internal_filename, + '%s/tracked/%s.json' % (DEFAULT_BASE_TMP_DIR, internal_filename) + ) + + def test_save_file_fail_uploads_if_not_in_s3(self): + repo = mock_tracked_repo() + + mock_upload = mock.Mock() + mock_save = mock.Mock(return_value=False) + mock_exists = mock.Mock(return_value=False) + with mock.patch.object(repo, '_upload', mock_upload), \ + mock.patch.object(repo, '_parent_save', mock_save), \ + mock.patch.object(repo, '_does_file_exist', mock_exists): + repo.save() + + assert mock_upload.called is True + + def test_save_file_normal_success(self): + repo = mock_tracked_repo() + + mock_parent_save = mock.Mock(return_value=True) + mock_upload = mock.Mock() + with mock.patch.object(repo, '_parent_save', mock_parent_save), \ + mock.patch.object(repo, '_upload', mock_upload): + repo.save() + + assert mock_upload.called is True + + def test_save_file_already_exists_on_s3(self): + repo = mock_tracked_repo() + + mock_parent_save = mock.Mock(return_value=True) + mock_file_exist = mock.Mock(return_value=True) + mock_upload = mock.Mock() + with mock.patch.object(repo, '_parent_save', mock_parent_save), \ + mock.patch.object(repo, '_upload', mock_upload), \ + mock.patch.object(repo, '_does_file_exist', mock_file_exist): + + # Make sure to override, if file exists. + repo.save() + + assert mock_upload.called is True + + # Make sure **not** to override, if override == NEVER + mock_upload.called = False + + repo.save(OverrideLevel.NEVER) + + assert mock_upload.called is False + + # Make sure to still upload, if file doesn't exist + mock_upload.called = False + mock_file_exist.return_value = False + + repo.save(OverrideLevel.NEVER) + + assert mock_upload.called is True + + def test_s3_key(self): + for prefix_name in [ + 'prefix', + 'prefix/', + ]: + repo = mock_tracked_repo(prefix=prefix_name) + + mock_stub = PropertyMock(return_value='internal_filename') + with mock.patch.object(S3TrackedRepo, 'internal_filename', mock_stub): + assert repo.s3_key == 'prefix/internal_filename.json' + + def test_modify_tracked_file_contents(self): + data = { + 'plugins': { + 'HexHighEntropyString': 3, + }, + 's3_config': { + 's3_creds_file': 'filename', + 'bucket_name': 'bucket', + 'prefix': 'prefix', + }, + } + + output = S3TrackedRepo._modify_tracked_file_contents(data) + + assert isinstance(output['plugin_sensitivity'], SensitivityValues) + assert output['plugin_sensitivity'].hex_limit == 3 + assert isinstance(output['s3_config'], S3Config) + assert output['s3_config'].bucket_name == 'bucket' + assert output['s3_config'].prefix == 'prefix' diff --git a/tests/server_main_test.py b/tests/server_main_test.py new file mode 100644 index 000000000..e507f86e6 --- /dev/null +++ b/tests/server_main_test.py @@ -0,0 +1,493 @@ +#!/usr/bin/python +from __future__ import absolute_import + +import hashlib +import json +import unittest + +import mock + +from detect_secrets.core.secrets_collection import SecretsCollection +from detect_secrets.plugins import SensitivityValues +from detect_secrets.server.base_tracked_repo import BaseTrackedRepo +from detect_secrets.server.local_tracked_repo import LocalTrackedRepo +from detect_secrets.server.repo_config import RepoConfig +from detect_secrets.server.s3_tracked_repo import S3Config +from detect_secrets.server.s3_tracked_repo import S3LocalTrackedRepo +from detect_secrets.server.s3_tracked_repo import S3TrackedRepo +from detect_secrets.server_main import initialize_repos_from_repo_yaml +from detect_secrets.server_main import main +from detect_secrets.server_main import parse_args +from detect_secrets.server_main import parse_s3_config +from detect_secrets.server_main import parse_sensitivity_values +from tests.util.mock_util import mock_subprocess +from tests.util.mock_util import SubprocessMock + + +class ServerTest(unittest.TestCase): + + @staticmethod + def assert_sensitivity_values(actual, **expected_values): + assert isinstance(actual, SensitivityValues) + for key in actual._fields: + if key in expected_values: + assert expected_values[key] == getattr(actual, key) + else: + assert getattr(actual, key) is None + + def _mock_repo_config(self): + return RepoConfig( + base_tmp_dir='default_base_tmp_dir', + baseline='baseline', + exclude_regex='', + ) + + def test_parse_sensitivity_values_usage_defaults(self): + mock_args = parse_args([]) + + self.assert_sensitivity_values( + parse_sensitivity_values(mock_args), + base64_limit=4.5, + hex_limit=3, + ) + + @mock.patch('detect_secrets.server_main.open_config_file') + def test_parse_sensitivity_values_config_file_overrides_default_values(self, mock_data): + mock_data.return_value = { + 'default': { + 'plugins': { + 'HexHighEntropyString': 4, + } + } + } + + mock_args = parse_args(['--config-file', 'will_be_mocked']) + + self.assert_sensitivity_values( + parse_sensitivity_values(mock_args), + base64_limit=4.5, + hex_limit=4, + ) + + def test_parse_sensitivity_values_cli_overrides_default_values(self): + mock_args = parse_args(['--base64-limit', '2']) + + self.assert_sensitivity_values( + parse_sensitivity_values(mock_args), + base64_limit=2, + hex_limit=3, + ) + + @mock.patch('detect_secrets.server_main.open_config_file') + def test_parse_sensitivity_values_config_file_overrides_cli(self, mock_data): + mock_args = parse_args( + ['--base64-limit', '3', '--config-file', 'will_be_mocked']) + mock_data.return_value = { + 'default': { + 'plugins': { + 'Base64HighEntropyString': 2, + } + } + } + + self.assert_sensitivity_values( + parse_sensitivity_values(mock_args), + base64_limit=2, + hex_limit=3, + ) + + def test_parse_s3_config_fail(self): + # No file supplied + mock_args = parse_args([]) + assert parse_s3_config(mock_args) is None + + # Bad initialization of S3Config + m = mock.mock_open(read_data='{}') + mock_args = parse_args(['--s3-config-file', 'will_be_mocked']) + with mock.patch('detect_secrets.server_main.codecs.open', m): + assert parse_s3_config(mock_args) is None + + def test_parse_s3_config_success(self): + mock_args = parse_args(['--s3-config-file', 'will_be_mocked']) + data = { + 's3_creds_file': 's3_creds_file.json', + 'bucket_name': 'bucket_name', + 'prefix': 'prefix', + } + m = mock.mock_open(read_data=json.dumps(data)) + with mock.patch('detect_secrets.server_main.codecs.open', m): + output = parse_s3_config(mock_args) + + assert isinstance(output, S3Config) + assert output.bucket_name == 'bucket_name' + assert output.prefix == 'prefix' + + @mock.patch('detect_secrets.server_main.open_config_file') + def test_initialize_repos_from_repo_yaml_no_tracked_repos(self, mock_data): + mock_data.return_value = { + 'nothing': 'important' + } + + assert initialize_repos_from_repo_yaml( + 'will_be_mocked', + SensitivityValues(), + self._mock_repo_config(), + ) == [] + + @mock.patch('detect_secrets.server_main.open_config_file') + def test_initialize_repos_from_repo_yaml_no_s3_config(self, mock_data): + mock_data.return_value = { + 'tracked': [ + { + 'sha': 'does_not_matter', + 'repo': 'does_not_matter', + 's3_backed': True, + } + ] + } + + assert initialize_repos_from_repo_yaml( + 'will_be_mocked', + SensitivityValues, + self._mock_repo_config(), + ) == [] + + @mock.patch('detect_secrets.server.local_tracked_repo.subprocess') + @mock.patch('detect_secrets.server_main.open_config_file') + def test_initialize_repos_from_repo_yaml_success(self, mock_data, mock_subprocess): + def _create_mock_tracked_repo_repr(**kwargs): + defaults = { + 'sha': 'does_not_matter', + 'repo': 'does_not_matter', + } + + defaults.update(kwargs) + + return defaults + + mock_data.return_value = { + 'tracked': [ + _create_mock_tracked_repo_repr( + # Test that it can also be overriden here. + plugins={ + 'Base64HighEntropyString': 2, + }, + baseline_file='is_included', + ), + _create_mock_tracked_repo_repr( + # Test local repo + is_local_repo=True, + ), + _create_mock_tracked_repo_repr( + # Test S3 remote repo + s3_backed=True, + ), + _create_mock_tracked_repo_repr( + # Test S3 local repo + is_local_repo=True, + s3_backed=True, + ), + ] + } + + with mock.patch.object(S3TrackedRepo, '_initialize_s3_client'): + output = initialize_repos_from_repo_yaml( + 'will_be_mocked', + SensitivityValues( + base64_limit=1, + hex_limit=2, + ), + self._mock_repo_config(), + S3Config( + s3_creds_file='filename', + bucket_name='bucket', + prefix='prefix', + ) + ) + + assert isinstance(output[0], BaseTrackedRepo) + assert isinstance(output[1], LocalTrackedRepo) + assert isinstance(output[2], S3TrackedRepo) + assert isinstance(output[3], S3LocalTrackedRepo) + + assert output[0].plugin_config.base64_limit == 2 + assert output[0].baseline_file == 'is_included' + assert output[1].plugin_config.base64_limit == 1 + + @mock.patch('detect_secrets.server_main.print') + @mock.patch('detect_secrets.server.local_tracked_repo.subprocess.check_output') + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo.save') + @mock.patch('detect_secrets.server_main.open_config_file') + def test_main_initialize_success(self, mock_data, mock_save, mock_repo_url, mock_print): + mock_save.return_value = True + mock_repo_url.return_value = b'git@github.com:some/random-repo.git' + mock_data.return_value = { + 'tracked': [ + { + 'repo': 'git@github.com:yelp/detect-secrets.git', + 'sha': 'some_sha_value', + 'cron': '1 2 3 4 5', + }, + { + 'repo': '/file/to/local/repo', + 'is_local_repo': True, + 'sha': 'some_other_value', + 'cron': '2 3 4 5 6', + }, + ] + } + + assert main(['--initialize']) == 0 + mock_print.assert_has_calls([ + mock.call('# detect-secrets scanner'), + mock.call( + '1 2 3 4 5 detect-secrets-server --scan-repo yelp/detect-secrets'), + mock.call( + '2 3 4 5 6 detect-secrets-server --scan-repo some/random-repo --local'), + ]) + assert mock_print.call_count == 3 + + @mock.patch('detect_secrets.server_main.print') + def test_main_initialize_failures(self, mock_print): + with mock.patch('detect_secrets.server_main.initialize_repos_from_repo_yaml') as m: + m.side_effect = IOError + assert main(['--initialize']) == 1 + + with mock.patch('detect_secrets.server_main.initialize_repos_from_repo_yaml') as m: + m.return_value = [] + assert main(['--initialize']) == 0 + assert mock_print.call_count == 0 + + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output') + def test_main_add_repo_remote(self, mock_subprocess_obj): + mock_subprocess_obj.side_effect = mock_subprocess(( + # mock out `clone_and_fetch_repo` + SubprocessMock( + expected_input='git clone', + mocked_output=b"fatal: destination path 'asdf' already exists", + ), + SubprocessMock( + expected_input='git rev-parse --abbrev-ref', + mocked_output=b'master', + ), + SubprocessMock( + expected_input='git fetch -q origin', + mocked_output=b'', + ), + + # mock out `update` + SubprocessMock( + expected_input='git rev-parse HEAD', + mocked_output=b'new-sha-hash', + ) + )) + + m = mock.mock_open() + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', m): + assert main([ + '--add-repo', + 'git@github.com:yelp/detect-secrets.git', + '--base64-limit', + '2' + ]) == 0 + + m().write.assert_called_once_with(json.dumps({ + 'sha': 'new-sha-hash', + 'repo': 'git@github.com:yelp/detect-secrets.git', + 'plugins': { + 'base64_limit': 2.0, # supplied CLI value + 'hex_limit': 3, # default value + }, + 'cron': '', + 'baseline_file': '', + }, indent=2)) + + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output') + def test_main_add_repo_local(self, mock_subprocess_obj): + mock_subprocess_obj.side_effect = mock_subprocess(( + # mock out `clone_and_fetch_repo` + SubprocessMock( + expected_input='git clone', + mocked_output=b"fatal: destination path 'asdf' already exists", + ), + SubprocessMock( + expected_input='git rev-parse --abbrev-ref', + mocked_output=b'master', + ), + SubprocessMock( + expected_input='git fetch -q origin', + mocked_output=b'', + ), + + # mock out `update` + SubprocessMock( + expected_input='git rev-parse HEAD', + mocked_output=b'new-sha-hash', + ) + )) + + m = mock.mock_open() + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', m): + assert main([ + '--add-repo', + '/file/to/local/repo', + '--local', + '--baseline', + '.baseline', + ]) == 0 + + m().write.assert_called_once_with(json.dumps({ + 'sha': 'new-sha-hash', + 'repo': '/file/to/local/repo', + 'plugins': { + 'base64_limit': 4.5, + 'hex_limit': 3, + }, + 'cron': '', + 'baseline_file': '.baseline', + }, indent=2)) + + @mock.patch('detect_secrets.server.s3_tracked_repo.S3TrackedRepo.S3') + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output') + def test_main_add_repo_s3(self, mock_subprocess_obj, mock_s3_obj): + mock_subprocess_obj.side_effect = mock_subprocess(( + # mock out `_get_repo_name` + SubprocessMock( + expected_input='git remote get-url origin', + mocked_output=b'git@github.com:yelp/detect-secrets', + ), + + # mock out `update` + SubprocessMock( + expected_input='git rev-parse HEAD', + mocked_output=b'new-sha-hash', + ) + )) + + mock_s3_config = { + 's3_creds_file': 'filename', + 'bucket_name': 'bucketman', + 'prefix': 'mister', + } + + final_output = mock.mock_open() + s3_config = mock.mock_open(read_data=json.dumps(mock_s3_config)) + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', final_output),\ + mock.patch('detect_secrets.server_main.codecs.open', s3_config),\ + mock.patch( + 'detect_secrets.server.s3_tracked_repo.S3TrackedRepo._initialize_s3_client' + ): + assert main([ + '--add-repo', + 'git@github.com:yelp/detect-secrets.git', + '--s3-config-file', + 'will-be-mocked', + ]) == 0 + + mock_s3_obj.list_objects_v2.assert_called_once_with( + Bucket='bucketman', + Prefix='mister/%s.json' % hashlib.sha512( + 'yelp/detect-secrets'.encode('utf-8') + ).hexdigest(), + ) + + assert mock_s3_obj.upload_file.call_count == 1 + + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo.load_from_file') + def test_main_scan_repo_unconfigured_repo(self, mock_load_from_file): + mock_load_from_file.return_value = None + assert main(['--scan-repo', 'will-be-mocked']) == 1 + + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo.scan') + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo._read_tracked_file') + def test_main_scan_repo_scan_failed(self, mock_read_file, mock_scan): + mock_read_file.return_value = { + 'sha': 'does_not_matter', + 'repo': 'repo_name', + 'plugins': { + 'base64_limit': 3, + }, + 'cron': '* * * * *', + 'baseline_file': '.secrets.baseline', + } + + mock_scan.return_value = None + assert main(['--scan-repo', 'will-be-mocked']) == 1 + + @mock.patch('detect_secrets.server.base_tracked_repo.subprocess.check_output', autospec=True) + @mock.patch('detect_secrets.server_main.CustomLogObj.getLogger') + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo.scan') + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo._read_tracked_file') + def test_main_scan_repo_scan_success_no_results_found( + self, + mock_file, + mock_scan, + mock_log, + mock_subprocess_obj + ): + mock_file.return_value = { + 'sha': 'does_not_matter', + 'repo': 'repo_name', + 'plugins': { + 'base64_limit': 3, + }, + 'cron': '* * * * *', + 'baseline_file': '.secrets.baseline', + } + mock_scan.return_value = SecretsCollection() + + mock_subprocess_obj.side_effect = mock_subprocess(( + SubprocessMock( + expected_input='git rev-parse HEAD', + mocked_output=b'new_sha' + ), + )) + + m = mock.mock_open() + with mock.patch('detect_secrets.server.base_tracked_repo.codecs.open', m): + assert main(['--scan-repo', 'will-be-mocked']) == 0 + + mock_log().info.assert_called_with( + 'SCAN COMPLETE - STATUS: clean for %s', + 'repo_name', + ) + + m().write.assert_called_once_with(json.dumps({ + 'sha': 'new_sha', + 'repo': 'repo_name', + 'plugins': { + 'base64_limit': 3, + 'hex_limit': None, + }, + 'cron': '* * * * *', + 'baseline_file': '.secrets.baseline', + }, indent=2)) + + @mock.patch('detect_secrets.server_main.CustomLogObj.getLogger') + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo.scan') + @mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo._read_tracked_file') + def test_main_scan_repo_scan_success_secrets_found(self, mock_file, mock_scan, mock_log): + mock_file.return_value = { + 'sha': 'does_not_matter', + 'repo': 'repo_name', + 'plugins': { + 'base64_limit': 3, + }, + 'cron': '* * * * *', + 'baseline_file': '.secrets.baseline', + } + + mock_secret_collection = SecretsCollection() + mock_secret_collection.data['junk'] = 'data' + mock_scan.return_value = mock_secret_collection + + with mock.patch('detect_secrets.server_main.PySensuYelpHook') as m, \ + mock.patch('detect_secrets.server.base_tracked_repo.BaseTrackedRepo.update') as u: + assert main(['--scan-repo', 'will-be-mocked']) == 0 + + assert u.call_count == 0 + assert m.call_count == 1 + + def test_main_no_args(self): + # Needed for coverage + assert main([]) == 0 diff --git a/tests/util/__init__.py b/tests/util/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/util/file_util.py b/tests/util/file_util.py new file mode 100644 index 000000000..51a171da9 --- /dev/null +++ b/tests/util/file_util.py @@ -0,0 +1,36 @@ +#!/usr/bin/python +""" + This is a collection of utility functions for easier, DRY testing. +""" +import sys + +if sys.version_info[0] == 2: # pragma: no cover + import cStringIO as io +else: # pragma: no cover + import io + + +def create_file_object_from_string(string): + return io.StringIO(string) + + +def create_file_object_that_throws_unicode_decode_error(string): + + class BadUnicodeFile(object): + """For Python 2 compatibility, we can't extend io.StringIO, then override the __next__ + function. So we need to do this hackish way.""" + + def __init__(self, string): + self.obj = io.StringIO(string) + + def __iter__(self): + return self + + def __next__(self): + raise UnicodeDecodeError('encoding type', b'subject', 0, 1, 'exception message') + + if sys.version_info[0] == 2: # pragma: no cover + def next(self): + return self.__next__() + + return BadUnicodeFile(string) diff --git a/tests/util/mock_util.py b/tests/util/mock_util.py new file mode 100644 index 000000000..736a83adb --- /dev/null +++ b/tests/util/mock_util.py @@ -0,0 +1,104 @@ +""" + This is a collection of utility functions for easier, DRY testing. +""" +from collections import namedtuple +from subprocess import CalledProcessError + +import mock + + +_SubprocessMock = namedtuple( + 'SubprocessMock', + [ + 'expected_input', + 'mocked_output', + 'should_throw_exception', + ] +) + + +class SubprocessMock(_SubprocessMock): + """For use with mock_subprocess. + + :type expected_input: string + :param expected_input: only return mocked_output if input matches this + + :type mocked_output: mixed + :param mocked_output: value you want to return, when expected_input matches. + + :type should_throw_exception: bool + :param should_throw_exception: if True, will throw subprocess.CalledProcessError with + mocked output as error message + """ + def __new__(cls, expected_input, mocked_output, should_throw_exception=False): + return super(SubprocessMock, cls).__new__(cls, expected_input, mocked_output, should_throw_exception) + + +class PropertyMock(mock.Mock): + """Allows the mocking of class functions with the @property decorator""" + + def __get__(self, instance, owner): + return self() + + +def setup_global_mocks(obj, mocks=[]): # pragma: no cover + """ + Mocks out global objects, for general test cases. + :param obj: unittest.TestCase + :param mocks: mixed; either modules_to_mock_out :string, or + (modules_to_mock_out :string, autospec :boolean) + """ + for item in mocks: + autospec = True + if not isinstance(item, str) and len(item) > 1: + autospec = item[1] + item = item[0] + + m = mock.patch(item, autospec=autospec) + + obj.addCleanup(m.stop) + m.start() + + +def Any(cls): + """Used to call assert_called_with with any argument. + + Usage: Any(list) => allows any list to pass as input + """ + class Any(cls): + def __eq__(self, other): + return isinstance(other, cls) + return Any() + + +def mock_subprocess(case_tuple): + """We perform several subprocess.check_output calls, but we want to only mock + one of them at a time. This function helps us do that. + + :type case_tuple: tuple of SubprocessMock + :param case_tuple: See docstring for SubprocessMock + """ + def fn(inputs, **kwargs): + if len(inputs) >= 2 and inputs[1] == '--git-dir': + # Remove `--git-dir ` from git command. + # This is just a convenience / increased readability conditional + inputs = inputs[0:1] + inputs[3:] + + str_input = ' '.join( + map(lambda x: x.decode('utf-8') + if not isinstance(x, str) else x, inputs) + ) + for tup in case_tuple: + if not str_input.startswith(tup.expected_input): + # We don't care what is returned, if we're not mocking it. + continue + + if tup.should_throw_exception: + raise CalledProcessError(1, '', tup.mocked_output) + + return tup.mocked_output + + # Default return value is just a byte-string. + return b'' + + return fn diff --git a/tox.ini b/tox.ini new file mode 100644 index 000000000..c25273ae6 --- /dev/null +++ b/tox.ini @@ -0,0 +1,28 @@ +[tox] +project = detect_secrets +# These should match the travis env list +envlist = py27,py35,py36,pypy +tox_pip_extensions_ext_pip_custom_platform = true +tox_pip_extensions_ext_venv_update = true + +[testenv] +passenv = SSH_AUTH_SOCK +deps = -rrequirements-dev.txt +commands = + coverage erase + coverage run -m pytest tests + coverage report --show-missing --fail-under 99 + pre-commit run --all-files + +[testenv:venv] +basepython = /usr/bin/python3.6 +envdir = venv +commands = + pre-commit install -f --install-hooks + +[testenv:pre-commit] +deps = pre-commit>=0.16.3 +commands = pre-commit {posargs} + +[pep8] +ignore = E501