diff --git a/conf/broker.conf b/conf/broker.conf index bec34fded0e75..6e5e83f28b5f8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -361,6 +361,11 @@ maxProducersPerTopic=0 # Using a value of 0, is disabling maxConsumersPerTopic-limit check. maxConsumersPerTopic=0 +# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject +# new subscription until the number of subscribed subscriptions decrease. +# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check. +maxSubscriptionsPerTopic=0 + # Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers # until the number of connected consumers decrease. # Using a value of 0, is disabling maxConsumersPerSubscription-limit check. diff --git a/conf/standalone.conf b/conf/standalone.conf index af88965c37099..38a4f62a3e87f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -242,6 +242,11 @@ maxProducersPerTopic=0 # Using a value of 0, is disabling maxConsumersPerTopic-limit check. maxConsumersPerTopic=0 +# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject +# new subscription until the number of subscribed subscriptions decrease. +# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check. +maxSubscriptionsPerTopic=0 + # Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers # until the number of connected consumers decrease. # Using a value of 0, is disabling maxConsumersPerSubscription-limit check. diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 7568fe00b1c57..8c6d60972b763 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -346,6 +346,11 @@ maxProducersPerTopic=0 # Using a value of 0, is disabling maxConsumersPerTopic-limit check. maxConsumersPerTopic=0 +# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject +# new subscription until the number of subscribed subscriptions decrease. +# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check. +maxSubscriptionsPerTopic=0 + # Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers # until the number of connected consumers decrease. # Using a value of 0, is disabling maxConsumersPerSubscription-limit check. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3623785b91e08..c0fa3f9be1516 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -697,6 +697,14 @@ public class ServiceConfiguration implements PulsarConfiguration { + " Using a value of 0, is disabling maxConsumersPerTopic-limit check.") private int maxConsumersPerTopic = 0; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Max number of subscriptions allowed to subscribe to topic. \n\nOnce this limit reaches, " + + " broker will reject new subscription until the number of subscribed subscriptions decrease.\n" + + " Using a value of 0, is disabling maxSubscriptionsPerTopic limit check." + ) + private int maxSubscriptionsPerTopic = 0; + @FieldContext( category = CATEGORY_SERVER, doc = "Max number of consumers allowed to connect to subscription. \n\nOnce this limit reaches," diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f3591bb81b546..3ffc0ad0f1bc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import javax.ws.rs.core.Response; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @@ -45,9 +46,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.BiFunction; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.core.StreamingOutput; - import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -87,10 +85,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; -import org.apache.pulsar.broker.service.PrecisPublishLimiter; import org.apache.pulsar.broker.service.Producer; -import org.apache.pulsar.broker.service.PublishRateLimiter; -import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.StreamingStats; @@ -120,7 +115,6 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -133,7 +127,7 @@ import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; @@ -686,6 +680,12 @@ private CompletableFuture getDurableSubscription(String subscripti InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); + if (checkMaxSubscriptionsPerTopicExceed()) { + subscriptionFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, + "Exceed the maximum number of subscriptions of the topic: " + topic)); + return subscriptionFuture; + } + Map properties = PersistentSubscription.getBaseCursorProperties(replicated); ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() { @@ -723,6 +723,13 @@ private CompletableFuture getNonDurableSubscription(Stri MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) { log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId); + CompletableFuture subscriptionFuture = new CompletableFuture<>(); + if (checkMaxSubscriptionsPerTopicExceed()) { + subscriptionFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, + "Exceed the maximum number of subscriptions of the topic: " + topic)); + return subscriptionFuture; + } + synchronized (ledger) { // Create a new non-durable cursor only for the first consumer that connects PersistentSubscription subscription = subscriptions.get(subscriptionName); @@ -758,7 +765,6 @@ private CompletableFuture getNonDurableSubscription(Stri if (startMessageRollbackDurationSec > 0) { long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec); - CompletableFuture subscriptionFuture = new CompletableFuture<>(); final Subscription finalSubscription = subscription; subscription.resetCursor(timestamp).handle((s, ex) -> { if (ex != null) { @@ -2511,4 +2517,15 @@ private void registerTopicPolicyListener() { public MessageDeduplication getMessageDeduplication() { return messageDeduplication; } + + private boolean checkMaxSubscriptionsPerTopicExceed() { + final int maxSubscriptionsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic(); + if (maxSubscriptionsPerTopic > 0) { + if (subscriptions != null && subscriptions.size() >= maxSubscriptionsPerTopic) { + return true; + } + } + + return false; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 8974cd641c2d7..ec3d0cf3fd0d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.admin; +import lombok.extern.slf4j.Slf4j; import static org.apache.commons.lang3.StringUtils.isBlank; +import org.apache.pulsar.client.api.MessageId; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -91,6 +93,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j public class AdminApiTest2 extends MockedPulsarServiceBaseTest { private MockedPulsarService mockPulsarSetup; @@ -1323,4 +1326,89 @@ public void testMaxNamespacesPerTenant() throws Exception { } } + + @Test + public void testMaxSubscriptionsPerTopic() throws Exception { + super.internalCleanup(); + conf.setMaxSubscriptionsPerTopic(2); + super.internalSetup(); + + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + + final String topic = "persistent://testTenant/ns1/max-subscriptions-per-topic"; + + admin.topics().createPartitionedTopic(topic, 3); + Producer producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + + // create subscription + admin.topics().createSubscription(topic, "test-sub1", MessageId.earliest); + admin.topics().createSubscription(topic, "test-sub2", MessageId.earliest); + try { + admin.topics().createSubscription(topic, "test-sub3", MessageId.earliest); + Assert.fail(); + } catch (PulsarAdminException e) { + log.info("create subscription failed. Exception: ", e); + } + + super.internalCleanup(); + conf.setMaxSubscriptionsPerTopic(0); + super.internalSetup(); + + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + + admin.topics().createPartitionedTopic(topic, 3); + producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + + for (int i = 0; i < 10; ++i) { + admin.topics().createSubscription(topic, "test-sub" + i, MessageId.earliest); + } + + super.internalCleanup(); + conf.setMaxSubscriptionsPerTopic(2); + super.internalSetup(); + + admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); + + admin.topics().createPartitionedTopic(topic, 3); + producer = pulsarClient.newProducer().topic(topic).create(); + producer.close(); + + Consumer consumer1 = null; + Consumer consumer2 = null; + Consumer consumer3 = null; + + try { + consumer1 = pulsarClient.newConsumer().subscriptionName("test-sub1").topic(topic).subscribe(); + Assert.assertNotNull(consumer1); + } catch (PulsarClientException e) { + Assert.fail(); + } + + try { + consumer2 = pulsarClient.newConsumer().subscriptionName("test-sub2").topic(topic).subscribe(); + Assert.assertNotNull(consumer2); + } catch (PulsarClientException e) { + Assert.fail(); + } + + try { + consumer3 = pulsarClient.newConsumer().subscriptionName("test-sub3").topic(topic).subscribe(); + Assert.fail(); + } catch (PulsarClientException e) { + log.info("subscription reached max subscriptions per topic"); + } + + consumer1.close(); + consumer2.close(); + admin.topics().deletePartitionedTopic(topic); + } }