Skip to content

Commit

Permalink
Inactive topic policies support cross multiple clusters (apache#13509)
Browse files Browse the repository at this point in the history
* inactiveTopicPolicies support cross multiple clusters
  • Loading branch information
mattisonchao authored Dec 29, 2021
1 parent 862e120 commit 6ec66ae
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,9 @@ protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl
});
}

protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies
(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies)
.orElseGet(() -> {
if (applied) {
Expand All @@ -853,10 +854,12 @@ protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolic
}));
}

protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
return getTopicPoliciesAsyncWithRetry(topicName)
protected CompletableFuture<Void> internalSetInactiveTopicPolicies
(InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setIsGlobal(isGlobal);
topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,12 @@ public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetInactiveTopicPolicies(applied))
.thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
return null;
Expand All @@ -508,11 +509,12 @@ public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "inactive topic policies for the specified topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies))
.thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setInactiveTopicPolicies", ex, asyncResponse);
Expand All @@ -529,11 +531,12 @@ public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResp
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetInactiveTopicPolicies(null))
.thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteInactiveTopicPolicies", ex, asyncResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -296,6 +298,30 @@ public void testReplicatorMessageDispatchRatePolicies() throws Exception {
assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName)));
}

@Test
public void testReplicatorInactiveTopicPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, persistentTopicName);

// set InactiveTopicPolicies
InactiveTopicPolicies inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies);
Awaitility.await().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true)
.getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
Awaitility.await().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true)
.getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
// remove InactiveTopicPolicies
admin1.topicPolicies(true).removeInactiveTopicPolicies(persistentTopicName);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
}

private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,14 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+ " -e -t 1s -m delete_when_no_subscriptions"));
verify(mockTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
Expand Down Expand Up @@ -1022,6 +1030,16 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+ " -e -t 1s -m delete_when_no_subscriptions -g"));
verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));



cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down Expand Up @@ -78,6 +81,11 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());

jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies());

}

@Parameters(commandDescription = "Get max consumers per subscription for a topic")
Expand Down Expand Up @@ -733,6 +741,87 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get the inactive topic policies on a topic")
private class GetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getInactiveTopicPolicies(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Set the inactive topic policies on a topic")
private class SetInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
private boolean enableDeleteWhileInactive = false;

@Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
private boolean disableDeleteWhileInactive = false;

@Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
private String deleteInactiveTopicsMaxInactiveDuration;

@Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
private String inactiveTopicDeleteMode;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
long maxInactiveDurationInSeconds;
try {
maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(
RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}

if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
}
InactiveTopicDeleteMode deleteMode;
try {
deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
} catch (IllegalArgumentException e) {
throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
}
getTopicPolicies(isGlobal).setInactiveTopicPolicies(persistentTopic,
new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
}
}

@Parameters(commandDescription = "Remove inactive topic policies from a topic")
private class RemoveInactiveTopicPolicies extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, the removing operation will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeInactiveTopicPolicies(persistentTopic);
}
}

private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("set-persistence");
cmdUsageFormatter.addDeprecatedCommand("remove-persistence");

cmdUsageFormatter.addDeprecatedCommand("get-inactive-topic-policies");
cmdUsageFormatter.addDeprecatedCommand("set-inactive-topic-policies");
cmdUsageFormatter.addDeprecatedCommand("remove-inactive-topic-policies");

cmdUsageFormatter.addDeprecatedCommand("get-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-dispatch-rate");
Expand Down

0 comments on commit 6ec66ae

Please sign in to comment.