From 84d02ac4ad42497c052d627e834c5e07c0e3449a Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Mon, 20 May 2019 10:58:57 +0800 Subject: [PATCH] Add rate limit support for Replicator between clusters (#4273) Currently the rate limit between replication clusters is not able to control. In Geo-replication, once a cluster is offline, and after a long time, if it comes back, it may get a lot of messages from other clusters, and may use all of the network bandwidth. This PR tries to provided a way to control the rate limit between cluster replications. Add rate limit support for Replicator between clusters. changes: - change DispatchRateLimiter.java to support 3 kind type: Topic, subscription, replicator. - add DispatchRateLimiter support in PersistentReplicator. - add test and docs --- conf/broker.conf | 10 +- .../pulsar/broker/ServiceConfiguration.java | 16 +- .../pulsar/broker/admin/AdminResource.java | 6 +- .../broker/admin/impl/NamespacesBase.java | 60 +++- .../pulsar/broker/admin/v1/Namespaces.java | 4 +- .../pulsar/broker/admin/v2/Namespaces.java | 30 +- .../pulsar/broker/service/BrokerService.java | 30 +- .../pulsar/broker/service/Replicator.java | 12 +- .../persistent/DispatchRateLimiter.java | 160 ++++++--- ...PersistentDispatcherMultipleConsumers.java | 7 +- ...sistentDispatcherSingleActiveConsumer.java | 7 +- .../persistent/PersistentReplicator.java | 80 ++++- .../service/persistent/PersistentTopic.java | 20 +- .../apache/pulsar/broker/ConfigHelper.java | 12 +- .../pulsar/broker/admin/AdminApiTest.java | 2 +- .../broker/admin/v1/V1_AdminApiTest.java | 2 +- .../service/ReplicatorRateLimiterTest.java | 325 ++++++++++++++++++ .../pulsar/broker/service/ServerCnxTest.java | 10 +- .../pulsar/client/admin/Namespaces.java | 24 +- .../client/admin/internal/NamespacesImpl.java | 26 +- .../pulsar/admin/cli/CmdNamespaces.java | 50 ++- .../pulsar/common/policies/data/Policies.java | 14 +- site2/docs/admin-api-namespaces.md | 118 ++++++- 23 files changed, 910 insertions(+), 115 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java diff --git a/conf/broker.conf b/conf/broker.conf index 37b1281c0cdee..4d217a3e1981f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -189,7 +189,15 @@ dispatchThrottlingRatePerSubscriptionInMsg=0 # Default number of message-bytes dispatching throttling-limit for a subscription. # Using a value of 0, is disabling default message-byte dispatch-throttling. -dispatchThrottlingRatePerSubscribeInByte=0 +dispatchThrottlingRatePerSubscriptionInByte=0 + +# Default messages per second dispatch throttling-limit for every replicator in replication. +# Using a value of 0, is disabling replication message dispatch-throttling +dispatchThrottlingRatePerReplicatorInMsg=0 + +# Default bytes per second dispatch throttling-limit for every replicator in replication. +# Using a value of 0, is disabling replication message-byte dispatch-throttling +dispatchThrottlingRatePerReplicatorInByte=0 # By default we enable dispatch-throttling for both caught up consumers as well as consumers who have # backlog. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e59420841a1ca..f047703942258 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -380,6 +380,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n" + "Using a value of 0, is disabling default message-byte dispatch-throttling") private long dispatchThrottlingRatePerTopicInByte = 0; + @FieldContext( dynamic = true, category = CATEGORY_POLICIES, @@ -391,7 +392,20 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_POLICIES, doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n" + "Using a value of 0, is disabling default message-byte dispatch-throttling.") - private long dispatchThrottlingRatePerSubscribeInByte = 0; + private long dispatchThrottlingRatePerSubscriptionInByte = 0; + + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default number of message dispatching throttling-limit for every replicator in replication. \n\n" + + "Using a value of 0, is disabling replication message dispatch-throttling") + private int dispatchThrottlingRatePerReplicatorInMsg = 0; + @FieldContext( + dynamic = true, + category = CATEGORY_POLICIES, + doc = "Default number of message-bytes dispatching throttling-limit for every replicator in replication. \n\n" + + "Using a value of 0, is disabling replication message-byte dispatch-throttling") + private long dispatchThrottlingRatePerReplicatorInByte = 0; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 35d22861e4982..2c870626f4ed0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -372,8 +372,8 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S final String cluster = config.getClusterName(); // attach default dispatch rate polices - if (policies.clusterDispatchRate.isEmpty()) { - policies.clusterDispatchRate.put(cluster, dispatchRate()); + if (policies.topicDispatchRate.isEmpty()) { + policies.topicDispatchRate.put(cluster, dispatchRate()); } if (policies.subscriptionDispatchRate.isEmpty()) { @@ -400,7 +400,7 @@ protected DispatchRate dispatchRate() { protected DispatchRate subscriptionDispatchRate() { return new DispatchRate( pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), - pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(), + pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(), 1 ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 98fb11b3b9f80..eab5cd9d9c3eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -593,7 +593,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat log.debug("Failed to validate cluster ownership for {}-{}, {}", namespaceName.toString(), bundleRange, e.getMessage(), e); } } - + // validate namespace ownership only if namespace is not owned by local-cluster (it happens when broker doesn't // receive replication-cluster change watch and still owning bundle if (!isOwnedByLocalCluster) { @@ -657,7 +657,7 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit } } - protected void internalSetDispatchRate(DispatchRate dispatchRate) { + protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) { log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); validateSuperUserAccess(); @@ -668,7 +668,7 @@ protected void internalSetDispatchRate(DispatchRate dispatchRate) { // Force to read the data s.t. the watch to the cache content is setup. policiesNode = policiesCache().getWithStat(path).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); - policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); + policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); // Write back the new policies into zookeeper globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()), @@ -694,11 +694,11 @@ protected void internalSetDispatchRate(DispatchRate dispatchRate) { } } - protected DispatchRate internalGetDispatchRate() { + protected DispatchRate internalGetTopicDispatchRate() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); - DispatchRate dispatchRate = policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName()); + DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()); if (dispatchRate != null) { return dispatchRate; } else { @@ -806,6 +806,56 @@ protected SubscribeRate internalGetSubscribeRate() { } } + protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) { + log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); + validateSuperUserAccess(); + + Entry policiesNode = null; + + try { + final String path = path(POLICIES, namespaceName.toString()); + // Force to read the data s.t. the watch to the cache content is setup. + policiesNode = policiesCache().getWithStat(path).orElseThrow( + () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); + policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); + + // Write back the new policies into zookeeper + globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()), + policiesNode.getValue().getVersion()); + policiesCache().invalidate(path); + + log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(), + namespaceName); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist", + clientAppId(), namespaceName); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn( + "[{}] Failed to update the replicatorDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification", + clientAppId(), namespaceName, policiesNode.getValue().getVersion()); + + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (Exception e) { + log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(), + namespaceName, e); + throw new RestException(e); + } + } + + protected DispatchRate internalGetReplicatorDispatchRate() { + validateAdminAccessForTenant(namespaceName.getTenant()); + + Policies policies = getNamespacePolicies(namespaceName); + DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()); + if (dispatchRate != null) { + return dispatchRate; + } else { + throw new RestException(Status.NOT_FOUND, + "replicator-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName()); + } + } + protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) { validateAdminAccessForTenant(namespaceName.getTenant()); validatePoliciesReadOnlyAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 1f2c8c93c131b..35b57135c1d4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -435,7 +435,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa public void setDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, DispatchRate dispatchRate) { validateNamespaceName(property, cluster, namespace); - internalSetDispatchRate(dispatchRate); + internalSetTopicDispatchRate(dispatchRate); } @GET @@ -446,7 +446,7 @@ public void setDispatchRate(@PathParam("property") String property, @PathParam(" public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - return internalGetDispatchRate(); + return internalGetTopicDispatchRate(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 860ab84076756..b0715ff4bd958 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -185,7 +185,7 @@ public void grantPermissionOnSubscription(@PathParam("property") String property validateNamespaceName(property, namespace); internalGrantPermissionOnSubscription(subscription, roles); } - + @DELETE @Path("/{tenant}/{namespace}/permissions/{role}") @ApiOperation(value = "Revoke all permissions to a role on a namespace.") @@ -208,7 +208,7 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert validateNamespaceName(property, namespace); internalRevokePermissionsOnSubscription(subscription, role); } - + @GET @Path("/{tenant}/{namespace}/replication") @ApiOperation(value = "Get the replication clusters for a namespace.", response = String.class, responseContainer = "List") @@ -336,7 +336,7 @@ public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam( public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, DispatchRate dispatchRate) { validateNamespaceName(tenant, namespace); - internalSetDispatchRate(dispatchRate); + internalSetTopicDispatchRate(dispatchRate); } @GET @@ -347,7 +347,7 @@ public void setDispatchRate(@PathParam("tenant") String tenant, @PathParam("name public DispatchRate getDispatchRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetDispatchRate(); + return internalGetTopicDispatchRate(); } @POST @@ -393,6 +393,28 @@ public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant, return internalGetSubscribeRate(); } + @POST + @Path("/{tenant}/{namespace}/replicatorDispatchRate") + @ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) + public void setReplicatorDispatchRate(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + DispatchRate dispatchRate) { + validateNamespaceName(tenant, namespace); + internalSetReplicatorDispatchRate(dispatchRate); + } + + @GET + @Path("/{tenant}/{namespace}/replicatorDispatchRate") + @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + return internalGetReplicatorDispatchRate(); + } + @GET @Path("/{tenant}/{namespace}/backlogQuotaMap") @ApiOperation(value = "Get backlog quota map on a namespace.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fddda96351f72..b26a6c1861290 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1195,11 +1195,11 @@ private void updateConfigurationAndRegisterListeners() { log.warn("Failed to change load manager due to {}", ex); } }); - // add listener to update message-dispatch-rate in msg + // add listener to update message-dispatch-rate in msg for topic registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> { updateTopicMessageDispatchRate(); }); - // add listener to update message-dispatch-rate in byte + // add listener to update message-dispatch-rate in byte for topic registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> { updateTopicMessageDispatchRate(); }); @@ -1207,14 +1207,22 @@ private void updateConfigurationAndRegisterListeners() { registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> { updateManagedLedgerConfig(); }); - // add listener to update message-dispatch-rate in msg + // add listener to update message-dispatch-rate in msg for subscription registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> { updateSubscriptionMessageDispatchRate(); }); - // add listener to update message-dispatch-rate in byte - registerConfigurationListener("dispatchThrottlingRatePerSubscribeInByte", (dispatchRatePerTopicInByte) -> { + // add listener to update message-dispatch-rate in byte for subscription + registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> { updateSubscriptionMessageDispatchRate(); }); + // add listener to update message-dispatch-rate in msg for replicator + registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", (dispatchRatePerTopicInMsg) -> { + updateReplicatorMessageDispatchRate(); + }); + // add listener to update message-dispatch-rate in byte for replicator + registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", (dispatchRatePerTopicInByte) -> { + updateReplicatorMessageDispatchRate(); + }); // add more listeners here } @@ -1243,6 +1251,18 @@ private void updateSubscriptionMessageDispatchRate() { }); } + private void updateReplicatorMessageDispatchRate() { + this.pulsar().getExecutor().submit(() -> { + // update message-rate for each topic Replicator in Geo-replication + forEachTopic(topic -> + topic.getReplicators().forEach((name, persistentReplicator) -> { + if (persistentReplicator.getRateLimiter().isPresent()) { + persistentReplicator.getRateLimiter().get().updateDispatchRate(); + } + })); + }); + } + private void updateManagedLedgerConfig() { this.pulsar().getExecutor().execute(() -> { // update managed-ledger config of each topic diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index f8a23f643c6a2..7412b228b5893 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -18,14 +18,17 @@ */ package org.apache.pulsar.broker.service; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.ReplicatorStats; public interface Replicator { void startProducer(); - + ReplicatorStats getStats(); CompletableFuture disconnect(); @@ -36,4 +39,11 @@ public interface Replicator { String getRemoteCluster(); + default void initializeDispatchRateLimiterIfNeeded(Optional policies) { + //No-op + } + + default Optional getRateLimiter() { + return Optional.empty(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 7e9052ada589b..d7d0ade129776 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -38,23 +38,26 @@ public class DispatchRateLimiter { + public enum Type { + TOPIC, + SUBSCRIPTION, + REPLICATOR + } + private final String topicName; - private final String subscriptionName; + private final Type type; + private final BrokerService brokerService; private RateLimiter dispatchRateLimiterOnMessage; private RateLimiter dispatchRateLimiterOnByte; - public DispatchRateLimiter(PersistentTopic topic, String subscriptionName) { + public DispatchRateLimiter(PersistentTopic topic, Type type) { this.topicName = topic.getName(); - this.subscriptionName = subscriptionName; this.brokerService = topic.getBrokerService(); + this.type = type; updateDispatchRate(); } - public DispatchRateLimiter(PersistentTopic topic) { - this(topic, null); - } - /** * returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1 * @@ -100,67 +103,105 @@ public boolean isDispatchRateLimitingEnabled() { return dispatchRateLimiterOnMessage != null || dispatchRateLimiterOnByte != null; } + /** + * createDispatchRate according to broker service config. + * + * @return + */ + private DispatchRate createDispatchRate() { + int dispatchThrottlingRateInMsg; + long dispatchThrottlingRateInByte; + ServiceConfiguration config = brokerService.pulsar().getConfiguration(); + + switch (type) { + case TOPIC: + dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerTopicInMsg(); + dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerTopicInByte(); + break; + case SUBSCRIPTION: + dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerSubscriptionInMsg(); + dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerSubscriptionInByte(); + break; + case REPLICATOR: + dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg(); + dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte(); + break; + default: + dispatchThrottlingRateInMsg = -1; + dispatchThrottlingRateInByte = -1; + } + + return new DispatchRate(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, 1); + } + /** * Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies * default broker dispatch-throttling-rate */ public void updateDispatchRate() { DispatchRate dispatchRate = getPoliciesDispatchRate(brokerService); + if (dispatchRate == null) { - if (subscriptionName == null) { - dispatchRate = new DispatchRate(brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), - brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte(), 1); - } else { - dispatchRate = new DispatchRate(brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), - brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(), 1); - } + dispatchRate = createDispatchRate(); } + updateDispatchRate(dispatchRate); - log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate); + log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate); } - public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional policies, - String topicName, String subscriptionName) { + String topicName, Type type) { final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration(); policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); - return isDispatchRateNeeded(serviceConfig, policies, topicName, subscriptionName); + return isDispatchRateNeeded(serviceConfig, policies, topicName, type); } public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig, - final Optional policies, final String topicName, final String subscriptionName) { - DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, topicName, - subscriptionName); + final Optional policies, final String topicName, final Type type) { + DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type); if (dispatchRate == null) { - if (subscriptionName == null) { - return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0 + switch (type) { + case TOPIC: + return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0 || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0; - } else { - return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0 - || serviceConfig.getDispatchThrottlingRatePerSubscribeInByte() > 0; + case SUBSCRIPTION: + return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0 + || serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0; + case REPLICATOR: + return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0 + || serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0; + default: + log.error("error DispatchRateLimiter type: {} ", type); + return false; } } return true; } - + public void onPoliciesUpdate(Policies data) { String cluster = brokerService.pulsar().getConfiguration().getClusterName(); DispatchRate dispatchRate; - if (subscriptionName == null) { - dispatchRate = data.clusterDispatchRate.get(cluster); - } else { - dispatchRate = data.subscriptionDispatchRate.get(cluster); + + switch (type) { + case TOPIC: + dispatchRate = data.topicDispatchRate.get(cluster); + break; + case SUBSCRIPTION: + dispatchRate = data.subscriptionDispatchRate.get(cluster); + break; + case REPLICATOR: + dispatchRate = data.replicatorDispatchRate.get(cluster); + break; + default: + log.error("error DispatchRateLimiter type: {} ", type); + dispatchRate = null; } + // update dispatch-rate only if it's configured in policies else ignore if (dispatchRate != null) { - int inMsg = (subscriptionName == null) ? - brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() : - brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(); - long inByte = (subscriptionName == null) ? - brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() : - brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte(); - final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1); + final DispatchRate newDispatchRate = createDispatchRate(); + // if policy-throttling rate is disabled and cluster-throttling is enabled then apply // cluster-throttling rate if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) { @@ -170,6 +211,29 @@ public void onPoliciesUpdate(Policies data) { } } + public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional policies, Type type) { + // return policy-dispatch rate only if it's enabled in policies + return policies.map(p -> { + DispatchRate dispatchRate; + switch (type) { + case TOPIC: + dispatchRate = p.topicDispatchRate.get(cluster); + break; + case SUBSCRIPTION: + dispatchRate = p.subscriptionDispatchRate.get(cluster); + break; + case REPLICATOR: + dispatchRate = p.replicatorDispatchRate.get(cluster); + break; + default: + log.error("error DispatchRateLimiter type: {} ", type); + return null; + } + return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null; + }).orElse(null); + } + + /** * Gets configured dispatch-rate from namespace policies. Returns null if dispatch-rate is not configured * @@ -178,10 +242,9 @@ public void onPoliciesUpdate(Policies data) { public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) { final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); final Optional policies = getPolicies(brokerService, topicName); - return getPoliciesDispatchRate(cluster, policies, topicName, subscriptionName); + return getPoliciesDispatchRate(cluster, policies, type); } - - + public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); final String path = path(POLICIES, namespace.toString()); @@ -195,19 +258,6 @@ public static Optional getPolicies(BrokerService brokerService, String return policies; } - public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional policies, final String topicName, final String subscriptionName) { - // return policy-dispatch rate only if it's enabled in policies - return policies.map(p -> { - DispatchRate dispatchRate; - if (subscriptionName == null) { - dispatchRate = p.clusterDispatchRate.get(cluster); - } else { - dispatchRate = p.subscriptionDispatchRate.get(cluster); - } - return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null; - }).orElse(null); - } - /** * Update dispatch rate by updating msg and byte rate-limiter. If dispatch-rate is configured < 0 then it closes * the rate-limiter and disables appropriate rate-limiter. @@ -216,7 +266,7 @@ public static DispatchRate getPoliciesDispatchRate(final String cluster, Optiona */ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { // synchronized to prevent race condition from concurrent zk-watch - log.info("[{}] [{}] setting message-dispatch-rate {}", topicName, subscriptionName, dispatchRate); + log.info("setting message-dispatch-rate {}", dispatchRate); long msgRate = dispatchRate.dispatchThrottlingRateInMsg; long byteRate = dispatchRate.dispatchThrottlingRateInByte; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index b9351c2578a19..0bb49652ac987 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -643,10 +644,10 @@ public Optional getRateLimiter() { @Override public void initializeDispatchRateLimiterIfNeeded(Optional policies) { if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter - .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), name)) { - this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, name)); + .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) { + this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION)); } } - + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 4356fbe0a5a0f..e1e17a3650dd5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -495,10 +496,10 @@ public Optional getRateLimiter() { @Override public void initializeDispatchRateLimiterIfNeeded(Optional policies) { if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter - .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), name)) { - this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, name)); + .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) { + this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION)); } } - + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index cda58e3cfe79f..225310487c6dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -18,7 +18,10 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; + import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -40,11 +43,13 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.Replicator; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.SendCallback; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; @@ -59,6 +64,8 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private final PersistentTopic topic; private final ManagedCursor cursor; + private Optional dispatchRateLimiter = Optional.empty(); + private int readBatchSize; private final int producerQueueThreshold; @@ -101,6 +108,8 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); producerQueueThreshold = (int) (producerQueueSize * 0.9); + this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); + startProducer(); } @@ -147,9 +156,53 @@ protected void disableReplicatorRead() { } - protected void readMoreEntries() { + /** + * Calculate available permits for read entries. + * + * @return + * 0: Producer queue is full, no permits. + * -1: Rate Limiter reaches limit. + * >0: available permits for read entries. + */ + private int getAvailablePermits() { int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this); + // return 0, if Producer queue is full, it will pause read entries. + if (availablePermits <= 0) { + if (log.isDebugEnabled()) { + log.debug("[{}][{} -> {}] Producer queue is full, availablePermits: {}, pause reading", + topicName, localCluster, remoteCluster, availablePermits); + } + return 0; + } + + // handle rate limit + if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { + DispatchRateLimiter rateLimiter = dispatchRateLimiter.get(); + // no permits from rate limit + if (!rateLimiter.hasMessageDispatchPermit()) { + if (log.isDebugEnabled()) { + log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}, schedule after a {}", + topicName, localCluster, remoteCluster, + rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), + MESSAGE_RATE_BACKOFF_MS); + } + return -1; + } + + // if dispatch-rate is in msg then read only msg according to available permit + long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); + if (availablePermitsOnMsg > 0) { + availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); + } + } + + return availablePermits; + } + + protected void readMoreEntries() { + int availablePermits = getAvailablePermits(); + if (availablePermits > 0) { int messagesToRead = Math.min(availablePermits, readBatchSize); if (!isWritable()) { @@ -174,10 +227,14 @@ protected void readMoreEntries() { localCluster, remoteCluster, messagesToRead); } } + } else if (availablePermits == -1) { + // no permits from rate limit + topic.getBrokerService().executor().schedule( + () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Producer queue is full, pause reading", topicName, localCluster, - remoteCluster); + log.debug("[{}][{} -> {}] No Permits for reading. availablePermits: {}", + topicName, localCluster, remoteCluster, availablePermits); } } } @@ -267,6 +324,10 @@ public void readEntriesComplete(List entries, Object ctx) { continue; } + if (dispatchRateLimiter.isPresent()) { + dispatchRateLimiter.get().tryDispatchPermit(1, entry.getLength()); + } + // Increment pending messages for messages produced locally PENDING_MESSAGES_UPDATER.incrementAndGet(this); @@ -576,5 +637,18 @@ public void expireMessages(int messageTTLInSeconds) { expiryMonitor.expireMessages(messageTTLInSeconds); } + @Override + public Optional getRateLimiter() { + return dispatchRateLimiter; + } + + @Override + public void initializeDispatchRateLimiterIfNeeded(Optional policies) { + if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter + .isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.REPLICATOR)) { + this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR)); + } + } + private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4cbcf1113a9c8..b0392f9106af5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -81,6 +81,7 @@ import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; @@ -277,17 +278,23 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS private void initializeDispatchRateLimiterIfNeeded(Optional policies) { synchronized (dispatchRateLimiter) { + // dispatch rate limiter for topic if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter - .isDispatchRateNeeded(brokerService, policies, topic, null)) { - this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this)); + .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) { + this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC)); } if (!subscribeRateLimiter.isPresent() && SubscribeRateLimiter .isDispatchRateNeeded(brokerService, policies, topic)) { this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this)); } - subscriptions.forEach((name, subscription) -> { - subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies); - }); + + // dispatch rate limiter for each subscription + subscriptions.forEach((name, subscription) -> + subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies)); + + // dispatch rate limiter for each replicator + replicators.forEach((name, replicator) -> + replicator.initializeDispatchRateLimiterIfNeeded(policies)); } } @@ -1656,6 +1663,9 @@ public CompletableFuture onPoliciesUpdate(Policies data) { sub.getDispatcher().getRateLimiter().get().onPoliciesUpdate(data); } }); + replicators.forEach((name, replicator) -> + replicator.getRateLimiter().get().onPoliciesUpdate(data) + ); checkMessageExpiry(); CompletableFuture replicationFuture = checkReplicationAndRetryOnFailure(); CompletableFuture dedupFuture = checkDeduplicationStatus(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java index 020d7b3e95c13..108c7ac7a37bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java @@ -41,7 +41,7 @@ public static BacklogQuota backlogQuota(ServiceConfiguration configuration) { ); } - public static DispatchRate dispatchRate(ServiceConfiguration configuration) { + public static DispatchRate topicDispatchRate(ServiceConfiguration configuration) { return new DispatchRate( configuration.getDispatchThrottlingRatePerTopicInMsg(), configuration.getDispatchThrottlingRatePerTopicInByte(), @@ -52,11 +52,19 @@ public static DispatchRate dispatchRate(ServiceConfiguration configuration) { public static DispatchRate subscriptionDispatchRate(ServiceConfiguration configuration) { return new DispatchRate( configuration.getDispatchThrottlingRatePerSubscriptionInMsg(), - configuration.getDispatchThrottlingRatePerSubscribeInByte(), + configuration.getDispatchThrottlingRatePerSubscriptionInByte(), 1 ); } + public static DispatchRate replicatorDispatchRate(ServiceConfiguration configuration) { + return new DispatchRate( + configuration.getDispatchThrottlingRatePerReplicatorInMsg(), + configuration.getDispatchThrottlingRatePerReplicatorInByte(), + 1 + ); + } + public static SubscribeRate subscribeRate(ServiceConfiguration configuration) { return new SubscribeRate( configuration.getSubscribeThrottlingRatePerConsumer(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 66e9067a13cab..c8ee9caaf9722 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -637,7 +637,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc // set default quotas on namespace Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf)); - policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf)); + policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf)); policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf)); policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index eda4409edca09..42ea2ce995dee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -632,7 +632,7 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc // set default quotas on namespace Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf)); - policies.clusterDispatchRate.put("test", ConfigHelper.dispatchRate(conf)); + policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf)); policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf)); policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java new file mode 100644 index 0000000000000..4631b07de5b8b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Sets; + +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.Cleanup; + +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Starts 3 brokers that are in 3 different clusters + */ +public class ReplicatorRateLimiterTest extends ReplicatorTestBase { + + protected String methodName; + + @BeforeMethod + public void beforeMethod(Method m) throws Exception { + methodName = m.getName(); + } + + @Override + @BeforeClass(timeOut = 30000) + void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(timeOut = 30000) + void shutdown() throws Exception { + super.shutdown(); + } + + enum DispatchRateType { + messageRate, byteRate + } + + @DataProvider(name = "dispatchRateType") + public Object[][] dispatchRateProvider() { + return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } }; + } + + /** + * verifies dispatch rate for replicators get changed once namespace policies changed. + * + * 1. verify default replicator not configured. + * 2. change namespace setting of replicator dispatchRateMsg, verify topic changed. + * 3. change namespace setting of replicator dispatchRateByte, verify topic changed. + * + * @throws Exception + */ + @Test + public void testReplicatorRateLimiterDynamicallyChange() throws Exception { + log.info("--- Starting ReplicatorTest::{} --- ", methodName); + + final String namespace = "pulsar/replicatorchange"; + final String topicName = "persistent://" + namespace + "/ratechange"; + + admin1.namespaces().createNamespace(namespace); + // 0. set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.close(); + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); + + // 1. default replicator throttling not configured + Assert.assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + + // 2. change namespace setting of replicator dispatchRateMsg, verify topic changed. + int messageRate = 100; + DispatchRate dispatchRateMsg = new DispatchRate(messageRate, -1, 360); + admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateMsg); + + boolean replicatorUpdated = false; + int retry = 5; + for (int i = 0; i < retry; i++) { + if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + replicatorUpdated = true; + break; + } else { + if (i != retry - 1) { + Thread.sleep(100); + } + } + } + Assert.assertTrue(replicatorUpdated); + Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + + // 3. change namespace setting of replicator dispatchRateByte, verify topic changed. + messageRate = 500; + DispatchRate dispatchRateByte = new DispatchRate(-1, messageRate, 360); + admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRateByte); + replicatorUpdated = false; + for (int i = 0; i < retry; i++) { + if (topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte() == messageRate) { + replicatorUpdated = true; + break; + } else { + if (i != retry - 1) { + Thread.sleep(100); + } + } + } + Assert.assertTrue(replicatorUpdated); + Assert.assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), dispatchRateByte); + } + + /** + * verifies dispatch rate for replicators works well for both Message limit and Byte limit . + * + * 1. verify topic replicator get configured. + * 2. namespace setting of replicator dispatchRate, verify consumer in other cluster could not receive all messages. + * + * @throws Exception + */ + @Test(dataProvider = "dispatchRateType", timeOut = 5000) + public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateType dispatchRateType) throws Exception { + log.info("--- Starting ReplicatorTest::{} --- ", methodName); + + final String namespace = "pulsar/replicatorbyteandmsg" + dispatchRateType.toString(); + final String topicName = "persistent://" + namespace + "/notReceivedAll"; + + admin1.namespaces().createNamespace(namespace); + // 0. set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + final int messageRate = 100; + DispatchRate dispatchRate; + if (DispatchRateType.messageRate.equals(dispatchRateType)) { + dispatchRate = new DispatchRate(messageRate, -1, 360); + } else { + dispatchRate = new DispatchRate(-1, messageRate, 360); + } + admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); + + boolean replicatorUpdated = false; + int retry = 5; + for (int i = 0; i < retry; i++) { + if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + replicatorUpdated = true; + break; + } else { + if (i != retry - 1) { + Thread.sleep(100); + } + } + } + Assert.assertTrue(replicatorUpdated); + if (DispatchRateType.messageRate.equals(dispatchRateType)) { + Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + } else { + Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), messageRate); + } + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + final AtomicInteger totalReceived = new AtomicInteger(0); + + Consumer consumer = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2").messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + totalReceived.incrementAndGet(); + }).subscribe(); + + int numMessages = 500; + // Asynchronously produce messages + for (int i = 0; i < numMessages; i++) { + producer.send(new byte[80]); + } + + log.info("Received message number: [{}]", totalReceived.get()); + + Assert.assertTrue(totalReceived.get() < messageRate * 2); + + consumer.close(); + producer.close(); + } + + /** + * verifies dispatch rate for replicators works well for both Message limit. + * + * 1. verify topic replicator get configured. + * 2. namespace setting of replicator dispatchRate, + * verify consumer in other cluster could receive all messages < message limit. + * 3. verify consumer in other cluster could not receive all messages > message limit. + * + * @throws Exception + */ + @Test(timeOut = 5000) + public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Exception { + log.info("--- Starting ReplicatorTest::{} --- ", methodName); + + final String namespace = "pulsar/replicatormsg"; + final String topicName = "persistent://" + namespace + "/notReceivedAll"; + + admin1.namespaces().createNamespace(namespace); + // 0. set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + final int messageRate = 100; + DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360); + admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); + + boolean replicatorUpdated = false; + int retry = 5; + for (int i = 0; i < retry; i++) { + if (topic.getReplicators().values().get(0).getRateLimiter().isPresent()) { + replicatorUpdated = true; + break; + } else { + if (i != retry - 1) { + Thread.sleep(100); + } + } + } + Assert.assertTrue(replicatorUpdated); + Assert.assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), messageRate); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + final AtomicInteger totalReceived = new AtomicInteger(0); + + Consumer consumer = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2").messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + totalReceived.incrementAndGet(); + }).subscribe(); + + int numMessages = 50; + // Asynchronously produce messages + for (int i = 0; i < numMessages; i++) { + producer.send(new byte[80]); + } + + Thread.sleep(1000); + log.info("Received message number: [{}]", totalReceived.get()); + + Assert.assertEquals(totalReceived.get(), numMessages); + + + numMessages = 200; + // Asynchronously produce messages + for (int i = 0; i < numMessages; i++) { + producer.send(new byte[80]); + } + Thread.sleep(1000); + log.info("Received message number: [{}]", totalReceived.get()); + + Assert.assertEquals(totalReceived.get(), messageRate); + + consumer.close(); + producer.close(); + } + + private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 1fe2f9797113b..82f61c22eeff1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -51,8 +51,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import javax.naming.AuthenticationException; - import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -1275,7 +1273,7 @@ public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception { ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); Policies policies = mock(Policies.class); policies.encryption_required = true; - policies.clusterDispatchRate = Maps.newHashMap(); + policies.topicDispatchRate = Maps.newHashMap(); doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(zkDataCache).when(configCacheService).policiesCache(); @@ -1303,7 +1301,7 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception { ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); Policies policies = mock(Policies.class); policies.encryption_required = true; - policies.clusterDispatchRate = Maps.newHashMap(); + policies.topicDispatchRate = Maps.newHashMap(); doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(zkDataCache).when(configCacheService).policiesCache(); @@ -1333,7 +1331,7 @@ public void testSendSuccessOnEncryptionRequiredTopic() throws Exception { ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); Policies policies = mock(Policies.class); policies.encryption_required = true; - policies.clusterDispatchRate = Maps.newHashMap(); + policies.topicDispatchRate = Maps.newHashMap(); doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(zkDataCache).when(configCacheService).policiesCache(); @@ -1368,7 +1366,7 @@ public void testSendFailureOnEncryptionRequiredTopic() throws Exception { ZooKeeperDataCache zkDataCache = mock(ZooKeeperDataCache.class); Policies policies = mock(Policies.class); policies.encryption_required = true; - policies.clusterDispatchRate = Maps.newHashMap(); + policies.topicDispatchRate = Maps.newHashMap(); doReturn(Optional.of(policies)).when(zkDataCache).get(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(zkDataCache).getAsync(AdminResource.path(POLICIES, TopicName.get(encryptionRequiredTopicName).getNamespace())); doReturn(zkDataCache).when(configCacheService).policiesCache(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 6b8c22ca194b9..29f509bb8e56b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -384,7 +384,7 @@ public interface Namespaces { * @throws PulsarAdminException */ void grantPermissionOnSubscription(String namespace, String subscription, Set roles) throws PulsarAdminException; - + /** * Revoke permissions on a subscription's admin-api access. * @param namespace @@ -393,7 +393,7 @@ public interface Namespaces { * @throws PulsarAdminException */ void revokePermissionOnSubscription(String namespace, String subscription, String role) throws PulsarAdminException; - + /** * Get the replication clusters for a namespace. *

@@ -932,6 +932,26 @@ List getAntiAffinityNamespaces(String tenant, String cluster, String nam */ DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException; + /** + * Set replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second) + * + * @param namespace + * @param dispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException; + + /** Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second) + * + * @param namespace + * @returns DispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException; /** * Clear backlog for all topics on a namespace diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index b80d23816a036..42cdfbd338ab1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -233,7 +233,7 @@ public void revokePermissionsOnNamespace(String namespace, String role) throws P } } - + @Override public void grantPermissionOnSubscription(String namespace, String subscription, Set roles) throws PulsarAdminException { @@ -256,7 +256,7 @@ public void revokePermissionOnSubscription(String namespace, String subscription throw getApiException(e); } } - + @Override public List getNamespaceReplicationClusters(String namespace) throws PulsarAdminException { try { @@ -549,6 +549,28 @@ public DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarA } } + @Override + public void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "replicatorDispatchRate"); + request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "replicatorDispatchRate"); + return request(path).get(DispatchRate.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void clearNamespaceBacklog(String namespace) throws PulsarAdminException { try { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index d45df9ca06894..7b7043fa547ad 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -224,7 +224,7 @@ private class RevokeSubscriptionPermissions extends CliCommand { @Parameter(names = "--subscription", description = "Subscription name for which permission will be revoked to roles", required = true) private String subscription; - + @Parameter(names = "--role", description = "Client role to which revoke permissions", required = true) private String role; @@ -234,7 +234,7 @@ void run() throws PulsarAdminException { admin.namespaces().revokePermissionOnSubscription(namespace, subscription, role); } } - + @Parameters(commandDescription = "Get the permissions on a namespace") private class Permissions extends CliCommand { @Parameter(description = "tenant/namespace\n", required = true) @@ -548,7 +548,7 @@ private class SetSubscriptionDispatchRate extends CliCommand { @Parameter(description = "tenant/namespace\n", required = true) private java.util.List params; - @Parameter(names = { "--sub-msg-dispatch-rate", + @Parameter(names = { "--msg-dispatch-rate", "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) private int msgDispatchRate = -1; @@ -580,6 +580,43 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set replicator message-dispatch-rate for all topics of the namespace") + private class SetReplicatorDispatchRate extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Parameter(names = { "--msg-dispatch-rate", + "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) + private int msgDispatchRate = -1; + + @Parameter(names = { "--byte-dispatch-rate", + "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false) + private long byteDispatchRate = -1; + + @Parameter(names = { "--dispatch-rate-period", + "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false) + private int dispatchRatePeriodSec = 1; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + admin.namespaces().setReplicatorDispatchRate(namespace, + new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec)); + } + } + + @Parameters(commandDescription = "Get replicator configured message-dispatch-rate for all topics of the namespace (Disabled if value < 0)") + private class GetReplicatorDispatchRate extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(admin.namespaces().getReplicatorDispatchRate(namespace)); + } + } + @Parameters(commandDescription = "Get the backlog quota policies for a namespace") private class GetBacklogQuotaMap extends CliCommand { @Parameter(description = "tenant/namespace\n", required = true) @@ -1062,10 +1099,10 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("permissions", new Permissions()); jcommander.addCommand("grant-permission", new GrantPermissions()); jcommander.addCommand("revoke-permission", new RevokePermissions()); - + jcommander.addCommand("grant-subscription-permission", new GrantSubscriptionPermissions()); jcommander.addCommand("revoke-subscription-permission", new RevokeSubscriptionPermissions()); - + jcommander.addCommand("set-clusters", new SetReplicationClusters()); jcommander.addCommand("get-clusters", new GetReplicationClusters()); @@ -1102,6 +1139,9 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate()); jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate()); + jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate()); + jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate()); + jcommander.addCommand("clear-backlog", new ClearBacklog()); jcommander.addCommand("unsubscribe", new Unsubscribe()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 23728ddfa82f8..efa18e4b50adf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -34,8 +34,9 @@ public class Policies { public Set replication_clusters = Sets.newHashSet(); public BundlesData bundles; public Map backlog_quota_map = Maps.newHashMap(); - public Map clusterDispatchRate = Maps.newHashMap(); + public Map topicDispatchRate = Maps.newHashMap(); public Map subscriptionDispatchRate = Maps.newHashMap(); + public Map replicatorDispatchRate = Maps.newHashMap(); public Map clusterSubscribeRate = Maps.newHashMap(); public PersistencePolicies persistence = null; @@ -70,7 +71,8 @@ public class Policies { @Override public int hashCode() { return Objects.hash(auth_policies, replication_clusters, - backlog_quota_map, clusterDispatchRate, + backlog_quota_map, + topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate, clusterSubscribeRate, deduplicationEnabled, persistence, bundles, latency_stats_sample_rate, message_ttl_in_seconds, retention_policies, @@ -90,7 +92,9 @@ public boolean equals(Object obj) { return Objects.equals(auth_policies, other.auth_policies) && Objects.equals(replication_clusters, other.replication_clusters) && Objects.equals(backlog_quota_map, other.backlog_quota_map) - && Objects.equals(clusterDispatchRate, other.clusterDispatchRate) + && Objects.equals(topicDispatchRate, other.topicDispatchRate) + && Objects.equals(subscriptionDispatchRate, other.subscriptionDispatchRate) + && Objects.equals(replicatorDispatchRate, other.replicatorDispatchRate) && Objects.equals(clusterSubscribeRate, other.clusterSubscribeRate) && Objects.equals(deduplicationEnabled, other.deduplicationEnabled) && Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles) @@ -136,7 +140,9 @@ public String toString() { .add("replication_clusters", replication_clusters).add("bundles", bundles) .add("backlog_quota_map", backlog_quota_map).add("persistence", persistence) .add("deduplicationEnabled", deduplicationEnabled) - .add("clusterDispatchRate", clusterDispatchRate) + .add("topicDispatchRate", topicDispatchRate) + .add("subscriptionDispatchRate", subscriptionDispatchRate) + .add("replicatorDispatchRate", replicatorDispatchRate) .add("clusterSubscribeRate", clusterSubscribeRate) .add("latency_stats_sample_rate", latency_stats_sample_rate) .add("antiAffinityGroup", antiAffinityGroup) diff --git a/site2/docs/admin-api-namespaces.md b/site2/docs/admin-api-namespaces.md index f2cd9292b2af9..7cd0c1d9d72ce 100644 --- a/site2/docs/admin-api-namespaces.md +++ b/site2/docs/admin-api-namespaces.md @@ -577,7 +577,7 @@ $ pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \ ###### Java ```java -admin.namespaces().setDispatchRate(namespace, 1000, 1048576, 1) +admin.namespaces().setDispatchRate(namespace, new DispatchRate(1000, 1048576, 1)) ``` #### get configured message-rate @@ -611,6 +611,122 @@ admin.namespaces().getDispatchRate(namespace) ``` +#### set dispatch throttling for subscription + +It sets message dispatch rate for all the subscription of topics under a given namespace. +Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`). +dispatch rate is in second and it can be configured with `dispatch-rate-period`. Default value of `msg-dispatch-rate` and `byte-dispatch-rate` is -1 which +disables the throttling. + +###### CLI + +``` +$ pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \ + --msg-dispatch-rate 1000 \ + --byte-dispatch-rate 1048576 \ + --dispatch-rate-period 1 +``` + +###### REST + +``` +{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/subscriptionDispatchRate|operation/setDispatchRate} +``` + +###### Java + +```java +admin.namespaces().setSubscriptionDispatchRate(namespace, new DispatchRate(1000, 1048576, 1)) +``` + +#### get configured message-rate + +It shows configured message-rate for the namespace (topics under this namespace can dispatch this many messages per second) + +###### CLI + +``` +$ pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1 +``` + +```json +{ + "dispatchThrottlingRatePerTopicInMsg" : 1000, + "dispatchThrottlingRatePerTopicInByte" : 1048576, + "ratePeriodInSecond" : 1 +} +``` + +###### REST + +``` +{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/subscriptionDispatchRate|operation/getDispatchRate} +``` + +###### Java + +```java +admin.namespaces().getSubscriptionDispatchRate(namespace) +``` + +#### set dispatch throttling for subscription + +It sets message dispatch rate for all the replicator between replication clusters under a given namespace. +Dispatch rate can be restricted by number of message per X seconds (`msg-dispatch-rate`) or by number of message-bytes per X second (`byte-dispatch-rate`). +dispatch rate is in second and it can be configured with `dispatch-rate-period`. Default value of `msg-dispatch-rate` and `byte-dispatch-rate` is -1 which +disables the throttling. + +###### CLI + +``` +$ pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \ + --msg-dispatch-rate 1000 \ + --byte-dispatch-rate 1048576 \ + --dispatch-rate-period 1 +``` + +###### REST + +``` +{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/replicatorDispatchRate|operation/setDispatchRate} +``` + +###### Java + +```java +admin.namespaces().setReplicatorDispatchRate(namespace, new DispatchRate(1000, 1048576, 1)) +``` + +#### get configured message-rate + +It shows configured message-rate for the namespace (topics under this namespace can dispatch this many messages per second) + +###### CLI + +``` +$ pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1 +``` + +```json +{ + "dispatchThrottlingRatePerTopicInMsg" : 1000, + "dispatchThrottlingRatePerTopicInByte" : 1048576, + "ratePeriodInSecond" : 1 +} +``` + +###### REST + +``` +{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/replicatorDispatchRate|operation/getDispatchRate} +``` + +###### Java + +```java +admin.namespaces().getReplicatorDispatchRate(namespace) +``` + ### Namespace isolation Coming soon.