Skip to content

Commit

Permalink
Fixes authorization when using client credentials. (Azure#6275)
Browse files Browse the repository at this point in the history
* Adding environment variables to tests.yml

* Adding test dependency on azure-identity.

* Adding ability to create builder using credentials.

* Fix AAD scope.

* Update CBS node to also take scope in addition to the audience.

* Adding test for CBS channel.

* Add tests for AzureTokenManagerProvider.
  • Loading branch information
conniey authored Nov 12, 2019
1 parent a044649 commit 92cbbd7
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.core.amqp;

import com.azure.core.credential.TokenCredential;
import reactor.core.publisher.Mono;

import java.io.Closeable;
Expand All @@ -18,10 +19,11 @@ public interface CBSNode extends EndpointStateNotifier, Closeable {
/**
* Authorizes the caller with the CBS node to access resources for the {@code audience}.
*
* @param audience Resource that the callee needs access to.
* @param audience The audience to which the token applies. This can be the path within the AMQP message broker.
* @param scopes The requested scopes for the {@link TokenCredential}.
* @return A Mono that completes with the callee's expiration date if it is successful and errors if
* authorization was unsuccessful. Once the expiration date has elapsed, the callee needs to reauthorize with the
* CBS node.
*/
Mono<OffsetDateTime> authorize(String audience);
Mono<OffsetDateTime> authorize(String audience, String scopes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ public class ActiveClientTokenManager implements TokenManager {
private final AtomicBoolean hasDisposed = new AtomicBoolean();
private final Mono<CBSNode> cbsNode;
private final String tokenAudience;
private final String scopes;
private final Timer timer;
private final Flux<AmqpResponseCode> authorizationResults;
private FluxSink<AmqpResponseCode> sink;

// last refresh interval in milliseconds.
private AtomicLong lastRefreshInterval = new AtomicLong();

public ActiveClientTokenManager(Mono<CBSNode> cbsNode, String tokenAudience) {
public ActiveClientTokenManager(Mono<CBSNode> cbsNode, String tokenAudience, String scopes) {
this.timer = new Timer(tokenAudience + "-tokenManager");
this.cbsNode = cbsNode;
this.tokenAudience = tokenAudience;
this.scopes = scopes;
this.authorizationResults = Flux.create(sink -> {
if (hasDisposed.get()) {
sink.complete();
Expand Down Expand Up @@ -74,7 +76,7 @@ public Mono<Long> authorize() {
"Cannot authorize with CBS node when this token manager has been disposed of."));
}

return cbsNode.flatMap(cbsNode -> cbsNode.authorize(tokenAudience))
return cbsNode.flatMap(cbsNode -> cbsNode.authorize(tokenAudience, scopes))
.map(expiresOn -> {
final Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Generates the correct resource scope to access Azure messaging resources given the authorization type.
*/
public class AzureTokenManagerProvider implements TokenManagerProvider {
private static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";

private final ClientLogger logger = new ClientLogger(AzureTokenManagerProvider.class);
private final CBSAuthorizationType authorizationType;
Expand Down Expand Up @@ -43,8 +43,9 @@ public AzureTokenManagerProvider(CBSAuthorizationType authorizationType, String
*/
@Override
public TokenManager getTokenManager(Mono<CBSNode> cbsNodeMono, String resource) {
final String audience = getResourceString(resource);
return new ActiveClientTokenManager(cbsNodeMono, audience);
final String scopes = getResourceString(resource);
final String tokenAudience = String.format(Locale.US, TOKEN_AUDIENCE_FORMAT, host, resource);
return new ActiveClientTokenManager(cbsNodeMono, tokenAudience, scopes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public enum CBSAuthorizationType {
/**
* Authorize with CBS through a shared access signature.
*/
SHARED_ACCESS_SIGNATURE("sastoken"),
SHARED_ACCESS_SIGNATURE("servicebus.windows.net:sastoken"),
/**
* Authorize with CBS using a JSON web token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@

import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

public class CBSChannel extends EndpointStateNotifierBase implements CBSNode {
private static final String PUT_TOKEN_OPERATION = "operation";
private static final String PUT_TOKEN_OPERATION_VALUE = "put-token";
private static final String PUT_TOKEN_TYPE = "type";
private static final String PUT_TOKEN_TYPE_VALUE_FORMAT = "servicebus.windows.net:%s";
private static final String PUT_TOKEN_AUDIENCE = "name";
static final String PUT_TOKEN_OPERATION = "operation";
static final String PUT_TOKEN_OPERATION_VALUE = "put-token";
static final String PUT_TOKEN_TYPE = "type";
static final String PUT_TOKEN_AUDIENCE = "name";

private final TokenCredential credential;
private final Mono<RequestResponseChannel> cbsChannelMono;
Expand All @@ -44,17 +42,17 @@ public CBSChannel(Mono<RequestResponseChannel> responseChannelMono, TokenCredent
}

@Override
public Mono<OffsetDateTime> authorize(final String tokenAudience) {
public Mono<OffsetDateTime> authorize(String tokenAudience, String scopes) {
final Message request = Proton.message();
final Map<String, Object> properties = new HashMap<>();
properties.put(PUT_TOKEN_OPERATION, PUT_TOKEN_OPERATION_VALUE);
properties.put(PUT_TOKEN_TYPE, String.format(Locale.ROOT, PUT_TOKEN_TYPE_VALUE_FORMAT,
authorizationType.getTokenType()));
properties.put(PUT_TOKEN_TYPE, authorizationType.getTokenType());
properties.put(PUT_TOKEN_AUDIENCE, tokenAudience);

final ApplicationProperties applicationProperties = new ApplicationProperties(properties);
request.setApplicationProperties(applicationProperties);

return credential.getToken(new TokenRequestContext().addScopes(tokenAudience)).flatMap(accessToken -> {
return credential.getToken(new TokenRequestContext().addScopes(scopes)).flatMap(accessToken -> {
request.setBody(new AmqpValue(accessToken.getToken()));

return cbsChannelMono.flatMap(x -> x.sendWithAck(request))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

public class ActiveClientTokenManagerTest {
private static final String AUDIENCE = "an-audience-test";
private static final String SCOPES = "scopes-test";
private static final Duration TIMEOUT = Duration.ofSeconds(4);

@Mock
Expand All @@ -50,9 +51,9 @@ public void teardown() {
public void getAuthorizationResults() {
// Arrange
final Mono<CBSNode> cbsNodeMono = Mono.fromCallable(() -> cbsNode);
when(cbsNode.authorize(any())).thenReturn(getNextExpiration(3));
when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(3));

final ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE);
final ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE, SCOPES);

// Act & Assert
StepVerifier.create(tokenManager.getAuthorizationResults())
Expand All @@ -74,11 +75,11 @@ public void getAuthorizationResultsSuccessFailure() {
final Mono<CBSNode> cbsNodeMono = Mono.fromCallable(() -> cbsNode);
final IllegalArgumentException error = new IllegalArgumentException("Some error");

when(cbsNode.authorize(any())).thenReturn(getNextExpiration(2),
when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(2),
getNextExpiration(2), Mono.error(error));

// Act & Assert
try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE)) {
try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE, SCOPES)) {
StepVerifier.create(tokenManager.getAuthorizationResults())
.then(() -> tokenManager.authorize().block(TIMEOUT))
.expectNext(AmqpResponseCode.ACCEPTED)
Expand All @@ -97,9 +98,9 @@ public void getAuthorizationResultsSuccessFailure() {
public void cannotAuthorizeDisposedInstance() {
// Arrange
final Mono<CBSNode> cbsNodeMono = Mono.fromCallable(() -> cbsNode);
when(cbsNode.authorize(any())).thenReturn(getNextExpiration(2));
when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(2));

final ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE);
final ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE, SCOPES);
tokenManager.authorize().then(Mono.fromRunnable(tokenManager::close)).block();

// Act & Assert
Expand All @@ -119,12 +120,12 @@ public void getAuthorizationResultsRetriableError() {
final AmqpException error = new AmqpException(true, ErrorCondition.TIMEOUT_ERROR, "Timed out",
new ErrorContext("Test-context-namespace"));

when(cbsNode.authorize(any())).thenReturn(getNextExpiration(3), Mono.error(error),
when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(3), Mono.error(error),
getNextExpiration(5), getNextExpiration(10),
getNextExpiration(45));

// Act & Assert
try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE)) {
try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE, SCOPES)) {
StepVerifier.create(tokenManager.getAuthorizationResults())
.then(() -> tokenManager.authorize().block(TIMEOUT))
.expectError(AmqpException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,45 @@

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.CBSNode;
import com.azure.core.credential.AccessToken;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Locale;

import static com.azure.core.amqp.implementation.AzureTokenManagerProvider.TOKEN_AUDIENCE_FORMAT;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.when;

public class AzureTokenManagerProviderTest {
private static final String HOST_NAME = "foobar.windows.net";

@Mock
private CBSNode cbsNode;

@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
}

@AfterEach
public void teardown() {
Mockito.framework().clearInlineMocks();
}

@Test
public void constructorNullType() {
assertThrows(NullPointerException.class, () -> new AzureTokenManagerProvider(null, HOST_NAME, "something."));
Expand Down Expand Up @@ -55,4 +84,55 @@ public void getResourceString(CBSAuthorizationType authorizationType) {
Assertions.fail("This authorization type is unknown: " + authorizationType);
}
}

/**
* Verifies that for SAS token credentials, the scope is the exact same as the audience because the token credential
* is generated from it.
*/
@Test
public void getCorrectTokenManagerSasToken() {
// Arrange
final String aadScope = "some-active-directory-scope";
final AzureTokenManagerProvider provider = new AzureTokenManagerProvider(CBSAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST_NAME, aadScope);
final String entityPath = "event-hub-test-2/partition/2";
final AccessToken token = new AccessToken("a-new-access-token", OffsetDateTime.now().plusMinutes(10));
final String tokenAudience = String.format(Locale.US, TOKEN_AUDIENCE_FORMAT, HOST_NAME, entityPath);

when(cbsNode.authorize(argThat(audience -> audience.equals(tokenAudience)), argThat(scope -> scope.equals(tokenAudience))))
.thenReturn(Mono.just(token.getExpiresAt()));

// Act
final TokenManager tokenManager = provider.getTokenManager(Mono.just(cbsNode), entityPath);

// Assert
StepVerifier.create(tokenManager.authorize())
.expectNextCount(1)
.expectComplete()
.verify(Duration.ofSeconds(10));
}

/**
* Verifies that for JWT token credentials, the scope is the the one that we expect from Azure AAD scope.
*/
@Test
public void getCorrectTokenManagerJwt() {
// Arrange
final String aadScope = "some-active-directory-scope";
final AzureTokenManagerProvider provider = new AzureTokenManagerProvider(CBSAuthorizationType.JSON_WEB_TOKEN, HOST_NAME, aadScope);
final String entityPath = "event-hub-test-2/partition/2";
final AccessToken token = new AccessToken("a-new-access-token", OffsetDateTime.now().plusMinutes(10));
final String tokenAudience = String.format(Locale.US, TOKEN_AUDIENCE_FORMAT, HOST_NAME, entityPath);

when(cbsNode.authorize(argThat(audience -> audience.equals(tokenAudience)), argThat(scope -> scope.equals(aadScope))))
.thenReturn(Mono.just(token.getExpiresAt()));

// Act
final TokenManager tokenManager = provider.getTokenManager(Mono.just(cbsNode), entityPath);

// Assert
StepVerifier.create(tokenManager.authorize())
.expectNextCount(1)
.expectComplete()
.verify(Duration.ofSeconds(10));
}
}
Loading

0 comments on commit 92cbbd7

Please sign in to comment.