forked from python/pythondotorg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresources.py
123 lines (97 loc) · 3.89 KB
/
resources.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from tastypie.authentication import ApiKeyAuthentication
from tastypie.authorization import Authorization
from tastypie.exceptions import Unauthorized
from tastypie.http import HttpUnauthorized
from tastypie.resources import ModelResource
from tastypie.throttle import CacheThrottle
from django.contrib.auth import get_user_model
class ApiKeyOrGuestAuthentication(ApiKeyAuthentication):
def _unauthorized(self):
# Allow guests anyway
return True
def is_authenticated(self, request, **kwargs):
"""
Copypasted from tastypie, modified to avoid issues with app-loading and
custom user model.
"""
User = get_user_model()
username_field = User.USERNAME_FIELD
try:
username, api_key = self.extract_credentials(request)
except ValueError:
return self._unauthorized()
if not username or not api_key:
return self._unauthorized()
try:
lookup_kwargs = {username_field: username}
user = User.objects.get(**lookup_kwargs)
except (User.DoesNotExist, User.MultipleObjectsReturned):
return self._unauthorized()
if not self.check_active(user):
return False
key_auth_check = self.get_key(user, api_key)
if key_auth_check and not isinstance(key_auth_check, HttpUnauthorized):
request.user = user
return key_auth_check
def get_identifier(self, request):
if request.user.is_authenticated:
return super().get_identifier(request)
else:
# returns a combination of IP address and hostname.
return "%s_%s" % (request.META.get('REMOTE_ADDR', 'noaddr'), request.META.get('REMOTE_HOST', 'nohost'))
def check_active(self, user):
return True
class StaffAuthorization(Authorization):
"""
Everybody can read everything. Staff users can write everything.
"""
def read_list(self, object_list, bundle):
# Everybody can read
return object_list
def read_detail(self, object_list, bundle):
# Everybody can read
return True
def create_list(self, object_list, bundle):
if bundle.request.user.is_staff:
return object_list
else:
raise Unauthorized("Operation restricted to staff users.")
def create_detail(self, object_list, bundle):
return bundle.request.user.is_staff
def update_list(self, object_list, bundle):
if bundle.request.user.is_staff:
return object_list
else:
raise Unauthorized("Operation restricted to staff users.")
def update_detail(self, object_list, bundle):
return bundle.request.user.is_staff
def delete_list(self, object_list, bundle):
if not bundle.request.user.is_staff:
raise Unauthorized("Operation restricted to staff users.")
else:
return object_list
def delete_detail(self, object_list, bundle):
if not bundle.request.user.is_staff:
raise Unauthorized("Operation restricted to staff users.")
else:
return True
class OnlyPublishedAuthorization(StaffAuthorization):
"""
Only staff users can see unpublished objects.
"""
def read_list(self, object_list, bundle):
if not bundle.request.user.is_staff:
return object_list.filter(is_published=True)
else:
return super().read_list(object_list, bundle)
def read_detail(self, object_list, bundle):
if not bundle.request.user.is_staff:
return bundle.obj.is_published
else:
return super().read_detail(object_list, bundle)
class GenericResource(ModelResource):
class Meta:
authentication = ApiKeyOrGuestAuthentication()
authorization = StaffAuthorization()
throttle = CacheThrottle(throttle_at=600) # default is 150 req/hr
abstract = True