From b1003d1281d690e3da37d44718d4f335f4cbd23f Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Thu, 2 Jun 2022 11:01:21 +0800 Subject: [PATCH] Add cli cmd for subscription level dispatch-rate-limiter (#15862) --- .../pulsar/admin/cli/PulsarAdminToolTest.java | 26 +++++++++++ .../pulsar/admin/cli/CmdTopicPolicies.java | 43 +++++++++++++++---- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index dd48ff0f317f4..2b46d2ab76042 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -995,6 +995,19 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1"); + cmdTopics = new CmdTopicPolicies(() -> admin); + cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 3")); + verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub", + DispatchRate.builder() + .dispatchThrottlingRateInMsg(-1) + .dispatchThrottlingRateInByte(-1) + .ratePeriodInSecond(3) + .build()); + cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub")); + verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false); + cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub")); + verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub"); + cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0")); @@ -1293,6 +1306,19 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1"); + cmdTopics = new CmdTopicPolicies(() -> admin); + cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 2 -g")); + verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub", + DispatchRate.builder() + .dispatchThrottlingRateInMsg(-1) + .dispatchThrottlingRateInByte(-1) + .ratePeriodInSecond(2) + .build()); + cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g")); + verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false); + cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g")); + verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1","sub"); + cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 1314f649c86d2..8a4821df1525e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.TopicPolicies; @@ -1461,10 +1462,18 @@ private class GetSubscriptionDispatchRate extends CliCommand { + "If set to true, broker returned global topic policies") private boolean isGlobal = false; + @Parameter(names = {"--subscription", "-s"}, + description = "Get message-dispatch-rate of a specific subscription") + private String subName; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied)); + if (StringUtils.isBlank(subName)) { + print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied)); + } else { + print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, subName, applied)); + } } } @@ -1495,16 +1504,24 @@ private class SetSubscriptionDispatchRate extends CliCommand { + "If set to true, the policy will be replicate to other clusters asynchronously") private boolean isGlobal = false; + @Parameter(names = {"--subscription", "-s"}, + description = "Set message-dispatch-rate for a specific subscription") + private String subName; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, - DispatchRate.builder() - .dispatchThrottlingRateInMsg(msgDispatchRate) - .dispatchThrottlingRateInByte(byteDispatchRate) - .ratePeriodInSecond(dispatchRatePeriodSec) - .relativeToPublishRate(relativeToPublishRate) - .build()); + DispatchRate rate = DispatchRate.builder() + .dispatchThrottlingRateInMsg(msgDispatchRate) + .dispatchThrottlingRateInByte(byteDispatchRate) + .ratePeriodInSecond(dispatchRatePeriodSec) + .relativeToPublishRate(relativeToPublishRate) + .build(); + if (StringUtils.isBlank(subName)) { + getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, rate); + } else { + getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, subName, rate); + } } } @@ -1517,10 +1534,18 @@ private class RemoveSubscriptionDispatchRate extends CliCommand { + "If set to true, the policy will be replicate to other clusters asynchronously") private boolean isGlobal = false; + @Parameter(names = {"--subscription", "-s"}, + description = "Remove message-dispatch-rate for a specific subscription") + private String subName; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic); + if (StringUtils.isBlank(subName)) { + getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic); + } else { + getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic, subName); + } } }