Skip to content

Commit

Permalink
[Issue 5999] Support create/update tenant with empty cluster (apache#…
Browse files Browse the repository at this point in the history
…6027)

### Motivation

Fixes apache#5999

### Modifications

Add the logic to handle the blank cluster name.
  • Loading branch information
murong00 authored Feb 16, 2020
1 parent bf7302a commit e9083f5
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;

import io.swagger.annotations.ApiParam;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -31,6 +30,7 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response.Status;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.Constants;
Expand All @@ -42,7 +42,9 @@
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
Expand Down Expand Up @@ -91,6 +93,7 @@ public TenantInfo getTenantAdmin(
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Tenant name is not valid"),
@ApiResponse(code = 412, message = "Clusters can not be empty"),
@ApiResponse(code = 412, message = "Clusters do not exist") })
public void createTenant(
@ApiParam(value = "The tenant name")
Expand All @@ -102,9 +105,6 @@ public void createTenant(

try {
NamedEntity.checkName(tenant);
if (config == null) {
config = new TenantInfo();
}
zkCreate(path(POLICIES, tenant), jsonMapper().writeValueAsBytes(config));
log.info("[{}] Created tenant {}", clientAppId(), tenant);
} catch (KeeperException.NodeExistsException e) {
Expand All @@ -125,6 +125,7 @@ public void createTenant(
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Clusters can not be empty"),
@ApiResponse(code = 412, message = "Clusters do not exist") })
public void updateTenant(
@ApiParam(value = "The tenant name")
Expand Down Expand Up @@ -225,11 +226,15 @@ public void deleteTenant(
}

private void validateClusters(TenantInfo info) {
// empty cluster shouldn't be allowed
if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
|| info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
log.warn("[{}] Failed to validate due to clusters are empty", clientAppId());
throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty");
}

List<String> nonexistentClusters;
try {
if (info == null) {
info = new TenantInfo();
}
Set<String> availableClusters = clustersListCache().get();
Set<String> allowedClusters = info.getAllowedClusters();
nonexistentClusters = allowedClusters.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,11 @@ void properties() throws Exception {
assertEquals(properties.getTenants(), Lists.newArrayList());
verify(properties, times(1)).validateSuperUserAccess();

// create local cluster
clusters.createCluster(configClusterName, new ClusterData());

Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters);
properties.createTenant("test-property", tenantInfo);
verify(properties, times(2)).validateSuperUserAccess();
Expand Down Expand Up @@ -489,7 +493,6 @@ void properties() throws Exception {
assertEquals(properties.getTenants(), Lists.newArrayList());

// Create a namespace to test deleting a non-empty property
clusters.createCluster("use", new ClusterData());
newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", "other-role"), Sets.newHashSet("use"));
properties.createTenant("my-tenant", newPropertyAdmin);

Expand All @@ -511,17 +514,41 @@ void properties() throws Exception {
}

// Check tenantInfo is null
TenantInfo nullTenantInfo = new TenantInfo();
properties.createTenant("tenant-config-is-null", null);
assertEquals(properties.getTenantAdmin("tenant-config-is-null"), nullTenantInfo);
try {
properties.createTenant("tenant-config-is-null", null);
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}

// Check tenantInfo with empty cluster
String blankCluster = "";
Set<String> blankClusters = Sets.newHashSet(blankCluster);
TenantInfo tenantWithEmptyCluster = new TenantInfo(Sets.newHashSet("role1", "role2"), blankClusters);
try {
properties.createTenant("tenant-config-is-empty", tenantWithEmptyCluster);
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}

// Check tenantInfo contains empty cluster
Set<String> containBlankClusters = Sets.newHashSet(blankCluster);
containBlankClusters.add(configClusterName);
TenantInfo tenantContainEmptyCluster = new TenantInfo(Sets.newHashSet(), containBlankClusters);
try {
properties.createTenant("tenant-config-contain-empty", tenantContainEmptyCluster);
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}

AsyncResponse response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false);
ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
properties.deleteTenant("my-tenant");
properties.deleteTenant("tenant-config-is-null");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -48,8 +50,11 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
Expand Down Expand Up @@ -231,6 +236,17 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor();
}

public TenantInfo createDefaultTenantInfo() throws PulsarAdminException {
// create local cluster if not exist
if (!admin.clusters().getClusters().contains(configClusterName)) {
admin.clusters().createCluster(configClusterName, new ClusterData());
}
Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet(), allowedClusters);
return tenantInfo;
}

public static MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
List<ACL> dummyAclList = new ArrayList<>(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ public void testExclusiveSingleAckedPartitionedTopic() throws Exception {
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 10;
final int numberOfPartitions = 4;
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
// Special step to create partitioned topic

Expand Down Expand Up @@ -476,7 +477,8 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception {
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 10;
final int numberOfPartitions = 3;
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
Random rn = new Random();
// Special step to create partitioned topic
Expand Down Expand Up @@ -576,7 +578,8 @@ public void testFailoverSingleAckedPartitionedTopic() throws Exception {
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 10;
final int numberOfPartitions = 3;
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
Random rn = new Random();
// Special step to create partitioned topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.Sets;
import com.google.common.io.CharStreams;
import com.google.common.io.Closeables;

Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -228,8 +230,14 @@ public void testMaxRequestSize() throws Exception {
// This should have failed
assertEquals(response.getStatusLine().getStatusCode(), 400);

// Create local cluster
String localCluster = "test";
String clusterPath = PulsarWebResource.path("clusters", localCluster);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new ClusterData());
pulsar.getGlobalZkCache().getZooKeeper().create(clusterPath, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
TenantInfo info2 = new TenantInfo();
info2.setAdminRoles(Collections.singleton(StringUtils.repeat("*", 1 * 1024)));
info2.setAllowedClusters(Sets.newHashSet(localCluster));
httpPut.setEntity(new ByteArrayEntity(ObjectMapperFactory.getThreadLocal().writeValueAsBytes(info2)));

response = client.execute(httpPut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception {
final String patternString = "persistent://my-property/my-ns/pattern-topic.*";
Pattern pattern = Pattern.compile(patternString);

admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down Expand Up @@ -143,7 +144,8 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");

// 1. create partition
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down Expand Up @@ -234,7 +236,8 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio
Pattern pattern = Pattern.compile("my-property/my-ns/np-pattern-topic.*");

// 1. create partition
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down Expand Up @@ -324,7 +327,8 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*");

// 1. create partition
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down Expand Up @@ -473,7 +477,8 @@ public void testStartEmptyPatternConsumer() throws Exception {
Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");

// 1. create partition
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down Expand Up @@ -560,7 +565,8 @@ public void testAutoSubscribePatternConsumer() throws Exception {
Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");

// 1. create partition
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down Expand Up @@ -668,7 +674,8 @@ public void testAutoUnbubscribePatternConsumer() throws Exception {
Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");

// 1. create partition
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ public void testSharedAckedPartitionedTopic() throws Exception {
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 15;
final int numberOfPartitions = 3;
admin.tenants().createTenant("prop", new TenantInfo());
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);

// 1. producer connect
Expand Down
Loading

0 comments on commit e9083f5

Please sign in to comment.