Skip to content

Commit

Permalink
Support limit max subscriptions per topic (apache#8289)
Browse files Browse the repository at this point in the history
* ssupport limit max subscriptions per topic

* fix a bug
  • Loading branch information
hangc0276 authored Oct 21, 2020
1 parent 8ddd6ba commit 9951ddc
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 9 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,11 @@ maxProducersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject
# new subscription until the number of subscribed subscriptions decrease.
# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check.
maxSubscriptionsPerTopic=0

# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ maxProducersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject
# new subscription until the number of subscribed subscriptions decrease.
# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check.
maxSubscriptionsPerTopic=0

# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
Expand Down
5 changes: 5 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ maxProducersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

# Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject
# new subscription until the number of subscribed subscriptions decrease.
# Using a value of 0, is disabling maxSubscriptionsPerTopic limit check.
maxSubscriptionsPerTopic=0

# Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
# until the number of connected consumers decrease.
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " Using a value of 0, is disabling maxConsumersPerTopic-limit check.")
private int maxConsumersPerTopic = 0;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of subscriptions allowed to subscribe to topic. \n\nOnce this limit reaches, "
+ " broker will reject new subscription until the number of subscribed subscriptions decrease.\n"
+ " Using a value of 0, is disabling maxSubscriptionsPerTopic limit check."
)
private int maxSubscriptionsPerTopic = 0;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of consumers allowed to connect to subscription. \n\nOnce this limit reaches,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import javax.ws.rs.core.Response;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

Expand All @@ -45,9 +46,6 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.StreamingOutput;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -87,10 +85,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.PrecisPublishLimiter;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
Expand Down Expand Up @@ -120,7 +115,6 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand All @@ -133,7 +127,7 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
Expand Down Expand Up @@ -686,6 +680,12 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();

if (checkMaxSubscriptionsPerTopicExceed()) {
subscriptionFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED,
"Exceed the maximum number of subscriptions of the topic: " + topic));
return subscriptionFuture;
}

Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);

ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
Expand Down Expand Up @@ -723,6 +723,13 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) {
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);

CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed()) {
subscriptionFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED,
"Exceed the maximum number of subscriptions of the topic: " + topic));
return subscriptionFuture;
}

synchronized (ledger) {
// Create a new non-durable cursor only for the first consumer that connects
PersistentSubscription subscription = subscriptions.get(subscriptionName);
Expand Down Expand Up @@ -758,7 +765,6 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
if (startMessageRollbackDurationSec > 0) {
long timestamp = System.currentTimeMillis()
- TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
final Subscription finalSubscription = subscription;
subscription.resetCursor(timestamp).handle((s, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -2511,4 +2517,15 @@ private void registerTopicPolicyListener() {
public MessageDeduplication getMessageDeduplication() {
return messageDeduplication;
}

private boolean checkMaxSubscriptionsPerTopicExceed() {
final int maxSubscriptionsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
if (maxSubscriptionsPerTopic > 0) {
if (subscriptions != null && subscriptions.size() >= maxSubscriptionsPerTopic) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.admin;

import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.client.api.MessageId;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -91,6 +93,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class AdminApiTest2 extends MockedPulsarServiceBaseTest {

private MockedPulsarService mockPulsarSetup;
Expand Down Expand Up @@ -1323,4 +1326,89 @@ public void testMaxNamespacesPerTenant() throws Exception {
}

}

@Test
public void testMaxSubscriptionsPerTopic() throws Exception {
super.internalCleanup();
conf.setMaxSubscriptionsPerTopic(2);
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

final String topic = "persistent://testTenant/ns1/max-subscriptions-per-topic";

admin.topics().createPartitionedTopic(topic, 3);
Producer producer = pulsarClient.newProducer().topic(topic).create();
producer.close();

// create subscription
admin.topics().createSubscription(topic, "test-sub1", MessageId.earliest);
admin.topics().createSubscription(topic, "test-sub2", MessageId.earliest);
try {
admin.topics().createSubscription(topic, "test-sub3", MessageId.earliest);
Assert.fail();
} catch (PulsarAdminException e) {
log.info("create subscription failed. Exception: ", e);
}

super.internalCleanup();
conf.setMaxSubscriptionsPerTopic(0);
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

admin.topics().createPartitionedTopic(topic, 3);
producer = pulsarClient.newProducer().topic(topic).create();
producer.close();

for (int i = 0; i < 10; ++i) {
admin.topics().createSubscription(topic, "test-sub" + i, MessageId.earliest);
}

super.internalCleanup();
conf.setMaxSubscriptionsPerTopic(2);
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));

admin.topics().createPartitionedTopic(topic, 3);
producer = pulsarClient.newProducer().topic(topic).create();
producer.close();

Consumer consumer1 = null;
Consumer consumer2 = null;
Consumer consumer3 = null;

try {
consumer1 = pulsarClient.newConsumer().subscriptionName("test-sub1").topic(topic).subscribe();
Assert.assertNotNull(consumer1);
} catch (PulsarClientException e) {
Assert.fail();
}

try {
consumer2 = pulsarClient.newConsumer().subscriptionName("test-sub2").topic(topic).subscribe();
Assert.assertNotNull(consumer2);
} catch (PulsarClientException e) {
Assert.fail();
}

try {
consumer3 = pulsarClient.newConsumer().subscriptionName("test-sub3").topic(topic).subscribe();
Assert.fail();
} catch (PulsarClientException e) {
log.info("subscription reached max subscriptions per topic");
}

consumer1.close();
consumer2.close();
admin.topics().deletePartitionedTopic(topic);
}
}

0 comments on commit 9951ddc

Please sign in to comment.