Skip to content

Commit

Permalink
Introduce precise topic publish rate limiting (apache#7078)
Browse files Browse the repository at this point in the history
Fixes apache#6975 
### Motivation

  For now,  pulsar limits  publish rate of topic by a period task runs every `topicPublisherThrottlingTickTimeMillis`. That means in the time `topicPublisherThrottlingTickTimeMillis`, the limit can't take effect. 
  This PR enable precise topic publish rate limit on broker.
  • Loading branch information
aloyszhang authored Jun 2, 2020
1 parent 7c8dc32 commit c64b22a
Show file tree
Hide file tree
Showing 11 changed files with 488 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)"
)
private int topicPublisherThrottlingTickTimeMillis = 5;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable precise rate limit for topic publish"
)
private boolean preciseTopicPublishRateLimiterEnable = false;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public abstract class AbstractTopic implements Topic {

protected volatile PublishRateLimiter topicPublishRateLimiter;

protected boolean preciseTopicPublishRateLimitingEnable;

private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();

Expand All @@ -102,6 +104,8 @@ public AbstractTopic(String topic, BrokerService brokerService) {
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
}

Expand All @@ -123,6 +127,18 @@ protected boolean isProducersExceeded() {
return false;
}

public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
}

public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.values().forEach(producer -> {
Expand Down Expand Up @@ -373,6 +389,18 @@ public boolean isPublishRateExceeded() {
getBrokerPublishRateLimiter().isPublishRateExceeded();
}

@Override
public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
// whether topic publish rate exceed if precise rate limit is enable
return preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
}

@Override
public boolean isBrokerPublishRateExceeded() {
// whether broker publish rate exceed
return getBrokerPublishRateLimiter().isPublishRateExceeded();
}

public PublishRateLimiter getTopicPublishRateLimiter() {
return topicPublishRateLimiter;
}
Expand All @@ -393,12 +421,21 @@ private void updatePublishDispatcher(Policies policies) {
if (publishRate != null
&& (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, this.topic);
// lazy init Publish-rateLimiting monitoring if not initialized yet
this.brokerService.setupTopicPublishRateLimiterMonitor();

// if not precise mode, lazy init Publish-rateLimiting monitoring if not initialized yet
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}

if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(policies, clusterName,
() -> AbstractTopic.this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
}
} else {
this.topicPublishRateLimiter.update(policies, clusterName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private Channel listenChannel;
private Channel listenChannelTls;

private boolean preciseTopicPublishRateLimitingEnable;
private final long maxMessagePublishBufferBytes;
private final long resumeProducerReadMessagePublishBufferBytes;
private volatile boolean reachMessagePublishBufferThreshold;
Expand All @@ -242,6 +243,7 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0 ?
pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L : -1;
this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
Expand Down Expand Up @@ -1612,9 +1614,11 @@ private void updateConfigurationAndRegisterListeners() {
updateBrokerPublisherThrottlingMaxRate());

// add listener to notify topic publish-rate monitoring
registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> {
setupTopicPublishRateLimiterMonitor();
});
if (!preciseTopicPublishRateLimitingEnable) {
registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> {
setupTopicPublishRateLimiterMonitor();
});
}

// add more listeners here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;

public interface PublishRateLimiter {

Expand Down Expand Up @@ -66,6 +69,83 @@ public interface PublishRateLimiter {
* @param clusterName
*/
void update(PublishRate maxPublishRate);

/**
* try to acquire permit
* @param numbers
* @param bytes
* */
boolean tryAcquire(int numbers, long bytes);
}

class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;

public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
}

@Override
public void checkPublishRate() {
// No-op
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
}

@Override
public boolean resetPublishCount() {
return true;
}

@Override
public boolean isPublishRateExceeded() {
return false;
}


@Override
public void update(Policies policies, String clusterName) {
final PublishRate maxPublishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
this.update(maxPublishRate);
}
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
(topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
}
}

class PublishRateLimiterImpl implements PublishRateLimiter {
Expand Down Expand Up @@ -153,6 +233,11 @@ public void update(PublishRate maxPublishRate) {
resetPublishCount();
}
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return false;
}
}

class PublishRateLimiterDisable implements PublishRateLimiter {
Expand Down Expand Up @@ -189,4 +274,11 @@ public void update(Policies policies, String clusterName) {
public void update(PublishRate maxPublishRate) {
// No-op
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public class ServerCnx extends PulsarHandler {
private final int maxMessageSize;
private boolean preciseDispatcherFlowControl;

private boolean preciseTopicPublishRateLimitingEnable;

// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
private FeatureFlags features;
Expand Down Expand Up @@ -193,6 +195,7 @@ public ServerCnx(PulsarService pulsar) {
this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
this.resumeReadsThreshold = maxPendingSendRequests / 2;
this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
}

@Override
Expand Down Expand Up @@ -1206,7 +1209,7 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
}
}

startSendOperation(producer, headersAndPayload.readableBytes());
startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
Expand Down Expand Up @@ -1792,9 +1795,21 @@ public boolean isWritable() {
return ctx.channel().isWritable();
}

public void startSendOperation(Producer producer, int msgSize) {

public void startSendOperation(Producer producer, int msgSize, int numMessages) {
MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
boolean isPublishRateExceeded = false;
if (preciseTopicPublishRateLimitingEnable) {
boolean isPreciseTopicPublishRateExceeded = producer.getTopic().isTopicPublishRateExceeded(numMessages, msgSize);
if (isPreciseTopicPublishRateExceeded) {
producer.getTopic().disableCnxAutoRead();
return;
}
isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
} else {
isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
}

if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

boolean isPublishRateExceeded();

boolean isTopicPublishRateExceeded(int msgSize, int numMessages);

boolean isBrokerPublishRateExceeded();

void disableCnxAutoRead();

void enableCnxAutoRead();

CompletableFuture<Void> onPoliciesUpdate(Policies data);

boolean isBacklogQuotaExceeded(String producerName);
Expand Down
Loading

0 comments on commit c64b22a

Please sign in to comment.