Skip to content

Commit

Permalink
Issue 6702: add isSuperUser in AuthorizationProvider interface (apach…
Browse files Browse the repository at this point in the history
…e#6708)

Fixes apache#6702

### Motivation

Most of the methods in AuthorizationProvider take AuthenticationDataSource for authorization decision. However isSuperUser still takes the string role token. This has to be changed.

### Modifications

Add needed method
  • Loading branch information
jiazhai authored Apr 13, 2020
1 parent dcb7499 commit 797d89b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
}

/**
* Check if specified role is a super user
* @param role the role to check
* @param authenticationData authentication data related to the role
* @return a CompletableFuture containing a boolean in which true means the role is a super user
* and false if it is not
*/
default CompletableFuture<Boolean> isSuperUser(String role,
AuthenticationDataSource authenticationData,
ServiceConfiguration serviceConfiguration) {
return isSuperUser(role, serviceConfiguration);
}

/**
* Check if specified role is an admin of the tenant
* @param tenant the tenant to check
Expand Down Expand Up @@ -137,7 +150,7 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc

/**
* Grant permission to roles that can access subscription-admin api
*
*
* @param namespace
* @param subscriptionName
* @param roles
Expand All @@ -147,7 +160,7 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc
*/
CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
String authDataJson);

/**
* Revoke subscription admin-api access for a role
* @param namespace
Expand All @@ -157,7 +170,7 @@ CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace
*/
CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
String role, String authDataJson);

/**
* Grant authorization-action permission on a topic to the given client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService
}
}

public CompletableFuture<Boolean> isSuperUser(String user) {
public CompletableFuture<Boolean> isSuperUser(String user, AuthenticationDataSource authenticationData) {
if (provider != null) {
return provider.isSuperUser(user, conf);
return provider.isSuperUser(user, authenticationData, conf);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}
Expand Down Expand Up @@ -111,7 +111,7 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set

/**
* Grant permission to roles that can access subscription-admin api
*
*
* @param namespace
* @param subscriptionName
* @param roles
Expand All @@ -130,7 +130,7 @@ public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName na

/**
* Revoke subscription admin-api access for a role
*
*
* @param namespace
* @param subscriptionName
* @param role
Expand All @@ -143,7 +143,7 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

/**
* Grant authorization-action permission on a topic to the given client
*
Expand Down Expand Up @@ -180,7 +180,7 @@ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String ro
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
return provider.isSuperUser(role, conf).thenComposeAsync(isSuperUser -> {
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
Expand All @@ -207,7 +207,7 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
return CompletableFuture.completedFuture(true);
}
if (provider != null) {
return provider.isSuperUser(role, conf).thenComposeAsync(isSuperUser -> {
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -187,11 +186,11 @@ protected void validateSuperUserAccess() {
try {
proxyAuthorizedFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(appId);
.isSuperUser(appId, clientAuthData());

originalPrincipalAuthorizedFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(originalPrincipal);
.isSuperUser(originalPrincipal, clientAuthData());

if (!proxyAuthorizedFuture.get() || !originalPrincipalAuthorizedFuture.get()) {
throw new RestException(Status.UNAUTHORIZED,
Expand All @@ -206,7 +205,7 @@ protected void validateSuperUserAccess() {
} else {
if (config().isAuthorizationEnabled() && !pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(appId)
.isSuperUser(appId, clientAuthData())
.join()) {
throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access");
}
Expand Down Expand Up @@ -266,9 +265,9 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
try {
AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId);
isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId, authenticationData);

isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal);
isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal, authenticationData);

boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get();
boolean originalPrincipalAuthorized
Expand All @@ -286,7 +285,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
} else {
if (!pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(clientAppId)
.isSuperUser(clientAppId, authenticationData)
.join()) {
if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
throw new RestException(Status.UNAUTHORIZED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public void testNonExistentTopic() throws Exception {
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

// Test producer creation
resetChannel();
Expand Down Expand Up @@ -546,7 +546,7 @@ public void testClusterAccess() throws Exception {
providerField.set(authorizationService, authorizationProvider);
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class), Mockito.anyString(),
any(AuthAction.class));

Expand Down Expand Up @@ -574,7 +574,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception {
providerField.setAccessible(true);
PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
providerField.set(authorizationService, authorizationProvider);
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

// Test producer creation
resetChannel();
Expand Down

0 comments on commit 797d89b

Please sign in to comment.