forked from openid/python-openid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstoretest.py
397 lines (315 loc) · 12.7 KB
/
storetest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
from openid.association import Association
from openid.cryptutil import randomString
from openid.store.nonce import mkNonce, split
import unittest
import string
import time
import socket
import random
import os
db_host = 'dbtest'
allowed_handle = []
for c in string.printable:
if c not in string.whitespace:
allowed_handle.append(c)
allowed_handle = ''.join(allowed_handle)
def generateHandle(n):
return randomString(n, allowed_handle)
generateSecret = randomString
def getTmpDbName():
hostname = socket.gethostname()
hostname = hostname.replace('.', '_')
hostname = hostname.replace('-', '_')
return "%s_%d_%s_openid_test" % \
(hostname, os.getpid(), \
random.randrange(1, int(time.time())))
def testStore(store):
"""Make sure a given store has a minimum of API compliance. Call
this function with an empty store.
Raises AssertionError if the store does not work as expected.
OpenIDStore -> NoneType
"""
### Association functions
now = int(time.time())
server_url = 'http://www.myopenid.com/openid'
def genAssoc(issued, lifetime=600):
sec = generateSecret(20)
hdl = generateHandle(128)
return Association(hdl, sec, now + issued, lifetime, 'HMAC-SHA1')
def checkRetrieve(url, handle=None, expected=None):
retrieved_assoc = store.getAssociation(url, handle)
assert retrieved_assoc == expected, (retrieved_assoc, expected)
if expected is not None:
if retrieved_assoc is expected:
print ('Unexpected: retrieved a reference to the expected '
'value instead of a new object')
assert retrieved_assoc.handle == expected.handle
assert retrieved_assoc.secret == expected.secret
def checkRemove(url, handle, expected):
present = store.removeAssociation(url, handle)
assert bool(expected) == bool(present)
assoc = genAssoc(issued=0)
# Make sure that a missing association returns no result
checkRetrieve(server_url)
# Check that after storage, getting returns the same result
store.storeAssociation(server_url, assoc)
checkRetrieve(server_url, None, assoc)
# more than once
checkRetrieve(server_url, None, assoc)
# Storing more than once has no ill effect
store.storeAssociation(server_url, assoc)
checkRetrieve(server_url, None, assoc)
# Removing an association that does not exist returns not present
checkRemove(server_url, assoc.handle + 'x', False)
# Removing an association that does not exist returns not present
checkRemove(server_url + 'x', assoc.handle, False)
# Removing an association that is present returns present
checkRemove(server_url, assoc.handle, True)
# but not present on subsequent calls
checkRemove(server_url, assoc.handle, False)
# Put assoc back in the store
store.storeAssociation(server_url, assoc)
# More recent and expires after assoc
assoc2 = genAssoc(issued=1)
store.storeAssociation(server_url, assoc2)
# After storing an association with a different handle, but the
# same server_url, the handle with the later issue date is returned.
checkRetrieve(server_url, None, assoc2)
# We can still retrieve the older association
checkRetrieve(server_url, assoc.handle, assoc)
# Plus we can retrieve the association with the later issue date
# explicitly
checkRetrieve(server_url, assoc2.handle, assoc2)
# More recent, and expires earlier than assoc2 or assoc. Make sure
# that we're picking the one with the latest issued date and not
# taking into account the expiration.
assoc3 = genAssoc(issued=2, lifetime=100)
store.storeAssociation(server_url, assoc3)
checkRetrieve(server_url, None, assoc3)
checkRetrieve(server_url, assoc.handle, assoc)
checkRetrieve(server_url, assoc2.handle, assoc2)
checkRetrieve(server_url, assoc3.handle, assoc3)
checkRemove(server_url, assoc2.handle, True)
checkRetrieve(server_url, None, assoc3)
checkRetrieve(server_url, assoc.handle, assoc)
checkRetrieve(server_url, assoc2.handle, None)
checkRetrieve(server_url, assoc3.handle, assoc3)
checkRemove(server_url, assoc2.handle, False)
checkRemove(server_url, assoc3.handle, True)
checkRetrieve(server_url, None, assoc)
checkRetrieve(server_url, assoc.handle, assoc)
checkRetrieve(server_url, assoc2.handle, None)
checkRetrieve(server_url, assoc3.handle, None)
checkRemove(server_url, assoc2.handle, False)
checkRemove(server_url, assoc.handle, True)
checkRemove(server_url, assoc3.handle, False)
checkRetrieve(server_url, None, None)
checkRetrieve(server_url, assoc.handle, None)
checkRetrieve(server_url, assoc2.handle, None)
checkRetrieve(server_url, assoc3.handle, None)
checkRemove(server_url, assoc2.handle, False)
checkRemove(server_url, assoc.handle, False)
checkRemove(server_url, assoc3.handle, False)
### test expired associations
# assoc 1: server 1, valid
# assoc 2: server 1, expired
# assoc 3: server 2, expired
# assoc 4: server 3, valid
assocValid1 = genAssoc(issued=-3600,lifetime=7200)
assocValid2 = genAssoc(issued=-5)
assocExpired1 = genAssoc(issued=-7200,lifetime=3600)
assocExpired2 = genAssoc(issued=-7200,lifetime=3600)
store.cleanupAssociations()
store.storeAssociation(server_url + '1', assocValid1)
store.storeAssociation(server_url + '1', assocExpired1)
store.storeAssociation(server_url + '2', assocExpired2)
store.storeAssociation(server_url + '3', assocValid2)
cleaned = store.cleanupAssociations()
assert cleaned == 2, cleaned
### Nonce functions
def checkUseNonce(nonce, expected, server_url, msg=''):
stamp, salt = split(nonce)
actual = store.useNonce(server_url, stamp, salt)
assert bool(actual) == bool(expected), "%r != %r: %s" % (actual, expected,
msg)
for url in [server_url, '']:
# Random nonce (not in store)
nonce1 = mkNonce()
# A nonce is allowed by default
checkUseNonce(nonce1, True, url)
# Storing once causes useNonce to return True the first, and only
# the first, time it is called after the store.
checkUseNonce(nonce1, False, url)
checkUseNonce(nonce1, False, url)
# Nonces from when the universe was an hour old should not pass these days.
old_nonce = mkNonce(3600)
checkUseNonce(old_nonce, False, url, "Old nonce (%r) passed." % (old_nonce,))
old_nonce1 = mkNonce(now - 20000)
old_nonce2 = mkNonce(now - 10000)
recent_nonce = mkNonce(now - 600)
from openid.store import nonce as nonceModule
orig_skew = nonceModule.SKEW
try:
nonceModule.SKEW = 0
store.cleanupNonces()
# Set SKEW high so stores will keep our nonces.
nonceModule.SKEW = 100000
assert store.useNonce(server_url, *split(old_nonce1))
assert store.useNonce(server_url, *split(old_nonce2))
assert store.useNonce(server_url, *split(recent_nonce))
nonceModule.SKEW = 3600
cleaned = store.cleanupNonces()
assert cleaned == 2, "Cleaned %r nonces." % (cleaned,)
nonceModule.SKEW = 100000
# A roundabout method of checking that the old nonces were cleaned is
# to see if we're allowed to add them again.
assert store.useNonce(server_url, *split(old_nonce1))
assert store.useNonce(server_url, *split(old_nonce2))
# The recent nonce wasn't cleaned, so it should still fail.
assert not store.useNonce(server_url, *split(recent_nonce))
finally:
nonceModule.SKEW = orig_skew
def test_filestore():
from openid.store import filestore
import tempfile
import shutil
try:
temp_dir = tempfile.mkdtemp()
except AttributeError:
import os
temp_dir = os.tmpnam()
os.mkdir(temp_dir)
store = filestore.FileOpenIDStore(temp_dir)
try:
testStore(store)
store.cleanup()
except:
raise
else:
shutil.rmtree(temp_dir)
def test_sqlite():
from openid.store import sqlstore
try:
from pysqlite2 import dbapi2 as sqlite
except ImportError:
pass
else:
conn = sqlite.connect(':memory:')
store = sqlstore.SQLiteStore(conn)
store.createTables()
testStore(store)
def test_mysql():
from openid.store import sqlstore
try:
import MySQLdb
except ImportError:
pass
else:
db_user = 'openid_test'
db_passwd = ''
db_name = getTmpDbName()
from MySQLdb.constants import ER
# Change this connect line to use the right user and password
try:
conn = MySQLdb.connect(user=db_user, passwd=db_passwd, host = db_host)
except MySQLdb.OperationalError, why:
if why[0] == 2005:
print ('Skipping MySQL store test (cannot connect '
'to test server on host %r)' % (db_host,))
return
else:
raise
conn.query('CREATE DATABASE %s;' % db_name)
try:
conn.query('USE %s;' % db_name)
# OK, we're in the right environment. Create store and
# create the tables.
store = sqlstore.MySQLStore(conn)
store.createTables()
# At last, we get to run the test.
testStore(store)
finally:
# Remove the database. If you want to do post-mortem on a
# failing test, comment out this line.
conn.query('DROP DATABASE %s;' % db_name)
def test_postgresql():
"""
Tests the PostgreSQLStore on a locally-hosted PostgreSQL database
cluster, version 7.4 or later. To run this test, you must have:
- The 'psycopg' python module (version 1.1) installed
- PostgreSQL running locally
- An 'openid_test' user account in your database cluster, which
you can create by running 'createuser -Ad openid_test' as the
'postgres' user
- Trust auth for the 'openid_test' account, which you can activate
by adding the following line to your pg_hba.conf file:
local all openid_test trust
This test connects to the database cluster three times:
- To the 'template1' database, to create the test database
- To the test database, to run the store tests
- To the 'template1' database once more, to drop the test database
"""
from openid.store import sqlstore
try:
import psycopg
except ImportError:
pass
else:
db_name = getTmpDbName()
db_user = 'openid_test'
# Connect once to create the database; reconnect to access the
# new database.
conn_create = psycopg.connect(database = 'template1', user = db_user,
host = db_host)
conn_create.autocommit()
# Create the test database.
cursor = conn_create.cursor()
cursor.execute('CREATE DATABASE %s;' % (db_name,))
conn_create.close()
# Connect to the test database.
conn_test = psycopg.connect(database = db_name, user = db_user,
host = db_host)
# OK, we're in the right environment. Create the store
# instance and create the tables.
store = sqlstore.PostgreSQLStore(conn_test)
store.createTables()
# At last, we get to run the test.
testStore(store)
# Disconnect.
conn_test.close()
# It takes a little time for the close() call above to take
# effect, so we'll wait for a second before trying to remove
# the database. (Maybe this is because we're using a UNIX
# socket to connect to postgres rather than TCP?)
import time
time.sleep(1)
# Remove the database now that the test is over.
conn_remove = psycopg.connect(database = 'template1', user = db_user,
host = db_host)
conn_remove.autocommit()
cursor = conn_remove.cursor()
cursor.execute('DROP DATABASE %s;' % (db_name,))
conn_remove.close()
def test_memstore():
from openid.store import memstore
testStore(memstore.MemoryStore())
test_functions = [
test_filestore,
test_sqlite,
test_mysql,
test_postgresql,
test_memstore,
]
def pyUnitTests():
tests = map(unittest.FunctionTestCase, test_functions)
load = unittest.defaultTestLoader.loadTestsFromTestCase
return unittest.TestSuite(tests)
if __name__ == '__main__':
import sys
suite = pyUnitTests()
runner = unittest.TextTestRunner()
result = runner.run(suite)
if result.wasSuccessful():
sys.exit(0)
else:
sys.exit(1)