-
Notifications
You must be signed in to change notification settings - Fork 42
/
prime.py
135 lines (112 loc) · 4.64 KB
/
prime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import urllib.request
from urllib.parse import urlencode, urlsplit
from json import loads
from socket import timeout
from ssl import _create_unverified_context
from base64 import b64encode
TIMEOUT = 20
def gen_auth(username, password):
'Generate basic authorization using username and password'
return b64encode(f'{username}:{password}'.encode('utf-8')).decode()
def make_url(base_url, endpoint, resource):
'Corsair creates URLs using this method'
base_url = urlsplit(base_url)
path = base_url.path + f'/{endpoint}/{resource}'
path = path.replace('//', '/')
path = path[:-1] if path.endswith('/') else path
return base_url._replace(path=path).geturl()
class Api(object):
def __init__(self, base_url, username, password, tls_verify=True):
self.base_url = base_url if base_url[-1] != '/' else base_url[:-1]
self.auth = gen_auth(username, password)
self.tls_verify = tls_verify
self.credentials = (self.base_url, self.auth, self.tls_verify)
self.data = Endpoint(self.credentials, 'data')
self.op = Endpoint(self.credentials, 'op')
class Endpoint(object):
def __init__(self, credentials, endpoint):
self.base_url = credentials[0]
self.endpoint = endpoint
self.resource = ''
self.auth = credentials[1]
self.tls_verify = credentials[2]
def read(self, _resource, **filters):
self.resource = f'{_resource}.json' # will only deal with JSON outputs
first_result = 0 if 'firstResult' not in filters else filters['firstResult']
max_results = 1000 if 'maxResults' not in filters else filters['maxResults']
filters.update({'firstResult':first_result, 'maxResults':max_results})
req = Request(make_url(self.base_url, self.endpoint, self.resource),
self.auth, self.tls_verify)
try:
res = req.get(**filters)
except timeout:
raise Exception('Operation timedout')
return loads(res.read()) # test for possible Prime errors
class Request(object):
def __init__(self, url, auth, tls_verify):
self.url = url
self.auth = auth
self.timeout = TIMEOUT
self.context = None if tls_verify else _create_unverified_context()
self.headers = {
'Content-Type': 'application/json',
'Authorization': f'Basic {self.auth}'
}
def get(self, **filters):
url = f'{self.url}?{self.dotted_filters(**filters)}' if filters else self.url
req = urllib.request.Request(url, headers=self.headers, method='GET')
return urllib.request.urlopen(req, timeout=self.timeout, context=self.context)
def dotted_filters(self, **filters):
'Prime filters start with a dot'
if not filters:
return ''
else:
return f'.{urlencode(filters).replace("&", "&.")}'
class Prime(object):
def __init__(self, address, username, password, tls_verify, unknown):
self.prime = Api(address, username, password, tls_verify)
self.unknown = unknown
self.hosts = list()
def run(self, access_points=False):
'''Extracts devices from Cisco Prime
access_points: if set to True, will try to get APs data
Returns False for no errors or True if errors occurred
'''
errors = False
devices = self.get_devices('Devices')
for device in devices:
try:
self.hosts.append((
device['devicesDTO']['ipAddress'],
device['devicesDTO']['deviceName']
))
except KeyError:
errors = True
if access_points:
aps = self.get_devices('AccessPoints')
for ap in aps:
try:
self.hosts.append((
ap['accessPointsDTO']['ipAddress']['address'],
ap['accessPointsDTO']['model']
))
except KeyError:
errors = True
return errors
def get_devices(self, resource):
'This function is used to support run()'
raw = list()
res = self.prime.data.read(resource, full='true')
count = res['queryResponse']['@count']
last = res['queryResponse']['@last']
raw.extend(res['queryResponse']['entity'])
while last < count - 1:
first_result = last + 1
last += 1000
res = self.prime.data.read(
resource,
full='true',
firstResult=first_result
)
raw.extend(res['queryResponse']['entity'])
return raw