forked from robinwyss/appsec_scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathattack_details.py
102 lines (86 loc) · 3.78 KB
/
attack_details.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python
import sys
from argparse import ArgumentParser
import csv
from dynatrace_api import DynatraceApi
import logging
import logging.config
import time
import re
def getProperty(entity, propertyName):
"""
Retrieves the value of a property from an entity if it exists, otherwise an empty string
param: dictionary entity: the entity from which the property should be retrieved
param: string propertyName: the property to be retrieved
return: string: the value of the property or empty string if it doesn' exist.
"""
if propertyName in entity['properties']:
return entity['properties'][propertyName]
else:
return ""
start_time = time.time()
# get the Dynatrace Environmemnt (URL) and the API Token and default parameters
parser = ArgumentParser()
parser.add_argument("-e", "--env", dest="environment", help="The Dynatrace Environment to query", required=True)
parser.add_argument("-t", "--token", dest="token", help="The Dynatrace API Token to use", required=True)
parser.add_argument("--debug", dest="debug", help="Set log level to debbug", action='store_true')
parser.add_argument("-k", "--insecure", dest="insecure", help="Skip SSL certificate validation", action='store_true')
args = parser.parse_args()
env = args.environment
apiToken = args.token
verifySSL = not args.insecure
debug = args.debug
if debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.basicConfig(filename='output.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("=" * 200)
logging.info("Running %s ", re.sub(r"dt0c01\.[\S]+", "dt0c01.XXX", " ".join(sys.argv)))
logging.info("=" * 200)
def getProperty(entity, propertyName):
"""
Retrieves the value of a property from an entity if it exists, otherwise an empty string
param: dictionary entity: the entity from which the property should be retrieved
param: string propertyName: the property to be retrieved
return: string: the value of the property or empty string if it doesn' exist.
"""
if propertyName in entity['properties']:
return entity['properties'][propertyName]
else:
return ""
dynatraceApi = DynatraceApi(env, apiToken, verifySSL)
attacks = dynatraceApi.getAttacks()
pgiIds = set([])
with open('attack_details.csv', 'w', newline='') as f:
writer = csv.writer(f, delimiter=",", quoting=csv.QUOTE_ALL)
# header
header = ['ID', 'Timestamp', 'Type', 'State', 'Source IP', 'Process Name', 'Container Name', 'Pod Name',
'Image Name', 'Workload Name', 'Namespace', 'Container Base Name', 'Container IP',
'Kubernetes Cluster Name']
writer.writerow(header)
for attack in attacks:
fields = [
attack['displayId'],
attack['timestamp'],
attack['attackType'],
attack['state'],
attack['attacker']['sourceIp'],
attack['affectedEntities']['processGroupInstance']['name']
]
container = dynatraceApi.getContainerGroupForPGI(attack['affectedEntities']['processGroupInstance']['id'])
if container:
fields += [
container['displayName'],
getProperty(container, 'podName'),
getProperty(container, 'containerImageName'),
getProperty(container, 'workloadName'),
getProperty(container, 'namespaceName'),
','.join(getProperty(container, 'containerNames')),
','.join(getProperty(container, 'ipAddress'))
]
cluster = dynatraceApi.getClusterForCGI(container['entityId'])
if cluster:
fields += [cluster['displayName']]
writer.writerow(fields)
end_time = time.time()
print('')
print(f'Script completed successfully, took {(end_time - start_time):.2f}s')