Skip to content

Commit

Permalink
Do not register each DispatchRateLimiter for policies notifications (a…
Browse files Browse the repository at this point in the history
…pache#2699)

* Do not register each DispatchRateLimiter for policies notifications

* Fixed updates on per-subscriptions limits

* Fixed testClusterPolicyOverrideConfiguration
  • Loading branch information
merlimat authored Oct 3, 2018
1 parent e31c2d8 commit 5e9c35c
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;

Expand All @@ -45,14 +46,14 @@ public interface Dispatcher {

/**
* mark dispatcher closed to stop new incoming requests and disconnect all consumers
*
*
* @return
*/
CompletableFuture<Void> close();

/**
* disconnect all consumers
*
*
* @return
*/
CompletableFuture<Void> disconnectAllConsumers();
Expand All @@ -72,4 +73,7 @@ public interface Dispatcher {

RedeliveryTracker getRedeliveryTracker();

default DispatchRateLimiter getRateLimiter() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public DispatchRateLimiter(PersistentTopic topic, String subscriptionName) {
this.subscriptionName = subscriptionName;
this.brokerService = topic.getBrokerService();
updateDispatchRate();
registerLocalPoliciesListener();
}

public DispatchRateLimiter(PersistentTopic topic) {
Expand Down Expand Up @@ -119,40 +118,31 @@ public void updateDispatchRate() {
log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate);
}

/**
* Register listener on namespace policy change to update dispatch-rate if required
*
*/
private void registerLocalPoliciesListener() {
brokerService.pulsar().getConfigurationCache().policiesCache().registerListener((path, data, stat) -> {
final NamespaceName namespace = TopicName.get(this.topicName).getNamespaceObject();
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
final String policiesPath = path(POLICIES, namespace.toString());
if (policiesPath.equals(path)) {
DispatchRate dispatchRate;
if (subscriptionName == null) {
dispatchRate = data.clusterDispatchRate.get(cluster);
} else {
dispatchRate = data.subscriptionDispatchRate.get(cluster);
}
// update dispatch-rate only if it's configured in policies else ignore
if (dispatchRate != null) {
int inMsg = (subscriptionName == null) ?
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
long inByte = (subscriptionName == null) ?
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
// if policy-throttling rate is disabled and cluster-throttling is enabled then apply
// cluster-throttling rate
if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
dispatchRate = newDispatchRate;
}
updateDispatchRate(dispatchRate);
}
public void onPoliciesUpdate(Policies data) {
String cluster = brokerService.pulsar().getConfiguration().getClusterName();

DispatchRate dispatchRate;
if (subscriptionName == null) {
dispatchRate = data.clusterDispatchRate.get(cluster);
} else {
dispatchRate = data.subscriptionDispatchRate.get(cluster);
}
// update dispatch-rate only if it's configured in policies else ignore
if (dispatchRate != null) {
int inMsg = (subscriptionName == null) ?
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
long inByte = (subscriptionName == null) ?
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
// if policy-throttling rate is disabled and cluster-throttling is enabled then apply
// cluster-throttling rate
if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
dispatchRate = newDispatchRate;
}
});
updateDispatchRate(dispatchRate);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,5 +636,10 @@ public RedeliveryTracker getRedeliveryTracker() {
return redeliveryTracker;
}

@Override
public DispatchRateLimiter getRateLimiter() {
return dispatchRateLimiter;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,5 +496,10 @@ public RedeliveryTracker getRedeliveryTracker() {
return redeliveryTracker;
}

@Override
public DispatchRateLimiter getRateLimiter() {
return dispatchRateLimiter;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
// doesn't support batch-message
private volatile boolean hasBatchMessagePublished = false;
private DispatchRateLimiter dispatchRateLimiter;
private final DispatchRateLimiter dispatchRateLimiter;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;

private final MessageDeduplication messageDeduplication;
Expand Down Expand Up @@ -1559,11 +1559,17 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
if (sub.getDispatcher().getRateLimiter() != null) {
sub.getDispatcher().getRateLimiter().onPoliciesUpdate(data);
}
});
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
dispatchRateLimiter.onPoliciesUpdate(data);
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,15 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
int nsMessageRate = 500;
DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0, 1);
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);

if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}

for (int i = 0; i < 5; i++) {
if (subRateLimiter.getDispatchRateOnMsg() != nsMessageRate) {
Thread.sleep(50 + (i * 10));
Expand Down

0 comments on commit 5e9c35c

Please sign in to comment.