Skip to content

Commit

Permalink
Subscription dispatch rate support cross multiple clusters (apache#13511
Browse files Browse the repository at this point in the history
)

* Subscription dispatch rate support cross multiple clusters
  • Loading branch information
mattisonchao authored Dec 30, 2021
1 parent 33c9dea commit 24e79d0
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4147,8 +4147,8 @@ protected CompletableFuture<Void> internalRemoveDispatchRate(boolean isGlobal) {
});
}

protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate)
.orElseGet(() -> {
if (applied) {
Expand All @@ -4160,25 +4160,29 @@ protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(bo
}));
}

protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
protected CompletableFuture<Void> internalSetSubscriptionDispatchRate
(DispatchRateImpl dispatchRate, boolean isGlobal) {
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionDispatchRate(dispatchRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setSubscriptionDispatchRate(null);
TopicPolicies topicPolicies = op.get();
topicPolicies.setSubscriptionDispatchRate(null);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2885,11 +2885,12 @@ public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResp
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetSubscriptionDispatchRate(applied))
.thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getSubscriptionDispatchRate", ex, asyncResponse);
Expand All @@ -2912,11 +2913,12 @@ public void setSubscriptionDispatchRate(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Subscription message dispatch rate for the specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate))
.thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic subscription dispatch rate:"
Expand Down Expand Up @@ -2947,11 +2949,12 @@ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncR
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveSubscriptionDispatchRate())
.thenCompose(__ -> internalRemoveSubscriptionDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
clientAppId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,35 @@ public void testReplicatorInactiveTopicPolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
}

@Test
public void testReplicatorSubscriptionDispatchRatePolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

init(namespace, persistentTopicName);
// set subscription dispatch rate
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1)
.ratePeriodInSecond(1)
.dispatchThrottlingRateInByte(1)
.relativeToPublishRate(true)
.build();
admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, dispatchRate);
// get subscription dispatch rate
Awaitility.await().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true)
.getSubscriptionDispatchRate(persistentTopicName), dispatchRate));
Awaitility.await().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true)
.getSubscriptionDispatchRate(persistentTopicName), dispatchRate));

//remove subscription dispatch rate
admin1.topicPolicies(true).removeSubscriptionDispatchRate(persistentTopicName);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
}
@Test
public void testReplicateMaxUnackedMsgPerSub() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,18 @@ public void topicPolicies() throws Exception {
.ratePeriodInSecond(2)
.build());

cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(2)
.build());
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
Expand Down Expand Up @@ -1104,6 +1116,18 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).removeMaxConsumers("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99 -g"));
verify(mockGlobalTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2 -g"));
verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(2)
.build());
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.admin.TopicPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
Expand Down Expand Up @@ -74,6 +75,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());

jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());

jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
Expand Down Expand Up @@ -938,6 +943,80 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic")
private class SetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--msg-dispatch-rate",
"-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)")
private int msgDispatchRate = -1;

@Parameter(names = { "--byte-dispatch-rate",
"-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)")
private long byteDispatchRate = -1;

@Parameter(names = { "--dispatch-rate-period",
"-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)")
private int dispatchRatePeriodSec = 1;

@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))")
private boolean relativeToPublishRate = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
DispatchRate.builder()
.dispatchThrottlingRateInMsg(msgDispatchRate)
.dispatchThrottlingRateInByte(byteDispatchRate)
.ratePeriodInSecond(dispatchRatePeriodSec)
.relativeToPublishRate(relativeToPublishRate)
.build());
}
}

@Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic")
private class RemoveSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
}

}

private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("get-max-producers");
cmdUsageFormatter.addDeprecatedCommand("set-max-producers");
cmdUsageFormatter.addDeprecatedCommand("remove-max-producers");

cmdUsageFormatter.addDeprecatedCommand("get-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-dispatch-rate");
}
}

Expand Down

0 comments on commit 24e79d0

Please sign in to comment.