-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allows overriding CPE configurations on NVD records (#502)
* wip: add input writer to nvd provider Signed-off-by: Will Murphy <[email protected]> * feat: get new nvd_writer from input and generate reader from store Signed-off-by: Christopher Phillips <[email protected]> * wip: move input db writing to be during update This way, we don't have to loop over the entire input table to write an output table. Signed-off-by: Will Murphy <[email protected]> * chore: lint fix Signed-off-by: Will Murphy <[email protected]> * add download overrides Signed-off-by: Alex Goodman <[email protected]> * update for latest overrides layout Signed-off-by: Weston Steimel <[email protected]> * bump nvd provider to version 2 This will force any preload state of version 2 to be used if the current workspace state is v1. This is needed because the nvd-input db will not exist and will otherwise result in a full NVD api pull Signed-off-by: Weston Steimel <[email protected]> * fix static analysis Signed-off-by: Alex Goodman <[email protected]> * add and fix tests Signed-off-by: Alex Goodman <[email protected]> --------- Signed-off-by: Will Murphy <[email protected]> Signed-off-by: Christopher Phillips <[email protected]> Signed-off-by: Alex Goodman <[email protected]> Signed-off-by: Weston Steimel <[email protected]> Co-authored-by: Will Murphy <[email protected]> Co-authored-by: Christopher Phillips <[email protected]> Co-authored-by: Alex Goodman <[email protected]>
- Loading branch information
1 parent
fc1a2e0
commit f4dbceb
Showing
11 changed files
with
458 additions
and
24 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
from __future__ import annotations | ||
|
||
import glob | ||
import logging | ||
import os | ||
import tarfile | ||
from typing import TYPE_CHECKING, Any | ||
|
||
from orjson import loads | ||
|
||
from vunnel.utils import http | ||
|
||
if TYPE_CHECKING: | ||
from vunnel.workspace import Workspace | ||
|
||
|
||
class NVDOverrides: | ||
__file_name__ = "nvd-overrides.tar.gz" | ||
__extract_name__ = "nvd-overrides" | ||
|
||
def __init__( # noqa: PLR0913 | ||
self, | ||
enabled: bool, | ||
url: str, | ||
workspace: Workspace, | ||
logger: logging.Logger | None = None, | ||
download_timeout: int = 125, | ||
): | ||
self.enabled = enabled | ||
self.__url__ = url | ||
self.workspace = workspace | ||
self.download_timeout = download_timeout | ||
if not logger: | ||
logger = logging.getLogger(self.__class__.__name__) | ||
self.logger = logger | ||
self.__filepaths_by_cve__: dict[str, str] | None = None | ||
|
||
@property | ||
def url(self) -> str: | ||
return self.__url__ | ||
|
||
def download(self) -> None: | ||
if not self.enabled: | ||
self.logger.debug("overrides are not enabled, skipping download...") | ||
return | ||
|
||
req = http.get(self.__url__, self.logger, stream=True, timeout=self.download_timeout) | ||
|
||
file_path = os.path.join(self.workspace.input_path, self.__file_name__) | ||
with open(file_path, "wb") as fp: | ||
for chunk in req.iter_content(): | ||
fp.write(chunk) | ||
|
||
untar_file(file_path, self._extract_path) | ||
|
||
@property | ||
def _extract_path(self) -> str: | ||
return os.path.join(self.workspace.input_path, self.__extract_name__) | ||
|
||
def _build_files_by_cve(self) -> dict[str, Any]: | ||
filepaths_by_cve__: dict[str, str] = {} | ||
for path in glob.glob(os.path.join(self._extract_path, "**/data/**/", "CVE-*.json"), recursive=True): | ||
cve_id = os.path.basename(path).removesuffix(".json").upper() | ||
filepaths_by_cve__[cve_id] = path | ||
|
||
return filepaths_by_cve__ | ||
|
||
def cve(self, cve_id: str) -> dict[str, Any] | None: | ||
if not self.enabled: | ||
return None | ||
|
||
if self.__filepaths_by_cve__ is None: | ||
self.__filepaths_by_cve__ = self._build_files_by_cve() | ||
|
||
# TODO: implement in-memory index | ||
path = self.__filepaths_by_cve__.get(cve_id.upper()) | ||
if path and os.path.exists(path): | ||
with open(path) as f: | ||
return loads(f.read()) | ||
return None | ||
|
||
def cves(self) -> list[str]: | ||
if not self.enabled: | ||
return [] | ||
|
||
if self.__filepaths_by_cve__ is None: | ||
self.__filepaths_by_cve__ = self._build_files_by_cve() | ||
|
||
return list(self.__filepaths_by_cve__.keys()) | ||
|
||
|
||
def untar_file(file_path: str, extract_path: str) -> None: | ||
with tarfile.open(file_path, "r:gz") as tar: | ||
|
||
def filter_path_traversal(tarinfo: tarfile.TarInfo, path: str) -> tarfile.TarInfo | None: | ||
# we do not expect any relative file paths that would result in the clean | ||
# path being different from the original path | ||
# e.g. | ||
# expected: results/results.db | ||
# unexpected: results/../../../../etc/passwd | ||
# we filter (drop) any such entries | ||
|
||
if tarinfo.name != os.path.normpath(tarinfo.name): | ||
return None | ||
return tarinfo | ||
|
||
# note: we have a filter that drops any entries that would result in a path traversal | ||
# which is what S202 is referring to (linter isn't smart enough to understand this) | ||
tar.extractall(path=extract_path, filter=filter_path_traversal) # noqa: S202 |
Oops, something went wrong.