From a47189691d15852dfad31e83a863e846ff156162 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Wed, 29 Dec 2021 20:20:02 +0800 Subject: [PATCH] Max unacked messages on subscription support cross multiple clusters (#13549) * Max unacked messages per subscription support cross multiple clusters --- .../admin/impl/PersistentTopicsBase.java | 11 ++-- .../broker/admin/v2/PersistentTopics.java | 9 ++- .../service/ReplicatorTopicPoliciesTest.java | 19 ++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 14 +++++ .../pulsar/admin/cli/CmdTopicPolicies.java | 58 ++++++++++++++++++- .../apache/pulsar/admin/cli/CmdTopics.java | 4 ++ 6 files changed, 107 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 28b8321b4bf03..2bce3114df1b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -894,8 +894,9 @@ private CompletableFuture internalUpdateOffloadPolicies(OffloadPoliciesImp }); } - protected CompletableFuture internalGetMaxUnackedMessagesOnSubscription(boolean applied) { - return getTopicPoliciesAsyncWithRetry(topicName) + protected CompletableFuture internalGetMaxUnackedMessagesOnSubscription(boolean applied, + boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription) .orElseGet(() -> { if (applied) { @@ -907,16 +908,18 @@ protected CompletableFuture internalGetMaxUnackedMessagesOnSubscription })); } - protected CompletableFuture internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) { + protected CompletableFuture internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum, + boolean isGlobal) { if (maxUnackedNum != null && maxUnackedNum < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxUnackedNum must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName) + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum); + topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index fd8a3a4d264d9..938da358e21c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -555,11 +555,12 @@ public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse a @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("applied") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied)) + .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal)) .thenApply(asyncResponse::resume) .exceptionally(ex -> { handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", ex, asyncResponse); @@ -577,6 +578,7 @@ public void setMaxUnackedMessagesOnSubscription( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Max unacked messages on subscription policies for the specified topic") @@ -584,7 +586,7 @@ public void setMaxUnackedMessagesOnSubscription( validateTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); preValidation(authoritative) - .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum)) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", ex, asyncResponse); @@ -603,12 +605,13 @@ public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncRespons @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); preValidation(authoritative) - .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null)) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", ex, asyncResponse); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index 1a6abbddfd32f..9737c46aa1388 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -352,6 +352,25 @@ public void testReplicatorInactiveTopicPolicies() throws Exception { assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName))); } + @Test + public void testReplicateMaxUnackedMsgPerSub() throws Exception { + final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); + final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); + init(namespace, topic); + // set max unacked msgs per sub + admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100); + Awaitility.await().ignoreExceptions().untilAsserted(() -> + assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100)); + Awaitility.await().ignoreExceptions().untilAsserted(() -> + assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100)); + // remove max unacked msgs per sub + admin1.topicPolicies(true).removeMaxUnackedMessagesOnSubscription(topic); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic))); + } + private void init(String namespace, String topic) throws PulsarAdminException, PulsarClientException, PulsarServerException { final String cluster2 = pulsar2.getConfig().getClusterName(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a27147ab97dc8..812b27ed032c5 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -966,6 +966,13 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies, times(1)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies, times(1)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99")); + verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable")); @@ -1033,6 +1040,13 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies, times(1)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies, times(1)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99 -g")); + verify(mockGlobalTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false); cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index dd7a79f6eb9e7..bcecb590fd2b4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -33,7 +33,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -87,6 +86,10 @@ public CmdTopicPolicies(Supplier admin) { jcommander.addCommand("set-dispatch-rate", new SetDispatchRate()); jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate()); + jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription()); + jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription()); + jcommander.addCommand("remove-max-unacked-messages-per-subscription", new RemoveMaxUnackedMessagesPerSubscription()); + jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies()); jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies()); jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies()); @@ -397,6 +400,59 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get max unacked messages policy per subscription for a topic") + private class GetMaxUnackedMessagesPerSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, the removing operation will be replicate to other clusters asynchronously") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnSubscription(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Remove max unacked messages policy per subscription for a topic") + private class RemoveMaxUnackedMessagesPerSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. " + + "If set to true, the removing operation will be replicate to other clusters asynchronously") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnSubscription(persistentTopic); + } + } + + @Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic") + private class SetMaxUnackedMessagesPerSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true) + private int maxNum; + + @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. " + + "If set to true, the removing operation will be replicate to other clusters asynchronously") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum); + } + } @Parameters(commandDescription = "Get max number of producers for a topic") private class GetMaxProducers extends CliCommand { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 69b976cb4a3fe..dcbdc9b6c7f71 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -306,6 +306,10 @@ private void initDeprecatedCommands() { cmdUsageFormatter.addDeprecatedCommand("set-deduplication"); cmdUsageFormatter.addDeprecatedCommand("remove-deduplication"); + cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-subscription"); + cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription"); + cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription"); + cmdUsageFormatter.addDeprecatedCommand("set-subscription-types-enabled"); cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled"); cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");