Skip to content

Commit

Permalink
[broker] Optimize TopicPolicies#messageTTLInSeconds with HierarchyTop…
Browse files Browse the repository at this point in the history
…icPolicies (apache#13241)

### Motivation

This is one of the serial topic policy optimization with `HierarchyTopicPolicies`. 

Update topic policy with HierarchyTopicPolicies comes with these benefits:
- All topic policy related settings will goes into AbstractTopic#topicPolicies, easier to understand, check, review, and modify.
- Unify policy update to AbstractTopic. And easier to find which policy is not applied to non-persistent topic yet. And we can easily add support for it.
- Unify policy value with 3 level settings (topic/namespace/broker), and priority with topic > namespace > broker.  And it's easier to find which level settings is missing.
- All values are updated at creation or by trigger. We can save some resource to update it or recalculate each time we use it. 
- etc.

### Modifications

Add new field `messageTTLInSeconds` in org.apache.pulsar.broker.service.AbstractTopic#topicPolicies.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as 
- class org.apache.pulsar.broker.admin.TopicMessageTTLTest
- org.apache.pulsar.client.api.KeySharedSubscriptionTest#testContinueDispatchMessagesWhenMessageTTL
- org.apache.pulsar.broker.stats.PrometheusMetricsTest#testPerTopicExpiredStat
- org.apache.pulsar.client.impl.TopicsConsumerImplTest#testDefaultBacklogTTL

Some checklist for updating topic policy with `HierarchyTopicPolicies`.
  • Loading branch information
Jason918 authored Dec 12, 2021
1 parent d4ed376 commit 057edd8
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Expand All @@ -171,6 +172,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (namespacePolicies.deleted) {
return;
}
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
Expand Down Expand Up @@ -202,6 +204,7 @@ private void updateTopicPolicyByBrokerConfig() {
.updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());

topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1379,12 +1379,10 @@ public CompletableFuture<Void> checkReplication() {

CompletableFuture<List<String>> replicationClustersFuture = getReplicationClusters(name);

CompletableFuture<Integer> ttlFuture = getMessageTTL();

return CompletableFuture.allOf(replicationClustersFuture, ttlFuture)
return CompletableFuture.allOf(replicationClustersFuture)
.thenCompose(__ -> {
List<String> configuredClusters = replicationClustersFuture.join();
int newMessageTTLinSeconds = ttlFuture.join();
int newMessageTTLinSeconds = topicPolicies.getMessageTTLInSeconds().get();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

Expand Down Expand Up @@ -1454,15 +1452,12 @@ public CompletableFuture<List<String>> getReplicationClusters(TopicName topicNam

@Override
public void checkMessageExpiry() {
getMessageTTL().thenAccept(messageTtlInSeconds -> {
//If topic level policy or message ttl is not set, fall back to namespace level config.

if (messageTtlInSeconds != 0) {
subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
});
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (messageTtlInSeconds != 0) {
subscriptions.forEach((__, sub) -> sub.expireMessages(messageTtlInSeconds));
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
}

@Override
Expand Down Expand Up @@ -2882,31 +2877,6 @@ public synchronized OffloadProcessStatus offloadStatus() {
}
}

/**
* Get message TTL for this topic.
* @return Message TTL in second.
*/
private CompletableFuture<Integer> getMessageTTL() {
//Return Topic level message TTL if exist. If topic level policy or message ttl is not set,
//fall back to namespace level message ttl then message ttl set for current broker.
Optional<Integer> messageTtl = getTopicPolicies().map(TopicPolicies::getMessageTTLInSeconds);
if (messageTtl.isPresent()) {
return CompletableFuture.completedFuture(messageTtl.get());
}

return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(optPolicies -> {
if (optPolicies.isPresent()) {
if (optPolicies.get().message_ttl_in_seconds != null) {
return optPolicies.get().message_ttl_in_seconds;
}
}

return brokerService.getPulsar().getConfiguration().getTtlDurationDefaultInSeconds();
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.concurrent.CompletableFuture;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -35,9 +35,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.lang.reflect.Method;
import java.util.UUID;

@Slf4j
@Test(groups = "flaky")
public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -161,27 +158,29 @@ public void testDifferentLevelPolicyPriority() throws Exception {
final String topicName = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
Method method = PersistentTopic.class.getDeclaredMethod("getMessageTTL");
method.setAccessible(true);

Integer namespaceMessageTTL = admin.namespaces().getNamespaceMessageTTL(myNamespace);
Assert.assertNull(namespaceMessageTTL);
Assert.assertEquals((int) ((CompletableFuture<Integer>) method.invoke(persistentTopic)).join(), 3600);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(
(int) persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get(), 3600));

admin.namespaces().setNamespaceMessageTTL(myNamespace, 10);
Awaitility.await().untilAsserted(()
-> Assert.assertEquals(admin.namespaces().getNamespaceMessageTTL(myNamespace).intValue(), 10));
Assert.assertEquals((int) ((CompletableFuture<Integer>) method.invoke(persistentTopic)).join(), 10);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(
(int) persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get(), 10));

admin.namespaces().setNamespaceMessageTTL(myNamespace, 0);
Awaitility.await().untilAsserted(()
-> Assert.assertEquals(admin.namespaces().getNamespaceMessageTTL(myNamespace).intValue(), 0));
Assert.assertEquals((int) ((CompletableFuture<Integer>) method.invoke(persistentTopic)).join(), 0);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(
(int) persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get(), 0));

admin.namespaces().removeNamespaceMessageTTL(myNamespace);
Awaitility.await().untilAsserted(()
-> Assert.assertNull(admin.namespaces().getNamespaceMessageTTL(myNamespace)));
Assert.assertEquals((int) ((CompletableFuture<Integer>) method.invoke(persistentTopic)).join(), 3600);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(
(int) persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get(), 3600));
}

@Test(dataProvider = "isV1")
Expand All @@ -202,9 +201,8 @@ public void testNamespaceTTL(boolean isV1) throws Exception {
public void testDifferentLevelPolicyApplied() throws Exception {
final String topicName = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topicName);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
Method method = PersistentTopic.class.getDeclaredMethod("getMessageTTL");
method.setAccessible(true);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
//namespace-level default value is null
Integer namespaceMessageTTL = admin.namespaces().getNamespaceMessageTTL(myNamespace);
Assert.assertNull(namespaceMessageTTL);
Expand Down Expand Up @@ -237,7 +235,8 @@ public void testDifferentLevelPolicyApplied() throws Exception {
admin.topics().removeMessageTTL(topicName);
Awaitility.await().untilAsserted(()
-> Assert.assertEquals(admin.topics().getMessageTTL(topicName, true).intValue(), 3600));
Assert.assertEquals((int) ((CompletableFuture<Integer>)method.invoke(persistentTopic)).join(), 3600);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(
(int) persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get(), 3600));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void testPerTopicExpiredStat() throws Exception {
for (String topic : topicList) {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1);
}
pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
//wait for checkMessageExpiry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> maxProducersPerTopic;
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;

public HierarchyTopicPolicies() {
deduplicationEnabled = new PolicyHierarchyValue<>();
Expand All @@ -50,5 +51,6 @@ public HierarchyTopicPolicies() {
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
messageTTLInSeconds = new PolicyHierarchyValue<>();
}
}

0 comments on commit 057edd8

Please sign in to comment.