Skip to content

Commit

Permalink
Add rate limit support for Replicator between clusters (apache#4273)
Browse files Browse the repository at this point in the history
Currently the rate limit between replication clusters is not able to control.  
In Geo-replication, once a cluster is offline, and after a long time, if it comes back, it may get a lot of messages from other clusters, and may use all of the network bandwidth.
This PR tries to provided a way to control the rate limit between cluster replications. Add rate limit support for Replicator between clusters.

changes:
- change DispatchRateLimiter.java to support 3 kind type: Topic, subscription, replicator.
- add  DispatchRateLimiter support in PersistentReplicator.
- add test and docs
  • Loading branch information
jiazhai authored and sijie committed May 20, 2019
1 parent cacb16a commit 84d02ac
Show file tree
Hide file tree
Showing 23 changed files with 910 additions and 115 deletions.
10 changes: 9 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ dispatchThrottlingRatePerSubscriptionInMsg=0

# Default number of message-bytes dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message-byte dispatch-throttling.
dispatchThrottlingRatePerSubscribeInByte=0
dispatchThrottlingRatePerSubscriptionInByte=0

# Default messages per second dispatch throttling-limit for every replicator in replication.
# Using a value of 0, is disabling replication message dispatch-throttling
dispatchThrottlingRatePerReplicatorInMsg=0

# Default bytes per second dispatch throttling-limit for every replicator in replication.
# Using a value of 0, is disabling replication message-byte dispatch-throttling
dispatchThrottlingRatePerReplicatorInByte=0

# By default we enable dispatch-throttling for both caught up consumers as well as consumers who have
# backlog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
Expand All @@ -391,7 +392,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_POLICIES,
doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling.")
private long dispatchThrottlingRatePerSubscribeInByte = 0;
private long dispatchThrottlingRatePerSubscriptionInByte = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Default number of message dispatching throttling-limit for every replicator in replication. \n\n"
+ "Using a value of 0, is disabling replication message dispatch-throttling")
private int dispatchThrottlingRatePerReplicatorInMsg = 0;
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Default number of message-bytes dispatching throttling-limit for every replicator in replication. \n\n"
+ "Using a value of 0, is disabling replication message-byte dispatch-throttling")
private long dispatchThrottlingRatePerReplicatorInByte = 0;

@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S

final String cluster = config.getClusterName();
// attach default dispatch rate polices
if (policies.clusterDispatchRate.isEmpty()) {
policies.clusterDispatchRate.put(cluster, dispatchRate());
if (policies.topicDispatchRate.isEmpty()) {
policies.topicDispatchRate.put(cluster, dispatchRate());
}

if (policies.subscriptionDispatchRate.isEmpty()) {
Expand All @@ -400,7 +400,7 @@ protected DispatchRate dispatchRate() {
protected DispatchRate subscriptionDispatchRate() {
return new DispatchRate(
pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(),
pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(),
1
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
log.debug("Failed to validate cluster ownership for {}-{}, {}", namespaceName.toString(), bundleRange, e.getMessage(), e);
}
}

// validate namespace ownership only if namespace is not owned by local-cluster (it happens when broker doesn't
// receive replication-cluster change watch and still owning bundle
if (!isOwnedByLocalCluster) {
Expand Down Expand Up @@ -657,7 +657,7 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit
}
}

protected void internalSetDispatchRate(DispatchRate dispatchRate) {
protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();

Expand All @@ -668,7 +668,7 @@ protected void internalSetDispatchRate(DispatchRate dispatchRate) {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
Expand All @@ -694,11 +694,11 @@ protected void internalSetDispatchRate(DispatchRate dispatchRate) {
}
}

protected DispatchRate internalGetDispatchRate() {
protected DispatchRate internalGetTopicDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName());
DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
Expand Down Expand Up @@ -806,6 +806,56 @@ protected SubscribeRate internalGetSubscribeRate() {
}
}

protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);

log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the replicatorDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected DispatchRate internalGetReplicatorDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
"replicator-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
}
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
public void setDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, DispatchRate dispatchRate) {
validateNamespaceName(property, cluster, namespace);
internalSetDispatchRate(dispatchRate);
internalSetTopicDispatchRate(dispatchRate);
}

@GET
Expand All @@ -446,7 +446,7 @@ public void setDispatchRate(@PathParam("property") String property, @PathParam("
public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetDispatchRate();
return internalGetTopicDispatchRate();
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void grantPermissionOnSubscription(@PathParam("property") String property
validateNamespaceName(property, namespace);
internalGrantPermissionOnSubscription(subscription, roles);
}

@DELETE
@Path("/{tenant}/{namespace}/permissions/{role}")
@ApiOperation(value = "Revoke all permissions to a role on a namespace.")
Expand All @@ -208,7 +208,7 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert
validateNamespaceName(property, namespace);
internalRevokePermissionsOnSubscription(subscription, role);
}

@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.", response = String.class, responseContainer = "List")
Expand Down Expand Up @@ -336,7 +336,7 @@ public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam(
public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
DispatchRate dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetDispatchRate(dispatchRate);
internalSetTopicDispatchRate(dispatchRate);
}

@GET
Expand All @@ -347,7 +347,7 @@ public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("name
public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDispatchRate();
return internalGetTopicDispatchRate();
}

@POST
Expand Down Expand Up @@ -393,6 +393,28 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
return internalGetSubscribeRate();
}

@POST
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
DispatchRate dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
}

@GET
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetReplicatorDispatchRate();
}

@GET
@Path("/{tenant}/{namespace}/backlogQuotaMap")
@ApiOperation(value = "Get backlog quota map on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,26 +1195,34 @@ private void updateConfigurationAndRegisterListeners() {
log.warn("Failed to change load manager due to {}", ex);
}
});
// add listener to update message-dispatch-rate in msg
// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte
// add listener to update message-dispatch-rate in byte for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> {
updateTopicMessageDispatchRate();
});
// add listener to update managed-ledger config to skipNonRecoverableLedgers
registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> {
updateManagedLedgerConfig();
});
// add listener to update message-dispatch-rate in msg
// add listener to update message-dispatch-rate in msg for subscription
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> {
updateSubscriptionMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte
registerConfigurationListener("dispatchThrottlingRatePerSubscribeInByte", (dispatchRatePerTopicInByte) -> {
// add listener to update message-dispatch-rate in byte for subscription
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> {
updateSubscriptionMessageDispatchRate();
});
// add listener to update message-dispatch-rate in msg for replicator
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", (dispatchRatePerTopicInMsg) -> {
updateReplicatorMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte for replicator
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", (dispatchRatePerTopicInByte) -> {
updateReplicatorMessageDispatchRate();
});
// add more listeners here
}

Expand Down Expand Up @@ -1243,6 +1251,18 @@ private void updateSubscriptionMessageDispatchRate() {
});
}

private void updateReplicatorMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic Replicator in Geo-replication
forEachTopic(topic ->
topic.getReplicators().forEach((name, persistentReplicator) -> {
if (persistentReplicator.getRateLimiter().isPresent()) {
persistentReplicator.getRateLimiter().get().updateDispatchRate();
}
}));
});
}

private void updateManagedLedgerConfig() {
this.pulsar().getExecutor().execute(() -> {
// update managed-ledger config of each topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
*/
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ReplicatorStats;

public interface Replicator {

void startProducer();

ReplicatorStats getStats();

CompletableFuture<Void> disconnect();
Expand All @@ -36,4 +39,11 @@ public interface Replicator {

String getRemoteCluster();

default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
//No-op
}

default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}
}
Loading

0 comments on commit 84d02ac

Please sign in to comment.