Skip to content

Commit

Permalink
Message TTL support cross multiple clusters (apache#13484)
Browse files Browse the repository at this point in the history
Message TTL support cross multiple clusters
  • Loading branch information
315157973 authored Dec 25, 2021
1 parent 9f599c9 commit 09b2d4c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2914,17 +2914,18 @@ protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
});
}

protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {
protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond, boolean isGlobal) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond < 0) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Invalid value for message TTL"));
}

return getTopicPoliciesAsyncWithRetry(topicName)
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMessageTTLInSeconds(ttlInSecond);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() ->
log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,11 +1782,12 @@ public void getMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenAccept(op -> asyncResponse.resume(op
.map(TopicPolicies::getMessageTTLInSeconds)
.orElseGet(() -> {
Expand Down Expand Up @@ -1818,11 +1819,12 @@ public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "TTL in seconds for the specified namespace", required = true)
@QueryParam("messageTTL") Integer messageTTL,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMessageTTL(messageTTL))
.thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setMessageTTL", ex, asyncResponse);
Expand All @@ -1845,10 +1847,11 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetMessageTTL(null))
.thenCompose(__ -> internalSetMessageTTL(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("removeMessageTTL", ex, asyncResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ public void testReplicateQuotaTopicPolicies() throws Exception {
assertEquals(admin3.topicPolicies(true).getBacklogQuotaMap(topic).size(), 0));
}

@Test
public void testReplicateMessageTTLPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
init(namespace, topic);
// set message ttl
admin1.topicPolicies(true).setMessageTTL(topic, 10);
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin2.topicPolicies(true).getMessageTTL(topic).intValue(), 10));
Awaitility.await().ignoreExceptions().untilAsserted(() ->
assertEquals(admin3.topicPolicies(true).getMessageTTL(topic).intValue(), 10));
//remove message ttl
admin1.topicPolicies(true).removeMessageTTL(topic);
Awaitility.await().untilAsserted(() ->
assertNull(admin2.topicPolicies(true).getMessageTTL(topic)));
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getMessageTTL(topic)));
}

@Test
public void testReplicatePersistentPolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10"));
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");

// Reset the cmd, and check global option
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 -g"));
Expand Down Expand Up @@ -946,6 +953,13 @@ public void topicPolicies() throws Exception {
cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1 -t message_age"));
verify(mockTopicsPolicies).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.BacklogQuotaType.message_age);

cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10 -g"));
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0 -g"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class CmdTopicPolicies extends CmdBase {
public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
super("topicPolicies", admin);

jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());
Expand All @@ -51,6 +54,65 @@ public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("remove-persistence", new RemovePersistence());
}

@Parameters(commandDescription = "Get the message TTL for a topic")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getTopicPolicies(isGlobal).getMessageTTL(persistentTopic, applied));
}
}

@Parameters(commandDescription = "Set message TTL for a topic")
private class SetMessageTTL extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-t", "--ttl" }, description = "Message TTL for topic in second, allowed range from 1 to Integer.MAX_VALUE", required = true)
private int messageTTLInSecond;

@Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
if (messageTTLInSecond < 0) {
throw new ParameterException(String.format("Invalid retention policy type '%d'. ", messageTTLInSecond));
}

String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setMessageTTL(persistentTopic, messageTTLInSecond);
}
}

@Parameters(commandDescription = "Remove message TTL for a topic")
private class RemoveMessageTTL extends CliCommand {

@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).removeMessageTTL(persistentTopic);
}
}

@Parameters(commandDescription = "Get the retention policy for a topic")
private class GetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ private void initDeprecatedCommands() {
cmdUsageFormatter.addDeprecatedCommand("disable-deduplication");
cmdUsageFormatter.addDeprecatedCommand("get-deduplication-enabled");

cmdUsageFormatter.addDeprecatedCommand("get-message-ttl");
cmdUsageFormatter.addDeprecatedCommand("set-message-ttl");
cmdUsageFormatter.addDeprecatedCommand("remove-message-ttl");

cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-consumer");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-consumer");
Expand Down

0 comments on commit 09b2d4c

Please sign in to comment.