diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 980ecd594f46e..449794bf2bcde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -838,8 +838,9 @@ protected CompletableFuture internalSetOffloadPolicies(OffloadPoliciesImpl }); } - protected CompletableFuture internalGetInactiveTopicPolicies(boolean applied) { - return getTopicPoliciesAsyncWithRetry(topicName) + protected CompletableFuture internalGetInactiveTopicPolicies + (boolean applied, boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies) .orElseGet(() -> { if (applied) { @@ -853,10 +854,12 @@ protected CompletableFuture internalGetInactiveTopicPolic })); } - protected CompletableFuture internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) { - return getTopicPoliciesAsyncWithRetry(topicName) + protected CompletableFuture internalSetInactiveTopicPolicies + (InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setIsGlobal(isGlobal); topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 73b1d20b8c5b4..f031841653e61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -486,11 +486,12 @@ public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("applied") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalGetInactiveTopicPolicies(applied)) + .thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal)) .thenApply(asyncResponse::resume).exceptionally(ex -> { handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse); return null; @@ -508,11 +509,12 @@ public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "inactive topic policies for the specified topic") InactiveTopicPolicies inactiveTopicPolicies) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies)) + .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("setInactiveTopicPolicies", ex, asyncResponse); @@ -529,11 +531,12 @@ public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResp @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); preValidation(authoritative) - .thenCompose(__ -> internalSetInactiveTopicPolicies(null)) + .thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { handleTopicPolicyException("deleteInactiveTopicPolicies", ex, asyncResponse); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index c33c3d8e4878d..5e5d5f1bea5c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -32,6 +32,8 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -296,6 +298,30 @@ public void testReplicatorMessageDispatchRatePolicies() throws Exception { assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName))); } + @Test + public void testReplicatorInactiveTopicPolicies() throws Exception { + final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); + final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); + init(namespace, persistentTopicName); + + // set InactiveTopicPolicies + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true); + admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies); + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.topicPolicies(true) + .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies)); + Awaitility.await().untilAsserted(() -> + assertEquals(admin3.topicPolicies(true) + .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies)); + // remove InactiveTopicPolicies + admin1.topicPolicies(true).removeInactiveTopicPolicies(persistentTopicName); + Awaitility.await().untilAsserted(() -> + assertNull(admin2.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName))); + Awaitility.await().untilAsserted(() -> + assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName))); + } + private void init(String namespace, String topic) throws PulsarAdminException, PulsarClientException, PulsarServerException { final String cluster2 = pulsar2.getConfig().getClusterName(); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 40086eef3ae93..c4b4fcc4707d3 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -908,6 +908,14 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false); + cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1")); + verify(mockTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1" + + " -e -t 1s -m delete_when_no_subscriptions")); + verify(mockTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1" + , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1")); verify(mockTopicsPolicies).getMaxProducers("persistent://myprop/clust/ns1/ds1", false); cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1")); @@ -1022,6 +1030,16 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false); + cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g")); + verify(mockGlobalTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1" + + " -e -t 1s -m delete_when_no_subscriptions -g")); + verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1" + , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); + + cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g")); verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 8be0fb35e9ac3..bcd700167fb09 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -33,6 +33,9 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.RelativeTimeUtil; @@ -78,6 +81,11 @@ public CmdTopicPolicies(Supplier admin) { jcommander.addCommand("get-dispatch-rate", new GetDispatchRate()); jcommander.addCommand("set-dispatch-rate", new SetDispatchRate()); jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate()); + + jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies()); + jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies()); + jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies()); + } @Parameters(commandDescription = "Get max consumers per subscription for a topic") @@ -733,6 +741,87 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get the inactive topic policies on a topic") + private class GetInactiveTopicPolicies extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getInactiveTopicPolicies(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Set the inactive topic policies on a topic") + private class SetInactiveTopicPolicies extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive") + private boolean enableDeleteWhileInactive = false; + + @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive") + private boolean disableDeleteWhileInactive = false; + + @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" + + ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true) + private String deleteInactiveTopicsMaxInactiveDuration; + + @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" + + ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true) + private String inactiveTopicDeleteMode; + + @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. " + + "If set to true, the policy will be replicate to other clusters asynchronously") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + long maxInactiveDurationInSeconds; + try { + maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds( + RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration)); + } catch (IllegalArgumentException exception) { + throw new ParameterException(exception.getMessage()); + } + + if (enableDeleteWhileInactive == disableDeleteWhileInactive) { + throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive"); + } + InactiveTopicDeleteMode deleteMode; + try { + deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode); + } catch (IllegalArgumentException e) { + throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up"); + } + getTopicPolicies(isGlobal).setInactiveTopicPolicies(persistentTopic, + new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive)); + } + } + + @Parameters(commandDescription = "Remove inactive topic policies from a topic") + private class RemoveInactiveTopicPolicies extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. " + + "If set to true, the removing operation will be replicate to other clusters asynchronously") + private boolean isGlobal = false; + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).removeInactiveTopicPolicies(persistentTopic); + } + } + private TopicPolicies getTopicPolicies(boolean isGlobal) { return getAdmin().topicPolicies(isGlobal); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 53b45eaae1207..99ab4850a3a99 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -290,6 +290,10 @@ private void initDeprecatedCommands() { cmdUsageFormatter.addDeprecatedCommand("set-persistence"); cmdUsageFormatter.addDeprecatedCommand("remove-persistence"); + cmdUsageFormatter.addDeprecatedCommand("get-inactive-topic-policies"); + cmdUsageFormatter.addDeprecatedCommand("set-inactive-topic-policies"); + cmdUsageFormatter.addDeprecatedCommand("remove-inactive-topic-policies"); + cmdUsageFormatter.addDeprecatedCommand("get-dispatch-rate"); cmdUsageFormatter.addDeprecatedCommand("set-dispatch-rate"); cmdUsageFormatter.addDeprecatedCommand("remove-dispatch-rate");