Skip to content

Commit

Permalink
[Authorization] Support GET_METADATA topic op after enable auth (apac…
Browse files Browse the repository at this point in the history
…he#12656)

### Motivation
Currently, we can get the internal stats of a topic through `bin/pulsar-admin topics stats-internal tn1/ns1/tp1` and also get ledger metadata by specifying flag `--metadata`.

However I found that `PulsarAuthorizationProvider` lacks support for topic operation `GET_METADATA` when verifying the role's authorization, code as below: 
https://github.com/apache/pulsar/blob/08a49c06bff4a52d26319a114961aed6cb6c4791/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L1162-L1164
https://github.com/apache/pulsar/blob/08a49c06bff4a52d26319a114961aed6cb6c4791/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L567-L596

The purpose of this PR is to support role with `lookup` topic authorization to `GET_METADATA` of ledger.
  • Loading branch information
yuruguo authored Nov 8, 2021
1 parent 213f14c commit e476735
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
switch (operation) {
case LOOKUP:
case GET_STATS:
case GET_METADATA:
isAuthorizedFuture = canLookupAsync(topicName, role, authData);
break;
case PRODUCE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.auth.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import com.google.common.io.Files;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.elasticsearch.common.collect.Set;
import org.testcontainers.containers.Network;
import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* GetMetadataOfTopicWithAuthTest will test Getmetadata operation with and without the proper permission.
*/
@Slf4j
public class GetMetadataOfTopicWithAuthTest extends TestRetrySupport {

private static final String CLUSTER_PREFIX = "get-metadata-auth";
private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";

private static final String SUPER_USER_ROLE = "super-user";
private String superUserAuthToken;
private static final String PROXY_ROLE = "proxy";
private String proxyAuthToken;
private static final String REGULAR_USER_ROLE = "client";
private String clientAuthToken;
private File publicKeyFile;

private PulsarCluster pulsarCluster;
private PulsarContainer cmdContainer;

@Override
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
incrementSetupNumber();
// Before starting the cluster, generate the secret key and the token
// Use Zk container to have 1 container available before starting the cluster
final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
cmdContainer = new ZKContainer<>(cliContainerName);
cmdContainer
.withNetwork(Network.newNetwork())
.withNetworkAliases(ZKContainer.NAME)
.withEnv("zkServers", ZKContainer.NAME);
cmdContainer.start();

createKeysAndTokens(cmdContainer);

PulsarClusterSpec spec = PulsarClusterSpec.builder()
.numBookies(2)
.numBrokers(2)
.numProxies(1)
.clusterName(clusterName)
.brokerEnvs(getBrokerSettingsEnvs())
.proxyEnvs(getProxySettingsEnvs())
.brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
.proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
.build();

pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
}

@Override
@AfterClass(alwaysRun = true)
public void cleanup() {
markCurrentSetupNumberCleaned();
if (cmdContainer != null) {
cmdContainer.stop();
}
if (pulsarCluster != null) {
pulsarCluster.stop();
}
}

private Map<String, String> getBrokerSettingsEnvs() {
Map<String, String> envs = new HashMap<>();
envs.put("authenticationEnabled", "true");
envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
envs.put("authorizationEnabled", "true");
envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
envs.put("authenticationRefreshCheckSeconds", "1");
envs.put("authenticateOriginalAuthData", "true");
envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
return envs;
}

private Map<String, String> getProxySettingsEnvs() {
Map<String, String> envs = new HashMap<>();
envs.put("authenticationEnabled", "true");
envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
envs.put("authorizationEnabled", "true");
envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
envs.put("authenticationRefreshCheckSeconds", "1");
envs.put("forwardAuthorizationCredentials", "true");
envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
return envs;
}

protected void createKeysAndTokens(PulsarContainer container) throws Exception {
container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
"--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);

byte[] publicKeyBytes = DockerUtils
.runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
"/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
.getStdout();

publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
Files.write(publicKeyBytes, publicKeyFile);

clientAuthToken = container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
"--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--subject", REGULAR_USER_ROLE)
.getStdout().trim();
log.info("Created client token: {}", clientAuthToken);

superUserAuthToken = container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
"--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--subject", SUPER_USER_ROLE)
.getStdout().trim();
log.info("Created super-user token: {}", superUserAuthToken);

proxyAuthToken = container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
"--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--subject", PROXY_ROLE)
.getStdout().trim();
log.info("Created proxy token: {}", proxyAuthToken);
}

@Test
public void testGetMetadataOfTopicWithLookupPermission() throws Exception {
@Cleanup
PulsarAdmin superUserAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
.authentication(AuthenticationFactory.token(superUserAuthToken))
.build();

@Cleanup
PulsarAdmin clientAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
.authentication(AuthenticationFactory.token(clientAuthToken))
.build();

// create partitioned topic
superUserAdmin.topics().createPartitionedTopic("public/default/test", 1);

// do some operation without grant any permissions
try {
clientAdmin.topics().getInternalStats("public/default/test-partition-0", true);
fail("get internal stats and metadata operation should fail because the client hasn't permission to do");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 401);
}

// grant consume/produce permission to the role
superUserAdmin.topics().grantPermission("public/default/test",
REGULAR_USER_ROLE, Set.of(AuthAction.consume));

// then do some get internal stats and metadata operations again, it should success
PersistentTopicInternalStats internalStats = clientAdmin.topics()
.getInternalStats("public/default/test-partition-0", true);
assertNotNull(internalStats);
}
}

0 comments on commit e476735

Please sign in to comment.