Skip to content

Commit

Permalink
Support set message TTL on topic level. (apache#7738)
Browse files Browse the repository at this point in the history
### Motivation
Master Issue: apache#2688
Add Topic level policy support for message TTL.

### Modifications

Support set/get/remove Message TTL on topic level.

### Verifying this change
This change added tests and can be verified as follows:

  - *Added Unit test to verify set/get/remove message TTL at Topic level work as expected when Topic level policy is enabled/disabled*
  - *Added test case in PersistentTopicE2ETest to verify Topic level message TTL is used when set and will fall back to namespace message TTL if Topic level message TTL is removed.*
  • Loading branch information
MarvinCai authored Aug 6, 2020
1 parent 3e9db09 commit 8a763ab
Show file tree
Hide file tree
Showing 9 changed files with 498 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,47 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota
});
}

protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
}
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
TopicPolicies topicPolicies;
//Update existing topic policy or create a new one if not exist.
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName);
asyncResponse.resume(new RestException(e));
return;
}
if (topicPolicies == null){
topicPolicies = new TopicPolicies();
}
topicPolicies.setMessageTTLInSeconds(ttlInSecond);
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed set message ttl for topic",ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
ttlInSecond);
asyncResponse.resume(Response.noContent().build());
}
});
}


private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
if (retentionPolicies == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,53 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @Pa
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Get message TTL in seconds for a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")})
public int getMessageTTL(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
return getTopicPolicies(topicName)
.map(TopicPolicies::getMessageTTLInSeconds)
.orElse(0); //same as default ttl at namespace level
}

@POST
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Set message TTL in seconds for a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 412, message = "Invalid message TTL value") })
public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "TTL in seconds for the specified namespace", required = true)
@QueryParam("messageTTL") int messageTTL) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetMessageTTL(asyncResponse, messageTTL);
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Remove message TTL in seconds for a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 412, message = "Invalid message TTL value") })
public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetMessageTTL(asyncResponse, null);
}

@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
@ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand Down Expand Up @@ -1065,7 +1066,8 @@ public CompletableFuture<Void> checkReplication() {
return future;
}

final int newMessageTTLinSeconds = policies.message_ttl_in_seconds;
//Ignore current broker's config for messageTTL for replication.
final int newMessageTTLinSeconds = getMessageTTL(getTopicPolicies(name), policies, -1);

Set<String> configuredClusters;
if (policies.replication_clusters != null) {
Expand Down Expand Up @@ -1122,8 +1124,10 @@ public void checkMessageExpiry() {
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
int defaultTTL = brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
int message_ttl_in_seconds = (policies.message_ttl_in_seconds <= 0 && defaultTTL > 0) ? defaultTTL
: policies.message_ttl_in_seconds;
TopicPolicies topicPolicies = getTopicPolicies(name);
//If topic level policy or message ttl is not set, fall back to namespace level config.
int message_ttl_in_seconds = getMessageTTL(topicPolicies, policies, defaultTTL);

if (message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(message_ttl_in_seconds));
replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
Expand Down Expand Up @@ -2108,6 +2112,37 @@ public synchronized OffloadProcessStatus offloadStatus() {
}
}

/**
* Get {@link TopicPolicies} for this topic.
* @param topicName
* @return TopicPolicies is exist else return null.
*/
private TopicPolicies getTopicPolicies(TopicName topicName) {
try {
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
}
}

/**
* Get message TTL for this topic.
* @param topicPolicies TopicPolicies
* @param policies NameSpace policy
* @param brokerDefaultMessageTTL
* @return Message TTL in second.
*/
private int getMessageTTL(TopicPolicies topicPolicies, Policies policies, int brokerDefaultMessageTTL) {
//Return Topic level message TTL if exist. If topic level policy or message ttl is not set,
//fall back to namespace level message ttl then message ttl set for current broker.
return (topicPolicies == null || topicPolicies.getMessageTTLInSeconds() == null) ?
((policies.message_ttl_in_seconds <= 0 && brokerDefaultMessageTTL > 0) ?
brokerDefaultMessageTTL :
policies.message_ttl_in_seconds) :
topicPolicies.getMessageTTLInSeconds();
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected void setup() throws Exception {
admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "lookup-topic").create();
producer.close();
Thread.sleep(3000);
}

@AfterMethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Slf4j
public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {

private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogQuotaTest.class);

private final String testTenant = "my-tenant";

private final String testNamespace = "my-namespace";

private final String myNamespace = testTenant + "/" + testNamespace;

private final String testTopic = "persistent://" + myNamespace + "/test-topic-message-ttl";

@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
admin.topics().createPartitionedTopic(testTopic, 2);
Producer producer = pulsarClient.newProducer().topic(testTenant + "/" + testNamespace + "/" + "dummy-topic").create();
producer.close();
Thread.sleep(3000);
}

@AfterMethod
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSetThenRemoveMessageTTL() throws Exception {
admin.topics().setMessageTTL(testTopic, 100);
log.info("Message TTL set success on topic: {}", testTopic);

Thread.sleep(3000);
Integer messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertEquals(messageTTL.intValue(), 100);

Thread.sleep(3000);
admin.topics().removeMessageTTL(testTopic);
messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertEquals(messageTTL.intValue(), 0);
}

@Test
public void testSetInvalidMessageTTL() throws Exception {
try {
admin.topics().setMessageTTL(testTopic, -100);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}

try {
admin.topics().setMessageTTL(testTopic, (int)2147483650L);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 412);
}
}

@Test
public void testGetMessageTTL() throws Exception {
// Check default topic level message TTL.
Integer messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertEquals(messageTTL.intValue(), 0);

admin.topics().setMessageTTL(testTopic, 200);
log.info("Message TTL set success on topic: {}", testTopic);

Thread.sleep(3000);
messageTTL = admin.topics().getMessageTTL(testTopic);
log.info("Message TTL {} get on topic: {}", testTopic, messageTTL);
Assert.assertEquals(messageTTL.intValue(), 200);
}

@Test
public void testTopicPolicyDisabled() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();

admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
admin.topics().createPartitionedTopic(testTopic, 2);

try {
admin.topics().getMessageTTL(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}

try {
admin.topics().setMessageTTL(testTopic, 200);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
}

}
Loading

0 comments on commit 8a763ab

Please sign in to comment.