Skip to content

Commit

Permalink
support limit max topics per namespace (apache#8942)
Browse files Browse the repository at this point in the history
Fix apache#8225 

### Changes
1. Add topic partition number limit, which only take persistent topic into account, for each namespace
2. The  max topic number of each namespace can only be configured by broker.conf, i will make it configurable dynamic soon
3. Add the tests for this feature.
  • Loading branch information
hangc0276 authored Dec 15, 2020
1 parent 911a4f9 commit 37f3e65
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 0 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ defaultNumberOfNamespaceBundles=4
# This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded
maxNamespacesPerTenant=0

# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace,
# the broker should reject the new topic request(include topic auto-created by the producer or consumer)
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ brokerDeduplicationProducerInactivityTimeoutMinutes=360
# value will be used as the default
defaultNumberOfNamespaceBundles=4

# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace,
# the broker should reject the new topic request(include topic auto-created by the producer or consumer)
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
6 changes: 6 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ defaultNumberOfNamespaceBundles=4
# This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded
maxNamespacesPerTenant=0

# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace,
# the broker should reject the new topic request(include topic auto-created by the producer or consumer)
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded")
private int maxNamespacesPerTenant = 0;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Max number of topics allowed to be created in the namespace. "
+ "When the topics reach the max topics of the namespace, the broker should reject "
+ "the new topic request(include topic auto-created by the producer or consumer) until "
+ "the number of connected consumers decrease. "
+ " Using a value of 0, is disabling maxTopicsPerNamespace-limit check."
)
private int maxTopicsPerNamespace = 0;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";

protected ZooKeeper globalZk() {
return pulsar().getGlobalZkCache().getZooKeeper();
Expand Down Expand Up @@ -787,7 +788,47 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
return partitionedTopics;
}

protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
List<String> topicPartitions = Lists.newArrayList();

try {
String topicPartitionPath = joinPath(MANAGED_LEDGER_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(topicPartitionPath, false);
topicPartitions = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get topic partition list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
}

topicPartitions.sort(null);
return topicPartitions;
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
final int maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace();
if (maxTopicsPerNamespace > 0) {
try {
List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);
if (partitionedTopics.size() + numPartitions > maxTopicsPerNamespace) {
log.error("[{}] Failed to create partitioned topic {}, "
+ "exceed maximum number of topics in namespace", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.PRECONDITION_FAILED,
"Exceed maximum number of topics in namespace."));
return;
}
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
try {
validateAdminAccessForTenant(topicName.getTenant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final AuthenticationService authenticationService;

public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration";
public static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";

private final ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;

private static final LongAdder totalUnackedMessages = new LongAdder();
Expand Down Expand Up @@ -1085,6 +1087,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
return;
}

if (!checkMaxTopicsPerNamespace(topicName, 1, topicFuture)) {
return;
}

getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> {
managedLedgerConfig.setCreateIfMissing(createIfMissing);

Expand Down Expand Up @@ -2155,6 +2161,10 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline();

if (!checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, partitionedTopicFuture)) {
return partitionedTopicFuture;
}

try {
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata);

Expand Down Expand Up @@ -2504,6 +2514,35 @@ private void checkTopicLevelPolicyEnable() {
}
}


private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions,
CompletableFuture<T> topicFuture) {
final int maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace();
if (maxTopicsPerNamespace > 0) {
try {
String partitionedTopicPath = PulsarWebResource.joinPath(MANAGED_LEDGER_PATH_ZNODE,
topicName.getNamespace(), topicName.getDomain().value());
List<String> topics = pulsar().getGlobalZkCache().getZooKeeper()
.getChildren(partitionedTopicPath, false);
if (topics.size() + numPartitions > maxTopicsPerNamespace) {
log.error("Failed to create persistent topic {}, "
+ "exceed maximum number of topics in namespace", topicName);
topicFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED,
"Exceed maximum number of topics in namespace."));
return false;
}
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("Failed to create partitioned topic {}", topicName, e);
topicFuture.completeExceptionally(new RestException(e));
return false;
}
}

return true;
}

public void setInterceptor(BrokerInterceptor interceptor) {
this.interceptor = interceptor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,9 @@ public void testMaxNumPartitionsPerPartitionedTopicSuccess() {
} catch (Exception e) {
fail("should not throw any exceptions");
}

// reset configuration
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
}

@Test
Expand All @@ -1275,6 +1278,9 @@ public void testMaxNumPartitionsPerPartitionedTopicFailure() {
} catch (Exception e) {
assertTrue(e instanceof PulsarAdminException);
}

// reset configuration
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
}

@Test
Expand Down Expand Up @@ -1331,6 +1337,86 @@ public void testMaxNamespacesPerTenant() throws Exception {

}

@Test
public void testMaxTopicsPerNamespace() throws Exception {
super.internalCleanup();
conf.setMaxTopicsPerNamespace(10);
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

// check create partitioned/non-partitioned topics
String topic = "persistent://testTenant/ns1/test_create_topic_v";
admin.topics().createPartitionedTopic(topic + "1", 2);
admin.topics().createPartitionedTopic(topic + "2", 3);
admin.topics().createPartitionedTopic(topic + "3", 4);
admin.topics().createNonPartitionedTopic(topic + "4");
try {
admin.topics().createPartitionedTopic(topic + "5", 2);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
Assert.assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace.");
}

//unlimited
super.internalCleanup();
conf.setMaxTopicsPerNamespace(0);
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
for (int i = 0; i < 10; ++i) {
admin.topics().createPartitionedTopic(topic + i, 2);
admin.topics().createNonPartitionedTopic(topic + i + i);
}

// check producer/consumer auto create partitioned topic
super.internalCleanup();
conf.setMaxTopicsPerNamespace(10);
conf.setDefaultNumPartitions(3);
conf.setAllowAutoTopicCreationType("partitioned");
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

pulsarClient.newProducer().topic(topic + "1").create();
pulsarClient.newProducer().topic(topic + "2").create();
pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe();
try {
pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Exception: ", e);
}

// check producer/consumer auto create non-partitioned topic
super.internalCleanup();
conf.setMaxTopicsPerNamespace(3);
conf.setAllowAutoTopicCreationType("non-partitioned");
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

pulsarClient.newProducer().topic(topic + "1").create();
pulsarClient.newProducer().topic(topic + "2").create();
pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe();
try {
pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Exception: ", e);
}

// reset configuration
conf.setMaxTopicsPerNamespace(0);
conf.setDefaultNumPartitions(1);
}

@Test
public void testInvalidBundleErrorResponse() throws Exception {
try {
Expand Down

0 comments on commit 37f3e65

Please sign in to comment.