From a07e7a1cb6cda7a4dc04ac648d71222e9d8147f4 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Thu, 30 Dec 2021 16:06:46 +0800 Subject: [PATCH] Max unacked messages per consumer support cross multiple clusters (#13547) Max unacked messages per consumer support cross multiple clusters --- .../admin/impl/PersistentTopicsBase.java | 8 ++- .../broker/admin/v2/PersistentTopics.java | 9 ++- .../service/ReplicatorTopicPoliciesTest.java | 19 ++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 19 +++++- .../pulsar/admin/cli/CmdTopicPolicies.java | 59 +++++++++++++++++++ .../apache/pulsar/admin/cli/CmdTopics.java | 4 ++ 6 files changed, 110 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c47c4a00f4950..694f554d85cf7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -924,8 +924,8 @@ protected CompletableFuture internalSetMaxUnackedMessagesOnSubscription(In }); } - protected CompletableFuture internalGetMaxUnackedMessagesOnConsumer(boolean applied) { - return getTopicPoliciesAsyncWithRetry(topicName) + protected CompletableFuture internalGetMaxUnackedMessagesOnConsumer(boolean applied, boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer) .orElseGet(() -> { if (applied) { @@ -936,7 +936,8 @@ protected CompletableFuture internalGetMaxUnackedMessagesOnConsumer(boo })); } - protected CompletableFuture internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) { + protected CompletableFuture internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum, + boolean isGlobal) { if (maxUnackedNum != null && maxUnackedNum < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxUnackedNum must be 0 or more"); @@ -946,6 +947,7 @@ protected CompletableFuture internalSetMaxUnackedMessagesOnConsumer(Intege .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum); + topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 9c8cb2584fdcf..e2132784c76e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -349,11 +349,12 @@ public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse async @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("applied") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied)) + .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse); return null; @@ -370,13 +371,14 @@ public void setMaxUnackedMessagesOnConsumer( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Max unacked messages on consumer policies for the specified topic") Integer maxUnackedNum) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum)) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", ex, asyncResponse); @@ -393,11 +395,12 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null)) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", ex, asyncResponse); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index 4bc7543c8047a..365fe007bb3b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -230,6 +230,25 @@ public void testReplicatorMaxConsumerPerSubPolicies() throws Exception { assertNull(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic))); } + @Test + public void testReplicateMaxUnackedMsgPerConsumer() throws Exception { + final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); + final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); + init(namespace, topic); + // set max unacked msgs per consumers + admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100); + Awaitility.await().ignoreExceptions().untilAsserted(() -> + assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100)); + Awaitility.await().ignoreExceptions().untilAsserted(() -> + assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100)); + // remove max unacked msgs per consumers + admin1.topicPolicies(true).removeMaxUnackedMessagesOnConsumer(topic); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic))); + } + @Test public void testReplicatorTopicPolicies() throws Exception { final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 7502860b7b1b9..f6a68e9f2fc90 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -964,6 +964,14 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99")); verify(mockTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies, times(1)).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies, times(1)) + .getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999")); + verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999); + cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false); cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10")); @@ -1037,6 +1045,15 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("set-max-producers persistent://myprop/clust/ns1/ds1 -p 99 -g")); verify(mockGlobalTopicsPolicies).setMaxProducers("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies, times(1)) + .removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies, times(1)) + .getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999 -g")); + verify(mockGlobalTopicsPolicies, times(1)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999); + cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false); cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10 -g")); @@ -1068,8 +1085,6 @@ public void topicPolicies() throws Exception { verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1" , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); - - cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g")); verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("remove-dispatch-rate persistent://myprop/clust/ns1/ds1 -g")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 0003d83690234..aa906ab6fea53 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -50,6 +50,11 @@ public CmdTopicPolicies(Supplier admin) { jcommander.addCommand("get-message-ttl", new GetMessageTTL()); jcommander.addCommand("set-message-ttl", new SetMessageTTL()); jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL()); + + jcommander.addCommand("get-max-unacked-messages-per-consumer", new GetMaxUnackedMessagesPerConsumer()); + jcommander.addCommand("set-max-unacked-messages-per-consumer", new SetMaxUnackedMessagesPerConsumer()); + jcommander.addCommand("remove-max-unacked-messages-per-consumer", new RemoveMaxUnackedMessagesPerConsumer()); + jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription()); jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription()); jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription()); @@ -152,6 +157,60 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get max unacked messages policy per consumer for a topic") + private class GetMaxUnackedMessagesPerConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnConsumer(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Remove max unacked messages policy per consumer for a topic") + private class RemoveMaxUnackedMessagesPerConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnConsumer(persistentTopic); + } + } + + @Parameters(commandDescription = "Set max unacked messages policy per consumer for a topic") + private class SetMaxUnackedMessagesPerConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true) + private int maxNum; + + @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum); + } + } + @Parameters(commandDescription = "Get the message TTL for a topic") private class GetMessageTTL extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9e25332d52fa9..58761656aef19 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -258,6 +258,10 @@ private void initDeprecatedCommands() { cmdUsageFormatter.addDeprecatedCommand("set-max-consumers"); cmdUsageFormatter.addDeprecatedCommand("remove-max-consumers"); + cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-per-consumer"); + cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-per-consumer"); + cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-per-consumer"); + cmdUsageFormatter.addDeprecatedCommand("get-message-ttl"); cmdUsageFormatter.addDeprecatedCommand("set-message-ttl"); cmdUsageFormatter.addDeprecatedCommand("remove-message-ttl");