-
Notifications
You must be signed in to change notification settings - Fork 203
/
Copy pathauthority.py
225 lines (204 loc) · 11 KB
/
authority.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import json
try:
from urllib.parse import urlparse
except ImportError: # Fall back to Python 2
from urlparse import urlparse
import logging
logger = logging.getLogger(__name__)
# Endpoints were copied from here
# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints
AZURE_US_GOVERNMENT = "login.microsoftonline.us"
AZURE_CHINA = "login.chinacloudapi.cn"
AZURE_PUBLIC = "login.microsoftonline.com"
WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net
WELL_KNOWN_AUTHORITY_HOSTS = set([
WORLD_WIDE,
AZURE_CHINA,
'login-us.microsoftonline.com',
AZURE_US_GOVERNMENT,
])
WELL_KNOWN_B2C_HOSTS = [
"b2clogin.com",
"b2clogin.cn",
"b2clogin.us",
"b2clogin.de",
"ciamlogin.com",
]
_CIAM_DOMAIN_SUFFIX = ".ciamlogin.com"
class AuthorityBuilder(object):
def __init__(self, instance, tenant):
"""A helper to save caller from doing string concatenation.
Usage is documented in :func:`application.ClientApplication.__init__`.
"""
self._instance = instance.rstrip("/")
self._tenant = tenant.strip("/")
def __str__(self):
return "https://{}/{}".format(self._instance, self._tenant)
class Authority(object):
"""This class represents an (already-validated) authority.
Once constructed, it contains members named "*_endpoint" for this instance.
TODO: It will also cache the previously-validated authority instances.
"""
_domains_without_user_realm_discovery = set([])
def __init__(
self, authority_url, http_client,
validate_authority=True,
instance_discovery=None,
oidc_authority_url=None,
):
"""Creates an authority instance, and also validates it.
:param validate_authority:
The Authority validation process actually checks two parts:
instance (a.k.a. host) and tenant. We always do a tenant discovery.
This parameter only controls whether an instance discovery will be
performed.
"""
self._http_client = http_client
if oidc_authority_url:
logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
tenant_discovery_endpoint = self._initialize_oidc_authority(
oidc_authority_url)
else:
logger.debug("Initializing with Entra authority: %s", authority_url)
tenant_discovery_endpoint = self._initialize_entra_authority(
authority_url, validate_authority, instance_discovery)
try:
openid_config = tenant_discovery(
tenant_discovery_endpoint,
self._http_client)
except ValueError:
error_message = (
"Unable to get OIDC authority configuration for {url} "
"because its OIDC Discovery endpoint is unavailable at "
"{url}/.well-known/openid-configuration ".format(url=oidc_authority_url)
if oidc_authority_url else
"Unable to get authority configuration for {}. "
"Authority would typically be in a format of "
"https://login.microsoftonline.com/your_tenant "
"or https://tenant_name.ciamlogin.com "
"or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy. "
.format(authority_url)
) + " Also please double check your tenant name or GUID is correct."
raise ValueError(error_message)
logger.debug(
'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
self.authorization_endpoint = openid_config['authorization_endpoint']
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
def _initialize_oidc_authority(self, oidc_authority_url):
authority, self.instance, tenant = canonicalize(oidc_authority_url)
self.is_adfs = tenant.lower() == 'adfs' # As a convention
self._is_b2c = True # Not exactly true, but
# OIDC Authority was designed for CIAM which is the next gen of B2C.
# Besides, application.py uses this to bypass broker.
self._is_known_to_developer = True # Not really relevant, but application.py uses this to bypass authority validation
return oidc_authority_url + "/.well-known/openid-configuration"
def _initialize_entra_authority(
self, authority_url, validate_authority, instance_discovery):
# :param instance_discovery:
# By default, the known-to-Microsoft validation will use an
# instance discovery endpoint located at ``login.microsoftonline.com``.
# You can customize the endpoint by providing a url as a string.
# Or you can turn this behavior off by passing in a False here.
if isinstance(authority_url, AuthorityBuilder):
authority_url = str(authority_url)
authority, self.instance, tenant = canonicalize(authority_url)
is_ciam = self.instance.endswith(_CIAM_DOMAIN_SUFFIX)
self.is_adfs = tenant.lower() == 'adfs' and not is_ciam
parts = authority.path.split('/')
self._is_b2c = any(
self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS
) or (len(parts) == 3 and parts[2].lower().startswith("b2c_"))
self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority
is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS
instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only
WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
# See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
# and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
) if instance_discovery in (None, True) else instance_discovery
if instance_discovery_endpoint and not (
is_known_to_microsoft or self._is_known_to_developer):
payload = _instance_discovery(
"https://{}{}/oauth2/v2.0/authorize".format(
self.instance, authority.path),
self._http_client,
instance_discovery_endpoint)
if payload.get("error") == "invalid_instance":
raise ValueError(
"invalid_instance: "
"The authority you provided, %s, is not whitelisted. "
"If it is indeed your legit customized domain name, "
"you can turn off this check by passing in "
"instance_discovery=False"
% authority_url)
tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
else:
tenant_discovery_endpoint = authority._replace(
path="{prefix}{version}/.well-known/openid-configuration".format(
prefix=tenant if is_ciam and len(authority.path) <= 1 # Path-less CIAM
else authority.path, # In B2C, it is "/tenant/policy"
version="" if self.is_adfs else "/v2.0",
)
).geturl() # Keeping original port and query. Query is useful for test.
return tenant_discovery_endpoint
def user_realm_discovery(self, username, correlation_id=None, response=None):
# It will typically return a dict containing "ver", "account_type",
# "federation_protocol", "cloud_audience_urn",
# "federation_metadata_url", "federation_active_auth_url", etc.
if self.instance not in self.__class__._domains_without_user_realm_discovery:
resp = response or self._http_client.get(
"https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
netloc=self.instance, username=username),
headers={'Accept': 'application/json',
'client-request-id': correlation_id},)
if resp.status_code != 404:
resp.raise_for_status()
return json.loads(resp.text)
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow
def canonicalize(authority_or_auth_endpoint):
# Returns (url_parsed_result, hostname_in_lowercase, tenant)
authority = urlparse(authority_or_auth_endpoint)
if authority.scheme == "https":
parts = authority.path.split("/")
first_part = parts[1] if len(parts) >= 2 and parts[1] else None
if authority.hostname.endswith(_CIAM_DOMAIN_SUFFIX): # CIAM
# Use path in CIAM authority. It will be validated by OIDC Discovery soon
tenant = first_part if first_part else "{}.onmicrosoft.com".format(
# Fallback to sub domain name. This variation may not be advertised
authority.hostname.rsplit(_CIAM_DOMAIN_SUFFIX, 1)[0])
return authority, authority.hostname, tenant
# AAD
if len(parts) >= 2 and parts[1]:
return authority, authority.hostname, parts[1]
raise ValueError(
"Your given address (%s) should consist of "
"an https url with a minimum of one segment in a path: e.g. "
"https://login.microsoftonline.com/{tenant} "
"or https://{tenant_name}.ciamlogin.com/{tenant} "
"or https://{tenant_name}.b2clogin.com/{tenant_name}.onmicrosoft.com/policy"
% authority_or_auth_endpoint)
def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs):
resp = http_client.get(
instance_discovery_endpoint,
params={'authorization_endpoint': url, 'api-version': '1.0'},
**kwargs)
return json.loads(resp.text)
def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
# Returns Openid Configuration
resp = http_client.get(tenant_discovery_endpoint, **kwargs)
if resp.status_code == 200:
return json.loads(resp.text) # It could raise ValueError
if 400 <= resp.status_code < 500:
# Nonexist tenant would hit this path
# e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration
raise ValueError("OIDC Discovery failed on {}. HTTP status: {}, Error: {}".format(
tenant_discovery_endpoint,
resp.status_code,
resp.text, # Expose it as-is b/c OIDC defines no error response format
))
# Transient network error would hit this path
resp.raise_for_status()
raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
"Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))