Skip to content

Commit

Permalink
allow consume permission to do GetTopics op (apache#12600)
Browse files Browse the repository at this point in the history
Fixes apache#12423

### Motivation
Regex subscription requires to get the topics list of given namespace with GetTopicsOfNamespace request, but this request requires tenant admin permission which will block the regex consumers who only have consume permission.

### Modifications
This PR added the consume permission check for GetTopicsOfNamespace, which allows consumers get the topics list with consume permission.
  • Loading branch information
freeznet authored Nov 5, 2021
1 parent a6b1b34 commit 7e078aa
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, Stri
CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
* Allow consume operations with in this namespace
* @param namespaceName The namespace that the consume operations can be executed in
* @param role The role to check
* @param authenticationData authentication data related to the role
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
*
* Grant authorization-action permission on a namespace to the given client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return authorize(authenticationData, r -> super.allowSinkOpsAsync(namespaceName, r, authenticationData));
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return authorize(authenticationData, r -> super.allowConsumeOpsAsync(namespaceName, r, authenticationData));
}

@Override
public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume);
}

private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData,
AuthAction authAction) {
Expand Down Expand Up @@ -525,6 +530,9 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case PACKAGES:
isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
break;
case GET_TOPICS:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
break;
default:
isAuthorizedFuture = CompletableFuture.completedFuture(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return roleAuthorizedAsync(role);
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData) {
return roleAuthorizedAsync(role);
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
String authDataJson) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return null;
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authenticationData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return null;
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return null;
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authenticationData) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.auth.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import com.google.common.io.Files;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.elasticsearch.common.collect.Set;
import org.testcontainers.containers.Network;
import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* GetTopicsOfNamespaceWithAuthTest will test GetTopics operation with and without the proper permission.
*/
@Slf4j
public class GetTopicsOfNamespaceWithAuthTest extends TestRetrySupport {

private static final String CLUSTER_PREFIX = "get-topics-auth";
private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = "/tmp/private.key";
private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = "/tmp/public.key";

private static final String SUPER_USER_ROLE = "super-user";
private String superUserAuthToken;
private static final String PROXY_ROLE = "proxy";
private String proxyAuthToken;
private static final String REGULAR_USER_ROLE = "client";
private String clientAuthToken;
private File publicKeyFile;

private PulsarCluster pulsarCluster;
private PulsarContainer cmdContainer;

@Override
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
incrementSetupNumber();
// Before starting the cluster, generate the secret key and the token
// Use Zk container to have 1 container available before starting the cluster
final String clusterName = String.format("%s-%s", CLUSTER_PREFIX, RandomStringUtils.randomAlphabetic(6));
final String cliContainerName = String.format("%s-%s", "cli", RandomStringUtils.randomAlphabetic(6));
cmdContainer = new ZKContainer<>(cliContainerName);
cmdContainer
.withNetwork(Network.newNetwork())
.withNetworkAliases(ZKContainer.NAME)
.withEnv("zkServers", ZKContainer.NAME);
cmdContainer.start();

createKeysAndTokens(cmdContainer);

PulsarClusterSpec spec = PulsarClusterSpec.builder()
.numBookies(2)
.numBrokers(2)
.numProxies(1)
.clusterName(clusterName)
.brokerEnvs(getBrokerSettingsEnvs())
.proxyEnvs(getProxySettingsEnvs())
.brokerMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
.proxyMountFiles(Collections.singletonMap(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER))
.build();

pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
}

@Override
@AfterClass(alwaysRun = true)
public void cleanup() {
markCurrentSetupNumberCleaned();
if (cmdContainer != null) {
cmdContainer.stop();
}
if (pulsarCluster != null) {
pulsarCluster.stop();
}
}

private Map<String, String> getBrokerSettingsEnvs() {
Map<String, String> envs = new HashMap<>();
envs.put("authenticationEnabled", "true");
envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
envs.put("authorizationEnabled", "true");
envs.put("superUserRoles", String.format("%s,%s", SUPER_USER_ROLE, PROXY_ROLE));
envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
envs.put("brokerClientAuthenticationParameters", String.format("token:%s", superUserAuthToken));
envs.put("authenticationRefreshCheckSeconds", "1");
envs.put("authenticateOriginalAuthData", "true");
envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
return envs;
}

private Map<String, String> getProxySettingsEnvs() {
Map<String, String> envs = new HashMap<>();
envs.put("authenticationEnabled", "true");
envs.put("authenticationProviders", AuthenticationProviderToken.class.getName());
envs.put("authorizationEnabled", "true");
envs.put("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName());
envs.put("brokerClientAuthenticationParameters", String.format("token:%s", proxyAuthToken));
envs.put("authenticationRefreshCheckSeconds", "1");
envs.put("forwardAuthorizationCredentials", "true");
envs.put("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER);
return envs;
}

protected void createKeysAndTokens(PulsarContainer container) throws Exception {
container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create-key-pair",
"--output-private-key", PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--output-public-key", PUBLIC_KEY_PATH_INSIDE_CONTAINER);

byte[] publicKeyBytes = DockerUtils
.runCommandWithRawOutput(container.getDockerClient(), container.getContainerId(),
"/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
.getStdout();

publicKeyFile = File.createTempFile("public-", ".key", new File("/tmp"));
Files.write(publicKeyBytes, publicKeyFile);

clientAuthToken = container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
"--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--subject", REGULAR_USER_ROLE)
.getStdout().trim();
log.info("Created client token: {}", clientAuthToken);

superUserAuthToken = container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
"--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--subject", SUPER_USER_ROLE)
.getStdout().trim();
log.info("Created super-user token: {}", superUserAuthToken);

proxyAuthToken = container
.execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create",
"--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER,
"--subject", PROXY_ROLE)
.getStdout().trim();
log.info("Created proxy token: {}", proxyAuthToken);
}

@Test
public void testGetTopicsOfNamespaceOpsWithConsumePermission() throws Exception {
@Cleanup
PulsarAdmin superUserAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
.authentication(AuthenticationFactory.token(superUserAuthToken))
.build();

@Cleanup
PulsarAdmin clientAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
.authentication(AuthenticationFactory.token(clientAuthToken))
.build();

// do some operation without grant any permissions
try {
clientAdmin.namespaces().getTopics("public/default");
fail("list topics operation should fail because the client hasn't permission to do");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 401);
}

// grant consume permission to the role
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
REGULAR_USER_ROLE, Set.of(AuthAction.consume));

// then do some get topics operations again, it should success
List<String> topics = clientAdmin.namespaces().getTopics("public/default");
assertEquals(topics.size(), 0);
}
}

0 comments on commit 7e078aa

Please sign in to comment.