-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathapp.py
572 lines (450 loc) · 17.8 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
from quart import request
from ldap.modlist import addModlist, modifyModlist
import asyncio, base64, quart, functools, io, ldap, ldif, sys, types
from typing import *
app = quart.Quart(__name__, static_folder='dist')
app.config.from_object('settings')
# Constant to add technical attributes in LDAP search results
WITH_OPERATIONAL_ATTRS = ('*','+')
# HTTP 401 headers
UNAUTHORIZED = { 'WWW-Authenticate': 'Basic realm="Login Required", charset="UTF-8"' }
# Special fields
PHOTO = ['jpegPhoto', 'thumbnailPhoto']
PASSWORDS = ('userPassword',)
# Special syntaxes
OCTET_STRING = '1.3.6.1.4.1.1466.115.121.1.40'
def connected(view: Callable):
'Connect to LDAP'
@functools.wraps(view)
async def wrapped_view(**values):
try:
# Set up LDAP connection
url = app.config['LDAP_URL']
request.ldap = ldap.initialize(url)
# #43 TLS, see https://stackoverflow.com/a/8795694
if app.config['USE_TLS'] or app.config['INSECURE_TLS']:
cert_level = ldap.OPT_X_TLS_NEVER \
if app.config['INSECURE_TLS'] \
else ldap.OPT_X_TLS_DEMAND
request.ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, cert_level)
# See https://stackoverflow.com/a/38136255
request.ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
if not url.startswith('ldaps://'): request.ldap.start_tls_s()
# On success, call the view function and release connection
data = await view(**values)
request.ldap.unbind_s()
return data
except ldap.LDAPError as err:
args = err.args[0]
quart.abort(500, args.get('info', '')
+ ': ' + args.get('desc', ''))
return wrapped_view
def authenticated(view: Callable):
'Require simple_bind() authentication for a view'
@functools.wraps(view)
async def wrapped_view(**values):
get_dn = app.config['GET_BIND_DN']
dn = get_dn(request.authorization) if get_dn else None
try:
if dn is None and request.authorization is None:
raise ldap.INVALID_CREDENTIALS()
# Try authenticating
get_passwd = app.config['GET_BIND_PASSWORD']
await empty(request.ldap.simple_bind(
dn or await search_user_by_attr(),
get_passwd(request.authorization)))
# On success, call the view function
return await view(**values)
except ldap.INVALID_CREDENTIALS:
return quart.Response(
'Please log in', 401, UNAUTHORIZED)
return wrapped_view
async def search_user_by_attr():
'Search for the login DN with a given user name'
try:
get_dn_filter = app.config['GET_BIND_DN_FILTER']
dn, _attrs = await unique(request.ldap.search(
app.config['BASE_DN'],
ldap.SCOPE_SUBTREE,
get_dn_filter(request.authorization)))
return dn
except ValueError:
raise ldap.INVALID_CREDENTIALS({
'desc': 'Invalid user',
'info': "User '%s' unknown" % request.authorization.username})
def no_cache(view: types.FunctionType) -> quart.Response:
'View decorator to prevent browser caching. Must precede @api.'
@functools.wraps(view)
async def wrapped_view(**values) -> quart.Response:
resp = await view(**values)
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
resp.headers['Pragma'] = 'no-cache'
resp.headers['Expires'] = '0'
return resp
return wrapped_view
def api(view: Callable) -> quart.Response:
''' View decorator for JSON endpoints.
Forces authentication.
'''
@functools.wraps(view)
async def wrapped_view(**values) -> quart.Response:
data = await connected(authenticated(view))(**values)
if type(data) is not quart.Response:
data = quart.jsonify(data)
return data
return wrapped_view
async def result(msgid: int,) -> AsyncGenerator[Tuple[str, Dict[str, List[bytes]]], None]:
'Concurrently gather results'
while True:
r_type, r_data = request.ldap.result(msgid=msgid, all=0, timeout=0)
# Throttle to 100 results / second
if r_type is None: await asyncio.sleep(0.01)
elif r_data == []: break
else: yield r_data[0]
async def unique(msgid: int) -> Tuple[str, Dict[str, List[bytes]]]:
'Concurrently collect a unique result'
res = None
async for r in result(msgid):
if res is None: res = r
else:
request.ldap.abandon(msgid)
raise ValueError("Expected unique result")
if res is None:
raise ValueError("Expected unique result")
return res
async def empty(msgid: int) -> None:
'Concurrently wait for an empty result'
async for r in result(msgid):
request.ldap.abandon(msgid)
raise ValueError("Unexpected result")
@app.route('/')
async def index() -> quart.Response:
'Serve the main page'
return await static_file('index.html')
@app.route('/<path:filename>')
async def static_file(filename: str) -> quart.Response:
'Serve static assets'
return await app.send_static_file(filename)
@app.route('/api/whoami')
@no_cache
@api
async def whoami() -> str:
'DN of the current user'
return request.ldap.whoami_s().replace('dn:', '')
@app.route('/api/tree/<path:basedn>')
@no_cache
@api
async def tree(basedn: str) -> List[Dict[str, Any]]:
'List directory entries'
scope = ldap.SCOPE_ONELEVEL
if basedn == 'base':
scope = ldap.SCOPE_BASE
basedn = app.config['BASE_DN']
return await _tree(basedn, scope)
async def _tree(basedn: str, scope: int) -> List[Dict[str, Any]]:
'Get all nodes below a DN (including the DN) within the given scope'
return [ { 'dn': dn,
'structuralObjectClass' : attrs['structuralObjectClass'][0].decode(),
'hasSubordinates': b'TRUE' == attrs['hasSubordinates'][0] }
async for dn, attrs in result(request.ldap.search(
basedn, scope, attrlist=WITH_OPERATIONAL_ATTRS)) ]
def _entry(res: Tuple[str, Any]) -> Dict[str, Any]:
'Prepare an LDAP entry for transmission'
dn, attrs = res
ocs = set([oc.decode() for oc in attrs['objectClass']])
must_attrs, _may_attrs = app.schema.attribute_types(ocs)
soc = [oc.names[0]
for oc in map(lambda o: app.schema.get_obj(ldap.schema.models.ObjectClass, o), ocs)
if oc.kind == 0]
aux = set(app.schema.get_obj(ldap.schema.models.ObjectClass, a).names[0]
for a in app.schema.get_applicable_aux_classes(soc[0]))
#23 suppress userPassword
if 'userPassword' in attrs:
attrs['userPassword'] = [b'*****']
# Filter out binary attributes
binary = set()
for attr in attrs:
obj = app.schema.get_obj(ldap.schema.models.AttributeType, attr)
# Octet strings are not used consistently.
# Try to decode as text and treat as binary on failure
if not obj.syntax or obj.syntax == OCTET_STRING:
try:
for val in attrs[attr]:
assert val.decode().isprintable()
except:
binary.add(attr)
else: # Check human-readable flag in schema
syntax = app.schema.get_obj(ldap.schema.models.LDAPSyntax, obj.syntax)
if syntax.not_human_readable: binary.add(attr)
return {
'attrs': { k: [base64.b64encode(val).decode()
if k in binary else val.decode()
for val in values]
for k, values in attrs.items() },
'meta': {
'dn': dn,
'required': [app.schema.get_obj(ldap.schema.models.AttributeType, a).names[0]
for a in must_attrs],
'aux': sorted(aux - ocs),
'binary': sorted(binary),
'hints': {},
'autoFilled': [],
}
}
@app.route('/api/entry/<path:dn>', methods=('GET', 'POST', 'DELETE', 'PUT'))
@no_cache
@api
async def entry(dn: str) -> Optional[dict]:
'Edit directory entries'
if request.is_json:
json = await request.get_json()
# Copy JSON payload into a dictionary of non-empty byte strings
req = { k: [s.encode() for s in filter(None, v)]
for k,v in json.items()
if k not in PHOTO
and (k not in PASSWORDS or request.method == 'PUT') }
if request.method == 'GET':
try:
return _entry(await unique(request.ldap.search(dn, ldap.SCOPE_BASE)))
except ValueError:
return None
elif request.method == 'POST':
# Get previous values from directory
res = await unique(request.ldap.search(dn, ldap.SCOPE_BASE))
mods = { k: v for k, v in res[1].items() if k in req }
modlist = modifyModlist(mods, req)
if modlist: # Apply changes and send changed keys back
await empty(request.ldap.modify(dn, modlist))
return { 'changed' : sorted(set(m[1] for m in modlist)) }
elif request.method == 'PUT':
# Create new object
modlist = addModlist(req)
if modlist: await empty(request.ldap.add(dn, modlist))
return { 'changed' : ['dn'] } # Dummy
elif request.method == 'DELETE':
for entry in reversed(sorted(await _tree(dn, ldap.SCOPE_SUBTREE), key=_dn_order)):
await empty(request.ldap.delete(entry['dn']))
return None # for mypy
@app.route('/api/blob/<attr>/<int:index>/<path:dn>', methods=('GET', 'DELETE', 'PUT'))
@no_cache
@api
async def blob(attr: str, index: int, dn: str):
try:
_dn, attrs = await unique(request.ldap.search(dn, ldap.SCOPE_BASE))
except ValueError:
quart.abort(404)
if request.method == 'GET':
if attr not in attrs or len(attrs[attr]) <= index:
quart.abort(404)
resp = quart.Response(attrs[attr][index],
content_type='application/octet-stream')
resp.headers['Content-Disposition'] = \
'attachment; filename="%s-%d.bin"' % (attr, index)
return resp
elif request.method == 'PUT':
data = [(await request.files)['blob'].read()]
if attr in attrs: await empty(
request.ldap.modify(dn, [(1, attr, None), (0, attr, data + attrs[attr])]))
else: await empty(request.ldap.modify(dn, [(0, attr, data)]))
elif request.method == 'DELETE':
if attr not in attrs or len(attrs[attr]) <= index:
quart.abort(404)
await empty(request.ldap.modify(dn, [(1, attr, None)]))
data = attrs[attr][:index] + attrs[attr][index + 1:]
if data: await empty(request.ldap.modify(dn, [(0, attr, data)]))
return { 'changed' : [attr] } # dummy
@app.route('/api/ldif/<path:dn>')
@no_cache
@connected
@authenticated
async def ldifDump(dn: str) -> quart.Response:
'Dump an entry as LDIF'
out = io.StringIO()
writer = ldif.LDIFWriter(out)
async for dn, attrs in result(request.ldap.search(dn, ldap.SCOPE_SUBTREE)):
writer.unparse(dn, attrs)
resp = quart.Response(out.getvalue(), content_type='text/plain')
resp.headers['Content-Disposition'] = \
'attachment; filename="%s.ldif"' % dn.split(',')[0].split('=')[1]
return resp
class LDIFReader(ldif.LDIFParser):
def __init__(self, input, con):
ldif.LDIFParser.__init__(self, io.BytesIO(input))
self.count = 0
self.con = con
def handle(self, dn, entry):
self.con.add_s(dn, addModlist(entry))
self.count += 1
@app.route('/api/ldif', methods=('POST',))
@no_cache
@api
async def ldifUpload() -> quart.Response:
'Import LDIF'
reader = LDIFReader(await request.data, request.ldap)
reader.parse()
return reader.count
@app.route('/api/rename', methods=('POST',))
@no_cache
@api
async def rename() -> None:
'Rename an entry'
args = await request.get_json()
await empty(request.ldap.rename(args['dn'], args['rdn'], delold=0))
return 'OK'
def _ename(entry: dict) -> Optional[str]:
'Try to extract a CN'
return entry['cn'][0].decode() if 'cn' in entry and entry['cn'] else None
@app.route('/api/entry/password/<path:dn>', methods=('POST',))
@no_cache
@api
async def passwd(dn: str) -> Optional[bool]:
'Edit directory entries'
if request.is_json:
args = await request.get_json()
if 'check' in args:
try:
con = ldap.initialize(app.config['LDAP_URL'])
con.simple_bind_s(dn, args['check'])
con.unbind_s()
return True
except ldap.INVALID_CREDENTIALS:
return False
elif 'new1' in args:
if args['new1']:
await empty(request.ldap.passwd(dn, args.get('old') or None, args['new1']))
_dn, attrs = await unique(
request.ldap.search(dn, ldap.SCOPE_BASE))
return attrs['userPassword'][0].decode()
else:
await empty(request.ldap.modify(dn, [(1, 'userPassword', None)]))
return ''
return None # mypy
@app.route('/api/search/<path:query>')
@no_cache
@api
async def search(query: str) -> List[dict]:
'Search the directory'
patterns = app.config['SEARCH_PATTERNS']
if len(query) < app.config['SEARCH_QUERY_MIN']: return []
if '=' in query: # Search specific attributes
if '(' not in query: query = '(%s)' % query
else: # Build default query
query = '(|%s)' % ''.join(p % query for p in patterns)
# Collect results
res : List[dict] = []
async for dn, attrs in result(request.ldap.search(
app.config['BASE_DN'], ldap.SCOPE_SUBTREE, query)):
res.append({ 'dn': dn, 'name': _ename(attrs) or dn })
if len(res) >= app.config['SEARCH_MAX']: break
return res
def _dn_order(node):
'Reverse DN parts for tree ordering'
return tuple(reversed(node['dn'].lower().split(',')))
@app.route('/api/subtree/<path:dn>')
@no_cache
@api
async def subtree(dn: str) -> List[str]:
'List the subtree below a dn'
result, start = [], len(dn.split(','))
for node in sorted(await _tree(dn, ldap.SCOPE_SUBTREE), key=_dn_order):
if node['dn'] == dn: continue
node['level'] = len(node['dn'].split(',')) - start
result.append(node)
return result
@app.route('/api/range/<attribute>')
@no_cache
@api
async def attribute_range(attribute: str) -> List[int]:
'List all values for a numeric attribute of an objectClass like uidNumber or gidNumber'
res = set()
async for dn, attrs in result(request.ldap.search(
app.config['BASE_DN'], ldap.SCOPE_SUBTREE, '(%s=*)' % attribute, attrlist=(attribute,))):
res.add(int(attrs[attribute][0]))
if not res: return {}
minimum, maximum = min(res), max(res)
return { 'min' : minimum, 'max': maximum,
'next' : min(set(range(minimum, maximum + 2)) - res) }
### LDAP Schema ###
app.schema = None
def _schema(schema_class):
'Get all objects from the schema for type'
for oid in app.schema.listall(schema_class):
obj = app.schema.get_obj(schema_class, oid)
if schema_class is ldap.schema.models.LDAPSyntax or not obj.obsolete:
yield obj
def _el(obj) -> dict:
'Basic information about an schema element'
name = obj.names[0]
return {
'oid' : obj.oid,
'name' : name[:1].lower() + name[1:],
'names' : obj.names,
'desc' : obj.desc,
'obsolete' : bool(obj.obsolete),
'sup' : sorted(obj.sup),
}
# Object class constants
SCHEMA_OC_KIND = {
0: 'structural',
1: 'abstract',
2: 'auxiliary',
}
def _oc(obj) -> dict:
'Additional information about an object class'
r = _el(obj)
r.update({
'may' : sorted(obj.may),
'must' : sorted(obj.must),
'kind' : SCHEMA_OC_KIND[obj.kind]
})
return r
# Attribute usage constants
SCHEMA_ATTR_USAGE = {
0: 'userApplications',
1: 'directoryOperation',
2: 'distributedOperation',
3: 'dSAOperation',
}
def _at(obj) -> dict:
'Additional information about an attribute'
r = _el(obj)
r.update({
'single_value' : bool(obj.single_value),
'no_user_mod' : bool(obj.no_user_mod),
'usage' : SCHEMA_ATTR_USAGE[obj.usage],
# FIXME avoid null values below
'equality' : obj.equality,
'syntax' : obj.syntax,
'substr' : obj.substr,
'ordering' : obj.ordering,
})
return r
def _syntax(obj) -> dict:
'Additional information about an attribute syntax'
return {
'oid' : obj.oid,
'desc' : obj.desc,
'not_human_readable' : bool(obj.not_human_readable),
}
def _dict(key: str, items) -> dict:
'Create an dictionary with a given key'
return { obj[key].lower() : obj for obj in items }
@app.route('/api/schema')
@no_cache
@api
async def schema() -> dict:
'Dump the schema'
# Load schema into the app
if app.schema is None:
# See: https://hub.packtpub.com/python-ldap-applications-part-4-ldap-schema/
_dn, subschema_entry = await unique(
request.ldap.search(app.config['SCHEMA_DN'],
ldap.SCOPE_BASE, attrlist=WITH_OPERATIONAL_ATTRS))
# See: https://www.python-ldap.org/en/latest/reference/ldap-schema.html
app.schema = ldap.schema.SubSchema(subschema_entry, check_uniqueness=2)
return dict(attributes = _dict('name', map(_at,
_schema(ldap.schema.models.AttributeType))),
objectClasses = _dict('name', map(_oc,
_schema(ldap.schema.models.ObjectClass))),
syntaxes = _dict('oid', map(_syntax,
_schema(ldap.schema.models.LDAPSyntax))))