Skip to content

Commit

Permalink
Max unacked messages on subscription support cross multiple clusters (a…
Browse files Browse the repository at this point in the history
…pache#13549)

* Max unacked messages per subscription support cross multiple clusters
  • Loading branch information
315157973 authored Dec 29, 2021
1 parent b5fbeb4 commit a471896
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,9 @@ private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImp
});
}

protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription)
.orElseGet(() -> {
if (applied) {
Expand All @@ -907,16 +908,18 @@ protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription
}));
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum,
boolean isGlobal) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}

return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,12 @@ public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse a
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied))
.thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", ex, asyncResponse);
Expand All @@ -577,14 +578,15 @@ public void setMaxUnackedMessagesOnSubscription(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", ex, asyncResponse);
Expand All @@ -603,12 +605,13 @@ public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncRespons
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
preValidation(authoritative)
.thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", ex, asyncResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,25 @@ public void testReplicatorInactiveTopicPolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
}

@Test
public void testReplicateMaxUnackedMsgPerSub() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
// set max unacked msgs per sub
admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100);
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
// remove max unacked msgs per sub
admin1.topicPolicies(true).removeMaxUnackedMessagesOnSubscription(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
}

private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies, times(1)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies, times(1)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
Expand Down Expand Up @@ -1033,6 +1040,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies, times(1)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies, times(1)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99 -g"));
verify(mockGlobalTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -87,6 +86,10 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());

jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
jcommander.addCommand("remove-max-unacked-messages-per-subscription", new RemoveMaxUnackedMessagesPerSubscription());

jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies());
Expand Down Expand Up @@ -397,6 +400,59 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get max unacked messages policy per subscription for a topic")
private class GetMaxUnackedMessagesPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnSubscription(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Remove max unacked messages policy per subscription for a topic")
private class RemoveMaxUnackedMessagesPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnSubscription(persistentTopic);
}
}

@Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
private class SetMaxUnackedMessagesPerSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
private int maxNum;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
}
}

@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");

cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");

cmdUsageFormatter.addDeprecatedCommand("set-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");
Expand Down

0 comments on commit a471896

Please sign in to comment.