Skip to content

Commit

Permalink
Restore clusterDispatchRate policy for compatibility (apache#6176)
Browse files Browse the repository at this point in the history
Co-authored-by: Sijie Guo <[email protected]>
  • Loading branch information
Masahiro Sakamoto and sijie authored Feb 10, 2020
1 parent 05c3218 commit 9b46930
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ protected PublishRate internalGetPublishRate() {
}
}

@SuppressWarnings("deprecation")
protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();
Expand All @@ -867,6 +868,7 @@ protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
policiesNode.getKey().clusterDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
Expand All @@ -892,11 +894,15 @@ protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) {
}
}

@SuppressWarnings("deprecation")
protected DispatchRate internalGetTopicDispatchRate() {
validateAdminAccessForTenant(namespaceName.getTenant());

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate == null) {
dispatchRate = policies.clusterDispatchRate.get(pulsar().getConfiguration().getClusterName());
}
if (dispatchRate != null) {
return dispatchRate;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceCon
return true;
}

@SuppressWarnings("deprecation")
public void onPoliciesUpdate(Policies data) {
String cluster = brokerService.pulsar().getConfiguration().getClusterName();

Expand All @@ -194,6 +195,9 @@ public void onPoliciesUpdate(Policies data) {
switch (type) {
case TOPIC:
dispatchRate = data.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
dispatchRate = data.clusterDispatchRate.get(cluster);
}
break;
case SUBSCRIPTION:
dispatchRate = data.subscriptionDispatchRate.get(cluster);
Expand All @@ -219,13 +223,17 @@ public void onPoliciesUpdate(Policies data) {
}
}

@SuppressWarnings("deprecation")
public static DispatchRate getPoliciesDispatchRate(final String cluster, Optional<Policies> policies, Type type) {
// return policy-dispatch rate only if it's enabled in policies
return policies.map(p -> {
DispatchRate dispatchRate;
switch (type) {
case TOPIC:
dispatchRate = p.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
dispatchRate = p.clusterDispatchRate.get(cluster);
}
break;
case SUBSCRIPTION:
dispatchRate = p.subscriptionDispatchRate.get(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
*/
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;

import static org.testng.Assert.assertNotNull;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -37,6 +40,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -101,6 +105,7 @@ enum DispatchRateType {
*
* @throws Exception
*/
@SuppressWarnings("deprecation")
@Test
public void testMessageRateDynamicallyChange() throws Exception {

Expand All @@ -116,7 +121,7 @@ public void testMessageRateDynamicallyChange() throws Exception {
// (1) verify message-rate is -1 initially
Assert.assertFalse(topic.getDispatchRateLimiter().isPresent());

// (1) change to 100
// (2) change to 100
int messageRate = 100;
DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 360);
admin.namespaces().setDispatchRate(namespace, dispatchRate);
Expand All @@ -134,8 +139,13 @@ public void testMessageRateDynamicallyChange() throws Exception {
}
Assert.assertTrue(isDispatchRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
Policies policies = admin.namespaces().getPolicies(namespace);
Map<String, DispatchRate> dispatchRateMap = Maps.newHashMap();
dispatchRateMap.put("test", dispatchRate);
Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap);
Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap);

// (1) change to 500
// (3) change to 500
messageRate = 500;
dispatchRate = new DispatchRate(-1, messageRate, 360);
admin.namespaces().setDispatchRate(namespace, dispatchRate);
Expand All @@ -152,6 +162,10 @@ public void testMessageRateDynamicallyChange() throws Exception {
}
Assert.assertTrue(isDispatchRateUpdate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
policies = admin.namespaces().getPolicies(namespace);
dispatchRateMap.put("test", dispatchRate);
Assert.assertEquals(policies.clusterDispatchRate, dispatchRateMap);
Assert.assertEquals(policies.topicDispatchRate, dispatchRateMap);

producer.close();
}
Expand Down Expand Up @@ -896,6 +910,67 @@ public void testClosingRateLimiter(SubscriptionType subscription) throws Excepti
log.info("-- Exiting {} test --", methodName);
}

@SuppressWarnings("deprecation")
@Test
public void testDispatchRateCompatibility1() throws Exception {
final String cluster = "test";

Optional<Policies> policies = Optional.of(new Policies());
DispatchRate clusterDispatchRate = new DispatchRate(100, 512, 1);
DispatchRate topicDispatchRate = new DispatchRate(200, 1024, 1);

// (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled
DispatchRate dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies,
DispatchRateLimiter.Type.TOPIC);
Assert.assertNull(dispatchRate);

// (2) If topicDispatchRate is empty, clusterDispatchRate is effective
policies.get().clusterDispatchRate.put(cluster, clusterDispatchRate);
dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, DispatchRateLimiter.Type.TOPIC);
Assert.assertEquals(dispatchRate, clusterDispatchRate);

// (3) If topicDispatchRate is not empty, topicDispatchRate is effective
policies.get().topicDispatchRate.put(cluster, topicDispatchRate);
dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, DispatchRateLimiter.Type.TOPIC);
Assert.assertEquals(dispatchRate, topicDispatchRate);
}

@SuppressWarnings("deprecation")
@Test
public void testDispatchRateCompatibility2() throws Exception {
final String namespace = "my-property/dispatch-rate-compatibility";
final String topicName = "persistent://" + namespace + "/t1";
final String cluster = "test";
admin.namespaces().createNamespace(namespace, Sets.newHashSet(cluster));
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
DispatchRateLimiter dispatchRateLimiter = new DispatchRateLimiter(topic, DispatchRateLimiter.Type.TOPIC);

Policies policies = new Policies();
DispatchRate clusterDispatchRate = new DispatchRate(100, 512, 1);
DispatchRate topicDispatchRate = new DispatchRate(200, 1024, 1);

// (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled
dispatchRateLimiter.onPoliciesUpdate(policies);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);

// (2) If topicDispatchRate is empty, clusterDispatchRate is effective
policies.clusterDispatchRate.put(cluster, clusterDispatchRate);
dispatchRateLimiter.onPoliciesUpdate(policies);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 512);

// (3) If topicDispatchRate is not empty, topicDispatchRate is effective
policies.topicDispatchRate.put(cluster, topicDispatchRate);
dispatchRateLimiter.onPoliciesUpdate(policies);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 200);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 1024);

producer.close();
topic.close().get();
}

protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
statsUpdaterField.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Policies {
public BundlesData bundles;
@SuppressWarnings("checkstyle:MemberName")
public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlog_quota_map = Maps.newHashMap();
@Deprecated
public Map<String, DispatchRate> clusterDispatchRate = Maps.newHashMap();
public Map<String, DispatchRate> topicDispatchRate = Maps.newHashMap();
public Map<String, DispatchRate> subscriptionDispatchRate = Maps.newHashMap();
public Map<String, DispatchRate> replicatorDispatchRate = Maps.newHashMap();
Expand Down Expand Up @@ -103,7 +105,7 @@ public class Policies {
@Override
public int hashCode() {
return Objects.hash(auth_policies, replication_clusters,
backlog_quota_map, publishMaxMessageRate,
backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate,
clusterSubscribeRate, deduplicationEnabled, persistence,
bundles, latency_stats_sample_rate,
Expand All @@ -128,6 +130,7 @@ public boolean equals(Object obj) {
return Objects.equals(auth_policies, other.auth_policies)
&& Objects.equals(replication_clusters, other.replication_clusters)
&& Objects.equals(backlog_quota_map, other.backlog_quota_map)
&& Objects.equals(clusterDispatchRate, other.clusterDispatchRate)
&& Objects.equals(topicDispatchRate, other.topicDispatchRate)
&& Objects.equals(subscriptionDispatchRate, other.subscriptionDispatchRate)
&& Objects.equals(replicatorDispatchRate, other.replicatorDispatchRate)
Expand Down Expand Up @@ -182,6 +185,7 @@ public String toString() {
.add("replication_clusters", replication_clusters).add("bundles", bundles)
.add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
.add("deduplicationEnabled", deduplicationEnabled)
.add("clusterDispatchRate", clusterDispatchRate)
.add("topicDispatchRate", topicDispatchRate)
.add("subscriptionDispatchRate", subscriptionDispatchRate)
.add("replicatorDispatchRate", replicatorDispatchRate)
Expand Down

0 comments on commit 9b46930

Please sign in to comment.