diff --git a/conf/broker.conf b/conf/broker.conf index 5e22f71173be1..06cffb76d6856 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -226,6 +226,12 @@ defaultNumberOfNamespaceBundles=4 # This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded maxNamespacesPerTenant=0 +# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace, +# the broker should reject the new topic request(include topic auto-created by the producer or consumer) +# until the number of connected consumers decrease. +# Using a value of 0, is disabling maxTopicsPerNamespace-limit check. +maxTopicsPerNamespace=0 + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/conf/standalone.conf b/conf/standalone.conf index b54f40cfe2ea3..e0f20eecd472a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -153,6 +153,12 @@ brokerDeduplicationProducerInactivityTimeoutMinutes=360 # value will be used as the default defaultNumberOfNamespaceBundles=4 +# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace, +# the broker should reject the new topic request(include topic auto-created by the producer or consumer) +# until the number of connected consumers decrease. +# Using a value of 0, is disabling maxTopicsPerNamespace-limit check. +maxTopicsPerNamespace=0 + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 638c889d38c50..cb975051130a2 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -194,6 +194,12 @@ defaultNumberOfNamespaceBundles=4 # This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded maxNamespacesPerTenant=0 +# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace, +# the broker should reject the new topic request(include topic auto-created by the producer or consumer) +# until the number of connected consumers decrease. +# Using a value of 0, is disabling maxTopicsPerNamespace-limit check. +maxTopicsPerNamespace=0 + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index dcfe0c8742369..97799cde6aa9a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -468,6 +468,17 @@ public class ServiceConfiguration implements PulsarConfiguration { + "This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded") private int maxNamespacesPerTenant = 0; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Max number of topics allowed to be created in the namespace. " + + "When the topics reach the max topics of the namespace, the broker should reject " + + "the new topic request(include topic auto-created by the producer or consumer) until " + + "the number of connected consumers decrease. " + + " Using a value of 0, is disabling maxTopicsPerNamespace-limit check." + ) + private int maxTopicsPerNamespace = 0; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index d7880c5343ba4..58c72e90627aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -90,6 +90,7 @@ public abstract class AdminResource extends PulsarWebResource { private static final Logger log = LoggerFactory.getLogger(AdminResource.class); private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; + private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers"; protected ZooKeeper globalZk() { return pulsar().getGlobalZkCache().getZooKeeper(); @@ -787,7 +788,47 @@ protected List getPartitionedTopicList(TopicDomain topicDomain) { return partitionedTopics; } + protected List getTopicPartitionList(TopicDomain topicDomain) { + List topicPartitions = Lists.newArrayList(); + + try { + String topicPartitionPath = joinPath(MANAGED_LEDGER_PATH_ZNODE, + namespaceName.toString(), topicDomain.value()); + List topics = globalZk().getChildren(topicPartitionPath, false); + topicPartitions = topics.stream() + .map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s))) + .collect(Collectors.toList()); + } catch (KeeperException.NoNodeException e) { + // NoNode means there are no topics in this domain for this namespace + } catch (Exception e) { + log.error("[{}] Failed to get topic partition list for namespace {}", clientAppId(), + namespaceName.toString(), e); + throw new RestException(e); + } + + topicPartitions.sort(null); + return topicPartitions; + } + protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) { + final int maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace(); + if (maxTopicsPerNamespace > 0) { + try { + List partitionedTopics = getTopicPartitionList(TopicDomain.persistent); + if (partitionedTopics.size() + numPartitions > maxTopicsPerNamespace) { + log.error("[{}] Failed to create partitioned topic {}, " + + "exceed maximum number of topics in namespace", clientAppId(), topicName); + resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.PRECONDITION_FAILED, + "Exceed maximum number of topics in namespace.")); + return; + } + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } + final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); try { validateAdminAccessForTenant(topicName.getTenant()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 68523e2a32094..5d614af5bc6eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -232,6 +232,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener> dynamicConfigurationCache; private static final LongAdder totalUnackedMessages = new LongAdder(); @@ -1085,6 +1087,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, return; } + if (!checkMaxTopicsPerNamespace(topicName, 1, topicFuture)) { + return; + } + getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { managedLedgerConfig.setCreateIfMissing(createIfMissing); @@ -2155,6 +2161,10 @@ private CompletableFuture createDefaultPartitionedTopi PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); CompletableFuture partitionedTopicFuture = futureWithDeadline(); + if (!checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, partitionedTopicFuture)) { + return partitionedTopicFuture; + } + try { byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata); @@ -2504,6 +2514,35 @@ private void checkTopicLevelPolicyEnable() { } } + + private boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions, + CompletableFuture topicFuture) { + final int maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace(); + if (maxTopicsPerNamespace > 0) { + try { + String partitionedTopicPath = PulsarWebResource.joinPath(MANAGED_LEDGER_PATH_ZNODE, + topicName.getNamespace(), topicName.getDomain().value()); + List topics = pulsar().getGlobalZkCache().getZooKeeper() + .getChildren(partitionedTopicPath, false); + if (topics.size() + numPartitions > maxTopicsPerNamespace) { + log.error("Failed to create persistent topic {}, " + + "exceed maximum number of topics in namespace", topicName); + topicFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, + "Exceed maximum number of topics in namespace.")); + return false; + } + } catch (KeeperException.NoNodeException e) { + // NoNode means there are no partitioned topics in this domain for this namespace + } catch (Exception e) { + log.error("Failed to create partitioned topic {}", topicName, e); + topicFuture.completeExceptionally(new RestException(e)); + return false; + } + } + + return true; + } + public void setInterceptor(BrokerInterceptor interceptor) { this.interceptor = interceptor; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 209a996c9272e..5e3d9d6b05f62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -1262,6 +1262,9 @@ public void testMaxNumPartitionsPerPartitionedTopicSuccess() { } catch (Exception e) { fail("should not throw any exceptions"); } + + // reset configuration + pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0); } @Test @@ -1275,6 +1278,9 @@ public void testMaxNumPartitionsPerPartitionedTopicFailure() { } catch (Exception e) { assertTrue(e instanceof PulsarAdminException); } + + // reset configuration + pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0); } @Test @@ -1331,6 +1337,86 @@ public void testMaxNamespacesPerTenant() throws Exception { } + @Test + public void testMaxTopicsPerNamespace() throws Exception { + super.internalCleanup(); + conf.setMaxTopicsPerNamespace(10); + super.internalSetup(); + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + + // check create partitioned/non-partitioned topics + String topic = "persistent://testTenant/ns1/test_create_topic_v"; + admin.topics().createPartitionedTopic(topic + "1", 2); + admin.topics().createPartitionedTopic(topic + "2", 3); + admin.topics().createPartitionedTopic(topic + "3", 4); + admin.topics().createNonPartitionedTopic(topic + "4"); + try { + admin.topics().createPartitionedTopic(topic + "5", 2); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + Assert.assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace."); + } + + //unlimited + super.internalCleanup(); + conf.setMaxTopicsPerNamespace(0); + super.internalSetup(); + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + for (int i = 0; i < 10; ++i) { + admin.topics().createPartitionedTopic(topic + i, 2); + admin.topics().createNonPartitionedTopic(topic + i + i); + } + + // check producer/consumer auto create partitioned topic + super.internalCleanup(); + conf.setMaxTopicsPerNamespace(10); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreationType("partitioned"); + super.internalSetup(); + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + + pulsarClient.newProducer().topic(topic + "1").create(); + pulsarClient.newProducer().topic(topic + "2").create(); + pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe(); + try { + pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe(); + Assert.fail(); + } catch (PulsarClientException e) { + log.info("Exception: ", e); + } + + // check producer/consumer auto create non-partitioned topic + super.internalCleanup(); + conf.setMaxTopicsPerNamespace(3); + conf.setAllowAutoTopicCreationType("non-partitioned"); + super.internalSetup(); + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + + pulsarClient.newProducer().topic(topic + "1").create(); + pulsarClient.newProducer().topic(topic + "2").create(); + pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe(); + try { + pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe(); + Assert.fail(); + } catch (PulsarClientException e) { + log.info("Exception: ", e); + } + + // reset configuration + conf.setMaxTopicsPerNamespace(0); + conf.setDefaultNumPartitions(1); + } + @Test public void testInvalidBundleErrorResponse() throws Exception { try {