-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathauthorization.py
58 lines (44 loc) · 1.87 KB
/
authorization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Utilities for API authorization."""
from typing import Optional
import casbin
import casbin_sqlalchemy_adapter
from settings import settings
# CASBIN Dependency -----------------------------------------------------------
def init_enforcer():
casbin_adapter = casbin_sqlalchemy_adapter.Adapter(settings.casbin_database_uri)
return casbin.Enforcer(settings.casbin_model, casbin_adapter)
def get_enforcer():
casbin_enforcer = init_enforcer()
yield casbin_enforcer
# try:
# yield casbin_enforcer
# finally:
# casbin_enforcer.close()
# CASBIN Helpers --------------------------------------------------------------
# TODO: Support multi-tenant mode (teams)
def check_access(enforcer: casbin.Enforcer, user, path, method):
if enforcer is None:
enforcer = init_enforcer()
return enforcer.enforce(user, path, method)
def add_policy(enforcer: casbin.Enforcer, user, path, method):
if enforcer is None:
enforcer = init_enforcer()
return enforcer.add_permission_for_user(user, path, method)
def delete_policy(enforcer: casbin.Enforcer, user='', path='', method=''):
if enforcer is None:
enforcer = init_enforcer()
return enforcer.remove_filtered_policy(0, user, path, method)
def get_policies(enforcer: casbin.Enforcer, user='', path='', method=''):
if enforcer is None:
enforcer = init_enforcer()
rules = enforcer.get_filtered_policy(0, user, path, method)
# TODO: Make sure the /protocol/* cases are handled properly.
return [{'user': rule[0], 'path': rule[1], 'method': rule[2]} for rule in rules]
def get_roles(enforcer: casbin.Enforcer, user):
if enforcer is None:
enforcer = init_enforcer()
return enforcer.get_roles_for_user(user)
def get_all_roles(enforcer: casbin.Enforcer):
if enforcer is None:
enforcer = init_enforcer()
return enforcer.get_all_roles()