Skip to content

Commit

Permalink
maxSubscriptionsPerTopic support cross multiple clusters (apache#13623)
Browse files Browse the repository at this point in the history
  • Loading branch information
suiyuzeng authored Jan 12, 2022
1 parent 455470d commit 160ed8a
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3107,21 +3107,23 @@ protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers,

}

protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic() {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic(boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
}

protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic,
boolean isGlobal) {
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}

return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2152,11 +2152,12 @@ public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetMaxSubscriptionsPerTopic())
.thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
.exceptionally(ex -> {
Expand All @@ -2178,16 +2179,17 @@ public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
+ ", maxSubscriptions={}"
, clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic);
+ ", maxSubscriptions={}, isGlobal={}"
, clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic, isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Expand All @@ -2208,11 +2210,12 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,26 @@ public void testReplicatorCompactionThresholdPolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
}

@Test
public void testReplicateMaxSubscriptionsPerTopic() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, persistentTopicName);

//set max subscriptions per topic
admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 1024);

//get max subscriptions per topic
untilRemoteClustersAsserted(
admin -> assertEquals(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName),
Integer.valueOf(1024)));

//remove
admin1.topicPolicies(true).removeMaxSubscriptionsPerTopic(persistentTopicName);
untilRemoteClustersAsserted(
admin -> assertNull(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName)));
}

private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,14 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024"));
verify(mockTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");


// Reset the cmd, and check global option
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 -g"));
Expand Down Expand Up @@ -1225,6 +1233,13 @@ public void topicPolicies() throws Exception {
verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
verify(mockGlobalTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());

jcommander.addCommand("get-max-subscriptions-per-topic", new GetMaxSubscriptionsPerTopic());
jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic());
jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic());

}

@Parameters(commandDescription = "Get max consumers per subscription for a topic")
Expand Down Expand Up @@ -1427,6 +1431,59 @@ void run() throws PulsarAdminException {

}

@Parameters(commandDescription = "Get max subscriptions for a topic")
private class GetMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxSubscriptionsPerTopic(persistentTopic));
}
}

@Parameters(commandDescription = "Set max subscriptions for a topic")
private class SetMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--max-subscriptions-per-topic",
"-s"}, description = "max subscriptions for a topic (default -1 will be overwrite if not passed)",
required = true)
private int maxSubscriptionPerTopic;

@Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxSubscriptionsPerTopic(persistentTopic, maxSubscriptionPerTopic);
}
}

@Parameters(commandDescription = "Remove max subscriptions for a topic")
private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxSubscriptionsPerTopic(persistentTopic);
}
}

private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("get-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-dispatch-rate");

cmdUsageFormatter.addDeprecatedCommand("get-max-subscriptions-per-topic");
cmdUsageFormatter.addDeprecatedCommand("set-max-subscriptions-per-topic");
cmdUsageFormatter.addDeprecatedCommand("remove-max-subscriptions-per-topic");
}
}

Expand Down

0 comments on commit 160ed8a

Please sign in to comment.