diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 5cba6895a2238..1ad8fbe3e0981 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -228,6 +228,40 @@ public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks); } + private CompletableFuture allowConsumeOrProduceOpsAsync(NamespaceName namespaceName, + String role, + AuthenticationDataSource authenticationData) { + CompletableFuture finalResult = new CompletableFuture<>(); + allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume) + .whenComplete((consumeAuthorized, e) -> { + if (e == null) { + if (consumeAuthorized) { + finalResult.complete(consumeAuthorized); + return; + } + } else { + if (log.isDebugEnabled()) { + log.debug("Namespace [{}] Role [{}] exception occurred while trying to check Consume " + + "permission. {}", namespaceName, role, e.getCause()); + } + } + allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.produce) + .whenComplete((produceAuthorized, ex) -> { + if (ex == null) { + finalResult.complete(produceAuthorized); + } else { + if (log.isDebugEnabled()) { + log.debug("Namespace [{}] Role [{}] exception occurred while trying to check " + + "Produce permission. {}", namespaceName, role, ex.getCause()); + } + finalResult.completeExceptionally(ex.getCause()); + } + }); + }); + + return finalResult; + } + private CompletableFuture allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData, AuthAction authAction) { @@ -550,6 +584,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam namespaceName, role, authData, AuthAction.packages); case GET_TOPIC: case GET_TOPICS: + return allowConsumeOrProduceOpsAsync(namespaceName, role, authData); case UNSUBSCRIBE: case CLEAR_BACKLOG: return allowTheSpecifiedActionOpsAsync( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index d140da2fca35d..00faf964bf7b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -454,6 +454,12 @@ public void testClearBacklogPermission() throws Exception { assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions() .get(subscriptionName).getMsgBacklog(), 0); + superAdmin.namespaces().revokePermissionsOnNamespace(namespace, subscriptionRole); + superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole, + Sets.newHashSet(AuthAction.produce)); + assertEquals(sub1Admin.topics().getPartitionedTopicList(namespace), + Lists.newArrayList(topicName)); + log.info("-- Exiting {} test --", methodName); }