Skip to content

Commit

Permalink
Subscribe rate support cross multiple clusters (apache#13561)
Browse files Browse the repository at this point in the history
Subscribe rate support cross multiple clusters
  • Loading branch information
Jason918 authored Dec 30, 2021
1 parent a07e7a1 commit 6c6b913
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4320,8 +4320,8 @@ protected CompletableFuture<Void> internalRemovePublishRate(boolean isGlobal) {
});
}

protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscribeRate)
.orElseGet(() -> {
if (applied) {
Expand All @@ -4333,25 +4333,27 @@ protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean appl
}));
}

protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate, boolean isGlobal) {
if (subscribeRate == null) {
return CompletableFuture.completedFuture(null);
}
return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscribeRate(subscribeRate);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}

protected CompletableFuture<Void> internalRemoveSubscribeRate() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setSubscribeRate(null);
op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3368,11 +3368,12 @@ public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetSubscribeRate(applied))
.thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
return null;
Expand All @@ -3392,20 +3393,22 @@ public void setSubscribeRate(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetSubscribeRate(subscribeRate))
.thenCompose(__ -> internalSetSubscribeRate(subscribeRate, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic subscribe rate:"
+ " tenant={}, namespace={}, topic={}, subscribeRate={}",
+ " tenant={}, namespace={}, topic={}, isGlobal={} subscribeRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
isGlobal,
jsonMapper().writeValueAsString(subscribeRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
Expand All @@ -3428,17 +3431,20 @@ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveSubscribeRate())
.thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}",
log.info(
"[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
topicName.getLocalName(),
isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -114,6 +115,24 @@ public void testReplicateMessageTTLPolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getMessageTTL(topic)));
}

@Test
public void testReplicateSubscribeRatePolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
// set global topic policy
SubscribeRate subscribeRate = new SubscribeRate(100, 10000);
admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate);

// get global topic policy
untilRemoteClustersAsserted(
admin -> assertEquals(admin.topicPolicies(true).getSubscribeRate(topic), subscribeRate));

// remove global topic policy
admin1.topicPolicies(true).removeSubscribeRate(topic);
untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getSubscribeRate(topic)));
}

@Test
public void testReplicatePublishRatePolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopicsPolicies).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-subscribe-rate persistent://myprop/clust/ns1/ds1 -sr 10 -st 100"));
verify(mockTopicsPolicies).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new SubscribeRate(10, 100));
cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxConsumers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-consumers persistent://myprop/clust/ns1/ds1"));
Expand Down Expand Up @@ -1103,6 +1110,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
verify(mockGlobalTopicsPolicies).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("set-subscribe-rate persistent://myprop/clust/ns1/ds1 -sr 10 -st 100 -g"));
verify(mockGlobalTopicsPolicies).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new SubscribeRate(10, 100));
cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RelativeTimeUtil;

@Parameters(commandDescription = "Operations on persistent topics")
Expand Down Expand Up @@ -88,6 +89,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());

jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());

jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
Expand Down Expand Up @@ -781,6 +786,64 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get consumer subscribe rate for a topic")
private class GetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returns global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getSubscribeRate(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Set consumer subscribe rate for a topic")
private class SetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--subscribe-rate",
"-sr" }, description = "subscribe-rate (default -1 will be overwrite if not passed)", required = false)
private int subscribeRate = -1;

@Parameter(names = { "--subscribe-rate-period",
"-st" }, description = "subscribe-rate-period in second type (default 30 second will be overwrite if not passed)", required = false)
private int subscribeRatePeriodSec = 30;

@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setSubscribeRate(persistentTopic,
new SubscribeRate(subscribeRate, subscribeRatePeriodSec));
}
}

@Parameters(commandDescription = "Remove consumer subscribe rate for a topic")
private class RemoveSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeSubscribeRate(persistentTopic);
}
}

@Parameters(commandDescription = "Get the persistence policies for a topic")
private class GetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("set-publish-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-publish-rate");

cmdUsageFormatter.addDeprecatedCommand("get-subscribe-rate");
cmdUsageFormatter.addDeprecatedCommand("set-subscribe-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-subscribe-rate");

cmdUsageFormatter.addDeprecatedCommand("get-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("set-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("remove-maxProducers");
Expand Down

0 comments on commit 6c6b913

Please sign in to comment.