Skip to content

Commit

Permalink
Add cli cmd for subscription level dispatch-rate-limiter (apache#15862)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Jun 2, 2022
1 parent 8bcb921 commit b1003d1
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,19 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");

cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 3"));
verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",
DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(3)
.build());
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub"));
verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub"));
verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub");

cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
Expand Down Expand Up @@ -1293,6 +1306,19 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");

cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 2 -g"));
verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",
DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(2)
.build());
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g"));
verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1","sub");

cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
Expand Down Expand Up @@ -1461,10 +1462,18 @@ private class GetSubscriptionDispatchRate extends CliCommand {
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Parameter(names = {"--subscription", "-s"},
description = "Get message-dispatch-rate of a specific subscription")
private String subName;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
if (StringUtils.isBlank(subName)) {
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
} else {
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, subName, applied));
}
}
}

Expand Down Expand Up @@ -1495,16 +1504,24 @@ private class SetSubscriptionDispatchRate extends CliCommand {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Parameter(names = {"--subscription", "-s"},
description = "Set message-dispatch-rate for a specific subscription")
private String subName;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
DispatchRate.builder()
.dispatchThrottlingRateInMsg(msgDispatchRate)
.dispatchThrottlingRateInByte(byteDispatchRate)
.ratePeriodInSecond(dispatchRatePeriodSec)
.relativeToPublishRate(relativeToPublishRate)
.build());
DispatchRate rate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(msgDispatchRate)
.dispatchThrottlingRateInByte(byteDispatchRate)
.ratePeriodInSecond(dispatchRatePeriodSec)
.relativeToPublishRate(relativeToPublishRate)
.build();
if (StringUtils.isBlank(subName)) {
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, rate);
} else {
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, subName, rate);
}
}
}

Expand All @@ -1517,10 +1534,18 @@ private class RemoveSubscriptionDispatchRate extends CliCommand {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Parameter(names = {"--subscription", "-s"},
description = "Remove message-dispatch-rate for a specific subscription")
private String subName;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
if (StringUtils.isBlank(subName)) {
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
} else {
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic, subName);
}
}

}
Expand Down

0 comments on commit b1003d1

Please sign in to comment.