forked from City-of-Turku/tunnistamo
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Remove ApiScope.get_data_for_request and CombinedApiScopeData, since they are not needed anymore. * Add oidc_apis.api_tokens containing get_api_tokens_by_access_token function. * Move the OIDC Claims classes to oidc_apis * Remove TunnistamoTokenModule. It's not needed anymore, since we don't have to modify the ID Token. * Add an endpoint for getting the API tokens. * Do also some minor clean-ups.
- Loading branch information
1 parent
9338ebf
commit 939c574
Showing
8 changed files
with
223 additions
and
196 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import datetime | ||
from collections import defaultdict | ||
|
||
from django.utils import timezone | ||
from oidc_provider.lib.utils.token import create_id_token, encode_id_token | ||
|
||
from .models import ApiScope | ||
from .scopes import get_userinfo_by_scopes | ||
|
||
|
||
def get_api_tokens_by_access_token(token, request=None): | ||
""" | ||
Get API Tokens for given Access Token. | ||
:type token: oidc_provider.models.Token | ||
:param token: Access Token to get the API tokens for | ||
:type request: django.http.HttpRequest|None | ||
:param request: Optional request object for resolving issuer URLs | ||
:rtype: dict[str,str] | ||
:return: Dictionary of the API tokens with API identifer as the key | ||
""" | ||
# Limit scopes to known and allowed API scopes | ||
known_api_scopes = ApiScope.objects.by_identifiers(token.scope) | ||
allowed_api_scopes = known_api_scopes.allowed_for_client(token.client) | ||
|
||
# Group API scopes by the API identifiers | ||
scopes_by_api = defaultdict(list) | ||
for api_scope in allowed_api_scopes: | ||
scopes_by_api[api_scope.api.identifier].append(api_scope) | ||
|
||
return { | ||
api_identifier: generate_api_token(scopes, token, request) | ||
for (api_identifier, scopes) in scopes_by_api.items() | ||
} | ||
|
||
|
||
def generate_api_token(api_scopes, token, request=None): | ||
assert api_scopes | ||
api = api_scopes[0].api | ||
req_scopes = api.required_scopes | ||
userinfo = get_userinfo_by_scopes(token.user, req_scopes, token.client) | ||
id_token = create_id_token(token.user, aud=api.identifier, request=request) | ||
payload = {} | ||
payload.update(userinfo) | ||
payload.update(id_token) | ||
payload.update(_get_api_authorization_claims(api_scopes)) | ||
payload['exp'] = _get_api_token_expires_at(token) | ||
return encode_id_token(payload, token.client) | ||
|
||
|
||
def _get_api_authorization_claims(api_scopes): | ||
claims = defaultdict(list) | ||
for api_scope in api_scopes: | ||
field = api_scope.api.domain.identifier | ||
claims[field].append(api_scope.relative_identifier) | ||
return dict(claims) | ||
|
||
|
||
def _get_api_token_expires_at(token): | ||
# TODO: Should API tokens have a separate expire time? | ||
return int(_datetime_to_timestamp(token.expires_at)) | ||
|
||
|
||
def _datetime_to_timestamp(dt): | ||
if timezone.is_naive(dt): | ||
tz = timezone.get_default_timezone() | ||
dt = timezone.make_aware(dt, tz) | ||
return (dt - _EPOCH).total_seconds() | ||
|
||
|
||
_EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from django.utils.translation import ugettext_lazy as _ | ||
from oidc_provider.lib.claims import ScopeClaims, StandardScopeClaims | ||
|
||
from .models import ApiScope | ||
from .utils import combine_uniquely | ||
|
||
|
||
class ApiScopeClaims(ScopeClaims): | ||
@classmethod | ||
def get_scopes_info(cls, scopes=[]): | ||
scopes_by_identifier = { | ||
api_scope.identifier: api_scope | ||
for api_scope in ApiScope.objects.by_identifiers(scopes) | ||
} | ||
api_scopes = (scopes_by_identifier.get(scope) for scope in scopes) | ||
return [ | ||
{ | ||
'scope': api_scope.identifier, | ||
'name': api_scope.name, | ||
'description': api_scope.description, | ||
} | ||
for api_scope in api_scopes if api_scope | ||
] | ||
|
||
|
||
class GithubUsernameScopeClaims(ScopeClaims): | ||
info_github_username = ( | ||
_("GitHub username"), _("Access to your GitHub username.")) | ||
|
||
def scope_github_username(self): | ||
social_accounts = self.user.socialaccount_set | ||
github_account = social_accounts.filter(provider='github').first() | ||
if not github_account: | ||
return {} | ||
github_data = github_account.extra_data | ||
return { | ||
'github_username': github_data.get('login'), | ||
} | ||
|
||
|
||
class CombinedScopeClaims(ScopeClaims): | ||
combined_scope_claims = [ | ||
StandardScopeClaims, | ||
GithubUsernameScopeClaims, | ||
ApiScopeClaims, | ||
] | ||
|
||
@classmethod | ||
def get_scopes_info(cls, scopes=[]): | ||
extended_scopes = cls._extend_scope(scopes) | ||
scopes_info_map = {} | ||
for claim_cls in cls.combined_scope_claims: | ||
for info in claim_cls.get_scopes_info(extended_scopes): | ||
scopes_info_map[info['scope']] = info | ||
return [ | ||
scopes_info_map[scope] | ||
for scope in extended_scopes | ||
if scope in scopes_info_map | ||
] | ||
|
||
@classmethod | ||
def _extend_scope(cls, scopes): | ||
required_scopes = cls._get_all_required_scopes_by_api_scopes(scopes) | ||
extended_scopes = combine_uniquely(scopes, sorted(required_scopes)) | ||
return extended_scopes | ||
|
||
@classmethod | ||
def _get_all_required_scopes_by_api_scopes(cls, scopes): | ||
api_scopes = ApiScope.objects.by_identifiers(scopes) | ||
apis = {x.api for x in api_scopes} | ||
return set(sum((list(api.required_scopes) for api in apis), [])) | ||
|
||
def create_response_dic(self): | ||
result = super(CombinedScopeClaims, self).create_response_dic() | ||
token = FakeToken.from_claims(self) | ||
for claim_cls in self.combined_scope_claims: | ||
claim = claim_cls(token) | ||
result.update(claim.create_response_dic()) | ||
return result | ||
|
||
|
||
class FakeToken(object): | ||
""" | ||
Object that adapts a token. | ||
ScopeClaims constructor needs a token, but really uses just its | ||
user, scope and client attributes. This adapter makes it possible | ||
to create a token like object from those three attributes or from a | ||
claims object (which doesn't store the token) allowing it to be | ||
passed to a ScopeClaims constructor. | ||
""" | ||
def __init__(self, user, scope, client): | ||
self.user = user | ||
self.scope = scope | ||
self.client = client | ||
|
||
@classmethod | ||
def from_claims(cls, claims): | ||
return cls(claims.user, claims.scopes, claims.client) | ||
|
||
|
||
def get_userinfo_by_scopes(user, scopes, client=None): | ||
token = FakeToken(user, scopes, client) | ||
return _get_userinfo_by_token(token) | ||
|
||
|
||
def _get_userinfo_by_token(token): | ||
return CombinedScopeClaims(token).create_response_dic() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from collections import OrderedDict | ||
|
||
|
||
def combine_uniquely(iterable1, iterable2): | ||
""" | ||
Combine unique items of two sequences preserving order. | ||
:type seq1: Iterable[Any] | ||
:type seq2: Iterable[Any] | ||
:rtype: list[Any] | ||
""" | ||
result = OrderedDict.fromkeys(iterable1) | ||
for item in iterable2: | ||
result[item] = None | ||
return list(result.keys()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from django.http import JsonResponse | ||
from django.views.decorators.http import require_http_methods | ||
from oidc_provider.lib.utils.oauth2 import protected_resource_view | ||
|
||
from .api_tokens import get_api_tokens_by_access_token | ||
|
||
|
||
@require_http_methods(['GET']) | ||
@protected_resource_view(['openid']) | ||
def get_api_tokens_view(request, token, *args, **kwargs): | ||
""" | ||
Get the authorized API Tokens. | ||
:type token: oidc_provider.models.Token | ||
:rtype: JsonResponse | ||
""" | ||
api_tokens = get_api_tokens_by_access_token(token, request=request) | ||
response = JsonResponse(api_tokens, status=200) | ||
response['Access-Control-Allow-Origin'] = '*' | ||
response['Cache-Control'] = 'no-store' | ||
response['Pragma'] = 'no-cache' | ||
return response |
Oops, something went wrong.