Skip to content

Commit

Permalink
Support limit topic publish rate at the broker level (apache#8235)
Browse files Browse the repository at this point in the history
Fixes apache#8222 

### Motivation
Currently, we can set up the publish rate limitation of the topic by user pulsar-admin. It's better to introduce a broker level setting as the default setting. The namespace level setting can rewrite the broker level setting.

### Modifications
1)Add `maxPublishRatePerTopicInMessages` and `maxPublishRatePerTopicInBytes` in the broker.conf
2)Add `RemovePublishRate` API for namespace
3)The priority of the policies is modified to topic-level > namespace-level > broker-level

### Verifying this change
TopicPoliciesTest#testPublishRateInDifferentLevelPolicy
  • Loading branch information
315157973 authored Oct 14, 2020
1 parent 3539992 commit c144e04
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 78 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ brokerPublisherThrottlingMaxMessageRate=0
# (Disable byte rate limit with value 0)
brokerPublisherThrottlingMaxByteRate=0

# Max Rate(in 1 seconds) of Message allowed to publish for a topic if topic publish rate limiting enabled
# (Disable byte rate limit with value 0)
maxPublishRatePerTopicInMessages=0

#Max Rate(in 1 seconds) of Byte allowed to publish for a topic if topic publish rate limiting enabled.
# (Disable byte rate limit with value 0)
maxPublishRatePerTopicInBytes=0

# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
# hence causing high network bandwidth usage
# When the positive value is set, broker will throttle the subscribe requests for one consumer.
Expand Down
8 changes: 8 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ brokerPublisherThrottlingMaxMessageRate=0
# (Disable byte rate limit with value 0)
brokerPublisherThrottlingMaxByteRate=0

# Max Rate(in 1 seconds) of Message allowed to publish for a topic if topic publish rate limiting enabled
# (Disable byte rate limit with value 0)
maxPublishRatePerTopicInMessages=0

#Max Rate(in 1 seconds) of Byte allowed to publish for a topic if topic publish rate limiting enabled.
# (Disable byte rate limit with value 0)
maxPublishRatePerTopicInBytes=0

# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
# hence causing high network bandwidth usage
# When the positive value is set, broker will throttle the subscribe requests for one consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "when broker publish rate limiting enabled. (Disable byte rate limit with value 0)"
)
private long brokerPublisherThrottlingMaxByteRate = 0;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Max Rate(in 1 seconds) of Message allowed to publish for a topic "
+ "when topic publish rate limiting enabled. (Disable byte rate limit with value 0)"
)
private int maxPublishRatePerTopicInMessages = 0;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Max Rate(in 1 seconds) of Byte allowed to publish for a topic "
+ "when topic publish rate limiting enabled. (Disable byte rate limit with value 0)"
)
private long maxPublishRatePerTopicInBytes = 0;

@FieldContext(
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,41 @@ protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
}
}

protected void internalRemovePublishRate() {
validateSuperUserAccess();
log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName);
Entry<Policies, Stat> policiesNode = null;
try {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().publishMaxMessageRate.remove(pulsar().getConfiguration().getClusterName());

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);

log.info("[{}] Successfully remove the publish_max_message_rate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to remove the publish_max_message_rate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to remove the publish_max_message_rate for cluster on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to remove the publish_max_message_rate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected PublishRate internalGetPublishRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,15 @@ public void setPublishRate(@PathParam("property") String property, @PathParam("n
internalSetPublishRate(publishRate);
}

@DELETE
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void removePublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
internalRemovePublishRate();
}

@GET
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,8 +98,6 @@ public abstract class AbstractTopic implements Topic {

protected boolean preciseTopicPublishRateLimitingEnable;

protected volatile PublishRate topicPolicyPublishRate = null;

private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();

Expand All @@ -115,28 +114,17 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
ServiceConfiguration brokerConfig = brokerService.pulsar().getConfiguration();
if (brokerConfig.isSystemTopicEnabled() && brokerConfig.isSystemTopicEnabled()) {
topicPolicyPublishRate = Optional.ofNullable(getTopicPolicies(TopicName.get(topic)))
.map(TopicPolicies::getPublishRate)
.orElse(null);
}
if (topicPolicyPublishRate != null) {
// update topic level publish dispatcher
updateTopicPublishDispatcher();
} else {
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
}

protected boolean isProducersExceeded() {
Expand Down Expand Up @@ -499,42 +487,30 @@ public void updateMaxPublishRate(Policies policies) {
}

private void updatePublishDispatcher(Policies policies) {
// if topic level publish rate policy is set, skip update publish rate on namespace level
if (topicPolicyPublishRate != null) {
//if topic-level policy exists, try to use topic-level publish rate policy
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isPublishRateSet()) {
log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}", this.topic);
updatePublishDispatcher(topicPolicies.getPublishRate());
return;
}

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
if (publishRate != null
&& (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, this.topic);

// if not precise mode, lazy init Publish-rateLimiting monitoring if not initialized yet
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}

if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(policies, clusterName,
() -> AbstractTopic.this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
}
} else {
this.topicPublishRateLimiter.update(policies, clusterName);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
//both namespace-level and topic-level policy are not set, try to use broker-level policy
ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (publishRate == null) {
PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
, serviceConfiguration.getMaxPublishRatePerTopicInBytes());
updatePublishDispatcher(brokerPublishRate);
return;
}
//publishRate is not null , use namespace-level policy
updatePublishDispatcher(publishRate);
}

public long getMsgInCounter() { return this.msgInCounter.longValue(); }
Expand Down Expand Up @@ -597,7 +573,29 @@ public TopicPolicies getTopicPolicies(TopicName topicName) {
/**
* update topic publish dispatcher for this topic.
*/
protected void updateTopicPublishDispatcher() {
// noop
protected void updatePublishDispatcher(PublishRate publishRate) {
if (publishRate != null && (publishRate.publishThrottlingRateInByte > 0
|| publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} ", publishRate);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}

if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, ()-> this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
} else {
this.topicPublishRateLimiter.update(publishRate);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand All @@ -132,6 +133,7 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
Expand Down Expand Up @@ -2433,8 +2435,9 @@ public void onUpdate(TopicPolicies policies) {
});

if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
updatePublishDispatcher(policies.getPublishRate());
} else {
updateMaxPublishRate(namespacePolicies.orElse(null));
}

if (policies.isInactiveTopicPoliciesSet()) {
Expand Down Expand Up @@ -2481,33 +2484,6 @@ private PersistentTopic getPersistentTopic() {
return this;
}

@Override
protected void updateTopicPublishDispatcher() {
if (topicPolicyPublishRate != null && (topicPolicyPublishRate.publishThrottlingRateInByte > 0
|| topicPolicyPublishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling topic policy publish rate limiting {} on topic {}", topicPolicyPublishRate, this.topic);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupBrokerPublishRateLimiterMonitor();
}

if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(topicPolicyPublishRate, ()-> this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(topicPolicyPublishRate);
}
} else {
this.topicPublishRateLimiter.update(topicPolicyPublishRate);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
}
}

private void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled() &&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
Expand Down
Loading

0 comments on commit c144e04

Please sign in to comment.