forked from City-of-Turku/tunnistamo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
100 lines (77 loc) · 3.46 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import logging
from django.contrib.auth import get_user_model
from rest_framework import serializers
from rest_framework.exceptions import APIException, PermissionDenied
from rest_framework.mixins import CreateModelMixin, DestroyModelMixin, ListModelMixin
from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import GenericViewSet
from tunnistamo.api_common import DeviceGeneratedJWTAuthentication, OidcTokenAuthentication, ScopePermission
from .helmet_requests import (
HelmetConnectionException, HelmetGeneralException, HelmetImproperlyConfiguredException, validate_patron
)
from .models import UserIdentity
logger = logging.getLogger(__name__)
User = get_user_model()
class NotImplementedResponse(APIException):
status_code = 501
default_detail = 'Not implemented.'
class ThirdPartyAuthenticationFailed(APIException):
status_code = 401
def validate_credentials_helmet(identifier, secret):
try:
result = validate_patron(identifier, secret)
except HelmetImproperlyConfiguredException as e:
logger.error(e)
raise NotImplementedResponse()
except HelmetConnectionException as e:
logger.warning('Cannot validate patron from helmet, got connection exception: {}'.format(e))
raise ThirdPartyAuthenticationFailed({
'code': 'authentication_service_unavailable',
'detail': 'Connection to authentication service timed out',
})
except HelmetGeneralException as e:
logger.warning('Cannot validate patron from helmet, got general exception: {}'.format(e))
raise ThirdPartyAuthenticationFailed({
'code': 'unidentified_error',
'detail': 'Unidentified error',
})
if not result:
raise ThirdPartyAuthenticationFailed({
'code': 'invalid_credentials',
'detail': 'Invalid user credentials',
})
class UserIdentitySerializer(serializers.ModelSerializer):
secret = serializers.CharField(write_only=True)
class Meta:
model = UserIdentity
fields = ('id', 'identifier', 'service', 'secret')
def create(self, validated_data):
instance, _ = self.Meta.model.objects.update_or_create(
service=validated_data['service'],
user=validated_data['user'],
defaults={'identifier': validated_data['identifier']}
)
return instance
class UserIdentityViewSet(ListModelMixin, CreateModelMixin, DestroyModelMixin, GenericViewSet):
queryset = UserIdentity.objects.all()
serializer_class = UserIdentitySerializer
authentication_classes = (DeviceGeneratedJWTAuthentication, OidcTokenAuthentication)
permission_classes = (IsAuthenticated, ScopePermission)
required_scopes = ('identities',)
def get_queryset(self):
return self.queryset.filter(user=self.request.user)
def list(self, request, *args, **kwargs):
response = super(UserIdentityViewSet, self).list(request, *args, **kwargs)
nonce = getattr(request.auth, 'nonce')
if nonce is not None:
response['X-Nonce'] = nonce
return response
def perform_create(self, serializer):
data = serializer.validated_data
secret = data.pop('secret')
validate_credentials_helmet(data['identifier'], secret)
serializer.save(user=self.request.user)
def perform_destroy(self, instance):
if instance.user != self.request.user:
raise PermissionDenied()
instance.delete()