forked from nccgroup/ScoutSuite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest-rules-processingengine.py
87 lines (73 loc) · 3.53 KB
/
test-rules-processingengine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: utf-8 -*-
import json
import os
from opinel.utils.console import configPrintException, printError
from ScoutSuite.core.processingengine import ProcessingEngine
from ScoutSuite.core.ruleset import Ruleset
class DummyObject(object):
pass
class TestAWSScout2RulesProcessingEngine:
def setup(self):
configPrintException(True)
# Build a fake ruleset
# filename = 'foobar.json'
# ruleset = {'rules': {}}
# ruleset['rules'][filename] = []
# ruleset['rules'][filename].append({'enabled': True, 'level': 'danger'})
# pass
# finding_rules = Ruleset(profile_name, filename=args.ruleset, ip_ranges=args.ip_ranges)
# pe = ProcessingEngine(finding_rules)
# pe.run(aws_config)
# TODO
# Check that one testcase per finding rule exists (should be within default
# reulset)
def test_all_finding_rules(self):
test_dir = os.path.dirname(os.path.realpath(__file__))
test_ruleset_file_name = os.path.join(test_dir, 'data/ruleset-test.json')
#FIXME this is only for AWS
with open(os.path.join(test_dir, '../ScoutSuite/providers/aws/rules/rulesets/default.json'), 'rt') as f:
ruleset = json.load(f)
rule_counters = {'found': 0, 'tested': 0, 'verified': 0}
for file_name in ruleset['rules']:
rule_counters['found'] += 1
test_config_file_name = os.path.join(test_dir, 'data/rule-configs/%s' % file_name)
if not os.path.isfile(test_config_file_name):
continue
rule_counters['tested'] += 1
test_ruleset = {'rules': {}, 'about': 'regression test'}
test_ruleset['rules'][file_name] = []
rule = ruleset['rules'][file_name][0]
rule['enabled'] = True
test_ruleset['rules'][file_name].append(rule)
with open(test_ruleset_file_name, 'wt') as f:
f.write(json.dumps(test_ruleset, indent=4))
# printError('Ruleset ::')
# printError(str(test_ruleset))
rules = Ruleset(filename=test_ruleset_file_name)
pe = ProcessingEngine(rules)
with open(test_config_file_name, 'rt') as f:
dummy_provider = DummyObject()
test_config_dict = json.load(f)
for key in test_config_dict:
setattr(dummy_provider, key, test_config_dict[key])
pe.run(dummy_provider)
service = file_name.split('-')[0]
findings = dummy_provider.services[service]['findings']
findings = findings[list(findings.keys())[0]]['items']
test_result_file_name = os.path.join(test_dir, 'data/rule-results/%s' % file_name)
if not os.path.isfile(test_result_file_name):
printError('Expected findings:: ')
printError(json.dumps(findings, indent=4))
continue
rule_counters['verified'] += 1
with open(test_result_file_name, 'rt') as f:
items = json.load(f)
try:
assert (set(sorted(findings)) == set(sorted(items)))
except Exception as e:
printError('Expected items:\n %s' % json.dumps(sorted(items)))
printError('Reported items:\n %s' % json.dumps(sorted(findings)))
assert (False)
printError('Existing rules: %d' % rule_counters['found'])
printError('Processed rules: %d' % rule_counters['tested'])
printError('Verified rules: %d' % rule_counters['verified'])