forked from OpenLEADR/openleadr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_certificates.py
134 lines (116 loc) · 5.56 KB
/
test_certificates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import asyncio
import pytest
import os
from functools import partial
from openleadr import OpenADRServer, OpenADRClient, enable_default_logging
from openleadr.utils import certificate_fingerprint
from openleadr import errors
from async_timeout import timeout
enable_default_logging()
CA_CERT = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'certificates', 'dummy_ca.crt')
VTN_CERT = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'certificates', 'dummy_vtn.crt')
VTN_KEY = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'certificates', 'dummy_vtn.key')
VEN_CERT = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'certificates', 'dummy_ven.crt')
VEN_KEY = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'certificates', 'dummy_ven.key')
with open(VEN_CERT) as file:
ven_fingerprint = certificate_fingerprint(file.read())
with open(VTN_CERT) as file:
vtn_fingerprint = certificate_fingerprint(file.read())
async def lookup_fingerprint(ven_id):
return ven_fingerprint
async def on_create_party_registration(payload, future):
if payload['fingerprint'] != ven_fingerprint:
raise errors.FingerprintMismatch("The fingerprint of your TLS connection does not match the expected fingerprint. Your VEN is not allowed to register.")
else:
future.set_result(True)
return 'ven1234', 'reg5678'
@pytest.mark.asyncio
@pytest.mark.parametrize("disable_signature", [False, True])
async def test_ssl_certificates(disable_signature):
loop = asyncio.get_event_loop()
registration_future = loop.create_future()
server = OpenADRServer(vtn_id='myvtn',
http_cert=VTN_CERT,
http_key=VTN_KEY,
http_ca_file=CA_CERT,
cert=VTN_CERT,
key=VTN_KEY,
fingerprint_lookup=lookup_fingerprint)
server.add_handler('on_create_party_registration', partial(on_create_party_registration,
future=registration_future))
await server.run_async()
#await asyncio.sleep(1)
# Run the client
client = OpenADRClient(ven_name='myven',
vtn_url='https://localhost:8080/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
ca_file=CA_CERT,
vtn_fingerprint=vtn_fingerprint, disable_signature=disable_signature)
await client.run()
# Wait for the registration to be triggered
result = await asyncio.wait_for(registration_future, 1.0)
assert client.registration_id == 'reg5678'
await client.stop()
await server.stop()
#await asyncio.sleep(0)
@pytest.mark.asyncio
async def test_ssl_certificates_wrong_cert():
loop = asyncio.get_event_loop()
registration_future = loop.create_future()
server = OpenADRServer(vtn_id='myvtn',
http_cert=VTN_CERT,
http_key=VTN_KEY,
http_ca_file=CA_CERT,
cert=VTN_CERT,
key=VTN_KEY,
fingerprint_lookup=lookup_fingerprint)
server.add_handler('on_create_party_registration', partial(on_create_party_registration,
future=registration_future))
await server.run_async()
#await asyncio.sleep(1)
# Run the client
client = OpenADRClient(ven_name='myven',
vtn_url='https://localhost:8080/OpenADR2/Simple/2.0b',
cert=VTN_CERT,
key=VTN_KEY,
ca_file=CA_CERT,
vtn_fingerprint=vtn_fingerprint)
await client.run()
# Wait for the registration to be triggered
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(registration_future, timeout=0.5)
assert client.registration_id is None
await client.stop()
await server.stop()
await asyncio.sleep(0)
@pytest.mark.asyncio
async def test_ssl_certificates_wrong_fingerprint(caplog):
loop = asyncio.get_event_loop()
registration_future = loop.create_future()
server = OpenADRServer(vtn_id='myvtn',
http_cert=VTN_CERT,
http_key=VTN_KEY,
http_ca_file=CA_CERT,
cert=VTN_CERT,
key=VTN_KEY,
fingerprint_lookup=lookup_fingerprint)
server.add_handler('on_create_party_registration', partial(on_create_party_registration,
future=registration_future))
await server.run_async()
#await asyncio.sleep(1)
# Run the client
client = OpenADRClient(ven_name='myven',
vtn_url='https://localhost:8080/OpenADR2/Simple/2.0b',
cert=VEN_CERT,
key=VEN_KEY,
ca_file=CA_CERT,
vtn_fingerprint='00:11:22:33:44:55:66:77:88:99')
await client.run()
# Wait for the registration to be triggered
result = await asyncio.wait_for(registration_future, 1.0)
assert client.registration_id is None
assert ("The certificate fingerprint was incorrect. Expected: 00:11:22:33:44:55:66:77:88:99; "
"Received: 39:0F:7F:45:84:D6:24:49:FE:B7. Ignoring message.") in [rec.message for rec in caplog.records]
await client.stop()
await server.stop()