forked from OpenLEADR/openleadr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_failures.py
332 lines (289 loc) · 14.4 KB
/
test_failures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
from openleadr import OpenADRClient, OpenADRServer
from openleadr.utils import generate_id, certificate_fingerprint
from openleadr import messaging, errors, objects
from datetime import datetime, timezone, timedelta
import pytest
from aiohttp import web
import os
import logging
import asyncio
from datetime import timedelta
from base64 import b64encode
import re
from lxml import etree
@pytest.mark.asyncio
async def test_http_level_error(start_server):
client = OpenADRClient(vtn_url="http://this.is.an.error", ven_name=VEN_NAME)
client.on_event = _client_on_event
await client.run()
await client.client_session.close()
@pytest.mark.asyncio
async def test_xml_schema_error(start_server, caplog):
message = messaging.create_message("oadrQueryRegistration", request_id='req1234')
message = message.replace('<requestID xmlns="http://docs.oasis-open.org/ns/energyinterop/201110/payloads">req1234</requestID>', '')
client = OpenADRClient(ven_name='myven', vtn_url=f'http://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b')
result = await client._perform_request('EiRegisterParty', message)
assert result == (None, {})
logs = [rec.message for rec in caplog.records]
for log in logs:
if log.startswith("Non-OK status 400"):
assert "XML failed validation" in log
break
else:
assert False
@pytest.mark.asyncio
async def test_wrong_endpoint(start_server, caplog):
message = messaging.create_message("oadrQueryRegistration", request_id='req1234')
client = OpenADRClient(ven_name='myven', vtn_url=f'http://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b')
response_type, response_payload = await client._perform_request('OadrPoll', message)
assert response_type == 'oadrResponse'
assert response_payload['response']['response_code'] == 459
@pytest.mark.asyncio
async def test_vtn_no_create_party_registration_handler(caplog):
caplog.set_level(logging.WARNING)
server = OpenADRServer(vtn_id='myvtn')
client = OpenADRClient(ven_name='myven', vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
await server.run_async()
await client.run()
#await asyncio.sleep(0.5)
await server.stop()
await client.stop()
#await asyncio.sleep(0)
assert 'No VEN ID received from the VTN, aborting.' in caplog.messages
assert ("You should implement and register your own on_create_party_registration "
"handler if you want VENs to be able to connect to you. This handler will "
"receive a registration request and should return either 'False' (if the "
"registration is denied) or a (ven_id, registration_id) tuple if the "
"registration is accepted.") in caplog.messages
@pytest.mark.asyncio
async def test_invalid_signature_error(start_server_with_signatures, caplog):
client = OpenADRClient(ven_name='myven',
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
vtn_fingerprint=VTN_FINGERPRINT)
message = client._create_message('oadrPoll', ven_id='ven123')
fake_sig = b64encode("HelloThere".encode('utf-8')).decode('utf-8')
message = re.sub(r'<ds:SignatureValue>.*?</ds:SignatureValue>', f'<ds:SignatureValue>{fake_sig}</ds:SignatureValue>', message)
result = await client._perform_request('OadrPoll', message)
assert result == (None, {})
logs = [rec.message for rec in caplog.records]
for log in logs:
if log.startswith("Non-OK status 403 when performing a request"):
assert "Invalid Signature" in log
break
else:
assert False
def problematic_handler(*args, **kwargs):
raise Exception("BOOM")
@pytest.mark.asyncio
async def test_server_handler_exception(caplog):
server = OpenADRServer(vtn_id=VTN_ID,
http_port=SERVER_PORT)
server.add_handler('on_create_party_registration', problematic_handler)
await server.run_async()
client = OpenADRClient(ven_name='myven',
vtn_url=f'http://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b')
await client.run()
#await asyncio.sleep(0.5)
await client.stop()
await server.stop()
for message in caplog.messages:
if message.startswith('Non-OK status 500 when performing a request'):
break
else:
assert False
def protocol_error_handler(*args, **kwargs):
raise errors.OutOfSequenceError()
@pytest.mark.asyncio
async def test_throw_protocol_error(caplog):
server = OpenADRServer(vtn_id=VTN_ID,
http_port=SERVER_PORT)
server.add_handler('on_create_party_registration', protocol_error_handler)
await server.run_async()
client = OpenADRClient(ven_name='myven',
vtn_url=f'http://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b')
await client.run()
#await asyncio.sleep(0.5)
await client.stop()
await server.stop()
assert 'We got a non-OK OpenADR response from the server: 450: OUT OF SEQUENCE' in caplog.messages
@pytest.mark.asyncio
async def test_invalid_signature_error(start_server_with_signatures, caplog):
client = OpenADRClient(ven_name='myven',
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
vtn_fingerprint=VTN_FINGERPRINT)
message = client._create_message('oadrPoll', ven_id='ven123')
fake_sig = b64encode("HelloThere".encode('utf-8')).decode('utf-8')
message = re.sub(r'<ds:SignatureValue>.*?</ds:SignatureValue>', f'<ds:SignatureValue>{fake_sig}</ds:SignatureValue>', message)
result = await client._perform_request('OadrPoll', message)
assert result == (None, {})
logs = [rec.message for rec in caplog.records]
for log in logs:
if log.startswith("Non-OK status 403 when performing a request"):
assert "Invalid Signature" in log
break
else:
assert False
def test_replay_protect_message_too_old(caplog):
client = OpenADRClient(ven_name='myven',
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
vtn_fingerprint=VTN_FINGERPRINT)
_temp = messaging.REPLAY_PROTECT_MAX_TIME_DELTA
messaging.REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=0)
message = client._create_message('oadrPoll', ven_id='ven123')
tree = etree.fromstring(message.encode('utf-8'))
with pytest.raises(ValueError) as err:
messaging._verify_replay_protect(tree)
assert str(err.value) == 'The message was signed too long ago.'
messaging.REPLAY_PROTECT_MAX_TIME_DELTA = _temp
def test_replay_protect_repeated_message(caplog):
client = OpenADRClient(ven_name='myven',
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
vtn_fingerprint=VTN_FINGERPRINT)
message = client._create_message('oadrPoll', ven_id='ven123')
tree = etree.fromstring(message.encode('utf-8'))
messaging._verify_replay_protect(tree)
with pytest.raises(ValueError) as err:
messaging._verify_replay_protect(tree)
assert str(err.value) == 'This combination of timestamp and nonce was already used.'
def test_replay_protect_missing_nonce(caplog):
client = OpenADRClient(ven_name='myven',
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
vtn_fingerprint=VTN_FINGERPRINT)
message = client._create_message('oadrPoll', ven_id='ven123')
message = re.sub('<dsp:nonce>.*?</dsp:nonce>', '', message)
tree = etree.fromstring(message.encode('utf-8'))
with pytest.raises(ValueError) as err:
messaging._verify_replay_protect(tree)
assert str(err.value) == "Missing 'nonce' element in ReplayProtect in incoming message."
def test_replay_protect_malformed_nonce(caplog):
client = OpenADRClient(ven_name='myven',
vtn_url=f'https://localhost:{SERVER_PORT}/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
vtn_fingerprint=VTN_FINGERPRINT)
message = client._create_message('oadrPoll', ven_id='ven123')
message = re.sub('<dsp:timestamp>.*?</dsp:timestamp>', '', message)
tree = etree.fromstring(message.encode('utf-8'))
with pytest.raises(ValueError) as err:
messaging._verify_replay_protect(tree)
assert str(err.value) == "Missing or malformed ReplayProtect element in the message signature."
message = re.sub('<dsp:ReplayProtect>.*?</dsp:ReplayProtect>', '', message)
tree = etree.fromstring(message.encode('utf-8'))
with pytest.raises(ValueError) as err:
messaging._verify_replay_protect(tree)
assert str(err.value) == "Missing or malformed ReplayProtect element in the message signature."
def test_server_add_unknown_handler(caplog):
server = OpenADRServer(vtn_id='myvtn')
with pytest.raises(NameError) as err:
server.add_handler('unknown_name', print)
assert str(err.value) == ("Unknown handler 'unknown_name'. Correct handler names are: "
"'on_created_event', 'on_request_event', 'on_register_report', "
"'on_create_report', 'on_created_report', 'on_request_report', "
"'on_update_report', 'on_poll', 'on_query_registration', "
"'on_create_party_registration', 'on_cancel_party_registration'.")
def test_server_add_event_with_invalid_signal_type():
server = OpenADRServer(vtn_id='myvtn')
with pytest.raises(ValueError):
server.add_event(ven_id='ven123',
signal_name='simple',
signal_type='x-LoadControlCapacity',
intervals=[objects.Interval(dtstart=datetime.now(timezone.utc),
duration=timedelta(seconds=10),
signal_payload=1.0)])
def test_server_add_event_with_invalid_signal_name():
server = OpenADRServer(vtn_id='myvtn')
with pytest.raises(ValueError):
server.add_event(ven_id='ven123',
signal_name='invalid',
signal_type='x-loadControlCapacity',
intervals=[objects.Interval(dtstart=datetime.now(timezone.utc),
duration=timedelta(seconds=10),
signal_payload=1.0)])
def test_server_add_event_with_custom_signal_name():
server = OpenADRServer(vtn_id='myvtn')
server.add_event(ven_id='ven123',
signal_name='x-invalid',
signal_type='x-loadControlCapacity',
intervals=[objects.Interval(dtstart=datetime.now(timezone.utc),
duration=timedelta(seconds=10),
signal_payload=1.0)])
assert len(server.events['ven123']) == 1
def test_server_add_event_with_no_intervals():
server = OpenADRServer(vtn_id='myvtn')
with pytest.raises(ValueError):
server.add_event(ven_id='ven123',
signal_name='x-invalid',
signal_type='x-loadControlCapacity',
intervals=None)
with pytest.raises(ValueError):
server.add_event(ven_id='ven123',
signal_name='x-invalid',
signal_type='x-loadControlCapacity',
intervals=[])
with pytest.raises(ValueError):
server.add_event(ven_id='ven123',
signal_name='x-invalid',
signal_type='x-loadControlCapacity',
intervals={"dtstart": "2021-01-01"})
##########################################################################################
SERVER_PORT = 8001
VEN_NAME = 'myven'
VEN_ID = '1234abcd'
VTN_ID = "TestVTN"
VEN_CERT = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_ven.crt")
VEN_KEY = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_ven.key")
VTN_CERT = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_vtn.crt")
VTN_KEY = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_vtn.key")
CA_FILE = os.path.join(os.path.dirname(os.path.dirname(__file__)), "certificates", "dummy_ca.crt")
with open(VEN_CERT) as file:
VEN_FINGERPRINT = certificate_fingerprint(file.read())
with open(VTN_CERT) as file:
VTN_FINGERPRINT = certificate_fingerprint(file.read())
async def _on_create_party_registration(payload):
registration_id = generate_id()
payload = {'response': {'response_code': 200,
'response_description': 'OK',
'request_id': payload['request_id']},
'ven_id': VEN_ID,
'registration_id': registration_id,
'profiles': [{'profile_name': '2.0b',
'transports': {'transport_name': 'simpleHttp'}}],
'requested_oadr_poll_freq': timedelta(seconds=1)}
return 'oadrCreatedPartyRegistration', payload
async def _client_on_event(event):
pass
async def _client_on_report(report):
pass
def fingerprint_lookup(ven_id):
return VEN_FINGERPRINT
@pytest.fixture
async def start_server():
server = OpenADRServer(vtn_id=VTN_ID, http_port=SERVER_PORT)
server.add_handler('on_create_party_registration', _on_create_party_registration)
await server.run_async()
yield
await server.stop()
@pytest.fixture
async def start_server_with_signatures():
server = OpenADRServer(vtn_id=VTN_ID,
cert=VTN_CERT,
key=VTN_KEY,
http_cert=VTN_CERT,
http_key=VTN_KEY,
http_ca_file=CA_FILE,
http_port=SERVER_PORT,
fingerprint_lookup=fingerprint_lookup)
server.add_handler('on_create_party_registration', _on_create_party_registration)
await server.run_async()
yield
await server.stop()