diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 6943e95999bba..ae36a9bba6ece 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import static java.util.concurrent.TimeUnit.SECONDS; @@ -396,11 +397,15 @@ public boolean allowTenantOperation(String tenantName, AuthenticationDataSource authData) { try { return allowTenantOperationAsync( - tenantName, operation, originalRole, role, authData).get(); + tenantName, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } @@ -521,11 +526,15 @@ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName, AuthenticationDataSource authData) { try { return allowNamespacePolicyOperationAsync( - namespaceName, policy, operation, originalRole, role, authData).get(); + namespaceName, policy, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } @@ -585,11 +594,15 @@ public Boolean allowTopicPolicyOperation(TopicName topicName, AuthenticationDataSource authData) { try { return allowTopicPolicyOperationAsync( - topicName, policy, operation, originalRole, role, authData).get(); + topicName, policy, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } @@ -667,9 +680,10 @@ public Boolean allowTopicOperation(TopicName topicName, TopicOperation operation, String originalRole, String role, - AuthenticationDataSource authData) { + AuthenticationDataSource authData) throws Exception { try { - return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(); + return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { throw new RestException(e); } catch (ExecutionException e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b28a51014ad4e..4ff236af5ac27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -233,7 +233,7 @@ protected void validateAdminAndClientPermission() { validateAdminAccessForTenant(topicName.getTenant()); } catch (Exception ve) { try { - checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData()); + checkAuthorizationAsync(pulsar(), topicName, clientAppId(), clientAuthData()); } catch (RestException re) { throw re; } catch (Exception e) { @@ -3559,46 +3559,55 @@ public static CompletableFuture getPartitionedTopicMet PulsarService pulsar, String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) { CompletableFuture metadataFuture = new CompletableFuture<>(); - try { - // (1) authorize client - try { - checkAuthorization(pulsar, topicName, clientAppId, authenticationData); - } catch (RestException e) { - try { - validateAdminAccessForTenant(pulsar, - clientAppId, originalPrincipal, topicName.getTenant(), authenticationData); - } catch (RestException authException) { - log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString()); - throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s", - clientAppId, topicName.toString(), authException.getMessage())); - } - } catch (Exception ex) { - // throw without wrapping to PulsarClientException that considers: unknown error marked as internal - // server error - log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId, - topicName.toString(), ex.getMessage(), ex); - throw ex; - } + CompletableFuture authorizationFuture = new CompletableFuture<>(); + checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData) + .thenRun(() -> authorizationFuture.complete(null)) + .exceptionally(e -> { + Throwable throwable = FutureUtil.unwrapCompletionException(e); + if (throwable instanceof RestException) { + validateAdminAccessForTenantAsync(pulsar, + clientAppId, originalPrincipal, topicName.getTenant(), authenticationData) + .thenRun(() -> { + authorizationFuture.complete(null); + }).exceptionally(ex -> { + Throwable throwable2 = FutureUtil.unwrapCompletionException(ex); + if (throwable2 instanceof RestException) { + log.warn("Failed to authorize {} on topic {}", clientAppId, topicName); + authorizationFuture.completeExceptionally(new PulsarClientException( + String.format("Authorization failed %s on topic %s with error %s", + clientAppId, topicName, throwable2.getMessage()))); + } else { + authorizationFuture.completeExceptionally(throwable2); + } + return null; + }); + } else { + // throw without wrapping to PulsarClientException that considers: unknown error marked as + // internal server error + log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable); + authorizationFuture.completeExceptionally(throwable); + } + return null; + }); - // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can - // serve/redirect request else fail partitioned-metadata-request so, client fails while creating - // producer/consumer - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) - .thenCompose(res -> pulsar.getBrokerService() - .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) - .thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName, - metadata.partitions); - } - metadataFuture.complete(metadata); - }).exceptionally(ex -> { - metadataFuture.completeExceptionally(ex.getCause()); - return null; - }); - } catch (Exception ex) { - metadataFuture.completeExceptionally(ex); - } + // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can + // serve/redirect request else fail partitioned-metadata-request so, client fails while creating + // producer/consumer + authorizationFuture.thenCompose(__ -> + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())) + .thenCompose(res -> + pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) + .thenAccept(metadata -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName, + metadata.partitions); + } + metadataFuture.complete(metadata); + }) + .exceptionally(e -> { + metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + return null; + }); return metadataFuture; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index dab1b293e9008..967059c07184b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -48,6 +48,7 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,23 +220,14 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe cluster); } validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(), - differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false)); + differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, + requestId, false)); } else { // (2) authorize client - try { - checkAuthorization(pulsarService, topicName, clientAppId, authenticationData); - } catch (RestException authException) { - log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString()); - validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError, - authException.getMessage(), requestId)); - return; - } catch (Exception e) { - log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString()); - validationFuture.completeExceptionally(e); - return; - } - // (3) validate global namespace - checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject()) + checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> { + // (3) validate global namespace + checkLocalOrGetPeerReplicationCluster(pulsarService, + topicName.getNamespaceObject()) .thenAccept(peerClusterData -> { if (peerClusterData == null) { // (4) all validation passed: initiate lookup @@ -247,21 +239,36 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl()) && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) { validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError, - "Redirected cluster's brokerService url is not configured", requestId)); + "Redirected cluster's brokerService url is not configured", + requestId)); return; } validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(), - peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, + peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, + requestId, false)); - }).exceptionally(ex -> { - validationFuture.complete( - newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId)); - return null; - }); + validationFuture.complete( + newLookupErrorResponse(ServerError.MetadataError, + FutureUtil.unwrapCompletionException(ex).getMessage(), requestId)); + return null; + }); + }) + .exceptionally(e -> { + Throwable throwable = FutureUtil.unwrapCompletionException(e); + if (throwable instanceof RestException) { + log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName); + validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError, + throwable.getMessage(), requestId)); + } else { + log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName); + validationFuture.completeExceptionally(throwable); + } + return null; + }); } }).exceptionally(ex -> { - validationFuture.completeExceptionally(ex); + validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 7b5f455ac970b..20652d14a539c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -245,6 +245,86 @@ protected void validateAdminAccessForTenant(String tenant) { } } + protected static CompletableFuture validateAdminAccessForTenantAsync( + PulsarService pulsar, String clientAppId, + String originalPrincipal, String tenant, + AuthenticationDataSource authenticationData) { + if (log.isDebugEnabled()) { + log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant, + (isClientAuthenticated(clientAppId)), clientAppId); + } + return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant) + .thenCompose(tenantInfoOptional -> { + if (!tenantInfoOptional.isPresent()) { + throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); + } + TenantInfo tenantInfo = tenantInfoOptional.get(); + if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration() + .isAuthorizationEnabled()) { + if (!isClientAuthenticated(clientAppId)) { + throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"); + } + validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, + originalPrincipal); + if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) { + AuthorizationService authorizationService = + pulsar.getBrokerService().getAuthorizationService(); + return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, + authenticationData) + .thenCompose(isTenantAdmin -> { + String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}"; + if (!isTenantAdmin) { + return authorizationService.isSuperUser(clientAppId, authenticationData) + .thenCombine(authorizationService.isSuperUser(originalPrincipal, + authenticationData), + (proxyAuthorized, originalPrincipalAuthorized) -> { + if (!proxyAuthorized || !originalPrincipalAuthorized) { + throw new RestException(Status.UNAUTHORIZED, + String.format( + "Proxy not authorized to access " + + "resource (proxy:%s,original:%s)" + , clientAppId, originalPrincipal)); + } else { + if (log.isDebugEnabled()) { + log.debug(debugMsg, originalPrincipal, + clientAppId, tenant); + } + return null; + } + }); + } else { + if (log.isDebugEnabled()) { + log.debug(debugMsg, originalPrincipal, clientAppId, tenant); + } + return CompletableFuture.completedFuture(null); + } + }); + } else { + return pulsar.getBrokerService() + .getAuthorizationService() + .isSuperUser(clientAppId, authenticationData) + .thenCompose(isSuperUser -> { + if (!isSuperUser) { + return pulsar.getBrokerService().getAuthorizationService() + .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData); + } else { + return CompletableFuture.completedFuture(true); + } + }).thenAccept(authorized -> { + if (!authorized) { + throw new RestException(Status.UNAUTHORIZED, + "Don't have permission to administrate resources on this tenant"); + } else { + log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant); + } + }); + } + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId, String originalPrincipal, String tenant, AuthenticationDataSource authenticationData) @@ -795,18 +875,22 @@ private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService pulsar, return null; } - protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role, - AuthenticationDataSource authenticationData) throws Exception { + protected static CompletableFuture checkAuthorizationAsync(PulsarService pulsarService, + TopicName topicName, String role, + AuthenticationDataSource authenticationData) { if (!pulsarService.getConfiguration().isAuthorizationEnabled()) { // No enforcing of authorization policies - return; + return CompletableFuture.completedFuture(null); } // get zk policy manager - if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName, - TopicOperation.LOOKUP, null, role, authenticationData)) { - log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role); - throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace"); - } + return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, + TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> { + if (!allow) { + log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role); + throw new RestException(Status.UNAUTHORIZED, + "Don't have permission to connect to this namespace"); + } + }); } // Used for unit tests access