Skip to content

Commit

Permalink
[broker] Optimize TopicPolicies#subscriptionTypesEnabled with Hierarc…
Browse files Browse the repository at this point in the history
…hyTopicPolicies (apache#13121)
  • Loading branch information
Jason918 authored Dec 8, 2021
1 parent 3b5de44 commit c0958b3
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -51,6 +54,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
Expand Down Expand Up @@ -139,7 +143,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.replicatorPrefix = config.getReplicatorPrefix();

topicPolicies = new HierarchyTopicPolicies();
updateTopicPolicyByBrokerConfig(topicPolicies, brokerService);
updateTopicPolicyByBrokerConfig();

this.lastActive = System.nanoTime();
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
Expand All @@ -151,6 +155,9 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(
CollectionUtils.isEmpty(data.getSubscriptionTypesEnabled()) ? null :
EnumSet.copyOf(data.getSubscriptionTypesEnabled()));
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
Expand All @@ -168,18 +175,21 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
}

private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicies, BrokerService brokerService) {
private void updateTopicPolicyByBrokerConfig() {
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
config.getBrokerDeleteInactiveTopicsMode(),
config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
config.isBrokerDeleteInactiveTopicsEnabled()));

updateBrokerSubscriptionTypesEnabled();
topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
Expand All @@ -194,6 +204,23 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie
topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
EnumSet<SubType> subTypes = EnumSet.noneOf(SubType.class);
for (String subTypeStr : CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) {
try {
SubType subType = SubType.valueOf(subTypeStr);
subTypes.add(subType);
} catch (Throwable t) {
//ignore invalid SubType strings.
}
}
if (subTypes.isEmpty()) {
return null;
} else {
return subTypes;
}
}

protected boolean isProducersExceeded() {
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
if (maxProducers > 0 && maxProducers <= producers.size()) {
Expand Down Expand Up @@ -989,4 +1016,10 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}

// subscriptionTypesEnabled is dynamic and can be updated online.
public void updateBrokerSubscriptionTypesEnabled() {
topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue(
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2149,6 +2149,9 @@ private void updateConfigurationAndRegisterListeners() {
});
}

// add listener to notify topic subscriptionTypesEnabled changed.
registerConfigurationListener("subscriptionTypesEnabled", this::updateBrokerSubscriptionTypesEnabled);

// add more listeners here
}

Expand Down Expand Up @@ -2191,6 +2194,17 @@ private void updateTopicMessageDispatchRate() {
});
}

private void updateBrokerSubscriptionTypesEnabled(Object subscriptionTypesEnabled) {
this.pulsar().getExecutor().execute(() -> {
// update subscriptionTypesEnabled
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerSubscriptionTypesEnabled();
}
});
});
}

private void updateSubscriptionMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic subscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -72,7 +73,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -3232,42 +3232,9 @@ private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
return false;
}

public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
TopicName topicName = TopicName.get(topic);
if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies =
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
} else {
if (CollectionUtils.isEmpty(topicPolicies.getSubscriptionTypesEnabled())) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
} else {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
}

private boolean checkNsAndBrokerSubscriptionTypesEnable(TopicName topicName, SubType subType) throws Exception {
Optional<Policies> policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPolicies(topicName.getNamespaceObject());
if (policies.isPresent()) {
if (policies.get().subscription_types_enabled.isEmpty()) {
return getBrokerService().getPulsar().getConfiguration()
.getSubscriptionTypesEnabled().contains(subType.name());
} else {
return policies.get().subscription_types_enabled.contains(subType.name());
}
} else {
return getBrokerService().getPulsar().getConfiguration()
.getSubscriptionTypesEnabled().contains(subType.name());
}
public boolean checkSubscriptionTypesEnable(SubType subType) {
EnumSet<SubType> subTypesEnabled = topicPolicies.getSubscriptionTypesEnabled().get();
return subTypesEnabled != null && subTypesEnabled.contains(subType);
}

public TransactionBufferStats getTransactionBufferStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -78,6 +80,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -1701,9 +1704,12 @@ public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarCl
admin.namespaces().removeSubscriptionTypesEnabled(namespace);
assertEquals(admin.namespaces().getSubscriptionTypesEnabled(namespace), Sets.newHashSet());
consumerBuilder.subscriptionType(SubscriptionType.Shared);
HashSet<String> subscriptions = new HashSet<>();
subscriptions.add("Failover");
conf.setSubscriptionTypesEnabled(subscriptions);
admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled", "Failover");
Awaitility.await().untilAsserted(()->{
Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertTrue(((AbstractTopic) t).getHierarchyTopicPolicies().getSubscriptionTypesEnabled().getBrokerValue()
.contains(CommandSubscribe.SubType.Failover));
});
try {
consumerBuilder.subscribe().close();
fail();
Expand All @@ -1712,8 +1718,12 @@ public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarCl
}

// add shared to broker.conf and sub with shared will success
subscriptions.add("Shared");
conf.setSubscriptionTypesEnabled(subscriptions);
admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled", "Failover,Shared");
Awaitility.await().untilAsserted(()->{
Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertTrue(((AbstractTopic) t).getHierarchyTopicPolicies().getSubscriptionTypesEnabled().getBrokerValue()
.contains(CommandSubscribe.SubType.Failover));
});
consumerBuilder.subscribe().close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -2482,9 +2483,27 @@ public void testSubscriptionTypesWithPartitionedTopic() throws Exception {
@Test(timeOut = 30000)
public void testSubscriptionTypesEnabled() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
// use broker.conf
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
pulsar.getConfiguration().setSubscriptionTypesEnabled(Sets.newHashSet("Exclusive"));
admin.topics().createNonPartitionedTopic(topic);
try {
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
.subscriptionName("test").subscribe();
fail();
} catch (PulsarClientException pulsarClientException) {
assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
}
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test")
.subscribe().close();

//update broker level dynamic update config
admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled", "Shared");
Awaitility.await().untilAsserted(()->{
assertTrue(pulsar.getConfiguration().getSubscriptionTypesEnabled().contains("Shared"));
});
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
.subscriptionName("test").subscribe().close();

assertNull(admin.topicPolicies().getSubscriptionTypesEnabled(topic));
// set enable failover sub type
Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
Expand All @@ -2493,6 +2512,10 @@ public void testSubscriptionTypesEnabled() throws Exception {

Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().get()
.contains(CommandSubscribe.SubType.Failover));
});
subscriptionTypeSet = admin.topicPolicies().getSubscriptionTypesEnabled(topic);
assertTrue(subscriptionTypeSet.contains(SubscriptionType.Failover));
assertFalse(subscriptionTypeSet.contains(SubscriptionType.Shared));
Expand All @@ -2508,6 +2531,10 @@ public void testSubscriptionTypesEnabled() throws Exception {
// add shared type
subscriptionTypeSet.add(SubscriptionType.Shared);
admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().get()
.contains(CommandSubscribe.SubType.Shared));
});
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();

Expand All @@ -2518,6 +2545,10 @@ public void testSubscriptionTypesEnabled() throws Exception {
subscriptionTypeSet.clear();
subscriptionTypeSet.add(SubscriptionType.Failover);
admin.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertTrue(hierarchyTopicPolicies.getSubscriptionTypesEnabled().getTopicValue()
.contains(CommandSubscribe.SubType.Failover));
});

try {
pulsarClient.newConsumer().topic(topic)
Expand All @@ -2526,6 +2557,14 @@ public void testSubscriptionTypesEnabled() throws Exception {
} catch (PulsarClientException pulsarClientException) {
assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
}

//clear topic level setting, use ns setting only, which only contains shared.
admin.topicPolicies().setSubscriptionTypesEnabled(topic, Collections.emptySet());
waitTopicPoliciesApplied(topic, 0, hierarchyTopicPolicies -> {
assertNull(hierarchyTopicPolicies.getSubscriptionTypesEnabled().getTopicValue());
});
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();
}

@Test(timeOut = 20000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.pulsar.common.policies.data;

import com.google.common.collect.ImmutableMap;
import java.util.EnumSet;
import java.util.Map;
import lombok.Getter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;

/**
Expand All @@ -31,6 +33,7 @@
public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Boolean> deduplicationEnabled;
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
final PolicyHierarchyValue<EnumSet<SubType>> subscriptionTypesEnabled;
final PolicyHierarchyValue<Integer> maxSubscriptionsPerTopic;
final PolicyHierarchyValue<Integer> maxProducersPerTopic;
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
Expand All @@ -39,6 +42,7 @@ public class HierarchyTopicPolicies {
public HierarchyTopicPolicies() {
deduplicationEnabled = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
subscriptionTypesEnabled = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxProducersPerTopic = new PolicyHierarchyValue<>();
backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class PolicyHierarchyValue<T> {
private static final AtomicReferenceFieldUpdater<PolicyHierarchyValue, Object> VALUE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(PolicyHierarchyValue.class, Object.class, "value");

@Getter
private volatile T brokerValue;

@Getter
Expand Down

0 comments on commit c0958b3

Please sign in to comment.