Skip to content

Commit

Permalink
[broker] Optimize retentionPolicies with HierarchyTopicPolicies (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Jan 24, 2022
1 parent c9f68f0 commit 1568a7a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand Down Expand Up @@ -157,6 +158,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
// Only use namespace level setting for system topic.
topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters());
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
Expand Down Expand Up @@ -185,6 +187,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (namespacePolicies.deleted) {
return;
}
topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies);
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
Expand Down Expand Up @@ -224,6 +227,8 @@ private void updateTopicPolicyByBrokerConfig() {
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
topicPolicies.getRetentionPolicies().updateBrokerValue(new RetentionPolicies(
config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
config.getBrokerDeduplicationSnapshotIntervalSeconds());
//init backlogQuota
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2375,32 +2375,7 @@ public void checkDeduplicationSnapshot() {
* marked as inactive.
*/
private boolean shouldTopicBeRetained() {
RetentionPolicies retentionPolicies = null;
try {
retentionPolicies = getTopicPolicies()
.map(TopicPolicies::getRetentionPolicies)
.orElse(null);
if (retentionPolicies == null){
TopicName name = TopicName.get(topic);
retentionPolicies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPolicies(name.getNamespaceObject())
.map(p -> p.retention_policies)
.orElse(null);
}
if (retentionPolicies == null){
// If no policies, the default is to have no retention and delete the inactive topic
retentionPolicies = new RetentionPolicies(
brokerService.pulsar().getConfiguration().getDefaultRetentionTimeInMinutes(),
brokerService.pulsar().getConfiguration().getDefaultRetentionSizeInMB());
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
// Don't delete in case we cannot get the policies
return true;
}

RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies().get();
long retentionTime = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes());
// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
@Getter
public class HierarchyTopicPolicies {
final PolicyHierarchyValue<List<String>> replicationClusters;
final PolicyHierarchyValue<RetentionPolicies> retentionPolicies;
final PolicyHierarchyValue<Boolean> deduplicationEnabled;
final PolicyHierarchyValue<Integer> deduplicationSnapshotIntervalSeconds;
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
Expand All @@ -51,6 +52,7 @@ public class HierarchyTopicPolicies {

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
retentionPolicies = new PolicyHierarchyValue<>();
deduplicationEnabled = new PolicyHierarchyValue<>();
deduplicationSnapshotIntervalSeconds = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
Expand Down

0 comments on commit 1568a7a

Please sign in to comment.