diff --git a/.gitignore b/.gitignore index efcf5512..252b08cb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .vunnel.yaml -./data/ +/data/ +/backup/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 65b66532..4e7826c5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -45,7 +45,7 @@ repos: hooks: - id: system name: black - entry: poetry run black . + entry: poetry run black src tests pass_filenames: false language: system @@ -53,10 +53,10 @@ repos: hooks: - id: system name: isort - entry: poetry run isort . + entry: poetry run isort tests src --filter-files pass_filenames: false language: system - + files: \.py$ - repo: local hooks: - id: system diff --git a/poetry.lock b/poetry.lock index 1d28d765..ff22c588 100644 --- a/poetry.lock +++ b/poetry.lock @@ -163,15 +163,19 @@ optional = false python-versions = "*" [[package]] -name = "dacite" -version = "1.6.0" -description = "Simple creation of data classes from dictionaries." +name = "dataclass-wizard" +version = "0.22.2" +description = "Marshal dataclasses to/from JSON. Use field properties with initial values. Construct a dataclass schema with JSON input." category = "main" optional = false -python-versions = ">=3.6" +python-versions = "*" + +[package.dependencies] +typing-extensions = {version = ">=3.7.4.2", markers = "python_version <= \"3.9\""} [package.extras] -dev = ["black", "coveralls", "mypy", "pylint", "pytest (>=5)", "pytest-cov"] +timedelta = ["pytimeparse (>=1.1.7)"] +yaml = ["PyYAML (>=5.3)"] [[package]] name = "defusedxml" @@ -768,7 +772,7 @@ python-versions = "*" name = "typing-extensions" version = "4.4.0" description = "Backported and Experimental Type Hints for Python 3.7+" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" @@ -825,7 +829,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools" [metadata] lock-version = "1.1" python-versions = ">=3.8.1,<4.0" -content-hash = "9dbaaf7d6f2b85ee34d97b6436d82019c58f6544196d1e5753dc66aca1b380b2" +content-hash = "b68a490fc1b02dc6a29a1f3bc9fcaa7511e5cf157405782226bba819c622a236" [metadata.files] attrs = [ @@ -943,9 +947,9 @@ cvss = [ {file = "cvss-2.5-py2.py3-none-any.whl", hash = "sha256:c1a48f8a6024642b986c51182a79d7aa2c05d1d75c57ad3496b9ab0451a8e89a"}, {file = "cvss-2.5.tar.gz", hash = "sha256:63f648cffb2647498cf28646a7004fe0f48872c9ba7d8653bebd710409f8ba0e"}, ] -dacite = [ - {file = "dacite-1.6.0-py3-none-any.whl", hash = "sha256:4331535f7aabb505c732fa4c3c094313fc0a1d5ea19907bf4726a7819a68b93f"}, - {file = "dacite-1.6.0.tar.gz", hash = "sha256:d48125ed0a0352d3de9f493bf980038088f45f3f9d7498f090b50a847daaa6df"}, +dataclass-wizard = [ + {file = "dataclass-wizard-0.22.2.tar.gz", hash = "sha256:211f842e5e9a8ace50ba891ef428cd78c82579fb98024f80f3e630ca8d1946f6"}, + {file = "dataclass_wizard-0.22.2-py2.py3-none-any.whl", hash = "sha256:49be36ecc64bc5a1e9a35a6bad1d71d33b6b9b06877404931a17c6a3a6dfbb10"}, ] defusedxml = [ {file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"}, diff --git a/pyproject.toml b/pyproject.toml index 7cb13508..dce1e813 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,8 +23,8 @@ ijson = "^3.1.4" xxhash = "^3.1.0" cvss = "^2.5" python-dateutil = "^2.8.2" -dacite = "^1.6.0" defusedxml = "^0.7.1" +dataclass-wizard = "^0.22.2" [tool.poetry.group.dev.dependencies] pytest = "^7.2.0" @@ -57,6 +57,7 @@ force_grid_wrap = 0 use_parentheses = true ensure_newline_before_comments = true line_length = 100 +skip_gitignore = true [tool.pylint.messages_control] disable = [ @@ -121,11 +122,6 @@ exclude = '''(?x)( | ^tests/.*$ # any tests )''' -[[tool.mypy.overrides]] -# https://github.com/konradhalas/dacite/issues/133 -module = "dacite" -implicit_reexport = true - [tool.black] line-length = 130 exclude = ''' @@ -141,6 +137,8 @@ exclude = ''' | buck-out | build | dist + | data + | backup )/ ) ''' diff --git a/schema/vulnerability/os/schema-1.0.0.json b/schema/vulnerability/os/schema-1.0.0.json index 77117f7d..1add15db 100644 --- a/schema/vulnerability/os/schema-1.0.0.json +++ b/schema/vulnerability/os/schema-1.0.0.json @@ -107,6 +107,31 @@ "Metadata": { "type": "object", "properties": { + "Issued": { + "type": "string" + }, + "RefId": { + "type": "string" + }, + "CVE": { + "type": "array", + "items": [ + { + "type": "object", + "properties": { + "Name": { + "type": "string" + }, + "Link": { + "type": "string" + } + }, + "required": [ + "Name" + ] + } + ] + }, "NVD": { "type": "object", "properties": { diff --git a/src/vunnel/__main__.py b/src/vunnel/__main__.py new file mode 100644 index 00000000..e39ac6d8 --- /dev/null +++ b/src/vunnel/__main__.py @@ -0,0 +1,3 @@ +from vunnel.cli import run + +run() diff --git a/src/vunnel/cli/cli.py b/src/vunnel/cli/cli.py index 784897a9..18189857 100644 --- a/src/vunnel/cli/cli.py +++ b/src/vunnel/cli/cli.py @@ -27,7 +27,7 @@ def cli(ctx, verbose: bool, config_path: str) -> None: # type: ignore log_format = "%(log_color)s %(asctime)s %(name)s [%(levelname)s] %(message)s" if ctx.obj.log.slim: - log_format = "%(log_color)s [%(levelname)s] %(message)s" + log_format = "%(log_color)s %(message)s" logging.config.dictConfig( { @@ -129,7 +129,7 @@ def status_provider(cfg: config.Application, provider_names: str) -> None: ├── Inputs: {len(state.input.files)} files │ {state.input.timestamp} └── Results: {len(state.results.files)} files - {state.results.timestamp} + {state.results.timestamp} """ print(tmpl) except FileNotFoundError: diff --git a/src/vunnel/cli/config.py b/src/vunnel/cli/config.py index fbd398de..15019a27 100644 --- a/src/vunnel/cli/config.py +++ b/src/vunnel/cli/config.py @@ -2,10 +2,10 @@ from dataclasses import dataclass, field, fields from typing import Any -import dacite import yaml +from dataclass_wizard import fromdict -from vunnel import provider, providers +from vunnel import providers @dataclass @@ -53,19 +53,9 @@ def load(path: str = ".vunnel.yaml") -> Application: # noqa try: with open(path, encoding="utf-8") as f: app_object = yaml.safe_load(f.read()) - cfg = dacite.from_dict( + cfg = fromdict( Application, app_object, - config=dacite.Config( - cast=[ - provider.OnErrorAction, - provider.InputStatePolicy, - provider.ResultStatePolicy, - ], - # type_hooks={ - # - # } - ), ) if cfg is None: raise FileNotFoundError("parsed empty config") diff --git a/src/vunnel/providers/alpine/parser.py b/src/vunnel/providers/alpine/parser.py index 3f015d7f..bbbe6cd2 100644 --- a/src/vunnel/providers/alpine/parser.py +++ b/src/vunnel/providers/alpine/parser.py @@ -76,8 +76,8 @@ def _download(self, skip_if_exists=False): os.remove(os.path.join(self.secdb_dir_path, "alpine-secdb-master.tar.gz")) if skip_if_exists and os.path.exists(self.secdb_dir_path): - self.logger.warning( - "skip_if_exists flag enabled and found source under {}. Skipping download".format(self.secdb_dir_path) + self.logger.debug( + "'skip_if_exists' flag enabled and found source under {}. Skipping download".format(self.secdb_dir_path) ) else: links = [] @@ -85,7 +85,7 @@ def _download(self, skip_if_exists=False): if not os.path.exists(self.secdb_dir_path): os.makedirs(self.secdb_dir_path, exist_ok=True) - self.logger.info("Downloading alpine secdb metadata from: {}".format(self.metadata_url)) + self.logger.info("downloading alpine secdb metadata from: {}".format(self.metadata_url)) r = requests.get(self.metadata_url, timeout=self.download_timeout) if r.status_code == 200: try: @@ -94,21 +94,21 @@ def _download(self, skip_if_exists=False): parser.feed(r.text) links = parser.links except: - self.logger.warning("Unable to html parse secdb landing page content for links") + self.logger.warning("unable to html parse secdb landing page content for links") if not links: - self.logger.debug("String parsing secdb landing page content for links") + self.logger.debug("string parsing secdb landing page content for links") links = re.findall(self._link_finder_regex_, r.text) else: r.raise_for_status() except Exception: - self.logger.exception("Error downloading or parsing alpine secdb metadata") + self.logger.exception("error downloading or parsing alpine secdb metadata") raise if links: - self.logger.debug("Found release specific secdb links: {}".format(links)) + self.logger.debug("found release specific secdb links: {}".format(links)) else: - raise Exception("Unable to find release specific secdb links") + raise Exception("unable to find release specific secdb links") for link in links: if link not in ignore_links: @@ -130,7 +130,7 @@ def _download(self, skip_if_exists=False): else: r.raise_for_status() except: - self.logger.exception("Ignoring error processing secdb for {}".format(link)) + self.logger.exception("ignoring error processing secdb for {}".format(link)) def _load(self): """ @@ -156,7 +156,7 @@ def _load(self): for dbtype in self._db_types: secdb_yaml_path = os.path.join(self.secdb_dir_path, f, "{}.yaml".format(dbtype)) if os.path.exists(secdb_yaml_path): - self.logger.debug("Loading secdb data from: {}".format(secdb_yaml_path)) + self.logger.debug("loading secdb data from: {}".format(secdb_yaml_path)) with open(secdb_yaml_path, "r") as FH: yaml_data = yaml.safe_load(FH) dbtype_data_dict[dbtype] = yaml_data @@ -180,7 +180,7 @@ def _normalize(self, release, dbtype_data_dict): vuln_dict = {} for dbtype, data in dbtype_data_dict.items(): - self.logger.debug("Normalizing {}:{}".format(release, dbtype)) + self.logger.info("processing {}:{}".format(release, dbtype)) if data["packages"]: for el in data["packages"]: @@ -251,6 +251,5 @@ def get(self, skip_if_exists: bool = False): self._download(skip_if_exists) for release, dbtype_data_dict in self._load(): - print(release) # normalize the loaded data yield release, self._normalize(release, dbtype_data_dict) diff --git a/src/vunnel/providers/amazon/parser.py b/src/vunnel/providers/amazon/parser.py index 4ff7ce9b..fcb03213 100644 --- a/src/vunnel/providers/amazon/parser.py +++ b/src/vunnel/providers/amazon/parser.py @@ -261,7 +261,13 @@ def map_to_vulnerability(version, alas, fixed_in): v.NamespaceName = namespace + ":" + version v.Description = "" v.Severity = severity_map.get(alas.sev, "Unknown") - v.Metadata = {"CVE": alas.cves if alas.cves else []} + v.Metadata = { + "CVE": [], + } + + if alas.cves: + v.Metadata["CVE"] = [{"Name": cve} for cve in alas.cves] + v.Link = alas.url for item in fixed_in: f = FixedIn() diff --git a/src/vunnel/providers/ubuntu/__init__.py b/src/vunnel/providers/ubuntu/__init__.py index ff187579..ea494f39 100644 --- a/src/vunnel/providers/ubuntu/__init__.py +++ b/src/vunnel/providers/ubuntu/__init__.py @@ -13,6 +13,7 @@ class Config: request_timeout: int = 125 additional_versions: dict[str, str] = field(default_factory=lambda: {}) enable_rev_history: bool = True + max_workers: int = 5 class Provider(provider.Provider): @@ -28,6 +29,7 @@ def __init__(self, root: str, config: Config): logger=self.logger, additional_versions=self.config.additional_versions, enable_rev_history=self.config.enable_rev_history, + max_workers=self.config.max_workers, ) @classmethod @@ -41,7 +43,7 @@ def update(self) -> list[str]: writer.write( identifier=f"{namespace}-{vuln_id}".lower(), schema=self.schema, - payload=record, + payload={"Vulnerability": record}, ) return self.parser.urls diff --git a/src/vunnel/providers/ubuntu/git.py b/src/vunnel/providers/ubuntu/git.py index 7ebae891..4f2146eb 100644 --- a/src/vunnel/providers/ubuntu/git.py +++ b/src/vunnel/providers/ubuntu/git.py @@ -1,9 +1,9 @@ # flake8: noqa -import copy import logging import os import re +import shlex import shutil import subprocess # nosec import tempfile @@ -14,136 +14,125 @@ GitCommitSummary = namedtuple("GitCommitSummary", ["sha", "updated", "deleted"]) GitRevision = namedtuple("GitRevision", ["sha", "status", "file"]) -logger = logging.getLogger(__name__) - -class GitWrapper(object): +class GitWrapper: __active_retired_filename_regex__ = re.compile(r"(active|retired)/CVE-\S+") __cve_id_regex__ = re.compile(r"CVE-\S+") - - _check_cmd_ = ["git", "--version"] - _is_git_repo_cmd_ = ["git", "rev-parse", "--is-inside-work-tree"] - _clone_cmd_ = ["git", "clone", "-b", "master", "{src}", "{dest}"] - _check_out_cmd_ = ["git", "checkout", "master"] - _pull_cmd_ = ["git", "pull", "-f"] - _fetch_cmd_ = ["git", "fetch", "--all"] - _pull_ff_only_cmd_ = ["git", "pull", "--ff-only"] - _reset_head_cmd_ = ["git", "reset", "--hard", "origin/master"] - _change_set_cmd_ = [ - "git", - "log", - "--no-renames", - "--no-merges", - "--name-status", - "--format=oneline", - "{from_rev}..{to_rev}", - ] - _rev_history_cmd_ = [ - "git", - "log", - "--no-merges", - "--follow", - "--name-status", - "--format=oneline", - "{from_rev}..", - "--", - "{file}", - ] - _get_rev_content_cmd_ = ["git", "show", "{sha}:{file}"] - _head_rev_cmd_ = ["git", "rev-parse", "HEAD"] - - def __init__(self, source, checkout_dest, workspace=None): + _check_cmd_ = "git --version" + _is_git_repo_cmd_ = "git rev-parse --is-inside-work-tree" + _clone_cmd_ = "git clone -b master {src} {dest}" + _check_out_cmd_ = "git checkout master" + _pull_cmd_ = "git pull -f" + _fetch_cmd_ = "git fetch --all" + _pull_ff_only_cmd_ = "git pull --ff-only" + _reset_head_cmd_ = "git reset --hard origin/master" + _write_graph_ = "git commit-graph write --reachable --changed-paths" + _change_set_cmd_ = "git log --no-renames --no-merges --name-status --format=oneline {from_rev}..{to_rev}" + _rev_history_cmd_ = "git log --no-merges --name-status --format=oneline {from_rev} -- {file}" + _get_rev_content_cmd_ = "git show {sha}:{file}" + _head_rev_cmd_ = "git rev-parse HEAD" + + def __init__(self, source, checkout_dest, workspace=None, logger=None): self.src = source self.dest = checkout_dest self.workspace = workspace if workspace else tempfile.gettempdir() - @classmethod - def check(cls, destination): + if not logger: + logger = logging.getLogger(self.__class__.__name__) + self.logger = logger + try: - out = GitWrapper._exec_cmd(cls._check_cmd_) - logger.debug("git executable verified using cmd: {}, output: {}".format(" ".join(cls._check_cmd_), out)) + out = self._exec_cmd(self._check_cmd_) + self.logger.trace("git executable verified using cmd: {}, output: {}".format(self._check_cmd_, out)) except: - logger.warn('Could not find required "git" executable. Please install git on host') - return False + self.logger.exception('could not find required "git" executable. Please install git on host') + raise + def _check(self, destination): try: - cmd = copy.copy(cls._is_git_repo_cmd_) - out = GitWrapper._exec_cmd(cmd, cwd=destination) - logger.debug("Check for git repository, cmd: {}, output: {}".format(" ".join(cmd), out.decode())) + cmd = self._is_git_repo_cmd_ + out = self._exec_cmd(cmd, cwd=destination) + self.logger.debug("check for git repository, cmd: {}, output: {}".format(cmd, out.decode())) except: - logger.warn("git working tree not found at {}".format(destination)) + self.logger.warning("git working tree not found at {}".format(destination)) return False return True @utils.retry_with_backoff() def init_repo(self, force=False): - if self.check(self.dest) and not force: - logger.debug("Found git repository at {} Skipping init".format(self.dest)) - else: - try: - if os.path.exists(self.dest): - logger.debug("Found {}. Deleting it before cloning git repository") - shutil.rmtree(self.dest, ignore_errors=True) + if force: + if os.path.exists(self.dest): + self.logger.debug("deleting existing repository") + shutil.rmtree(self.dest, ignore_errors=True) - logger.info("Cloning git repository {} to {}".format(self.src, self.dest)) + if self._check(self.dest): + self.logger.debug("found git repository at {}".format(self.dest)) + self.sync_with_upstream() + return - cmd = copy.copy(self._clone_cmd_) - cmd[4] = cmd[4].format(src=self.src) - cmd[5] = cmd[5].format(dest=self.dest) + try: + self.logger.info("cloning git repository {} to {}".format(self.src, self.dest)) - out = GitWrapper._exec_cmd(cmd) - logger.debug("Initialized git repo, cmd: {}, output: {}".format(" ".join(cmd), out.decode())) - except: - logger.exception("Failed to clone initialize git repository {} to {}".format(self.src, self.dest)) - raise + cmd = self._clone_cmd_.format(src=self.src, dest=self.dest) + out = self._exec_cmd(cmd) + + self.logger.debug("initialized git repo, cmd: {}, output: {}".format(cmd, out.decode())) + self._write_graph() + except: + self.logger.exception("failed to clone initialize git repository {} to {}".format(self.src, self.dest)) + raise @utils.retry_with_backoff() def sync_with_upstream(self): try: try: - GitWrapper._exec_cmd(self._check_out_cmd_, cwd=self.dest) + self._exec_cmd(self._check_out_cmd_, cwd=self.dest) except: # nosec pass - out = GitWrapper._exec_cmd(self._pull_ff_only_cmd_, cwd=self.dest) - logger.debug("Synced with upstream git repo, output: {}".format(out.decode())) + out = self._exec_cmd(self._pull_ff_only_cmd_, cwd=self.dest) + self.logger.debug("synced with upstream git repo, output: {}".format(out.decode())) + self._write_graph() except: - logger.exception("Failed to git pull") + self.logger.exception("failed to git pull") raise - def get_merged_change_set(self, from_rev, to_rev=None): + def _write_graph(self): + """ + writes out a binary representation of the commit graph as a git object, enabling + huge performance gains when traversing the graph (e.g. git log) + """ try: - logger.debug("Fetching changes set between revisions {} - {}".format(from_rev, to_rev)) + self.logger.debug("writing commit graph") + out = self._exec_cmd(self._write_graph_, cwd=self.dest) + except: + self.logger.exception("failed to write commit graph") + raise - cmd = copy.copy(self._change_set_cmd_) - cmd[6] = cmd[6].format(from_rev=from_rev, to_rev=to_rev if to_rev else "") + def get_merged_change_set(self, from_rev, to_rev=None): + try: + self.logger.trace("fetching changes set between revisions {} - {}".format(from_rev, to_rev)) - out = GitWrapper._exec_cmd(cmd, cwd=self.dest) + cmd = self._change_set_cmd_.format(from_rev=from_rev, to_rev=to_rev if to_rev else "") + out = self._exec_cmd(cmd, cwd=self.dest) commit_list = self._parse_log(out.decode()) return self._compute_change_set(commit_list) except: - logger.exception("Failed to compute logically modified and removed CVEs between commits") + self.logger.exception("failed to compute logically modified and removed CVEs between commits") raise def get_revision_history(self, cve_id, file_path, from_rev=None): try: - logger.debug("Fetching revision history for {}".format(file_path)) - - cmd = copy.copy(self._rev_history_cmd_) - cmd[8] = cmd[8].format(file=file_path) - - if from_rev: - cmd[6] = cmd[6].format(from_rev=from_rev) - else: - del cmd[6] + self.logger.trace("fetching revision history for {}".format(file_path)) - out = GitWrapper._exec_cmd(cmd, cwd=self.dest) + cmd = self._rev_history_cmd_.format(file=file_path, from_rev=f"{from_rev}.." if from_rev else "") + out = self._exec_cmd(cmd, cwd=self.dest) return self._parse_revision_history(cve_id, out.decode()) except: - logger.exception("Failed to fetch the revision history for {}".format(file_path)) + self.logger.exception("failed to fetch the revision history for {}".format(file_path)) raise def get_content(self, git_rev): @@ -151,26 +140,24 @@ def get_content(self, git_rev): raise ValueError("Input must be a GitRevision") try: - logger.debug("Fetching content for {} from git commit {}".format(git_rev.file, git_rev.sha)) - - cmd = copy.copy(self._get_rev_content_cmd_) - cmd[2] = cmd[2].format(sha=git_rev.sha, file=git_rev.file) + # self.logger.trace("fetching content for {} from git commit {}".format(git_rev.file, git_rev.sha)) - out = GitWrapper._exec_cmd(cmd, cwd=self.dest) + cmd = self._get_rev_content_cmd_.format(sha=git_rev.sha, file=git_rev.file) + out = self._exec_cmd(cmd, cwd=self.dest) return out.decode().splitlines() except: - logger.exception("Failed to get content for {} from git commit {}".format(git_rev.file, git_rev.sha)) + self.logger.exception("failed to get content for {} from git commit {}".format(git_rev.file, git_rev.sha)) def get_current_rev(self): try: - rev = GitWrapper._exec_cmd(self._head_rev_cmd_, cwd=self.dest) + rev = self._exec_cmd(self._head_rev_cmd_, cwd=self.dest) return rev.decode().strip() if isinstance(rev, bytes) else rev except: - logger.exception() + self.logger.exception() - @staticmethod - def _parse_revision_history(cve_id, history): + @classmethod + def _parse_revision_history(cls, cve_id, history): """ eabaf525ae78eea3cd9f6063721afd1111efcd5c ran process_cves R100 active/CVE-2017-16011 ignored/CVE-2017-16011 @@ -194,7 +181,7 @@ def _parse_revision_history(cve_id, history): for x in range(0, len(history_lines), 2): # TODO check status is not D and file name matches CVE-* - rev = GitWrapper._parse_revision(history_lines[x : x + 2]) + rev = cls._parse_revision(history_lines[x : x + 2]) # don't include deleted revisions if rev.status.startswith("D"): @@ -264,8 +251,7 @@ def _compute_change_set(commit_list): return modified, removed - @staticmethod - def _parse_log(ordered_changes): + def _parse_log(self, ordered_changes): """ Input in the form @@ -299,7 +285,7 @@ def _parse_log(ordered_changes): if components and len(components) > 1: if len(components[0]) > 5: # indicates this is a commit sha since length greater than any change status if commit_lines: # encountered next commit, process the stored one first - c = GitWrapper._parse_normalized_commit(commit_lines) + c = self._parse_normalized_commit(commit_lines, self.logger) commits_list.append(c) if c else None del commit_lines[:] # else: # encountered the first commit line, keep going @@ -313,14 +299,13 @@ def _parse_log(ordered_changes): # process the last commit if any if commit_lines: - c = GitWrapper._parse_normalized_commit(commit_lines) + c = self._parse_normalized_commit(commit_lines, self.logger) commits_list.append(c) if c else None del commit_lines[:] return commits_list - @staticmethod - def _parse_normalized_commit(commit_lines): + def _parse_normalized_commit(self, commit_lines, logger): """ A list of lists where each inner list represents a line in the commit log [ @@ -344,7 +329,7 @@ def _parse_normalized_commit(commit_lines): deleted = {} for components in commit_lines[1:]: - if re.match(GitWrapper.__active_retired_filename_regex__, components[1]): + if re.match(self.__active_retired_filename_regex__, components[1]): cve_id = components[1].split("/", 1)[1] if components[0] == "A" or components[0] == "M": # CVE added to or modified in active or retired directory @@ -357,7 +342,7 @@ def _parse_normalized_commit(commit_lines): deleted[cve_id] = components[1] else: # either not a commit line or an irrelevant file, ignore it - logger.warn("Encountered unknown change symbol {}".format(components[0])) + self.logger.warn("encountered unknown change symbol {}".format(components[0])) else: # not a match pass @@ -368,8 +353,7 @@ def _parse_normalized_commit(commit_lines): else: return None - @staticmethod - def _exec_cmd(cmd, *args, **kwargs): + def _exec_cmd(self, cmd, *args, **kwargs): """ Run a command with errors etc handled :param cmd: list of arguments (including command name, e.g. ['ls', '-l]) @@ -378,11 +362,12 @@ def _exec_cmd(cmd, *args, **kwargs): :return: """ try: - logger.debug("Executing: {}".format(cmd)) + self.logger.trace("running: {}".format(cmd)) + cmd_list = shlex.split(cmd) if "stdout" in kwargs: - return subprocess.check_call(cmd, *args, **kwargs) # nosec + return subprocess.check_call(cmd_list, *args, **kwargs) # nosec else: - return subprocess.check_output(cmd, *args, **kwargs) # nosec + return subprocess.check_output(cmd_list, *args, **kwargs) # nosec except Exception as e: - logger.exception("Error executing command: {}".format(cmd)) + self.logger.exception("error executing command: {}".format(cmd)) raise e diff --git a/src/vunnel/providers/ubuntu/parser.py b/src/vunnel/providers/ubuntu/parser.py index f2ce42e5..aa84c295 100644 --- a/src/vunnel/providers/ubuntu/parser.py +++ b/src/vunnel/providers/ubuntu/parser.py @@ -1,10 +1,12 @@ # flake8: noqa +import concurrent.futures import copy import enum import json import logging import os +import queue import re import time from collections import namedtuple @@ -446,7 +448,7 @@ def map_parsed(parsed_cve, logger=None): # Map keyed by namespace name vulns = {} if not (parsed_cve.get("Candidate") or parsed_cve.get("Name")): - logger.error("Could not find a Candidate or Name for parsed cve: {}".format(parsed_cve)) + logger.error("could not find a Candidate or Name for parsed cve: {}".format(parsed_cve)) return [] for p in parsed_cve.get("patches"): @@ -483,7 +485,7 @@ def map_parsed(parsed_cve, logger=None): pkg.Version = p.get("version") if pkg.Version is None: logger.warn( - 'Found CVE {} in ubuntu version {} with "released" status for pkg {} but no version for release. Released patches should have version info, but missing in source data. Marking package as not vulnerable'.format( + 'found CVE {} in ubuntu version {} with "released" status for pkg {} but no version for release. Released patches should have version info, but missing in source data. Marking package as not vulnerable'.format( r.Name, r.NamespaceName, pkg.Name ) ) @@ -614,27 +616,21 @@ def __init__( logger: Optional[logging.Logger] = None, additional_versions: Optional[dict[str, str]] = None, enable_rev_history: bool = True, + max_workers: int = 5, ): self.vc_workspace = os.path.join(workspace, self._vc_working_dir) self.norm_workspace = os.path.join(workspace, self._normalized_cve_dir) - self.git_wrapper = GitWrapper(source=self._git_src, checkout_dest=self.vc_workspace) if not logger: logger = logging.getLogger(self.__class__.__name__) self.logger = logger self.urls = [self._git_https] + self.git_wrapper = GitWrapper(source=self._git_src, checkout_dest=self.vc_workspace, logger=logger) + if additional_versions: ubuntu_version_names.update(additional_versions) self.enable_rev_history = enable_rev_history - - def _get_git_repo(self): - self.logger.debug("Checking for git repository") - if not GitWrapper.check(self.vc_workspace): - self.logger.info("Initializing git repository from {}".format(self._git_src)) - self.git_wrapper.init_repo(force=True) - else: - self.logger.info("Found git repository and pulling updates") - self.git_wrapper.sync_with_upstream() + self._max_workers = max_workers def fetch(self, skip_if_exists=False): # setup merged workspace @@ -642,7 +638,7 @@ def fetch(self, skip_if_exists=False): os.makedirs(self.norm_workspace) # sync git repo - self._get_git_repo() + self.git_wrapper.init_repo() # get the last processed revision if available and the current latest revision last_rev = self._load_last_processed_rev() @@ -654,7 +650,7 @@ def fetch(self, skip_if_exists=False): self._save_last_processed_rev(current_rev) # load merged state and map it to vulnerabilities - self.logger.debug("Loading processed CVE content and transforming into to vulnerabilities") + self.logger.debug("loading processed CVE content and transforming into to vulnerabilities") for merged_cve in self._merged_cve_iterator(): for v in map_parsed(merged_cve, self.logger): @@ -662,44 +658,83 @@ def fetch(self, skip_if_exists=False): def _process_data(self, vc_dir, to_rev, from_rev=None): self.logger.debug( - "Processing data from git repository: {}, from revision: {}, to revision: {}".format(vc_dir, from_rev, to_rev) + "processing data from git repository: {}, from revision: {}, to revision: {}".format(vc_dir, from_rev, to_rev) ) # gather a list of changed files if the last repo revision processed is available updated_paths = [] deleted_ids = [] if from_rev and to_rev and from_rev != to_rev: - self.logger.debug("Fetching changes to CVEs between revisions {} and {}".format(from_rev, to_rev)) + self.logger.debug("fetching changes to CVEs between revisions {} and {}".format(from_rev, to_rev)) modified, removed = self.git_wrapper.get_merged_change_set(from_rev=from_rev, to_rev=to_rev) updated_paths = list(modified.values()) if modified else [] deleted_ids = list(removed.keys()) if removed else [] - self.logger.debug("Detected {} CVE updates (add/modify/rename)".format(len(updated_paths))) - self.logger.debug("Detected {} CVE deletions".format(len(deleted_ids))) + self.logger.debug("detected {} CVE updates (add/modify/rename)".format(len(updated_paths))) + self.logger.debug("detected {} CVE deletions".format(len(deleted_ids))) # Load cves from active and retired directories and spool merged state to disk - for d in [self._active_cve_dir, self._retired_cve_dir]: - cve_dir = os.path.join(vc_dir, d) - for cve_id in filter(lambda x: _cve_filename_regex.match(x), os.listdir(cve_dir)): - f = os.path.join(cve_dir, cve_id) - cve_rel_path = "/".join([d, cve_id]) - - if cve_rel_path in updated_paths: - # merge cves updated since last revision or all if the last processed revision is not available - self.logger.debug("CVE updated since last run, processing {}".format(cve_rel_path)) - merged_cve = self._merge_cve(cve_id, cve_rel_path, f, to_rev) - elif not self._merged_cve_exists(cve_id): - # merge may be required since the saved state is not found - self.logger.debug("CVE merged state not found, processing {}".format(cve_rel_path)) - merged_cve = self._merge_cve(cve_id, cve_rel_path, f, to_rev) - else: - # merge may be required if new distros were added - merged_cve = self._reprocess_merged_cve(cve_id, cve_rel_path) + # note: this is an IO bound operation, so a thread pool will suffice for now + # but look to a process pool if this becomes a bottleneck + proc_exception = None + with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_workers) as executor: + + def worker(fn, cve_id, *args, **kwargs): + try: + return fn(cve_id, *args, **kwargs) + except: + self.logger.exception("error processing {}".format(cve_id)) + raise + + futures = [] + + for d in [self._active_cve_dir, self._retired_cve_dir]: + cve_dir = os.path.join(vc_dir, d) + for cve_id in sorted(filter(lambda x: _cve_filename_regex.match(x), os.listdir(cve_dir))): + f = os.path.join(cve_dir, cve_id) + cve_rel_path = "/".join([d, cve_id]) + + future = executor.submit(worker, self._process_cve, cve_id, cve_rel_path, f, to_rev, updated_paths) + futures.append(future) + + # wait for all the futures to complete + done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION) + + if len(done) > 0 and len(done) != len(futures): + future = done.pop() + proc_exception = future.exception() + if proc_exception: + self.logger.error(f"one task failed with: {proc_exception} (shutting down)") + + # cancel any scheduled tasks + for future in futures: + future.cancel() + + if proc_exception: + raise proc_exception # Remove merged state of deleted cves for cve_id in deleted_ids: self.logger.debug("{} is no longer relevant, deleting merged CVE state if any".format(cve_id)) self._delete_merged_cve(cve_id) + def _process_cve(self, cve_id, cve_rel_path, f, to_rev, updated_paths): + if cve_rel_path in updated_paths: + # merge cves updated since last revision or all if the last processed revision is not available + # self.logger.debug("CVE updated since last run, processing {}".format(cve_rel_path)) + + return self._merge_cve(cve_id, cve_rel_path, f, to_rev) + + if not self._merged_cve_exists(cve_id): + # merge may be required since the saved state is not found + # self.logger.debug("CVE merged state not found, processing {}".format(cve_rel_path)) + + return self._merge_cve(cve_id, cve_rel_path, f, to_rev) + + # merge may be required if new distros were added + # self.logger.debug("reprocessing merged CVE {}".format(cve_rel_path)) + + return self._reprocess_merged_cve(cve_id, cve_rel_path) + def _load_last_processed_rev(self): last_processed_rev_path = os.path.join(self.norm_workspace, self._last_processed_rev_file_git) @@ -752,11 +787,12 @@ def _reprocess_merged_cve(self, cve_id, cve_rel_path): :param cve_rel_path: :return: """ + self.logger.debug("reprocessing merged CVE {}".format(cve_rel_path)) saved_state = self._load_merged_cve(cve_id) if not saved_state: - self.logger.warn("No saved state fround for {}".format(cve_id)) + self.logger.warning("no saved state fround for {}".format(cve_id)) return # reprocess only ignored patches @@ -764,11 +800,11 @@ def _reprocess_merged_cve(self, cve_id, cve_rel_path): # Found patches that can be merged and or can't be resolved from the saved state, could be a new namespace if merged_patches or to_be_merged_map: - self.logger.debug("Found unresolved patches in previously merged state, could be a new distro") + self.logger.debug("found unresolved patches in previously merged state, could be a new distro") # Process revision history for eol-ed packages that need to be merged if to_be_merged_map: if self.enable_rev_history: - self.logger.debug("Attempting to resolve patches using reivsion history for {}".format(cve_rel_path)) + self.logger.debug("attempting to resolve patches using revision history for {}".format(cve_rel_path)) (resolved_patches, pending_dpt_list, cve_latest_rev,) = self._resolve_patches_using_history( cve_id=cve_id, cve_rel_path=cve_rel_path, @@ -778,7 +814,7 @@ def _reprocess_merged_cve(self, cve_id, cve_rel_path): merged_patches.extend(resolved_patches) if pending_dpt_list: self.logger.debug( - "Exhausted all revisions for {} but could not resolve patches: {}".format( + "exhausted all revisions for {} but could not resolve patches: {}".format( cve_rel_path, [to_be_merged_map[x] for x in pending_dpt_list], ) @@ -791,7 +827,7 @@ def _reprocess_merged_cve(self, cve_id, cve_rel_path): if cve_latest_rev: saved_state[self._last_processed_rev_key_git] = cve_latest_rev else: - self.logger.debug("Revision history processing is disabled. Merging unresolved patches as they are") + self.logger.debug("revision history processing is disabled. Merging unresolved patches as they are") merged_patches.extend(to_be_merged_map.values()) # pulling this outside of the revision history block for fixing ENTERPRISE-195. saved state should be updated if there are mergeable or to-be-merged packages @@ -847,7 +883,7 @@ def _merge_cve(self, cve_id, cve_rel_path, cve_abs_path, repo_current_rev): :param repo_current_rev: :return: """ - self.logger.debug("Parsing cve file: {}".format(cve_abs_path)) + self.logger.debug("merging CVE {}".format(cve_rel_path)) with open(cve_abs_path) as cve_file: raw_content = cve_file.readlines() @@ -875,7 +911,7 @@ def _merge_cve(self, cve_id, cve_rel_path, cve_abs_path, repo_current_rev): merged_patches.extend(resolved_patches) if pending_dpt_list: self.logger.debug( - "Exhausted all revisions for {} but could not resolve patches: {}".format( + "exhausted all revisions for {} but could not resolve patches: {}".format( cve_rel_path, [to_be_merged_map[x] for x in pending_dpt_list], ) @@ -887,7 +923,7 @@ def _merge_cve(self, cve_id, cve_rel_path, cve_abs_path, repo_current_rev): else: # merge with saved state if any if saved_cve: - self.logger.debug("Revision history processing is disabled. Resolving patches using saved cve state") + self.logger.debug("revision history processing is disabled. Resolving patches using saved cve state") rev_matched_map = filter_merged_patches(saved_cve, list(to_be_merged_map.keys())) # merge resolved and unresolved patches merged_patches.extend(list(rev_matched_map.values())) @@ -896,7 +932,7 @@ def _merge_cve(self, cve_id, cve_rel_path, cve_abs_path, repo_current_rev): rev_matched_map.clear() else: self.logger.debug( - "Revision history processing is disabled and no saved state found. Skipping patch resolution" + "revision history processing is disabled and no saved state found. Skipping patch resolution" ) merged_patches.extend(list(to_be_merged_map.values())) @@ -921,7 +957,7 @@ def _resolve_patches_using_history( saved_state=None, ): t = time.time() - self.logger.debug("Processing cve revision history for: {}".format(cve_rel_path)) + self.logger.debug("processing CVE revision history for: {}".format(cve_rel_path)) # setup metrics metrics = { @@ -985,11 +1021,11 @@ def _resolve_patches_using_history( del rev_raw_content[:] else: # no revs for processing - self.logger.debug("No previous revisions found") + self.logger.debug("no previous revisions found") # then merge with saved state if there are things that still need to be merged. This is a one time thing to short circuit fetching revs if pending_dpt_list and saved_state: - self.logger.debug("Resolving patches using saved cve state before processing any more revisions") + self.logger.debug("resolving patches using saved cve state before processing any more revisions") rev_matched_map = filter_merged_patches(saved_state, pending_dpt_list) resolved_patches.extend(list(rev_matched_map.values())) pending_dpt_list = [x for x in pending_dpt_list if x not in rev_matched_map] @@ -1002,7 +1038,7 @@ def _resolve_patches_using_history( # So continue with process revision: fetch content, parse, filter patches that need to be merged and so on if pending_dpt_list: # fetch the entire revision history for this file, needed for computing the merge - self.logger.debug("Unresolved patches found even after merge with saved state, walking entire revision history") + self.logger.debug("unresolved patches found even after merge with saved state, walking entire revision history") all_revs = self.git_wrapper.get_revision_history(cve_id, cve_rel_path, None) before_revs = all_revs[len(since_revs) :] if since_revs else all_revs @@ -1042,7 +1078,7 @@ def _resolve_patches_using_history( self.logger.debug("Merge with saved state resolved all relevant patches") metrics["time_elapsed"] = int((time.time() - t) * 1000) / float(1000) - self.logger.debug("Metrics from processing revision history for {}: {}".format(cve_rel_path, metrics)) + self.logger.trace("metrics from processing revision history for {}: {}".format(cve_rel_path, metrics)) return resolved_patches, pending_dpt_list, cve_latest_rev diff --git a/src/vunnel/workspace.py b/src/vunnel/workspace.py index 1da322a7..a2d5c1dc 100644 --- a/src/vunnel/workspace.py +++ b/src/vunnel/workspace.py @@ -5,9 +5,9 @@ from dataclasses import asdict, dataclass, field from typing import Any -import dacite import rfc3339 import xxhash +from dataclass_wizard import fromdict from vunnel import schema as schemaDef @@ -65,15 +65,9 @@ def datetime_hook(t: str) -> datetime.datetime: return datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S%z") with open(metadata_path, "r", encoding="utf-8") as f: - return dacite.from_dict( + return fromdict( WorkspaceState, json.load(f), - config=dacite.Config( - type_hooks={ - datetime.datetime: datetime_hook, - datetime.datetime | None: datetime_hook, # type: ignore - }, - ), ) def write(self, root: str) -> str: diff --git a/tests/unit/providers/ubuntu/test_git_wrapper.py b/tests/unit/providers/ubuntu/test_git_wrapper.py index 780abf9c..d5f22d25 100644 --- a/tests/unit/providers/ubuntu/test_git_wrapper.py +++ b/tests/unit/providers/ubuntu/test_git_wrapper.py @@ -129,7 +129,9 @@ def test_parse_log(self): with open(self._git_change_log_file_, "r") as f: git_commit_log = f.read() - commits = GitWrapper._parse_log(git_commit_log) + wrapper = GitWrapper(self._workspace_, self._workspace_) + + commits = wrapper._parse_log(git_commit_log) self.assertEqual(len(commits), len(self._commit_changes_)) @@ -143,9 +145,11 @@ def test_compute_change_set(self): with open(self._git_change_log_file_, "r") as f: git_commit_log = f.read() - commits = GitWrapper._parse_log(git_commit_log) + wrapper = GitWrapper(self._workspace_, self._workspace_) + + commits = wrapper._parse_log(git_commit_log) - modified, removed = GitWrapper._compute_change_set(commits) + modified, removed = wrapper._compute_change_set(commits) self.assertEqual(modified, self._overall_changes_["modified"]) self.assertEqual(removed, self._overall_changes_["removed"]) @@ -156,7 +160,9 @@ def test_parse_revision_history(self): with open(self._git_rev_log_file_, "r") as f: git_rev_log = f.read() - revs = GitWrapper._parse_revision_history(self._rev_log_cve_, git_rev_log) + wrapper = GitWrapper(self._workspace_, self._workspace_) + + revs = wrapper._parse_revision_history(self._rev_log_cve_, git_rev_log) self.assertEqual(len(revs), len(self._revisions_))