diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 06291ee007052..411c253bbaa5d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -546,6 +546,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages); break; case GET_TOPICS: + case UNSUBSCRIBE: isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData); break; default: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index f94f6b787120e..1af36f540bce9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -20,6 +20,8 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Maps; @@ -27,6 +29,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -51,6 +54,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TenantOperation; @@ -174,6 +178,7 @@ public void testSubscriberPermission() throws Exception { final String tenantRole = "tenant-role"; final String subscriptionRole = "sub1-role"; final String subscriptionName = "sub1"; + final String subscriptionName2 = "sub2"; final String namespace = "my-property/my-ns-sub-auth"; final String topicName = "persistent://" + namespace + "/my-topic"; Authentication adminAuthentication = new ClientAuthentication("superUser"); @@ -201,7 +206,18 @@ public void testSubscriberPermission() throws Exception { superAdmin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test"))); superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); - tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole, + + // subscriptionRole doesn't have topic-level authorization, so it will fail to get topic stats-internal info + try { + sub1Admin.topics().getInternalStats(topicName, true); + fail("should have failed with authorization exception"); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith( + "Unauthorized to validateTopicOperation for operation [GET_STATS]")); + } + + // grant topic consume authorization to the subscriptionRole + tenantAdmin.topics().grantPermission(topicName, subscriptionRole, Collections.singleton(AuthAction.consume)); replacePulsarClient(PulsarClient.builder() @@ -211,7 +227,17 @@ public void testSubscriberPermission() throws Exception { // (1) Create subscription name Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) .subscribe(); + Consumer consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName2) + .subscribe(); consumer.close(); + consumer2.close(); + + List subscriptions = sub1Admin.topics().getSubscriptions(topicName); + assertEquals(subscriptions.size(), 2); + + // now, subscriptionRole have consume authorization on topic, so it will successfully get topic internal stats + PersistentTopicInternalStats internalStats = superAdmin.topics().getInternalStats(topicName, true); + assertNotNull(internalStats); // verify tenant is able to perform all subscription-admin api tenantAdmin.topics().skipAllMessages(topicName, subscriptionName); @@ -227,10 +253,24 @@ public void testSubscriberPermission() throws Exception { tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10); tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest); + // subscriptionRole doesn't have namespace-level authorization, so it will fail to unsubscribe namespace + try { + sub1Admin.namespaces().unsubscribeNamespace(namespace, subscriptionName2); + fail("should have failed with authorization exception"); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith( + "Unauthorized to validateNamespaceOperation for operation [UNSUBSCRIBE]")); + } + // grant namespace-level authorization to the subscriptionRole tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole, Collections.singleton(AuthAction.consume)); + // now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace + superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2); + subscriptions = sub1Admin.topics().getSubscriptions(topicName); + assertEquals(subscriptions.size(), 1); + // subscriptionRole has namespace-level authorization sub1Admin.topics().resetCursor(topicName, subscriptionName, 10); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java deleted file mode 100644 index 75786290027aa..0000000000000 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/admin/GetMetadataOfTopicWithAuthTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.tests.integration.auth.admin; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.fail; -import com.google.common.io.Files; -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import lombok.Cleanup; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.tests.TestRetrySupport; -import org.apache.pulsar.tests.integration.containers.PulsarContainer; -import org.apache.pulsar.tests.integration.containers.ZKContainer; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; -import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; -import org.apache.pulsar.tests.integration.utils.DockerUtils; -import org.elasticsearch.common.collect.Set; -import org.testcontainers.containers.Network; -import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -/** - * GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and without the proper permission. - */ -@Slf4j -public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport { - - private static final String CLUSTER_PREFIX = "get-metadata-auth"; - private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key"; - private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key"; - - private static final String SUPER_USER_ROLE = "super-user"; - private String superUserAuthToken; - private static final String PROXY_ROLE = "proxy"; - private String proxyAuthToken; - private static final String REGULAR_USER_ROLE = "client"; - private String clientAuthToken; - private File publicKeyFile; - - private PulsarCluster pulsarCluster; - private PulsarContainer cmdContainer; - - @Override - @BeforeClass(alwaysRun = true) - protected void setup() throws Exception { - incrementSetupNumber(); - // Before starting the cluster, generate the secret key and the token - // Use Zk container to have 1 container available before starting the cluster - final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6)); - final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6)); - cmdContainer = new ZKContainer<>(cliContainerName); - cmdContainer - .withNetwork(Network.newNetwork()) - .withNetworkAliases(ZKContainer.NAME) - .withEnv("zkServers", ZKContainer.NAME); - cmdContainer.start(); - - createKeysAndTokens(cmdContainer); - - PulsarClusterSpec spec = PulsarClusterSpec.builder() - .numBookies(2) - .numBrokers(2) - .numProxies(1) - .clusterName(clusterName) - .brokerEnvs(getBrokerSettingsEnvs()) - .proxyEnvs(getProxySettingsEnvs()) - .brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER)) - .proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER)) - .build(); - - pulsarCluster = PulsarCluster.forSpec(spec); - pulsarCluster.start(); - } - - @Override - @AfterClass(alwaysRun = true) - public void cleanup() { - markCurrentSetupNumberCleaned(); - if (cmdContainer != null) { - cmdContainer.stop(); - } - if (pulsarCluster != null) { - pulsarCluster.stop(); - } - } - - private Map getBrokerSettingsEnvs() { - Map envs = new HashMap<>(); - envs.put("authenticationEnabled", "true"); - envs.put("authenticationProviders", AuthenticationProviderToken.class.getName()); - envs.put("authorizationEnabled", "true"); - envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE)); - envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName()); - envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken)); - envs.put("authenticationRefreshCheckSeconds", "1"); - envs.put("authenticateOriginalAuthData", "true"); - envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER); - return envs; - } - - private Map getProxySettingsEnvs() { - Map envs = new HashMap<>(); - envs.put("authenticationEnabled", "true"); - envs.put("authenticationProviders", AuthenticationProviderToken.class.getName()); - envs.put("authorizationEnabled", "true"); - envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName()); - envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken)); - envs.put("authenticationRefreshCheckSeconds", "1"); - envs.put("forwardAuthorizationCredentials", "true"); - envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER); - return envs; - } - - protected void createKeysAndTokens(PulsarContainer container) throws Exception { - container - .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair", - "--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER, - "--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER); - - byte[] publicKeyBytes = DockerUtils - .runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(), - "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER) - .getStdout(); - - publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp")); - Files.write(publicKeyBytes, publicKeyFile); - - clientAuthToken = container - .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create", - "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER, - "--subject", REGULAR_USER_ROLE) - .getStdout().trim(); - log.info("Created client token: {}", clientAuthToken); - - superUserAuthToken = container - .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create", - "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER, - "--subject", SUPER_USER_ROLE) - .getStdout().trim(); - log.info("Created super-user token: {}", superUserAuthToken); - - proxyAuthToken = container - .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create", - "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER, - "--subject", PROXY_ROLE) - .getStdout().trim(); - log.info("Created proxy token: {}", proxyAuthToken); - } - - @Test - public void testGetMetadataOfTopicWithLookupPermission() throws Exception { - @Cleanup - PulsarAdmin superUserAdmin = PulsarAdmin.builder() - .serviceHttpUrl(pulsarCluster.getHttpServiceUrl()) - .authentication(AuthenticationFactory.token(superUserAuthToken)) - .build(); - - @Cleanup - PulsarAdmin clientAdmin = PulsarAdmin.builder() - .serviceHttpUrl(pulsarCluster.getHttpServiceUrl()) - .authentication(AuthenticationFactory.token(clientAuthToken)) - .build(); - - // create partitioned topic - superUserAdmin.topics().createPartitionedTopic("public/default/test", 1); - - // do some operation without grant any permissions - try { - clientAdmin.topics().getInternalStats("public/default/test-partition-0", true); - fail("get internal stats and metadata operation should fail because the client hasn't permission to do"); - } catch (PulsarAdminException e) { - assertEquals(e.getStatusCode(), 401); - } - - // grant consume/produce permission to the role - superUserAdmin.topics().grantPermission("public/default/test", - REGULAR_USER_ROLE, Set.of(AuthAction.consume)); - - // then do some get internal stats and metadata operations again, it should success - PersistentTopicInternalStats internalStats = clientAdmin.topics() - .getInternalStats("public/default/test-partition-0", true); - assertNotNull(internalStats); - } -}