Skip to content

Commit

Permalink
Add Type checking and enforce lint (DataDog#98)
Browse files Browse the repository at this point in the history
* start integrating type testing

* start integrating type testing

* type fixing

* type fixing

* revert previous change

* revert previous change

* revert previous change

* revert previous change

* revert previous change

* type codebase

* type codebase

* type codebase

* fight mypy

* fix lint and enforce it in CI

* CI concurency

* add code quality instructions in README

* use makefile
  • Loading branch information
vdeturckheim authored Dec 5, 2022
1 parent a1d064c commit 8e814de
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 123 deletions.
37 changes: 31 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,37 @@ permissions:
contents: read

jobs:

type-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Type check with mypy
run: make type-check

lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Lint with flake8
run: make lint

unit-tests:
runs-on: ubuntu-latest

Expand All @@ -25,12 +56,6 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data --ignore=E203,W503
- name: Semgrep rules unit tests
run: make test-semgrep-rules
- name: Python unit tests
Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

test: test-semgrep-rules test-metadata-rules test-core

type-check:
mypy --install-types --non-interactive guarddog

lint:
flake8 guarddog --count --select=E9,F63,F7,F82 --show-source --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data
flake8 guarddog --count --max-line-length=120 --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data --ignore=E203,W503

test-semgrep-rules:
semgrep --metrics off --quiet --test --config guarddog/analyzer/sourcecode tests/analyzer/sourcecode

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ Running unit tests against Semgrep rules: `make test-semgrep-rules` (tests are [

Running unit tests against package metadata heuristics: `make test-metadata-rules` (tests are [here](https://github.com/DataDog/guarddog/tree/main/tests/analyzer/metadata)).

### Code quality checks

Run the type checker with
```shell
mypy --install-types --non-interactive guarddog
```
and the linter with
```shell
flake8 guarddog --count --select=E9,F63,F7,F82 --show-source --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data
flake8 guarddog --count --max-line-length=120 --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data --ignore=E203,W503
```

### Adding new source code heuristics

TBD
Expand Down
2 changes: 1 addition & 1 deletion guarddog/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from guarddog.scanners.package_scanner import PackageScanner
from guarddog.scanners.package_scanner import PackageScanner # NOQA
26 changes: 13 additions & 13 deletions guarddog/analyzer/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from pathlib import Path

from semgrep.semgrep_main import invoke_semgrep
from semgrep.semgrep_main import invoke_semgrep # type: ignore

from guarddog.analyzer.metadata.potentially_compromised_email_domain import PotentiallyCompromisedEmailDomainDetector
from guarddog.analyzer.metadata.empty_information import EmptyInfoDetector
Expand Down Expand Up @@ -62,7 +62,7 @@ def get_rules(file_extension, path):
"release_zero": ReleaseZeroDetector()
}

def analyze(self, path, info=None, rules=None) -> dict[str]:
def analyze(self, path, info=None, rules=None) -> dict:
"""
Analyzes a package in the given path
Expand Down Expand Up @@ -99,7 +99,6 @@ def analyze(self, path, info=None, rules=None) -> dict[str]:

metadata_results = self.analyze_metadata(info, metadata_rules)
sourcecode_results = self.analyze_sourcecode(path, sourcecode_rules)


# Concatenate dictionaries together
issues = metadata_results["issues"] + sourcecode_results["issues"]
Expand All @@ -108,7 +107,7 @@ def analyze(self, path, info=None, rules=None) -> dict[str]:

return {"issues": issues, "errors": errors, "results": results, "path": path}

def analyze_metadata(self, info, rules=None) -> dict[str]:
def analyze_metadata(self, info, rules=None) -> dict:
"""
Analyzes the metadata of a given package
Expand Down Expand Up @@ -136,7 +135,7 @@ def analyze_metadata(self, info, rules=None) -> dict[str]:

return {"results": results, "errors": errors, "issues": issues}

def analyze_sourcecode(self, path, rules=None) -> tuple[dict, int]:
def analyze_sourcecode(self, path, rules=None) -> dict:
"""
Analyzes the source code of a given package
Expand All @@ -150,17 +149,18 @@ def analyze_sourcecode(self, path, rules=None) -> tuple[dict, int]:
targetpath = Path(path)
all_rules = rules if rules is not None else self.sourcecode_ruleset

results = {rule: {} for rule in all_rules}
results = {rule: {} for rule in all_rules} # type: dict
errors = {}
issues = 0

if rules is None:
# No rule specified, run all rules
try:
response = invoke_semgrep(Path(self.sourcecode_path), [targetpath], exclude=self.exclude, no_git_ignore=True)
response = invoke_semgrep(Path(self.sourcecode_path), [targetpath], exclude=self.exclude,
no_git_ignore=True)
rule_results = self._format_semgrep_response(response, targetpath=targetpath)
issues += len(rule_results)

results = results | rule_results
except Exception as e:
errors["rules-all"] = f"failed to run rule: {str(e)}"
Expand Down Expand Up @@ -206,7 +206,7 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
"""

results = {}

for result in response["results"]:
rule_name = rule or result["check_id"].split(".")[-1]
code_snippet = result["extra"]["lines"]
Expand All @@ -222,8 +222,8 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
if rule_name not in result:
results[rule_name] = []
results[rule_name].append({
'location': location,
'code': code,
'location': location,
'code': code,
'message': result["extra"]["message"]
})

Expand All @@ -233,6 +233,6 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
def trim_code_snippet(self, code):
THRESHOLD = 250
if len(code) > THRESHOLD:
return code[:THRESHOLD-10] + '...' + code[len(code)-10:]
return code[: THRESHOLD - 10] + '...' + code[len(code) - 10:]
else:
return code
return code
9 changes: 7 additions & 2 deletions guarddog/analyzer/metadata/detector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from abc import abstractmethod
from typing import Optional


class Detector:
def __init__(self) -> None:
pass

# returns (ruleMatches, message)
def detect(self, package_info) -> tuple[bool, str]:
pass
@abstractmethod
def detect(self, package_info) -> tuple[bool, Optional[str]]:
pass
2 changes: 1 addition & 1 deletion guarddog/analyzer/metadata/empty_information.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def detect(self, package_info) -> tuple[bool, str]:
"""

sanitized_description = package_info["info"]["description"].strip()
return len(sanitized_description) == 0, 'This package has an empty description on PyPi'
return len(sanitized_description) == 0, 'This package has an empty description on PyPi'
20 changes: 15 additions & 5 deletions guarddog/analyzer/metadata/potentially_compromised_email_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"""

from datetime import datetime
from typing import Optional

import whois
import whois # type: ignore
from dateutil import parser
from packaging import version

Expand All @@ -24,7 +25,7 @@ class PotentiallyCompromisedEmailDomainDetector(Detector):
def __init__(self) -> None:
super()

def _get_domain_creation_date(self, email_domain) -> tuple[datetime, bool]:
def _get_domain_creation_date(self, email_domain) -> tuple[Optional[datetime], bool]:
"""
Gets the creation date of an email address domain
Expand Down Expand Up @@ -57,7 +58,7 @@ def _get_domain_creation_date(self, email_domain) -> tuple[datetime, bool]:

return creation_dates, True

def _get_project_latest_release_date(self, releases) -> datetime:
def _get_project_latest_release_date(self, releases) -> Optional[datetime]:
"""
Gets the most recent release date of a Python project
Expand All @@ -80,6 +81,7 @@ def _get_project_latest_release_date(self, releases) -> datetime:
upload_time_text = version_release[0]["upload_time_iso_8601"]
release_date = parser.isoparse(upload_time_text).replace(tzinfo=None)
return release_date
return None

def detect(self, package_info) -> tuple[bool, str]:
"""
Expand Down Expand Up @@ -113,7 +115,15 @@ def detect(self, package_info) -> tuple[bool, str]:
latest_project_release = self._get_project_latest_release_date(releases)
domain_creation_date, domain_exists = self._get_domain_creation_date(email_domain)
if not domain_exists:
return True, "The maintainer's email domain does not exist and can likely be registered by an attacker to compromise the maintainer's PyPi account"
return True, "The maintainer's email domain does not exist and can likely be registered by an attacker to" \
" compromise the maintainer's PyPi account"
if domain_creation_date is None:
return False, "No e-mail domain creation date found"
return latest_project_release < domain_creation_date, "The domain name of the maintainer's email address was re-registered after the latest release of this package. This can be an indicator that this is a custom domain that expired, and was leveraged by an attacker to compromise the package owner's PyPi account."
if latest_project_release is None:
return False, "Could not find latest release date"
return latest_project_release < domain_creation_date, "The domain name of the maintainer's email address was" \
" re-registered after the latest release of this " \
"package. This can be an indicator that this is a" \
" custom domain that expired, and was leveraged by" \
" an attacker to compromise the package owner's PyPi" \
" account."
3 changes: 2 additions & 1 deletion guarddog/analyzer/metadata/release_zero.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ def __init__(self) -> None:
super()

def detect(self, package_info) -> tuple[bool, str]:
return package_info["info"]['version'] in ['0.0.0', '0.0'], "The package has its latest release version to 0.0.0"
return (package_info["info"]['version'] in ['0.0.0', '0.0'],
"The package has its latest release version to 0.0.0")
10 changes: 6 additions & 4 deletions guarddog/analyzer/metadata/typosquatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from datetime import datetime, timedelta
from itertools import permutations
from typing import Optional

import requests

Expand Down Expand Up @@ -239,7 +240,7 @@ def get_typosquatted_package(self, package_name) -> list[str]:

return typosquatted

def detect(self, package_info) -> tuple[bool, str]:
def detect(self, package_info) -> tuple[bool, Optional[str]]:
"""
Uses a package's information from PyPI's JSON API to determine the
package is attempting a typosquatting attack
Expand All @@ -254,6 +255,7 @@ def detect(self, package_info) -> tuple[bool, str]:
"""
similar_package_names = self.get_typosquatted_package(package_info["info"]["name"])
if len(similar_package_names) > 0:
return True, "This package closely ressembles the following package names, and might be a typosquatting attempt: " + ", ".join(similar_package_names)

return False, None
return True, "This package closely ressembles the following package names, and might be a typosquatting " \
"attempt: " + ", ".join(similar_package_names)

return False, None
36 changes: 23 additions & 13 deletions guarddog/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ALL_RULES = analyzer.sourcecode_ruleset | analyzer.metadata_ruleset
EXIT_CODE_ISSUES_FOUND = 1


@click.group
def cli():
"""Guard Dog cli tool to detect PyPI malware"""
Expand All @@ -27,7 +28,8 @@ def cli():
@cli.command("verify")
@click.argument("path")
@click.option("--json", default=False, is_flag=True, help="Dump the output as JSON to standard out")
@click.option("--exit-non-zero-on-finding", default=False, is_flag=True, help="Exit with a non-zero status code if at least one issue is identified")
@click.option("--exit-non-zero-on-finding", default=False, is_flag=True, help="Exit with a non-zero status code if at "
"least one issue is identified")
def verify(path, json, exit_non_zero_on_finding):
"""Verify a requirements.txt file
Expand All @@ -37,24 +39,27 @@ def verify(path, json, exit_non_zero_on_finding):
scanner = RequirementsScanner()
results = scanner.scan_local(path)
for result in results:
identifier = result['dependency'] if result['version'] is None else f"{result['dependency']} version {result['version']}"
identifier = result['dependency'] if result['version'] is None \
else f"{result['dependency']} version {result['version']}"
if not json:
print_scan_results(result.get('result'), identifier)

if json:
import json as js
print(js.dumps(results))

if exit_non_zero_on_finding:
exit_with_status_code(results)


@cli.command("scan")
@click.argument("identifier")
@click.option("-v", "--version", default=None, help="Specify a version to scan")
@click.option("-r", "--rules", multiple=True, type=click.Choice(ALL_RULES, case_sensitive=False))
@click.option("-x", "--exclude-rules", multiple=True, type=click.Choice(ALL_RULES, case_sensitive=False))
@click.option("--json", default=False, is_flag=True, help="Dump the output as JSON to standard out")
@click.option("--exit-non-zero-on-finding", default=False, is_flag=True, help="Exit with a non-zero status code if at least one issue is identified")
@click.option("--exit-non-zero-on-finding", default=False, is_flag=True, help="Exit with a non-zero status code if at "
"least one issue is identified")
def scan(identifier, version, rules, exclude_rules, json, exit_non_zero_on_finding):
"""Scan a package
Expand Down Expand Up @@ -91,6 +96,7 @@ def scan(identifier, version, rules, exclude_rules, json, exit_non_zero_on_findi
if exit_non_zero_on_finding:
exit_with_status_code(results)


# Determines if the input passed to the 'scan' command is a local package name
def is_local_package(input):
identifier_is_path = re.search(r"(.{0,2}\/)+.+", input)
Expand All @@ -102,24 +108,28 @@ def print_scan_results(results, identifier):
num_issues = results.get('issues')

if num_issues == 0:
print("Found " + colored('0 potentially malicious indicators', 'green', attrs=['bold']) + " scanning " + colored(identifier, None, attrs=['bold']))
print("Found " + colored('0 potentially malicious indicators', 'green',
attrs=['bold']) + " scanning " + colored(identifier, None, attrs=['bold']))
print()
return

print("Found " + colored(str(num_issues) + ' potentially malicious indicators', 'red', attrs=['bold']) + " in " + colored(identifier, None, attrs=['bold']))

print("Found " + colored(str(num_issues) + ' potentially malicious indicators', 'red',
attrs=['bold']) + " in " + colored(identifier, None, attrs=['bold']))
print()

results = results.get('results', [])
for finding in results:
description = results[finding]
if type(description) == str: # package metadata
if type(description) == str: # package metadata
print(colored(finding, None, attrs=['bold']) + ': ' + description)
print()
elif type(description) == list: # semgrep rule result:
elif type(description) == list: # semgrep rule result:
source_code_findings = description
print(colored(finding, None, attrs=['bold']) + ': found ' + str(len(source_code_findings)) + ' source code matches')
print(colored(finding, None,
attrs=['bold']) + ': found ' + str(len(source_code_findings)) + ' source code matches')
for finding in source_code_findings:
print(' * ' + finding['message'] + ' at ' + finding['location'] + '\n ' + format_code_line_for_output(finding['code']))
print(' * ' + finding['message']
+ ' at ' + finding['location'] + '\n ' + format_code_line_for_output(finding['code']))
print()


Expand All @@ -131,4 +141,4 @@ def format_code_line_for_output(code):
def exit_with_status_code(results):
num_issues = results.get('issues', 0)
if num_issues > 0:
exit(EXIT_CODE_ISSUES_FOUND)
exit(EXIT_CODE_ISSUES_FOUND)
Loading

0 comments on commit 8e814de

Please sign in to comment.