Skip to content

Commit

Permalink
Fix-2876 Limit the client reconnect behavior (apache#2977)
Browse files Browse the repository at this point in the history
### Motivation
This PR want to fix issue apache#2876 
Client reconnect too much and load data from bookie to cause the high network bandwidth usage.
Add mechanisms to limit the subscribe per consumer to avoid high network bandwidth usage.
By default, the limit mechanisms is close. Can enable by broker.conf and namespaces policy.

### Modifications

Add mechanisms to limit consumer subscribe times in a period.

### Result

If enable the Subscribe Rate Limiter, broker will refuse a consumer subscribe which subscribe times is over more than max subscribe times in a period.
  • Loading branch information
codelipenghui authored and sijie committed Nov 20, 2018
1 parent f8594e0 commit 3008a4b
Show file tree
Hide file tree
Showing 13 changed files with 608 additions and 1 deletion.
9 changes: 9 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
# hence causing high network bandwidth usage
# When the positive value is set, broker will throttle the subscribe requests for one consumer.
# Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled.
subscribeThrottlingRatePerConsumer=0

# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
subscribeRatePeriodPerConsumerInSecond=30

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
// than this percentage limit and subscription will not receive any new messages until that subscription acks back
// limit/2 messages
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
// Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
// hence causing high network bandwidth usage
// When the positive value is set, broker will throttle the subscribe requests for one consumer.
// Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled.
@FieldContext(dynamic = true)
private int subscribeThrottlingRatePerConsumer = 0;
// Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
@FieldContext(minValue = 1, dynamic = true)
private int subscribeRatePeriodPerConsumerInSecond = 30;
// Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
// message dispatch-throttling
@FieldContext(dynamic = true)
Expand Down Expand Up @@ -1796,4 +1805,20 @@ public BacklogQuota.RetentionPolicy getBacklogQuotaDefaultRetentionPolicy() {
public void setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy) {
this.backlogQuotaDefaultRetentionPolicy = backlogQuotaDefaultRetentionPolicy;
}

public int getSubscribeThrottlingRatePerConsumer() {
return subscribeThrottlingRatePerConsumer;
}

public void setSubscribeThrottlingRatePerConsumer(int subscribeThrottlingRatePerConsumer) {
this.subscribeThrottlingRatePerConsumer = subscribeThrottlingRatePerConsumer;
}

public int getSubscribeRatePeriodPerConsumerInSecond() {
return subscribeRatePeriodPerConsumerInSecond;
}

public void setSubscribeRatePeriodPerConsumerInSecond(int subscribeRatePeriodPerConsumerInSecond) {
this.subscribeRatePeriodPerConsumerInSecond = subscribeRatePeriodPerConsumerInSecond;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -696,6 +697,55 @@ protected DispatchRate internalGetSubscriptionDispatchRate() {
}
}

protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
validateSuperUserAccess();

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);

log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the subscribeRate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the subscribeRate for cluster on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected SubscribeRate internalGetSubscribeRate() {
validateAdminAccessForTenant(namespaceName.getTenant());
Policies policies = getNamespacePolicies(namespaceName);
SubscribeRate subscribeRate = policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
if (subscribeRate != null) {
return subscribeRate;
} else {
throw new RestException(Status.NOT_FOUND,
"Subscribe-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
}
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.slf4j.Logger;
Expand Down Expand Up @@ -345,6 +346,27 @@ public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tena
return internalGetSubscriptionDispatchRate();
}

@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
SubscribeRate subscribeRate) {
validateNamespaceName(tenant, namespace);
internalSetSubscribeRate(subscribeRate);
}

@GET
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Get subscribe-rate configured for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetSubscribeRate();
}

@GET
@Path("/{tenant}/{namespace}/backlogQuotaMap")
@ApiOperation(value = "Get backlog quota map on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,10 @@ public State getState() {
return state;
}

public SocketAddress getRemoteAddress() {
return remoteAddress;
}

public BrokerService getBrokerService() {
return service;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// doesn't support batch-message
private volatile boolean hasBatchMessagePublished = false;
private final DispatchRateLimiter dispatchRateLimiter;
private final SubscribeRateLimiter subscribeRateLimiter;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;

private final MessageDeduplication messageDeduplication;
Expand Down Expand Up @@ -226,6 +227,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
USAGE_COUNT_UPDATER.set(this, 0);

this.dispatchRateLimiter = new DispatchRateLimiter(this);
this.subscribeRateLimiter = new SubscribeRateLimiter(this);

this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());

Expand Down Expand Up @@ -494,6 +496,18 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
return future;
}

if (cnx.getRemoteAddress() != null && cnx.getRemoteAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
cnx.getRemoteAddress().toString().split(":")[0], consumerName, consumerId);
if (!subscribeRateLimiter.subscribeAvailable(consumer) || !subscribeRateLimiter.tryAcquire(consumer)) {
log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
topic, subscriptionName, consumer, subscribeRateLimiter.getSubscribeRate(),
subscribeRateLimiter.getAvailableSubscribeRateLimit(consumer));
future.completeExceptionally(new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
return future;
}
}

lock.readLock().lock();
try {
if (isFenced) {
Expand Down Expand Up @@ -840,6 +854,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}, null);

dispatchRateLimiter.close();
subscribeRateLimiter.close();

}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
Expand Down Expand Up @@ -1580,6 +1595,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
dispatchRateLimiter.onPoliciesUpdate(data);
subscribeRateLimiter.onPoliciesUpdate(data);
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
}

Expand Down Expand Up @@ -1727,6 +1743,10 @@ public DispatchRateLimiter getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}

public SubscribeRateLimiter getSubscribeRateLimiter() {
return this.subscribeRateLimiter;
}

public long getLastPublishedSequenceId(String producerName) {
return messageDeduplication.getLastPublishedSequenceId(producerName);
}
Expand Down
Loading

0 comments on commit 3008a4b

Please sign in to comment.