Skip to content

Commit

Permalink
[Broker] Optimize TopicPolicies#maxConsumerPerTopic with HierarchyTop…
Browse files Browse the repository at this point in the history
…icPolicies (apache#13361)
  • Loading branch information
Jason918 authored Dec 20, 2021
1 parent 1a70ae5 commit 7e2c8c1
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {
protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(
Expand All @@ -175,6 +176,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
Expand All @@ -194,6 +196,7 @@ private void updateTopicPolicyByBrokerConfig() {
updateBrokerSubscriptionTypesEnabled();
topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
Expand Down Expand Up @@ -273,18 +276,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
}

protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {

// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
: brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,8 @@ public void testAddRemoveConsumerDurableCursor() throws Exception {

private void testMaxConsumersShared() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize().join();
assertEquals((int) topic.getHierarchyTopicPolicies().getMaxConsumerPerTopic().get(), 3);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false);

Expand Down Expand Up @@ -930,13 +932,18 @@ public void testMaxConsumersSharedForNamespace() throws Exception {
when(pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(Optional.of(policies));
when(pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));

testMaxConsumersShared();
}

private void testMaxConsumersFailover() throws Exception {

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize().join();
assertEquals((int) topic.getHierarchyTopicPolicies().getMaxConsumerPerTopic().get(), 3);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false);

Expand Down Expand Up @@ -1029,6 +1036,9 @@ public void testMaxConsumersFailoverForNamespace() throws Exception {
when(pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(Optional.of(policies));
when(pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
testMaxConsumersFailover();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ public class HierarchyTopicPolicies {
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;

public HierarchyTopicPolicies() {
deduplicationEnabled = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
subscriptionTypesEnabled = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxProducersPerTopic = new PolicyHierarchyValue<>();
maxConsumerPerTopic = new PolicyHierarchyValue<>();
backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
.put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
Expand Down

0 comments on commit 7e2c8c1

Please sign in to comment.