forked from mail-in-a-box/free_tls_certificates
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
282 lines (228 loc) · 10.6 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# -*- coding: utf8 -*-
import unittest
import sys
import re
import os
import os.path
import time
import multiprocessing
import tempfile
import shutil
import requests.exceptions
import acme.messages
from free_tls_certificates import client
from free_tls_certificates.utils import get_certificate_domains
ACME_SERVER = "http://0.0.0.0:4000/directory"
domains = ["x1.le.wtf"] # le.wtf is coded to have a high rate limit in the default Boulder test files
if sys.version_info < (3,):
unicode_string = unicode("my unicode instance is not a bytes instance")
else:
unicode_string = "my str instance is not a bytes instance"
def run():
# Start a locally running web server that will serve
# the virtual path '/.well-known/acme-challenge/' from
# a temporary directory, with the correct content-type
# header.
tempdir = tempfile.mkdtemp()
try:
# Where should we store things?
output_dir = os.path.join(tempdir, 'output')
challenges_dir = os.path.join(tempdir, 'acme-challenges')
account_dir = os.path.join(tempdir, 'acme-account')
os.mkdir(output_dir)
os.mkdir(challenges_dir)
os.mkdir(account_dir)
# Start the domain validation server.
httpd = create_dv_server(challenges_dir)
httpd_proc = multiprocessing.Process(target=lambda : httpd.serve_forever())
httpd_proc.start()
try:
# Start the tests.
MyTest.output_dir = output_dir
MyTest.challenges_dir = challenges_dir
MyTest.account_dir = account_dir
unittest.TextTestRunner().run(unittest.defaultTestLoader.loadTestsFromTestCase(MyTest))
finally:
httpd_proc.terminate()
finally:
shutil.rmtree(tempdir)
class MyTest(unittest.TestCase):
def do_issue(self, domains=domains, validation_method=client.HTTPValidation(port=5002), **kwargs):
client.issue_certificate(
domains,
self.account_dir,
validation_method=validation_method,
certificate_file=os.path.join(self.output_dir, "certificate.crt"),
certificate_chain_file=os.path.join(self.output_dir, "chain.crt"),
acme_server=ACME_SERVER,
**kwargs)
# This method needs to occur first because the other tests depend on the
# ACME terms of service being agreed to, so we use two _'s to make it
# lexicographically first.
def test__main(self):
# Call the first time. It raises an exception telling us the
# URL to the terms of service agreement the user needs to agree to.
with self.assertRaises(client.NeedToAgreeToTOS) as cm:
self.do_issue()
tos_url = cm.exception.url
# Now agree. But it'll raise an exception telling us we need
# to make a file available at a certain URL.
with self.assertRaises(client.NeedToTakeAction) as cm:
self.do_issue(agree_to_tos_url=tos_url)
actions = cm.exception.actions
# It should give us as many actions as domains we asked to verify.
self.assertEqual(len(actions), len(domains))
for action in actions:
# Check that each action is a HTTP validation file request.
self.assertIsInstance(action, client.NeedToInstallFile)
self.assertTrue(re.match(r"http://[^/]+/.well-known/acme-challenge/", action.url))
self.assertTrue(re.match(r"^[A-Za-z0-9\._-]{60,100}$", action.contents))
self.assertTrue(re.match(r"^[A-Za-z0-9_-]{40,50}$", action.file_name))
# Create the file so we can pass validation. We write it to the
# directory that our local HTTP server is serving.
fn = os.path.join(self.challenges_dir, action.file_name)
with open(fn, 'w') as f:
f.write(action.contents)
# Try to get the certificate again, but it'll tell us to wait while
# the ACME server processes the request.
with self.assertRaises(client.WaitABit) as cm:
self.do_issue()
# Now actually wait until the certificate is issued.
while True:
try:
# Try to get the certificate again.
self.do_issue()
# Success.
break
except client.WaitABit:
time.sleep(1)
continue
# Check that the certificate is valid.
cert = load_cert_chain(os.path.join(self.output_dir, 'certificate.crt'))
self.assertEqual(len(cert), 1) # one element in certificate file
cert_domains = get_certificate_domains(cert[0])
self.assertEqual(cert_domains[0], domains[0])
self.assertEqual(set(cert_domains), set(domains))
# Check that the chain is valid.
chain = load_cert_chain(os.path.join(self.output_dir, 'chain.crt'))
self.assertEqual(len(chain), 1) # one element in chain
chain_names = get_certificate_domains(chain[0])
self.assertEqual(chain_names[0], 'happy hacker fake CA')
# Check that the certificate is signed by the first element in the chain.
self.assertEqual(cert[0].issuer, chain[0].subject)
def test_i8n_domain(self):
domains = [u"tëst.le.wtf", u"tëst2.le.wtf"]
# The main test already agreed to the TOS...
# Get the challenge details.
with self.assertRaises(client.InvalidDomainName) as cm:
self.do_issue(domains=domains)
# LE doesn't yet support internationalized domains, but we should get
# back this error telling us.
self.assertIn("Internationalized domain names", str(cm.exception))
def test_invalid_domain(self):
# TOS is already agreed to by main test.
with self.assertRaises(client.InvalidDomainName) as cm:
self.do_issue(domains=["test.invalid"])
def test_challenge_fails(self):
# Submit a challenge immediately, even though we haven't yet
# installed a file.
vm = validation_method=client.HTTPValidation(port=5002, verify_first=False)
with self.assertRaises(client.WaitABit) as cm:
self.do_issue(domains=["fail.le.wtf"], validation_method=vm)
# Give the Boulder server a chance to evaluate the challenge
# and go from pending status to invalid status.
time.sleep(5)
# Try to issue, but it will fail now.
with self.assertRaises(client.ChallengeFailed):
self.do_issue(domains=["fail.le.wtf"], validation_method=vm)
# The failed challenge is removed from the cache so that further
# attempts from scratch aren't blocked.
# Get a new challenge. Write the challenge response file.
with self.assertRaises(client.NeedToTakeAction) as cm:
self.do_issue(domains=["fail.le.wtf"])
for action in cm.exception.actions:
fn = os.path.join(self.challenges_dir, action.file_name)
with open(fn, 'w') as f:
f.write(action.contents)
# Submit and wait.
with self.assertRaises(client.WaitABit) as cm:
self.do_issue(domains=["fail.le.wtf"], validation_method=vm)
# Get the certificate.
while True:
try:
# Try to get the certificate again.
self.do_issue(domains=["fail.le.wtf"])
break
except client.WaitABit:
time.sleep(1)
continue
def test_invalid_private_key_argument(self):
# We're already authorized by the main test to issue the certificate.
with self.assertRaises(ValueError):
self.do_issue(private_key=unicode_string)
def test_invalid_csr_argument(self):
# We're already authorized by the main test to issue the certificate.
with self.assertRaises(ValueError):
self.do_issue(csr=unicode_string)
def test_driver(self):
# Run the driver program to issue the certificate.
import subprocess
subprocess.check_call([
sys.executable, "free_tls_certificates/driver.py",
"--server", ACME_SERVER,
] + domains + [
os.path.join(self.output_dir, 'driver_private.key'),
os.path.join(self.output_dir, 'driver_certificate.crt'),
self.challenges_dir,
self.account_dir,
], env={ "PYTHONPATH": ".:" + ":".join(sys.path) })
# Check that the private key was written.
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'driver_private.key')))
# Check that the certificate is valid.
cert = load_cert_chain(os.path.join(self.output_dir, 'driver_certificate.crt'))
self.assertEqual(len(cert), 2) # two elements in chain
cert_domains = get_certificate_domains(cert[0])
self.assertEqual(cert_domains[0], domains[0])
self.assertEqual(set(cert_domains), set(domains))
# Check that the chain is valid.
chain_names = get_certificate_domains(cert[1])
self.assertEqual(chain_names[0], 'happy hacker fake CA')
# Check that the certificate is signed by the first element in the chain.
self.assertEqual(cert[0].issuer, cert[1].subject)
def load_cert_chain(pemfile):
from free_tls_certificates.utils import load_certificate
return load_certificate(pemfile, with_chain=True)
def create_dv_server(challenges_dir):
# We need a simple HTTP server to respond to
# Boulder's domain validation requests.
import os.path
root_path = "/.well-known/acme-challenge/"
def translate_path(path):
if path.startswith(root_path):
# Strip the well-known prefix so we serve only
# that directory.
path = path[len(root_path):]
fn = os.path.join(challenges_dir, path)
return fn
if sys.version_info < (3,):
from BaseHTTPServer import BaseHTTPRequestHandler
from SocketServer import TCPServer as HTTPServer
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
fn = translate_path(self.path)
if os.path.exists(fn):
self.send_response(200)
self.end_headers()
with open(fn) as f:
self.wfile.write(f.read())
else:
self.send_error(404)
else:
from http.server import SimpleHTTPRequestHandler
from http.server import HTTPServer
class Handler(SimpleHTTPRequestHandler):
def translate_path(self, path):
return translate_path(path)
return HTTPServer(('', 5002), Handler)
if __name__ == "__main__":
run()