Skip to content

Commit

Permalink
[pulsar-broker] Introduce publish rate-limiting on topic (apache#3986)
Browse files Browse the repository at this point in the history
* [pulsar-broker] Introduce publish rate-limiting on topic

disable auto-read on cnx for publish throttling

Clean up

* fix documentation

* clean up conflict class
  • Loading branch information
rdhabalia authored Oct 25, 2019
1 parent 13e35f7 commit 7686542
Show file tree
Hide file tree
Showing 22 changed files with 844 additions and 45 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Tick time to schedule task that checks publish rate limiting across all topics
# (Disable publish throttling with value 0)
publisherThrottlingTickTimeMillis=10

# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
# hence causing high network bandwidth usage
# When the positive value is set, broker will throttle the subscribe requests for one consumer.
Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Tick time to schedule task that checks publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
publisherThrottlingTickTimeMillis=2

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " unacked messages than this percentage limit and subscription will not receive any new messages "
+ " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Tick time to schedule task that checks publish rate limiting across all topics "
+ "Reducing to lower value can give more accuracy while throttling publish but "
+ "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)"
)
private int publisherThrottlingTickTimeMillis = 5;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -801,6 +802,56 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit
}
}

protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
validateSuperUserAccess();

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);

log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the publish_max_message_rate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the publish_max_message_rate for cluster on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the publish_max_message_rate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected PublishRate internalGetPublishRate() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
if (publishRate != null) {
return publishRate;
} else {
throw new RestException(Status.NOT_FOUND,
"Publish-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
}
}

protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
Expand Down Expand Up @@ -448,6 +449,27 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
internalSplitNamespaceBundle(bundleRange, authoritative, unload);
}

@POST
@Path("/{property}/{cluster}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, PublishRate publishRate) {
validateNamespaceName(property, cluster, namespace);
internalSetPublishRate(publishRate);
}

@GET
@Path("/{property}/{cluster}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public PublishRate getPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetPublishRate();
}

@POST
@Path("/{property}/{cluster}/{namespace}/dispatchRate")
@ApiOperation(hidden = true, value = "Set dispatch-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -351,6 +352,26 @@ public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam(
internalSplitNamespaceBundle(bundleRange, authoritative, unload);
}

@POST
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setPublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace,
PublishRate publishRate) {
validateNamespaceName(property, namespace);
internalSetPublishRate(publishRate);
}

@GET
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public PublishRate getPublishRate(@PathParam("property") String property, @PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
return internalGetPublishRate();
}

@POST
@Path("/{tenant}/{namespace}/dispatchRate")
@ApiOperation(value = "Set dispatch-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,33 @@
*/
package org.apache.pulsar.broker.service;

import com.google.common.base.MoreObjects;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.google.common.base.MoreObjects;

public abstract class AbstractTopic implements Topic {
private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

Expand Down Expand Up @@ -75,6 +77,8 @@ public abstract class AbstractTopic implements Topic {
SchemaCompatibilityStrategy.FULL;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

protected volatile PublishRateLimiter publishRateLimiter;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
Expand All @@ -83,6 +87,17 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
updatePublishDispatcher(policies);
}

protected boolean isProducersExceeded() {
Expand Down Expand Up @@ -214,4 +229,69 @@ public void recordAddLatency(long latency, TimeUnit unit) {
.quantile(0.9999)
.quantile(1.0)
.register();

@Override
public void checkPublishThrottlingRate() {
this.publishRateLimiter.checkPublishRate();
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
this.publishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes);
}

@Override
public void resetPublishCountAndEnableReadIfRequired() {
if (this.publishRateLimiter.resetPublishCount()) {
enableProduerRead();
}
}

/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
protected void enableProduerRead() {
if (producers != null) {
producers.forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

@Override
public boolean isPublishRateExceeded() {
return this.publishRateLimiter.isPublishRateExceeded();
}

public PublishRateLimiter getPublishRateLimiter() {
return publishRateLimiter;
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(policies);
}

private void updatePublishDispatcher(Policies policies) {
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
if (publishRate != null
&& (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, this.topic);
// lazy init Publish-rateLimiting monitoring if not initialized yet
this.brokerService.setupPublishRateLimiterMonitor();
if (this.publishRateLimiter == null
|| this.publishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
this.publishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
} else {
this.publishRateLimiter.update(policies, clusterName);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.publishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProduerRead();
}
}

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
private ScheduledExecutorService publishRateLimiterMonitor;

private DistributedIdGenerator producerNameGenerator;

Expand Down Expand Up @@ -448,6 +449,40 @@ protected void startBacklogQuotaChecker() {

}

/**
* Schedules and monitors publish-throttling for all owned topics that has publish-throttling configured. It also
* disables and shutdowns publish-rate-limiter monitor task if broker disables it.
*/
public synchronized void setupPublishRateLimiterMonitor() {
long tickTimeMs = pulsar().getConfiguration().getPublisherThrottlingTickTimeMillis();
if (tickTimeMs > 0) {
if (this.publishRateLimiterMonitor == null) {
this.publishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-publish-rate-limiter-monitor"));
if (tickTimeMs > 0) {
// schedule task that sums up publish-rate across all cnx on a topic
publishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> checkPublishThrottlingRate()),
tickTimeMs, tickTimeMs, TimeUnit.MILLISECONDS);
// schedule task that refreshes rate-limitting bucket
publishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> refreshPublishRate()), 1, 1,
TimeUnit.SECONDS);
}
}
} else {
// disable publish-throttling for all topics
if (this.publishRateLimiterMonitor != null) {
try {
this.publishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("failed to shutdown publishRateLimiterMonitor", e);
}
// make sure topics are not being throttled
refreshPublishRate();
this.publishRateLimiterMonitor = null;
}
}
}

@Override
public void close() throws IOException {
log.info("Shutting down Pulsar Broker service");
Expand Down Expand Up @@ -1023,6 +1058,14 @@ public void checkInactiveSubscriptions() {
forEachTopic(Topic::checkInactiveSubscriptions);
}

public void checkPublishThrottlingRate() {
forEachTopic(Topic::checkPublishThrottlingRate);
}

private void refreshPublishRate() {
forEachTopic(Topic::resetPublishCountAndEnableReadIfRequired);
}

/**
* Iterates over all loaded topics in the broker
*/
Expand Down Expand Up @@ -1353,6 +1396,7 @@ private void updateConfigurationAndRegisterListeners() {
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> {
updateSubscriptionMessageDispatchRate();
});

// add listener to update message-dispatch-rate in msg for replicator
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", (dispatchRatePerTopicInMsg) -> {
updateReplicatorMessageDispatchRate();
Expand All @@ -1361,6 +1405,11 @@ private void updateConfigurationAndRegisterListeners() {
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", (dispatchRatePerTopicInByte) -> {
updateReplicatorMessageDispatchRate();
});

// add listener to notify publish-rate monitoring
registerConfigurationListener("publisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> {
setupPublishRateLimiterMonitor();
});
// add more listeners here
}

Expand Down
Loading

0 comments on commit 7686542

Please sign in to comment.