Skip to content

Commit

Permalink
[Feature] Support for Export phase 2 Filters and Endpoints (#826)
Browse files Browse the repository at this point in the history
* removed unused field asset_tag from compliance export

* added tags to schema

* added network_id to schema

* added list jobs endpoint for compliance export

* removed the extra character
  • Loading branch information
aseemsavio authored Aug 6, 2024
1 parent eeb4cab commit 18fb8d9
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 10 deletions.
22 changes: 20 additions & 2 deletions tenable/io/exports/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ def compliance(self, **kwargs) -> Union[ExportsIterator, UUID]:
Returns Compliance findings for the specified list of plugin names.
plugin_id (list[int], optional):
Returns Compliance findings for the specified list of plugin IDs.
asset_tags (list[str], optional):
Returns Compliance findings for the specified list of asset tags.
audit_name (str, optional):
Restricts compliance findings to those associated with the specified audit.
audit_file_name (str, optional):
Expand All @@ -384,6 +382,11 @@ def compliance(self, **kwargs) -> Union[ExportsIterator, UUID]:
Vulnerability Management on or after the specified unix timestamp.
state (list[str], optional):
Restricts compliance findings to those associated with the provided list of states, such as open, reopened and fixed.
tags (list[tuple[str, list[str]]], optional):
A list of tag pairs to filter the results on. The tag pairs
should be presented as ``('CATEGORY', ['VALUE'])``.
network_id (str, optional):
Returns Compliance findings for the specified network ID.
num_findings (int):
The number of findings to return per chunk of data. If left
unspecified, the default is ``5000``.
Expand Down Expand Up @@ -551,3 +554,18 @@ def vulns(self, **kwargs) -> Union[ExportsIterator, UUID]:
... )
'''
return self._export('vulns', VulnExportSchema(), **kwargs)

def list_compliance_export_jobs(self):
"""
Returns a list of the last 1,000 compliance export requests along with their statuses
and related metadata.
Returns:
:obj:`list`:
List of job records.
Examples:
>>> for compliance_job in tio.exports.list_compliance_export_jobs():
... pprint(compliance_job)
"""
return self._api.get('compliance/export/status').json()["exports"]
23 changes: 21 additions & 2 deletions tenable/io/exports/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ def serialize_tags(data: Dict) -> Dict:
data[tag_name].append(tag[1])
return data

def serialize_compliance_tags(data: Dict) -> Dict:
"""
Converts the tag tuples into a list of objects
"""
tags = data.pop("tags", [])
modified_tags = []
for tag in tags:
category = tag[0]
values = tag[1]
modified_tags.append({
"category": category,
"values": values
})

if tags:
data["tags"] = modified_tags

return data

class AssetExportSchema(Schema):
'''
Expand Down Expand Up @@ -119,22 +137,23 @@ class ComplianceExportSchema(Schema):
ipv6_addresses = fields.List(fields.Str())
plugin_name = fields.List(fields.Str())
plugin_id = fields.List(fields.Int())
asset_tags = fields.List(fields.Str())
audit_name = fields.Str()
audit_file_name = fields.Str()
compliance_results = fields.List(fields.Str())
last_observed = fields.Int()
indexed_at = fields.Int()
since = fields.Int()
state = fields.List(fields.Str())
tags = fields.List(fields.Tuple((fields.Str(), fields.List(fields.Str()))))
network_id = fields.Str()

# Other params
asset = fields.List(fields.UUID())
num_findings = fields.Int(dump_default=5000)

@post_dump
def post_serialization(self, data, **kwargs): # noqa PLR0201 PLW0613
data = serialize_tags(data)
data = serialize_compliance_tags(data)
data = envelope(data, 'filters', excludes=['asset',
'num_findings'
])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
User-Agent:
- Integration/1.0 (pytest; pytenable-automated-testing; Build/unknown) pyTenable/1.5.0
(Restfly/1.4.7; Python/3.10.6; Darwin/arm64)
X-APIKeys:
- accessKey=TIO_ACCESS_KEY;secretKey=TIO_SECRET_KEY
method: GET
uri: https://cloud.tenable.com/compliance/export/status
response:
body:
string: '{"exports":[]}'
headers:
Accept-Ranges:
- bytes
Cache-Control:
- no-store
Connection:
- keep-alive
Content-Length:
- '14'
Content-Type:
- application/json; charset=utf-8
Date:
- Mon, 05 Aug 2024 02:21:24 GMT
Expect-CT:
- enforce, max-age=86400
Pragma:
- no-cache
Referrer-Policy:
- strict-origin-when-cross-origin
Set-Cookie:
- nginx-cloud-site-id=qa-develop; path=/; HttpOnly; SameSite=Strict; Secure
Strict-Transport-Security:
- max-age=63072000; includeSubDomains
Vary:
- origin
X-Content-Type-Options:
- nosniff
X-Download-Options:
- noopen
X-Frame-Options:
- DENY
X-Gateway-Site-ID:
- service-nginx-router-us-east-1-eng-5689f7f454-gbj9n
X-Path-Handler:
- tenable-io
X-Request-Uuid:
- '"f1adf991e7bd765c92b6c693a08dc846"'
X-Xss-Protection:
- 1; mode=block
status:
code: 200
message: OK
version: 1
5 changes: 5 additions & 0 deletions tests/io/exports/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,8 @@ def test_export_adoption(tvm):
assert UUID(job_id) == tvm.exports.initiate_export('vulns')
with pytest.raises(RequestConflictError):
tvm.exports.initiate_export('vulns', adopt_existing=False)

@pytest.mark.vcr()
def test_list_compliance_export_jobs_returns_a_list(api):
jobs = api.exports.list_compliance_export_jobs()
assert isinstance(jobs, list)
17 changes: 11 additions & 6 deletions tests/io/exports/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def compliance_export():


@pytest.fixture
def compliance_export_phase_1_schema():
def compliance_export_phase_1_and_2_schema():
"""
Example compliance export request with phase 1 filters
"""
Expand All @@ -100,14 +100,17 @@ def compliance_export_phase_1_schema():
'ipv6_addresses': ['2001:0db8:85a3:0000:0000:8a2e:0370:7334'],
'plugin_name': ['Debian dla-3719 : php-seclib - security update', 'Debian dsa-5607 : chromium - security update'],
'plugin_id': [189491, 189490],
'asset_tags': ['tag-a', 'tag-b'],
'audit_name': 'my-audit-name',
'audit_file_name': 'my-audit-file-name',
'compliance_results': ['PASSED'],
'last_observed': 1635798607,
'indexed_at': 1635798607,
'since': 1635798607,
'state': ['Active']
'state': ['Active'],
'tags': [
('Category', ['value1', 'value2'])
],
'network_id': 'd6797cf4-42b9-4cad-8591-9dd91c3f0fc3'
}


Expand Down Expand Up @@ -272,12 +275,14 @@ def test_asset_export_schema_without_open_ports(asset_export_with_out_open_ports
schema_dump = schema.dump(schema.load(asset_export_with_out_open_ports))
assert "include_open_ports" not in schema_dump

def test_compliance_export_phase_1_filters(compliance_export_phase_1_schema):
def test_compliance_export_phase_1_and_2_filters(compliance_export_phase_1_and_2_schema):
"""
Test Compliance Export Phase 1 Filter Schema
"""
schema = ComplianceExportSchema()
schema_dump = schema.dump(schema.load(compliance_export_phase_1_schema))
schema_dump = schema.dump(schema.load(compliance_export_phase_1_and_2_schema))

# checking random element
assert schema_dump["filters"]["state"][0] == "Active"
assert schema_dump["filters"]["state"][0] == "Active"
assert len(schema_dump["filters"]["tags"]) == 1
assert schema_dump["filters"]["network_id"] == "d6797cf4-42b9-4cad-8591-9dd91c3f0fc3"

0 comments on commit 18fb8d9

Please sign in to comment.