Skip to content

Commit

Permalink
[pulsar-client]Add a optional params scope for pulsar oauth2 client (a…
Browse files Browse the repository at this point in the history
…pache#11931)

### Motivation

In some scenarios (e.g. azure cloud), when the client exchanges tokens with the server, an optional scope parameter is required, this pr fixes this issue, to ensure compatibility, when the user does not fill in this parameter, all behavior is the same as before.

### Modifications

* Add an optional parameter scope when exchanges token

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
tuteng authored Sep 9, 2021
1 parent bef3757 commit ac5114f
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,28 @@ public final class AuthenticationFactoryOAuth2 {
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
return clientCredentials(issuerUrl, credentialsUrl, audience, null);
}

/**
* Authenticate with client credentials.
*
* @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
* @param audience the audience identifier
* @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited,
* case-sensitive strings. The strings are defined by the authorization server.
* If the value contains multiple space-delimited strings, their order does not matter,
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl.toExternalForm())
.audience(audience)
.scope(scope)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,24 @@ class ClientCredentialsFlow extends FlowBase {
public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
public static final String CONFIG_PARAM_AUDIENCE = "audience";
public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
public static final String CONFIG_PARAM_SCOPE = "scope";

private static final long serialVersionUID = 1L;

private final String audience;
private final String privateKey;
private final String scope;

private transient ClientCredentialsExchanger exchanger;

private boolean initialized = false;

@Builder
public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) {
public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) {
super(issuerUrl);
this.audience = audience;
this.privateKey = privateKey;
this.scope = scope;
}

@Override
Expand All @@ -87,6 +90,7 @@ public TokenResult authenticate() throws PulsarClientException {
.clientId(keyFile.getClientId())
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
.scope(this.scope)
.build();
TokenResult tr;
if (!initialized) {
Expand Down Expand Up @@ -116,10 +120,13 @@ public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
// This is an optional parameter
String scope = params.get(CONFIG_PARAM_SCOPE);
return ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.audience(audience)
.privateKey(privateKeyUrl)
.scope(scope)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ public class ClientCredentialsExchangeRequest {

@JsonProperty("audience")
private String audience;

@JsonProperty("scope")
private String scope;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.asynchttpclient.AsyncHttpClient;
Expand All @@ -47,22 +48,46 @@ public class TokenClient implements ClientCredentialsExchanger {
private final AsyncHttpClient httpClient;

public TokenClient(URL tokenUrl) {
this.tokenUrl = tokenUrl;
this(tokenUrl, null);
}

DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
AsyncHttpClientConfig config = confBuilder.build();
httpClient = new DefaultAsyncHttpClient(config);
TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
if (httpClient == null) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
AsyncHttpClientConfig config = confBuilder.build();
this.httpClient = new DefaultAsyncHttpClient(config);
} else {
this.httpClient = httpClient;
}
this.tokenUrl = tokenUrl;
}

@Override
public void close() throws Exception {
httpClient.close();
}

/**
* Constructing http request parameters.
* @param bodyMap List of parameters to be requested.
* @return Generate the final request body from a map.
*/
String buildClientCredentialsBody(Map<String, String> bodyMap) {
return bodyMap.entrySet().stream()
.map(e -> {
try {
return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
} catch (UnsupportedEncodingException e1) {
throw new RuntimeException(e1);
}
})
.collect(Collectors.joining("&"));
}

/**
* Performs a token exchange using client credentials.
* @param req the client credentials request details.
Expand All @@ -76,15 +101,10 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re
bodyMap.put("client_id", req.getClientId());
bodyMap.put("client_secret", req.getClientSecret());
bodyMap.put("audience", req.getAudience());
String body = bodyMap.entrySet().stream()
.map(e -> {
try {
return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
} catch (UnsupportedEncodingException e1) {
throw new RuntimeException(e1);
}
})
.collect(Collectors.joining("&"));
if (!StringUtils.isBlank(req.getScope())) {
bodyMap.put("scope", req.getScope());
}
String body = buildClientCredentialsBody(bodyMap);

try {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.auth.oauth2.protocol;

import com.google.gson.Gson;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.net.URL;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Token client exchange token mock test.
*/
public class TokenClientTest {

@Test
@SuppressWarnings("unchecked")
public void exchangeClientCredentialsSuccessByScopeTest() throws
IOException, TokenExchangeException, ExecutionException, InterruptedException {
DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
URL url = new URL("http://localhost");
TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
Map<String, String> bodyMap = new TreeMap<>();
ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
.audience("test-audience")
.clientId("test-client-id")
.clientSecret("test-client-secret")
.scope("test-scope")
.build();
bodyMap.put("grant_type", "client_credentials");
bodyMap.put("client_id", request.getClientId());
bodyMap.put("client_secret", request.getClientSecret());
bodyMap.put("audience", request.getAudience());
bodyMap.put("scope", request.getScope());
String body = tokenClient.buildClientCredentialsBody(bodyMap);
BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
Response response = mock(Response.class);
ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
when(listenableFuture.get()).thenReturn(response);
when(response.getStatusCode()).thenReturn(200);
TokenResult tokenResult = new TokenResult();
tokenResult.setAccessToken("test-access-token");
tokenResult.setIdToken("test-id");
when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
TokenResult tr = tokenClient.exchangeClientCredentials(request);
Assert.assertNotNull(tr);
}

@Test
@SuppressWarnings("unchecked")
public void exchangeClientCredentialsSuccessByNoScopeTest() throws
IOException, TokenExchangeException, ExecutionException, InterruptedException {
DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
URL url = new URL("http://localhost");
TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
Map<String, String> bodyMap = new TreeMap<>();
ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder()
.audience("test-audience")
.clientId("test-client-id")
.clientSecret("test-client-secret")
.build();
bodyMap.put("grant_type", "client_credentials");
bodyMap.put("client_id", request.getClientId());
bodyMap.put("client_secret", request.getClientSecret());
bodyMap.put("audience", request.getAudience());
String body = tokenClient.buildClientCredentialsBody(bodyMap);
BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
Response response = mock(Response.class);
ListenableFuture<Response> listenableFuture = mock(ListenableFuture.class);
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
when(listenableFuture.get()).thenReturn(response);
when(response.getStatusCode()).thenReturn(200);
TokenResult tokenResult = new TokenResult();
tokenResult.setAccessToken("test-access-token");
tokenResult.setIdToken("test-id");
when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
TokenResult tr = tokenClient.exchangeClientCredentials(request);
Assert.assertNotNull(tr);
}
}

0 comments on commit ac5114f

Please sign in to comment.