diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index 57147e72b9c78..0403f34c26ac7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -225,12 +226,19 @@ CompletableFuture grantPermissionAsync(TopicName topicName, Set */ + @Deprecated default CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) { - return isTenantAdmin(tenantName, role, null, authData); + return allowTenantOperationAsync( + tenantName, + StringUtils.isBlank(originalRole) ? role : originalRole, + operation, + authData + ); } + @Deprecated default Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) { try { @@ -242,27 +250,94 @@ default Boolean allowTenantOperation(String tenantName, String originalRole, Str } } + /** + * Check if a given role is allowed to execute a given operation on the tenant. + * + * @param tenantName tenant name + * @param role role name + * @param operation tenant operation + * @param authData authenticated data of the role + * @return a completable future represents check result + */ + default CompletableFuture allowTenantOperationAsync(String tenantName, String role, + TenantOperation operation, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture(new IllegalStateException( + String.format("allowTenantOperation(%s) on tenant %s is not supported by the Authorization" + + " provider you are using.", + operation.toString(), tenantName))); + } + + default Boolean allowTenantOperation(String tenantName, String role, TenantOperation operation, + AuthenticationDataSource authData) { + try { + return allowTenantOperationAsync(tenantName, role, operation, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); + } + } + + /** + * Check if a given role is allowed to execute a given operation on the namespace. + * + * @param namespaceName namespace name + * @param role role name + * @param operation namespace operation + * @param authData authenticated data + * @return a completable future represents check result + */ + default CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture( + new IllegalStateException("NamespaceOperation is not supported by the Authorization provider you are using.")); + } + + default Boolean allowNamespaceOperation(NamespaceName namespaceName, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + try { + return allowNamespaceOperationAsync(namespaceName, role, operation, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); + } + } + /** * Grant authorization-action permission on a namespace to the given client + * * @param namespaceName - * @param originalRole role not overriden by proxy role if request do pass through proxy - * @param role originalRole | proxyRole if the request didn't pass through proxy + * @param role * @param operation * @param authData * @return CompletableFuture */ - default CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, - String role, NamespaceOperation operation, - AuthenticationDataSource authData) { - return FutureUtil.failedFuture( - new IllegalStateException( - String.format("NamespaceOperation(%s) on namespace(%s) by role(%s) is not supported" + - " by the Authorization provider you are using.", - operation.toString(), namespaceName.toString(), role == null ? "null" : role))); + @Deprecated + default CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, + String originalRole, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return allowNamespaceOperationAsync( + namespaceName, + StringUtils.isBlank(originalRole) ? role : originalRole, + operation, + authData + ); } - default Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, - NamespaceOperation operation, AuthenticationDataSource authData) { + @Deprecated + default Boolean allowNamespaceOperation(NamespaceName namespaceName, + String originalRole, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { try { return allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData).get(); } catch (InterruptedException e) { @@ -272,6 +347,39 @@ default Boolean allowNamespaceOperation(NamespaceName namespaceName, String orig } } + /** + * Check if a given role is allowed to execute a given policy operation on the namespace. + * + * @param namespaceName namespace name + * @param policy policy name + * @param operation policy operation + * @param role role name + * @param authData authenticated data + * @return a completable future represents check result + */ + default CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String role, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture( + new IllegalStateException("NamespacePolicyOperation is not supported by the Authorization provider you are using.")); + } + + default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String role, + AuthenticationDataSource authData) { + try { + return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); + } + } + /** * Grant authorization-action permission on a namespace to the given client * @param namespaceName @@ -281,16 +389,32 @@ default Boolean allowNamespaceOperation(NamespaceName namespaceName, String orig * @param authData * @return CompletableFuture */ - default CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, - PolicyOperation operation, String originalRole, - String role, AuthenticationDataSource authData) { - return isTenantAdmin(namespaceName.getTenant(), role, null, authData); + @Deprecated + default CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + return allowNamespacePolicyOperationAsync( + namespaceName, + policy, + operation, + StringUtils.isBlank(originalRole) ? role : originalRole, + authData + ); } - default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, - String originalRole, String role, AuthenticationDataSource authData) { + @Deprecated + default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { try { - return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData).get(); + return allowNamespacePolicyOperationAsync( + namespaceName, policy, operation, originalRole, role, authData).get(); } catch (InterruptedException e) { throw new RestException(e); } catch (ExecutionException e) { @@ -298,6 +422,35 @@ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, Polic } } + /** + * Check if a given role is allowed to execute a given topic operation on the topic. + * + * @param topic topic name + * @param role role name + * @param operation topic operation + * @param authData authenticated data + * @return CompletableFuture + */ + default CompletableFuture allowTopicOperationAsync(TopicName topic, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture( + new IllegalStateException("TopicOperation is not supported by the Authorization provider you are using.")); + } + + default Boolean allowTopicOperation(TopicName topicName, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + try { + return allowTopicOperationAsync(topicName, role, operation, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); + } + } /** * Grant authorization-action permission on a topic to the given client @@ -308,27 +461,26 @@ default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, Polic * @param authData * @return CompletableFuture */ - default CompletableFuture allowTopicOperationAsync(TopicName topic, String originalRole, String role, - TopicOperation operation, - AuthenticationDataSource authData) { - switch (operation) { - case PRODUCE: - return canProduceAsync(topic, role, authData); - case CONSUME: - return canConsumeAsync(topic, role, authData, null); - case LOOKUP: - return canLookupAsync(topic, role, authData); - default: - return FutureUtil.failedFuture( - new IllegalStateException( - String.format("TopicOperation(%s) on topic(%s) by role(%s) is not supported" + - " by the Authorization provider you are using.", - operation.toString(), topic.toString(), role == null ? "null" : null))); - } + @Deprecated + default CompletableFuture allowTopicOperationAsync(TopicName topic, + String originalRole, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { + return allowTopicOperationAsync( + topic, + StringUtils.isBlank(originalRole) ? role : originalRole, + operation, + authData + ); } - default Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, - AuthenticationDataSource authData) { + @Deprecated + default Boolean allowTopicOperation(TopicName topicName, + String originalRole, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { try { return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get(); } catch (InterruptedException e) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index b91d6163066a8..afa85eea0523e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.authorization; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; -import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.common.naming.NamespaceName; @@ -35,12 +37,10 @@ import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.RestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - import static java.util.concurrent.TimeUnit.SECONDS; /** @@ -341,45 +341,84 @@ public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, return provider.allowSinkOpsAsync(namespaceName, role, authenticationData); } + private static void validateOriginalPrincipal(Set proxyRoles, String authenticatedPrincipal, + String originalPrincipal) { + if (proxyRoles.contains(authenticatedPrincipal)) { + // Request has come from a proxy + if (StringUtils.isBlank(originalPrincipal)) { + log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal); + throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy."); + } + if (proxyRoles.contains(originalPrincipal)) { + log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles); + throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role"); + } + } + } + + private boolean isProxyRole(String role) { + return role != null && conf.getProxyRoles().contains(role); + } + /** * Grant authorization-action permission on a tenant to the given client * - * @param tenantName - * @param operation - * @param originalRole - * @param role + * @param tenantName tenant name + * @param operation tenant operation + * @param role role name * @param authData * additional authdata in json for targeted authorization provider * @return IllegalArgumentException when tenant not found * @throws IllegalStateException * when failed to grant permission */ - public CompletableFuture allowTenantOperationAsync(String tenantName, TenantOperation operation, - String originalRole, String role, - AuthenticationDataSource authData) { + public CompletableFuture allowTenantOperationAsync(String tenantName, + TenantOperation operation, + String role, + AuthenticationDataSource authData) { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } if (provider != null) { - return provider.allowTenantOperationAsync(tenantName, originalRole, role, operation, authData); + return provider.allowTenantOperationAsync(tenantName, role, operation, authData); } return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + "allowTenantOperationAsync")); } - public Boolean allowTenantOperation(String tenantName, TenantOperation operation, String orignalRole, String role, - AuthenticationDataSource authData) { - if (!this.conf.isAuthorizationEnabled()) { - return true; + public CompletableFuture allowTenantOperationAsync(String tenantName, + TenantOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole); + if (isProxyRole(role)) { + CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync( + tenantName, operation, role, authData); + CompletableFuture isOriginalAuthorizedFuture = allowTenantOperationAsync( + tenantName, operation, originalRole, authData); + return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, + (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); + } else { + return allowTenantOperationAsync(tenantName, operation, role, authData); } + } - if (provider != null) { - return provider.allowTenantOperation(tenantName, orignalRole, role, operation, authData); + public boolean allowTenantOperation(String tenantName, + TenantOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + try { + return allowTenantOperationAsync( + tenantName, operation, originalRole, role, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); } - - throw new IllegalStateException("No authorization provider configured for allowTenantOperation"); } /** @@ -387,7 +426,6 @@ public Boolean allowTenantOperation(String tenantName, TenantOperation operation * * @param namespaceName * @param operation - * @param originalRole * @param role * @param authData * additional authdata in json for targeted authorization provider @@ -397,31 +435,51 @@ public Boolean allowTenantOperation(String tenantName, TenantOperation operation */ public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, NamespaceOperation operation, - String originalRole, String role, + String role, AuthenticationDataSource authData) { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } if (provider != null) { - return provider.allowNamespaceOperationAsync(namespaceName, originalRole, role, operation, authData); + return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData); } return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + "allowNamespaceOperationAsync")); } - public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation, - String originalPrincipal, String role, AuthenticationDataSource authData) { - if (!this.conf.isAuthorizationEnabled()) { - return true; + public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, + NamespaceOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole); + if (isProxyRole(role)) { + CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync( + namespaceName, operation, role, authData); + CompletableFuture isOriginalAuthorizedFuture = allowNamespaceOperationAsync( + namespaceName, operation, originalRole, authData); + return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, + (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); + } else { + return allowNamespaceOperationAsync(namespaceName, operation, role, authData); } + } - if (provider != null) { - return provider.allowNamespaceOperation(namespaceName, originalPrincipal, role, operation, authData); + public boolean allowNamespaceOperation(NamespaceName namespaceName, + NamespaceOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + try { + return allowNamespaceOperationAsync( + namespaceName, operation, originalRole, role, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); } - - throw new IllegalStateException("No authorization provider configured for allowNamespaceOperation"); } /** @@ -429,7 +487,6 @@ public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOpe * * @param namespaceName * @param operation - * @param originalRole * @param role * @param authData * additional authdata in json for targeted authorization provider @@ -437,33 +494,56 @@ public Boolean allowNamespaceOperation(NamespaceName namespaceName, NamespaceOpe * @throws IllegalStateException * when failed to grant permission */ - public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, - PolicyOperation operation, String originalRole, - String role, AuthenticationDataSource authData) { + public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String role, + AuthenticationDataSource authData) { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } if (provider != null) { - return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, originalRole, role, authData); + return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData); } return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + "allowNamespacePolicyOperationAsync")); } - public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, - PolicyOperation operation, String originalPrincipal, String role, - AuthenticationDataHttps authData) { - if (!this.conf.isAuthorizationEnabled()) { - return true; + public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole); + if (isProxyRole(role)) { + CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( + namespaceName, policy, operation, role, authData); + CompletableFuture isOriginalAuthorizedFuture = allowNamespacePolicyOperationAsync( + namespaceName, policy, operation, originalRole, authData); + return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, + (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); + } else { + return allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData); } + } - if (provider != null) { - return provider.allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal, role, authData); + public boolean allowNamespacePolicyOperation(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + try { + return allowNamespacePolicyOperationAsync( + namespaceName, policy, operation, originalRole, role, authData).get(); + } catch (InterruptedException e) { + throw new RestException(e); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); } - - throw new IllegalStateException("No authorization provider configured for allowNamespacePolicyOperation"); } /** @@ -478,32 +558,43 @@ public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, Policy * @throws IllegalStateException * when failed to grant permission */ - public CompletableFuture allowTopicOperationAsync(TopicName topicName, TopicOperation operation, - String originalRole, String role, + public CompletableFuture allowTopicOperationAsync(TopicName topicName, + TopicOperation operation, + String role, AuthenticationDataSource authData) { + if (log.isDebugEnabled()) { + log.debug("Check if role {} is allowed to execute topic operation {} on topic {}", + role, operation, topicName); + } if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } if (provider != null) { - return provider.allowTopicOperationAsync(topicName, originalRole, role, operation, authData); + CompletableFuture allowFuture = + provider.allowTopicOperationAsync(topicName, role, operation, authData); + if (log.isDebugEnabled()) { + return allowFuture.whenComplete((allowed, exception) -> { + if (exception == null) { + if (allowed) { + log.debug("Topic operation {} on topic {} is allowed: role = {}", + operation, topicName, role); + } else{ + log.debug("Topic operation {} on topic {} is NOT allowed: role = {}", + operation, topicName, role); + } + } else { + log.debug("Failed to check if topic operation {} on topic {} is allowed:" + + " role = {}", + operation, topicName, role, exception); + } + }); + } else { + return allowFuture; + } } return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured for " + "allowTopicOperationAsync")); } - - public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation, - String orignalRole, String role, - AuthenticationDataSource authData) { - if (!this.conf.isAuthorizationEnabled()) { - return true; - } - - if (provider != null) { - return provider.allowTopicOperation(topicName, orignalRole, role, operation, authData); - } - - throw new IllegalStateException("No authorization provider configured for allowTopicOperation"); - } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index d7cea441192cb..66d0c2e70a6cb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal; import java.io.IOException; import java.util.Collections; @@ -29,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import com.google.common.base.Joiner; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -39,8 +39,6 @@ import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.Policies; -import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal; - import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -528,38 +526,43 @@ private void validatePoliciesReadOnlyAccess() { } @Override - public CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role, - TenantOperation operation, - AuthenticationDataSource authData) { - return validateTenantAdminAccess(tenantName, originalRole, role, authData); + public CompletableFuture allowTenantOperationAsync(String tenantName, + String role, + TenantOperation operation, + AuthenticationDataSource authData) { + return validateTenantAdminAccess(tenantName, role, authData); } @Override - public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, - String role, NamespaceOperation operation, - AuthenticationDataSource authData) { - return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData); + public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, + String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + return validateTenantAdminAccess(namespaceName.getTenant(), role, authData); } @Override - public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, - PolicyOperation operation, String originalRole, - String role, AuthenticationDataSource authData) { - return validateTenantAdminAccess(namespaceName.getTenant(), originalRole, role, authData); + public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, + String role, + AuthenticationDataSource authData) { + return validateTenantAdminAccess(namespaceName.getTenant(), role, authData); } @Override - public CompletableFuture allowTopicOperationAsync(TopicName topicName, String originalRole, String role, + public CompletableFuture allowTopicOperationAsync(TopicName topicName, + String role, TopicOperation operation, AuthenticationDataSource authData) { CompletableFuture isAuthorizedFuture; switch (operation) { - case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData); + case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, authData); break; - case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData); + case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, role, authData); break; - case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, StringUtils.isBlank(originalRole) ? role : originalRole, authData, authData.getSubscription()); + case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription()); break; default: isAuthorizedFuture = FutureUtil.failedFuture( new IllegalStateException("TopicOperation is not supported.")); @@ -568,7 +571,14 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, CompletableFuture isSuperUserFuture = isSuperUser(role, authData, conf); return isSuperUserFuture - .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized); + .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> { + if (log.isDebugEnabled()) { + log.debug("Verify if role {} is allowed to {} to topic {}:" + + " isSuperUser={}, isAuthorized={}", + role, operation, topicName, isSuperUser, isAuthorized); + } + return isSuperUser || isAuthorized; + }); } private static String path(String... parts) { @@ -578,43 +588,20 @@ private static String path(String... parts) { return sb.toString(); } - private CompletableFuture validateTenantAdminAccess(String tenantName, String originalRole, String role, + private CompletableFuture validateTenantAdminAccess(String tenantName, + String role, AuthenticationDataSource authData) { try { TenantInfo tenantInfo = configCache.propertiesCache() .get(path(POLICIES, tenantName)) .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist")); - validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole); - - if (role != null && conf.getProxyRoles().contains(role)) { - // role check - CompletableFuture isRoleSuperUserFuture = isSuperUser(role, authData, conf); - CompletableFuture isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData); - CompletableFuture isRoleAuthorizedFuture = isRoleSuperUserFuture - .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) -> - isRoleSuperUser || isRoleTenantAdmin); - - // originalRole check - CompletableFuture isOriginalRoleSuperUserFuture = isSuperUser(originalRole, authData, conf); - CompletableFuture isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole, - tenantInfo, authData); - CompletableFuture isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture - .thenCombine(isOriginalRoleTenantAdminFuture, (isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) -> - isOriginalRoleSuperUser || isOriginalRoleTenantAdmin); - - // merging - return isRoleAuthorizedFuture - .thenCombine(isOriginalRoleAuthorizedFuture, (isRoleAuthorized, isOriginalRoleAuthorized) -> - isRoleAuthorized && isOriginalRoleAuthorized); - } else { - // role check - CompletableFuture isRoleSuperUserFuture = isSuperUser(role, authData, conf); - CompletableFuture isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData); - return isRoleSuperUserFuture - .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) -> - isRoleSuperUser || isRoleTenantAdmin); - } + // role check + CompletableFuture isRoleSuperUserFuture = isSuperUser(role, authData, conf); + CompletableFuture isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData); + return isRoleSuperUserFuture + .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) -> + isRoleSuperUser || isRoleTenantAdmin); } catch (KeeperException.NoNodeException e) { log.warn("Failed to get tenant info data for non existing tenant {}", tenantName); throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"); @@ -624,18 +611,4 @@ private CompletableFuture validateTenantAdminAccess(String tenantName, } } - private static void validateOriginalPrincipal(Set proxyRoles, String authenticatedPrincipal, - String originalPrincipal) { - if (proxyRoles.contains(authenticatedPrincipal)) { - // Request has come from a proxy - if (StringUtils.isBlank(originalPrincipal)) { - log.warn("Original principal empty in request authenticated as {}", authenticatedPrincipal); - throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be empty if the request is via proxy."); - } - if (proxyRoles.contains(originalPrincipal)) { - log.warn("Original principal {} cannot be a proxy role ({})", originalPrincipal, proxyRoles); - throw new RestException(Response.Status.UNAUTHORIZED, "Original principal cannot be a proxy role"); - } - } - } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java index 6b4fc8cdc6dcf..8555f34ec7481 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.web; -import static com.google.common.base.Preconditions.checkState; - import java.io.IOException; import javax.servlet.Filter; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 612c7a3bee83f..e66d29ccd7efc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2616,6 +2616,36 @@ public static CompletableFuture getPartitionedTopicMet return metadataFuture; } + /** + * Get partitioned topic metadata without checking the permission. + */ + public static CompletableFuture unsafeGetPartitionedTopicMetadataAsync( + PulsarService pulsar, TopicName topicName) { + CompletableFuture metadataFuture = new CompletableFuture(); + + String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespace(), + topicName.getDomain().toString(), topicName.getEncodedLocalName()); + + // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can + // serve/redirect request else fail partitioned-metadata-request so, client fails while creating + // producer/consumer + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) + .thenCompose(res -> pulsar.getBrokerService() + .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) + .thenAccept(metadata -> { + if (log.isDebugEnabled()) { + log.debug("Total number of partitions for topic {} is {}", topicName, + metadata.partitions); + } + metadataFuture.complete(metadata); + }).exceptionally(ex -> { + metadataFuture.completeExceptionally(ex.getCause()); + return null; + }); + + return metadataFuture; + } + /** * Get the Topic object reference from the Pulsar broker */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 2131b18799e9a..03bbe12c6b675 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -152,7 +152,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.bytesOutCounter = new LongAdder(); this.msgOutCounter = new LongAdder(); this.appId = appId; - this.authenticationData = cnx.authenticationData; + this.authenticationData = cnx.getAuthenticationData(); this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl(); PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 771cd21d05385..258186be0625f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -101,7 +101,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName this.epoch = epoch; this.closeFuture = new CompletableFuture<>(); this.appId = appId; - this.authenticationData = cnx.authenticationData; + this.authenticationData = cnx.getAuthenticationData(); this.msgIn = new Rate(); this.chuckedMessageRate = new Rate(); this.isNonPersistentTopic = topic instanceof NonPersistentTopic; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 15b3c97ed05bc..8949db440ee36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getPartitionedTopicMetadata; +import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync; import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; @@ -141,6 +142,7 @@ public class ServerCnx extends PulsarHandler { // In case of proxy, if the authentication credentials are forwardable, // it will hold the credentials of the original client AuthenticationState originalAuthState; + AuthenticationDataSource originalAuthData; private boolean pendingAuthChallengeResponse = false; // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow @@ -273,6 +275,65 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) { // // Incoming commands handling // //// + private CompletableFuture isTopicOperationAllowed(TopicName topicName, TopicOperation operation) { + CompletableFuture isProxyAuthorizedFuture; + CompletableFuture isAuthorizedFuture; + if (service.isAuthorizationEnabled()) { + if (originalPrincipal != null) { + isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync( + topicName, operation, originalPrincipal, getAuthenticationData()); + } else { + isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); + } + isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync( + topicName, operation, authRole, authenticationData); + } else { + isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); + isAuthorizedFuture = CompletableFuture.completedFuture(true); + } + return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> { + if (!isProxyAuthorized) { + log.error("OriginalRole {} is not authorized to perform operation {} on topic {}", + originalPrincipal, operation, topicName); + } + if (!isAuthorized) { + log.error("Role {} is not authorized to perform operation {} on topic {}", + authRole, operation, topicName); + } + return isProxyAuthorized && isAuthorized; + }); + } + + private CompletableFuture isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) { + CompletableFuture isProxyAuthorizedFuture; + CompletableFuture isAuthorizedFuture; + if (service.isAuthorizationEnabled()) { + if (authenticationData == null) { + authenticationData = new AuthenticationDataCommand("", subscriptionName); + } else { + authenticationData.setSubscription(subscriptionName); + } + if (originalAuthData != null) { + originalAuthData.setSubscription(subscriptionName); + } + return isTopicOperationAllowed(topicName, operation); + } else { + isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); + isAuthorizedFuture = CompletableFuture.completedFuture(true); + } + return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> { + if (!isProxyAuthorized) { + log.error("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}", + originalPrincipal, operation, topicName, subscriptionName); + } + if (!isAuthorized) { + log.error("Role {} is not authorized to perform operation {} on topic {}, subscription {}", + authRole, operation, topicName, subscriptionName); + } + return isProxyAuthorized && isAuthorized; + }); + } + @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); @@ -297,18 +358,10 @@ protected void handleLookup(CommandLookupTopic lookup) { lookupSemaphore.release(); return; } - CompletableFuture isProxyAuthorizedFuture; - if (service.isAuthorizationEnabled() && originalPrincipal != null) { - isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData); - } else { - isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); - } - String finalOriginalPrincipal = originalPrincipal; - isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { - if (isProxyAuthorized) { + isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> { + if (isAuthorized) { lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, - finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData, + getPrincipal(), getAuthenticationData(), requestId, advertisedListenerName).handle((lookupResponse, ex) -> { if (ex == null) { ctx.writeAndFlush(lookupResponse); @@ -324,14 +377,14 @@ protected void handleLookup(CommandLookupTopic lookup) { }); } else { final String msg = "Proxy Client is not authorized to Lookup"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); + log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); } return null; }).exceptionally(ex -> { final String msg = "Exception occured while trying to authorize lookup"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName, ex); + log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName, ex); ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); return null; @@ -369,19 +422,10 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa lookupSemaphore.release(); return; } - CompletableFuture isProxyAuthorizedFuture; - if (service.isAuthorizationEnabled() && originalPrincipal != null) { - isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.LOOKUP, originalPrincipal, authRole, authenticationData); - } else { - isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); - } - String finalOriginalPrincipal = originalPrincipal; - isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { - if (isProxyAuthorized) { - getPartitionedTopicMetadata(getBrokerService().pulsar(), - authRole, finalOriginalPrincipal, authenticationData, - topicName).handle((metadata, ex) -> { + isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> { + if (isAuthorized) { + unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName) + .handle((metadata, ex) -> { if (ex == null) { int partitions = metadata.partitions; ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); @@ -407,7 +451,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa }); } else { final String msg = "Proxy Client is not authorized to Get Partition Metadata"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); + log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); ctx.writeAndFlush( Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); @@ -415,7 +459,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa return null; }).exceptionally(ex -> { final String msg = "Exception occured while trying to authorize get Partition Metadata"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); + log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); return null; @@ -505,6 +549,9 @@ private State doAuthentication(AuthData clientData, String authRole = useOriginalAuthState ? originalPrincipal : this.authRole; AuthData brokerData = authState.authenticate(clientData); + if (log.isDebugEnabled()) { + log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole); + } if (authState.isComplete()) { // Authentication has completed. It was either: @@ -520,7 +567,7 @@ private State doAuthentication(AuthData clientData, if (log.isDebugEnabled()) { log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", - remoteAddress, authMethod, authRole, originalPrincipal); + remoteAddress, authMethod, this.authRole, originalPrincipal); } if (state != State.Connected) { @@ -607,8 +654,12 @@ protected void handleConnect(CommandConnect connect) { checkArgument(state == State.Start); if (log.isDebugEnabled()) { - log.debug("Received CONNECT from {}, auth enabled: {}", - remoteAddress, service.isAuthenticationEnabled()); + log.debug("Received CONNECT from {}, auth enabled: {}:" + + " has original principal = {}, original principal = {}", + remoteAddress, + service.isAuthenticationEnabled(), + connect.hasOriginalPrincipal(), + connect.getOriginalPrincipal()); } String clientVersion = connect.getClientVersion(); @@ -656,6 +707,12 @@ protected void handleConnect(CommandConnect connect) { authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession); authenticationData = authState.getAuthDataSource(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Authenticate role : {}", remoteAddress, + authState != null ? authState.getAuthRole() : null); + } + state = doAuthentication(clientData, clientProtocolVersion, clientVersion); // This will fail the check if: @@ -684,9 +741,18 @@ protected void handleConnect(CommandConnect connect) { AuthData.of(connect.getOriginalAuthData().getBytes()), remoteAddress, sslSession); + originalAuthData = originalAuthState.getAuthDataSource(); originalPrincipal = originalAuthState.getAuthRole(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Authenticate original role : {}", remoteAddress, originalPrincipal); + } } else { originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null; + if (log.isDebugEnabled()) { + log.debug("[{}] Authenticate original role (forwarded from proxy): {}", + remoteAddress, originalPrincipal); + } } } catch (Exception e) { String msg = "Unable to authenticate"; @@ -737,6 +803,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return; } + if (log.isDebugEnabled()) { + log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", + remoteAddress, authRole, originalPrincipal); + } + if (invalidOriginalPrincipal(originalPrincipal)) { final String msg = "Valid Proxy Client role should be provided while subscribing "; log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, @@ -765,33 +836,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final boolean forceTopicCreation = subscribe.getForceTopicCreation(); final PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null; - CompletableFuture isProxyAuthorizedFuture; - if (service.isAuthorizationEnabled() && originalPrincipal != null) { - authenticationData.setSubscription(subscriptionName); - isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData); - } else { - isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); - } - isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { - if (isProxyAuthorized) { - CompletableFuture authorizationFuture; - if (service.isAuthorizationEnabled()) { - if (authenticationData == null) { - authenticationData = new AuthenticationDataCommand("", subscriptionName); - } else { - authenticationData.setSubscription(subscriptionName); - } - authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.CONSUME, originalPrincipal, authRole, authenticationData); - } else { - authorizationFuture = CompletableFuture.completedFuture(true); - } - - authorizationFuture.thenApply(isAuthorized -> { + CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( + topicName, + subscriptionName, + TopicOperation.CONSUME + ); + isAuthorizedFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { - log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole); + log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, getPrincipal()); } log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); @@ -919,24 +972,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { }); } else { String msg = "Client is not authorized to subscribe"; - log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); + log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal()); ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; - }).exceptionally(e -> { - String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole); - log.warn(msg); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage())); - return null; - }); - } else { - final String msg = "Proxy Client is not authorized to subscribe"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); - } - return null; }).exceptionally(ex -> { - String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole); + String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal()); if (ex.getCause() instanceof PulsarServerException) { log.info(msg); } else { @@ -989,27 +1030,13 @@ protected void handleProducer(final CommandProducer cmdProducer) { return; } - CompletableFuture isProxyAuthorizedFuture; - if (service.isAuthorizationEnabled() && originalPrincipal != null) { - isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData); - } else { - isProxyAuthorizedFuture = CompletableFuture.completedFuture(true); - } - isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { - if (isProxyAuthorized) { - CompletableFuture authorizationFuture; - if (service.isAuthorizationEnabled()) { - authorizationFuture = service.getAuthorizationService().allowTopicOperationAsync(topicName, - TopicOperation.PRODUCE, originalPrincipal, authRole, authenticationData); - } else { - authorizationFuture = CompletableFuture.completedFuture(true); - } - - authorizationFuture.thenApply(isAuthorized -> { + CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( + topicName, TopicOperation.PRODUCE + ); + isAuthorizedFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { - log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole); + log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, getPrincipal()); } CompletableFuture producerFuture = new CompletableFuture<>(); CompletableFuture existingProducerFuture = producers.putIfAbsent(producerId, @@ -1092,7 +1119,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); schemaVersionFuture.thenAccept(schemaVersion -> { - Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, + Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(), isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName); try { @@ -1153,24 +1180,12 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); } else { String msg = "Client is not authorized to Produce"; - log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); + log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal()); ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; - }).exceptionally(e -> { - String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole); - log.warn(msg); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage())); - return null; - }); - } else { - final String msg = "Proxy Client is not authorized to Produce"; - log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); - } - return null; }).exceptionally(ex -> { - String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole); + String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), getPrincipal()); log.warn(msg); ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage())); return null; @@ -2043,7 +2058,11 @@ public AuthenticationState getAuthState() { } public AuthenticationDataSource getAuthenticationData() { - return authenticationData; + return originalAuthData != null ? originalAuthData : authenticationData; + } + + public String getPrincipal() { + return originalPrincipal != null ? originalPrincipal : authRole; } public AuthenticationProvider getAuthenticationProvider() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index fb7877e50e818..edbc93c41d73b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -68,7 +68,6 @@ import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantOperation; -import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -422,10 +421,12 @@ static boolean isValidCluster(PulsarService pulsarService, String cluster) {// I * will throw an exception to redirect to assigned owner or leader; if authoritative is true then it will try to * acquire all the namespace bundles. * - * @param fqnn - * @param authoritative - * @param readOnly - * @param bundleData + * @param tenant tenant name + * @param cluster cluster name + * @param namespace namespace name + * @param authoritative if it is an authoritative request + * @param readOnly if the request is read-only + * @param bundleData bundle data */ protected void validateNamespaceOwnershipWithBundles(String tenant, String cluster, String namespace, boolean authoritative, boolean readOnly, BundlesData bundleData) { @@ -582,11 +583,8 @@ public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritativ * client to the appropriate broker. If no broker owns the namespace yet, this function will try to acquire the * ownership by default. * + * @param topicName topic name * @param authoritative - * - * @param tenant - * @param cluster - * @param namespace */ protected void validateTopicOwnership(TopicName topicName, boolean authoritative) { NamespaceService nsService = pulsar().getNamespaceService(); @@ -794,31 +792,33 @@ protected static boolean isLeaderBroker(PulsarService pulsar) { protected static final int NOT_IMPLEMENTED = 501; public void validateTenantOperation(String tenant, TenantOperation operation) { - if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { + if (pulsar().getConfiguration().isAuthenticationEnabled() + && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"); } - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowTenantOperation( - tenant, operation, originalPrincipal(), clientAppId(), clientAuthData()); - + boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() + .allowTenantOperation(tenant, operation, originalPrincipal(), clientAppId(), clientAuthData()); if (!isAuthorized) { - throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for" + - " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]", + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateTenantOperation for" + + " originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]", originalPrincipal(), clientAppId(), operation.toString(), tenant)); } } } public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) { - if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { + if (pulsar().getConfiguration().isAuthenticationEnabled() + && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"); } - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), clientAppId(), clientAuthData()); + boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() + .allowNamespaceOperation(namespaceName, operation, originalPrincipal(), + clientAppId(), clientAuthData()); if (!isAuthorized) { throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for" + @@ -827,14 +827,18 @@ public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOpe } } - public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) { - if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { + public void validateNamespacePolicyOperation(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation) { + if (pulsar().getConfiguration().isAuthenticationEnabled() + && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"); } - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowNamespacePolicyOperation(namespaceName, policy, operation, originalPrincipal(), clientAppId(), clientAuthData()); + boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() + .allowNamespacePolicyOperation(namespaceName, policy, operation, + originalPrincipal(), clientAppId(), clientAuthData()); if (!isAuthorized) { throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for" + @@ -842,20 +846,4 @@ public void validateNamespacePolicyOperation(NamespaceName namespaceName, Policy } } } - - public void validateTopicOperation(TopicName topicName, TopicOperation operation) { - if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { - if (!isClientAuthenticated(clientAppId())) { - throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"); - } - - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), clientAuthData()); - - if (!isAuthorized) { - throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" + - " operation [%s] on topic [%s]", operation.toString(), topicName)); - } - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 94a02f64db651..fd563401c196a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -75,6 +75,7 @@ import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.ServerCnx.State; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -180,6 +181,8 @@ public void setup() throws Exception { doReturn(zkCache).when(pulsar).getLocalZkCacheService(); brokerService = spy(new BrokerService(pulsar)); + BrokerInterceptor interceptor = mock(BrokerInterceptor.class); + doReturn(interceptor).when(brokerService).getInterceptor(); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(executor).when(pulsar).getOrderedExecutor(); @@ -474,7 +477,7 @@ public void testProducerOnNotOwnedTopic() throws Exception { public void testProducerCommandWithAuthorizationPositive() throws Exception { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthenticationEnabled(); resetChannel(); @@ -605,7 +608,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception { public void testProducerCommandWithAuthorizationNegative() throws Exception { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); @@ -1195,7 +1198,7 @@ public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { public void testSubscribeCommandWithAuthorizationPositive() throws Exception { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); @@ -1217,7 +1220,7 @@ public void testSubscribeCommandWithAuthorizationPositive() throws Exception { public void testSubscribeCommandWithAuthorizationNegative() throws Exception { AuthorizationService authorizationService = mock(AuthorizationService.class); doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), Mockito.any(), Mockito.any()); doReturn(authorizationService).when(brokerService).getAuthorizationService(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 180142c05a036..687b08d911809 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -22,20 +22,18 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.fail; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.io.IOException; -import java.net.URI; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - import javax.naming.AuthenticationException; import lombok.Cleanup; - import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; @@ -54,16 +52,12 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicOperation; -import org.apache.pulsar.common.util.RestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class); @@ -435,7 +429,8 @@ public void close() throws IOException { } @Override - public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) { + public CompletableFuture isSuperUser(String role, + ServiceConfiguration serviceConfiguration) { Set superUserRoles = serviceConfiguration.getSuperUserRoles(); return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false); } @@ -509,32 +504,38 @@ public CompletableFuture isTenantAdmin(String tenant, String role, Tena } @Override - public CompletableFuture allowTenantOperationAsync(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) { + public CompletableFuture allowTenantOperationAsync( + String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) { return CompletableFuture.completedFuture(true); } @Override - public Boolean allowTenantOperation(String tenantName, String originalRole, String role, TenantOperation operation, AuthenticationDataSource authData) { + public Boolean allowTenantOperation( + String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) { return true; } @Override - public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) { + public CompletableFuture allowNamespaceOperationAsync( + NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) { return CompletableFuture.completedFuture(true); } @Override - public Boolean allowNamespaceOperation(NamespaceName namespaceName, String originalRole, String role, NamespaceOperation operation, AuthenticationDataSource authData) { + public Boolean allowNamespaceOperation( + NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) { return null; } @Override - public CompletableFuture allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) { + public CompletableFuture allowTopicOperationAsync( + TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) { return CompletableFuture.completedFuture(true); } @Override - public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) { + public Boolean allowTopicOperation( + TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) { return true; } } @@ -566,18 +567,10 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider { @Override - public Boolean allowTopicOperation(TopicName topicName, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) { - try { - return allowTopicOperationAsync(topicName, originalRole, role, operation, authData).get(); - } catch (InterruptedException e) { - throw new RestException(e); - } catch (ExecutionException e) { - throw new RestException(e.getCause()); - } - } - - @Override - public CompletableFuture allowTopicOperationAsync(TopicName topic, String originalRole, String role, TopicOperation operation, AuthenticationDataSource authData) { + public CompletableFuture allowTopicOperationAsync(TopicName topic, + String role, + TopicOperation operation, + AuthenticationDataSource authData) { CompletableFuture future = new CompletableFuture<>(); if (authData.hasSubscription()) { String subscription = authData.getSubscription(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 56a933bb5f5dd..697ddf9617830 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -62,7 +62,11 @@ import org.slf4j.LoggerFactory; class AdminProxyHandler extends ProxyServlet { + private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class); + + private static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal"; + private static final Set functionRoutes = new HashSet<>(Arrays.asList( "/admin/v3/function", "/admin/v2/function", @@ -334,7 +338,7 @@ protected void addProxyHeaders(HttpServletRequest clientRequest, Request proxyRe super.addProxyHeaders(clientRequest, proxyRequest); String user = (String) clientRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName); if (user != null) { - proxyRequest.header("X-Original-Principal", user); + proxyRequest.header(ORIGINAL_PRINCIPAL_HEADER, user); } } }