Skip to content

Commit

Permalink
[Issue 13756][Broker]Optimize topic policy with HierarchyTopicPolicie…
Browse files Browse the repository at this point in the history
…s about subscribeRate (apache#14342)
  • Loading branch information
AnonHxy authored Feb 28, 2022
1 parent afae29f commit 989fc35
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand Down Expand Up @@ -151,6 +152,10 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
}

public SubscribeRate getSubscribeRate() {
return this.topicPolicies.getSubscribeRate().get();
}

public DispatchRateImpl getSubscriptionDispatchRate() {
return this.topicPolicies.getSubscriptionDispatchRate().get();
}
Expand Down Expand Up @@ -202,6 +207,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
Expand Down Expand Up @@ -246,6 +252,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
updateNamespaceSubscribeRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
Expand All @@ -260,6 +267,11 @@ private void updateNamespaceDispatchRate(Policies namespacePolicies, String clus
topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
}

private void updateNamespaceSubscribeRate(Policies namespacePolicies, String cluster) {
topicPolicies.getSubscribeRate()
.updateNamespaceValue(SubscribeRate.normalize(namespacePolicies.clusterSubscribeRate.get(cluster)));
}

private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
topicPolicies.getSubscriptionDispatchRate()
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
Expand Down Expand Up @@ -343,6 +355,7 @@ private void updateTopicPolicyByBrokerConfig() {
if (isSystemTopic()) {
schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
}
topicPolicies.getSubscribeRate().updateBrokerValue(subscribeRateInBroker(config));
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
Expand All @@ -357,6 +370,13 @@ private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
.build();
}

private SubscribeRate subscribeRateInBroker(ServiceConfiguration config) {
return new SubscribeRate(
config.getSubscribeThrottlingRatePerConsumer(),
config.getSubscribeRatePeriodPerConsumerInSecond()
);
}

private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerSubscriptionInMsg())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,10 @@ private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
&& DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
if (!subscribeRateLimiter.isPresent() && isDispatchRateNeeded) {
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
} else if (!isDispatchRateNeeded) {

if (SubscribeRateLimiter.isSubscribeRateEnabled(getSubscribeRate())) {
this.subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
} else {
this.subscribeRateLimiter = Optional.empty();
}

Expand Down Expand Up @@ -2430,7 +2430,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
// update rate-limiter if policies updated
dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
}

return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
Expand Down Expand Up @@ -3041,7 +3041,7 @@ public void onUpdate(TopicPolicies policies) {
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate()));
}
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,22 +49,7 @@ public SubscribeRateLimiter(PersistentTopic topic) {
subscribeRateLimiter = new ConcurrentHashMap<>();
this.executorService = brokerService.pulsar().getExecutor();
// get subscribeRate from topic level policies
this.subscribeRate = topic.getTopicPolicies()
.map(TopicPolicies::getSubscribeRate)
.orElse(null);

// subscribeRate of topic level policies not set, get from zookeeper
if (this.subscribeRate == null) {
this.subscribeRate = getPoliciesSubscribeRate();
}

// get subscribeRate from broker.conf
if (this.subscribeRate == null) {
this.subscribeRate = new SubscribeRate(brokerService.pulsar()
.getConfiguration().getSubscribeThrottlingRatePerConsumer(),
brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());

}
this.subscribeRate = topic.getSubscribeRate();
if (isSubscribeRateEnabled(this.subscribeRate)) {
resetTask = createTask();
log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
Expand Down Expand Up @@ -157,41 +139,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
}
}

public void onPoliciesUpdate(Policies data) {
// if subscribe rate is set on topic policy, skip subscribe rate update
SubscribeRate subscribeRate = brokerService.getTopicPolicies(TopicName.get(topicName))
.map(TopicPolicies::getSubscribeRate)
.orElse(null);
if (subscribeRate != null) {
return;
}

String cluster = brokerService.pulsar().getConfiguration().getClusterName();

subscribeRate = data.clusterSubscribeRate.get(cluster);

onSubscribeRateUpdate(subscribeRate);

}

public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
final SubscribeRate namespacePolicySubscribeRate = getPoliciesSubscribeRate();
final SubscribeRate newSubscribeRate = new SubscribeRate(
brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
);

// if policy-throttling rate is disabled and cluster-throttling is enabled then apply
// cluster-throttling rate
// if topic policy-throttling rate is disabled
if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(namespacePolicySubscribeRate)) {
subscribeRate = namespacePolicySubscribeRate;
}

if (!isSubscribeRateEnabled(subscribeRate) && !isSubscribeRateEnabled(namespacePolicySubscribeRate)
&& isSubscribeRateEnabled(newSubscribeRate)) {
subscribeRate = newSubscribeRate;
}
this.subscribeRate = subscribeRate;
stopResetTask();
for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
Expand All @@ -216,23 +164,6 @@ public SubscribeRate getPoliciesSubscribeRate() {
return getPoliciesSubscribeRate(brokerService, topicName);
}

public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName) {
ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
policies = policies.isPresent() ? policies : DispatchRateLimiter.getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName);
}

private static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
final Optional<Policies> policies, final String topicName) {
SubscribeRate subscribeRate = getPoliciesSubscribeRate(serviceConfig.getClusterName(), policies, topicName);
if (subscribeRate == null) {
return serviceConfig.getSubscribeThrottlingRatePerConsumer() > 0
&& serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
}
return true;
}

public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) {
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
final Optional<Policies> policies = DispatchRateLimiter.getPolicies(brokerService, topicName);
Expand Down Expand Up @@ -262,8 +193,8 @@ public long getSubscribeRatePerConsumer(ConsumerIdentifier consumerIdentifier) {
!= null ? subscribeRateLimiter.get(consumerIdentifier).getRate() : -1;
}

private static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
return subscribeRate != null && (subscribeRate.subscribeThrottlingRatePerConsumer > 0);
public static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
return subscribeRate.subscribeThrottlingRatePerConsumer > 0 && subscribeRate.ratePeriodInSecond > 0;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
Expand Down Expand Up @@ -1417,6 +1418,7 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
Expand Down Expand Up @@ -1457,6 +1459,7 @@ public void testProducerFailureOnEncryptionRequiredOnBroker() throws Exception {
// Namespace policy doesn't require encryption
policies.encryption_required = false;
policies.topicDispatchRate = Maps.newHashMap();
policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
Expand Down Expand Up @@ -1493,6 +1496,7 @@ public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
Expand Down Expand Up @@ -1536,6 +1540,7 @@ public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
Policies policies = mock(Policies.class);
policies.encryption_required = true;
policies.topicDispatchRate = Maps.newHashMap();
policies.clusterSubscribeRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public SubscribeRate(int subscribeThrottlingRatePerConsumer, int ratePeriodInSec
this.ratePeriodInSecond = ratePeriodInSecond;
}

public static SubscribeRate normalize(SubscribeRate subscribeRate) {
if (subscribeRate != null
&& subscribeRate.subscribeThrottlingRatePerConsumer > 0
&& subscribeRate.ratePeriodInSecond > 0) {
return subscribeRate;
} else {
return null;
}
}

@Override
public int hashCode() {
return Objects.hash(subscribeThrottlingRatePerConsumer, ratePeriodInSecond);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<SubscribeRate> subscribeRate;
final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
Expand Down Expand Up @@ -81,6 +82,7 @@ public HierarchyTopicPolicies() {
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
replicatorDispatchRate = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
subscribeRate = new PolicyHierarchyValue<>();
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
dispatchRate = new PolicyHierarchyValue<>();
Expand Down

0 comments on commit 989fc35

Please sign in to comment.