Skip to content

Commit

Permalink
[Issue 13756][Broker]Optimize topic policy with HierarchyTopicPolicie…
Browse files Browse the repository at this point in the history
…s about publishRate (apache#14267)
  • Loading branch information
AnonHxy authored Feb 16, 2022
1 parent cd172e7 commit b87aa79
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public AbstractTopic(String topic, BrokerService brokerService) {

this.lastActive = System.nanoTime();
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(Optional.empty());
}

public DispatchRateImpl getSubscriptionDispatchRate() {
Expand Down Expand Up @@ -191,6 +190,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
Expand Down Expand Up @@ -222,6 +222,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
namespacePolicies.deduplicationSnapshotIntervalSeconds);
updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::isActive).orElse(null));
Expand Down Expand Up @@ -267,6 +268,14 @@ private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespaceP
.updateNamespaceValue(formatSchemaCompatibilityStrategy(strategy));
}

private void updateNamespacePublishRate(Policies namespacePolicies, String cluster) {
topicPolicies.getPublishRate().updateNamespaceValue(
PublishRate.normalize(
namespacePolicies.publishMaxMessageRate != null
? namespacePolicies.publishMaxMessageRate.get(cluster)
: null));
}

private void updateTopicPolicyByBrokerConfig() {
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
Expand Down Expand Up @@ -298,6 +307,7 @@ private void updateTopicPolicyByBrokerConfig() {

topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
Expand Down Expand Up @@ -336,6 +346,10 @@ private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionType
}
}

private PublishRate publishRateInBroker(ServiceConfiguration config) {
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
}

protected boolean isProducersExceeded() {
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
if (maxProducers > 0 && maxProducers <= producers.size()) {
Expand Down Expand Up @@ -954,51 +968,20 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
return brokerService.getBrokerPublishRateLimiter();
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(Optional.of(policies));
}

private void updatePublishDispatcher(Optional<Policies> optPolicies) {
//if topic-level policy exists, try to use topic-level publish rate policy
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}",
this.topic);
updatePublishDispatcher(topicPublishRate.get());
return;
}

public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
if (optPolicies.isPresent()) {
policies = optPolicies.get();
} else {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
}
policies = optPolicies.orElseGet(() ->
brokerService.pulsar()
.getPulsarResources()
.getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(Policies::new));
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;

//both namespace-level and topic-level policy are not set, try to use broker-level policy
ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (publishRate != null) {
//publishRate is not null, use namespace-level policy
updatePublishDispatcher(publishRate);
} else {
PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
, serviceConfiguration.getMaxPublishRatePerTopicInBytes());
updatePublishDispatcher(brokerPublishRate);
}

// attach the resource-group level rate limiters, if set
String rgName = policies.resource_group_name != null
? policies.resource_group_name
Expand Down Expand Up @@ -1093,9 +1076,9 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon
/**
* update topic publish dispatcher for this topic.
*/
protected void updatePublishDispatcher(PublishRate publishRate) {
if (publishRate != null && (publishRate.publishThrottlingRateInByte > 0
|| publishRate.publishThrottlingRateInMsg > 0)) {
public void updatePublishDispatcher() {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
log.info("Enabling publish rate limiting {} ", publishRate);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
Expand All @@ -1120,15 +1103,15 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
}
}

@Override
public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}

// subscriptionTypesEnabled is dynamic and can be updated online.
public void updateBrokerSubscriptionTypesEnabled() {
topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue(
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
}

@Override
public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}

public void updateBrokerSubscriptionDispatchRate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public CompletableFuture<Void> initialize() {
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
}
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,19 @@ public CompletableFuture<Void> initialize() {
.thenAccept(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
return;
}

Policies policies = optPolicies.get();

this.updateTopicPolicyByNamespacePolicy(policies);

updatePublishDispatcher();

updateResourceGroupLimiter(optPolicies);

this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
Expand Down Expand Up @@ -2386,7 +2392,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

initializeRateLimiterIfNeeded(Optional.ofNullable(data));

this.updateMaxPublishRate(data);
updatePublishDispatcher();

this.updateResourceGroupLimiter(Optional.of(data));

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
Expand Down Expand Up @@ -3037,12 +3045,7 @@ public void onUpdate(TopicPolicies policies) {
}));

FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
if (policies.getPublishRate() != null) {
updatePublishDispatcher(policies.getPublishRate());
} else {
updateMaxPublishRate(namespacePolicies.orElse(null));
}

updatePublishDispatcher();
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -48,18 +46,15 @@ public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {
conf.setPreciseTopicPublishRateLimiterEnable(false);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();
Policies policies = new Policies();
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put("test", publishRate);

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).updateMaxPublishRate(policies);
MessageId messageId = null;
try {
// first will be success
Expand Down Expand Up @@ -87,18 +82,15 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();
Policies policies = new Policies();
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put("test", publishRate);

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).updateMaxPublishRate(policies);
MessageId messageId = null;
try {
// first will be success, and will set auto read to false
Expand All @@ -119,18 +111,15 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
final String topic = "persistent://prop/ns-abc/testPrecisTopicPublishRateLimiting";
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();
Policies policies = new Policies();
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put("test", publishRate);

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).updateMaxPublishRate(policies);
MessageId messageId = null;
try {
// first will be success, and will set auto read to false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public PublishRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRateI
this.publishThrottlingRateInByte = dispatchThrottlingRateInByte;
}

public static PublishRate normalize(PublishRate publishRate) {
if (publishRate != null
&& (publishRate.publishThrottlingRateInMsg > 0
|| publishRate.publishThrottlingRateInByte > 0)) {
return publishRate;
} else {
return null;
}
}

@Override
public int hashCode() {
return Objects.hash(publishThrottlingRateInMsg, publishThrottlingRateInByte);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Long> compactionThreshold;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<PublishRate> publishRate;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
Expand All @@ -73,6 +74,7 @@ public HierarchyTopicPolicies() {
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
messageTTLInSeconds = new PolicyHierarchyValue<>();
publishRate = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
Expand Down

0 comments on commit b87aa79

Please sign in to comment.