Skip to content

Commit

Permalink
Optimize TopicPolicies#maxConsumersPerSubscription with HierarchyTopi…
Browse files Browse the repository at this point in the history
…cPolicies (apache#13548)

This is one of the serial topic policy optimization with HierarchyTopicPolicies.

Update topic policy with HierarchyTopicPolicies comes with these benefits:

All topic policy related settings will goes into AbstractTopic#topicPolicies, easier to understand, check, review, and modify.
Unify policy update to AbstractTopic. And easier to find which policy is not applied to non-persistent topic yet. And we can easily add support for it.
Unify policy value with 3 level settings (topic/namespace/broker), and priority with topic > namespace > broker. And it's easier to find which level settings is missing.
All values are updated at creation or by trigger. We can save some resource to update it or recalculate each time we use it.
etc.
  • Loading branch information
Jason918 authored Jan 11, 2022
1 parent 4976c3e commit 5faae1c
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;

Expand Down Expand Up @@ -251,32 +248,8 @@ private void fillContext(FilterContext context, MessageMetadata msgMetadata,
*/
protected abstract boolean isConsumersExceededOnSubscription();

protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
String topic, int consumerSize) {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = brokerService
.getTopicPolicies(TopicName.get(topic))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()).orElse(null);
}
} catch (Exception e) {
log.debug("Get topic or namespace policies fail", e);
}

if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies != null
&& policies.max_consumers_per_subscription != null
&& policies.max_consumers_per_subscription >= 0
? policies.max_consumers_per_subscription :
brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
}

protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(
Expand All @@ -184,6 +185,8 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
topicPolicies.getMaxConsumersPerSubscription()
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
Expand All @@ -204,6 +207,7 @@ private void updateTopicPolicyByBrokerConfig() {
topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
return isConsumersExceededOnSubscription(topic, consumerList.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void sendMessages(List<Entry> entries) {

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
return isConsumersExceededOnSubscription(topic, consumers.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
return isConsumersExceededOnSubscription(topic, consumerList.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected void scheduleReadOnActiveConsumer() {

@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
return isConsumersExceededOnSubscription(topic, consumers.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,27 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -33,6 +51,7 @@
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
Expand All @@ -42,26 +61,6 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

@PrepareForTest({ DispatchRateLimiter.class })
@PowerMockIgnore({"org.apache.logging.log4j.*"})
public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
Expand Down Expand Up @@ -95,9 +94,13 @@ public void setup() throws Exception {
brokerMock = mock(BrokerService.class);
doReturn(pulsarMock).when(brokerMock).pulsar();

HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);

topicMock = mock(NonPersistentTopic.class);
doReturn(brokerMock).when(topicMock).getBrokerService();
doReturn(topicName).when(topicMock).getName();
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();

subscriptionMock = mock(NonPersistentSubscription.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,38 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
Expand All @@ -37,6 +66,7 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.ArgumentCaptor;
Expand All @@ -49,37 +79,6 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

@PrepareForTest({ DispatchRateLimiter.class })
@PowerMockIgnore({"org.apache.logging.log4j.*"})
@Test(groups = "broker")
Expand Down Expand Up @@ -118,9 +117,13 @@ public void setup() throws Exception {
brokerMock = mock(BrokerService.class);
doReturn(pulsarMock).when(brokerMock).pulsar();

HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);

topicMock = mock(PersistentTopic.class);
doReturn(brokerMock).when(topicMock).getBrokerService();
doReturn(topicName).when(topicMock).getName();
doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();

cursorMock = mock(ManagedCursorImpl.class);
doReturn(null).when(cursorMock).getLastIndividualDeletedRange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
Expand All @@ -51,6 +52,7 @@ public HierarchyTopicPolicies() {
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxProducersPerTopic = new PolicyHierarchyValue<>();
maxConsumerPerTopic = new PolicyHierarchyValue<>();
maxConsumersPerSubscription = new PolicyHierarchyValue<>();
backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
.put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
Expand Down

0 comments on commit 5faae1c

Please sign in to comment.