Skip to content

Commit

Permalink
Added feature - TenableAD Alert APIs
Browse files Browse the repository at this point in the history
Added feature - TenableAD Alert APIs

minor changes

implemented alert iterator for list api
  • Loading branch information
tushar-balwani authored and SteveMcGrath committed May 21, 2024
1 parent 8639fd4 commit 6ec94ab
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/api/ad/alert.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.ad.alert.api
1 change: 1 addition & 0 deletions tenable/ad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
:glob:
about
alert
api_keys
attack_types
category
Expand Down
Empty file added tenable/ad/alert/__init__.py
Empty file.
166 changes: 166 additions & 0 deletions tenable/ad/alert/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
'''
Alerts
=============
Methods described in this section relate to the alerts API.
These methods can be accessed at ``TenableAD.alerts``.
.. rst-class:: hide-signature
.. autoclass:: AlertsAPI
:members:
'''
from typing import List, Dict
from tenable.ad.alert.schema import AlertSchema, AlertParamsSchema
from tenable.ad.base.iterator import ADIterator
from tenable.base.endpoint import APIEndpoint


class AlertIterator(ADIterator):
'''
The alert iterator provides a scalable way to work through alert list
result sets of any size. The iterator will walk through each page of data,
returning one record at a time. If it reaches the end of a page of
records, then it will request the next page of information and then
continue to return records from the next page (and the next, and the next)
until the counter reaches the total number of records that the API has
reported.
'''


class AlertsAPI(APIEndpoint):
_path = 'alerts'
_schema = AlertSchema()

def list_by_profile(self,
profile_id: str,
**kwargs
) -> AlertIterator:
'''
Retrieve all alert instances
Args:
profile_id (str):
The profile instance identifier.
archived (optional, bool):
is alert archived?
read (optional, bool):
is alert read?
page (optional, int):
The page number user wants to retrieve.
per_page (optional, int):
The number of records per page user wants to retrieve.
max_items (optional, int):
The maximum number of records to return before
stopping iteration.
max_pages (optional, int):
The maximum number of pages to request before throwing
stopping iteration.
Returns:
:obj:`AlertIterator`:
An iterator that handles the page management of the requested
records.
Examples:
>>> for alert in tad.alerts.list_by_profile(
... profile_id='1',
... archived=False,
... read=False,
... page=1,
... per_page=20,
... max_pages=10,
... max_items=200
... ):
... pprint(alert)
'''
schema = AlertParamsSchema()
params = schema.dump(self._schema.load(kwargs))

return AlertIterator(
api=self._api,
_path=f'profiles/{profile_id}/alerts',
num_pages=params.get('page'),
_per_page=params.get('perPage'),
_query=params,
_schema=schema,
max_pages=params.pop('maxPages', None),
max_items=params.pop('maxItems', None)
)

def details(self,
alert_id: str
) -> Dict:
'''
Retrieves the details of a specific alert.
Args:
alert_id (str):
The alert instance identifier.
Returns:
dict:
the alert object.
Examples:
>>> tad.alerts.details(
... alert_id='1'
... )
'''
return self._schema.load(self._get(f"{alert_id}"))

def update(self,
alert_id: str,
**kwargs
) -> Dict:
'''
Update alert instance
Args:
alert_id (str):
The alert instance identifier.
archived (optional, bool):
is alert archived?
read (optional, bool):
is alert read?
Returns:
dict:
The updated alert object.
Example:
>>> tad.alerts.update(
... alert_id='1',
... archived=False,
... read=False
... )
'''
payload = self._schema.dump(self._schema.load(kwargs))
return self._schema.load(self._patch(f"{alert_id}", json=payload))

def update_on_profile(self,
profile_id: str,
**kwargs
) -> None:
'''
Update alerts for one profile
Args:
profile_id (str):
The alert instance identifier.
archived (optional, bool):
is alert archived?
read (optional, bool):
is alert read?
Returns:
None:
Example:
>>> tad.alerts.update_on_profile(
... profile_id='1',
... archived=False,
... read=False
... )
'''
payload = self._schema.dump(self._schema.load(kwargs))
self._api.patch(f"profiles/{profile_id}/alerts", json=payload)
21 changes: 21 additions & 0 deletions tenable/ad/alert/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from marshmallow import fields
from tenable.ad.base.schema import CamelCaseSchema, BoolStr


class AlertSchema(CamelCaseSchema):
id = fields.Int()
deviance_id = fields.Int()
archived = fields.Bool()
read = fields.Bool()
date = fields.DateTime()
directory_id = fields.Int()
infrastructure_id = fields.Int()
page = fields.Int(allow_none=True, dump_default=1)
per_page = fields.Int(allow_none=True)
max_pages = fields.Int(allow_none=True)
max_items = fields.Int(allow_none=True)


class AlertParamsSchema(AlertSchema):
archived = BoolStr()
read = BoolStr()
51 changes: 51 additions & 0 deletions tenable/ad/base/iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from restfly import APIIterator


class ADIterator(APIIterator):
'''
The following methods allows us to iterate through pages and get data
Attributes:
_api (restfly.session.APISession):
The APISession object that will be used for querying for the
data.
_path (str):
The URL for API call.
_schema (object):
The marshmallow schema object for deserialized response.
_method (str):
The API request method. supported values are ``get`` and ``post``.
default is ``get``
_query (dict):
The query params for API call.
_payload (dict):
The payload object for API call. it is applicable only for
post method.
'''
_api = None
_query = None
_payload = None
_method = None
_per_page = None
_path = None
_schema = None

def _get_page(self) -> None:
'''
Request the next page of data
'''
# The first thing that we need to do is construct the query with the
# current page and per_page
query = self._query
query['page'] = self.num_pages
query['perPage'] = self._per_page

# Lets make the actual call at this point.
if self._method == 'post':
self.page = self._schema.load(
self._api.post(self._path, params=query, json=self._payload),
many=True)
else:
self.page = self._schema.load(
self._api.get(self._path, params=query),
many=True)
7 changes: 7 additions & 0 deletions tenable/ad/base/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,10 @@ class BoolInt(fields.Boolean):
'''Schema to return an integer value for given boolean value'''
def _serialize(self, value, attr, obj, **kwargs) -> int:
return int(value) if value else 0


class BoolStr(fields.Boolean):
'''Schema to return a string value for given boolean value'''

def _serialize(self, value, attr, obj, **kwargs) -> str:
return str(value).lower() if isinstance(value, bool) else None
9 changes: 9 additions & 0 deletions tenable/ad/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tenable.base.platform import APIPlatform

from .about import AboutAPI
from .alert.api import AlertsAPI
from .api_keys import APIKeyAPI
from .attack_types.api import AttackTypesAPI
from .category.api import CategoryAPI
Expand Down Expand Up @@ -61,6 +62,14 @@ def about(self):
'''
return AboutAPI(self)

@property
def alerts(self):
'''
The interface object for the
:doc:`Tenable.ad Alerts APIs <alert>`.
'''
return AlertsAPI(self)

@property
def api_keys(self):
'''
Expand Down
Loading

0 comments on commit 6ec94ab

Please sign in to comment.