Skip to content

Commit

Permalink
[broker] Optimize TopicPolicies#compactionThreshold with HierarchyTop…
Browse files Browse the repository at this point in the history
…icPolicies (apache#13710)
  • Loading branch information
AnonHxy authored Jan 14, 2022
1 parent 1fd4490 commit 8798392
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Expand All @@ -182,6 +183,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (namespacePolicies.deleted) {
return;
}
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
Expand Down Expand Up @@ -230,6 +232,7 @@ private void updateTopicPolicyByBrokerConfig() {
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,20 +1457,7 @@ public CompletableFuture<Boolean> isCompactionEnabled() {
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
Long compactionThreshold = getTopicPolicies()
.map(TopicPolicies::getCompactionThreshold)
.orElse(null);
if (compactionThreshold == null) {
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPolicies(name.getNamespaceObject())
.orElseThrow(() -> new MetadataStoreException.NotFoundException());
compactionThreshold = policies.compaction_threshold;
}
if (compactionThreshold == null) {
compactionThreshold = brokerService.pulsar().getConfiguration()
.getBrokerServiceCompactionThresholdInBytes();
}

long compactionThreshold = topicPolicies.getCompactionThreshold().get();
if (isSystemTopic() || compactionThreshold != 0
&& currentCompaction.isDone()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1873,9 +1873,10 @@ public void testCompactionTriggeredAfterThresholdFirstInvocation() throws Except

NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
NamespaceName ns = TopicName.get(successTopicName).getNamespaceObject();
doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize().get();

topic.checkCompaction();

Expand Down Expand Up @@ -1907,9 +1908,10 @@ public void testCompactionTriggeredAfterThresholdSecondInvocation() throws Excep

NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
NamespaceName ns = TopicName.get(successTopicName).getNamespaceObject();
doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize().get();

topic.checkCompaction();

Expand All @@ -1936,11 +1938,12 @@ public void testCompactionDisabledWithZeroThreshold() throws Exception {

NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
NamespaceName ns = TopicName.get(successTopicName).getNamespaceObject();
doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);

doReturn(1000L).when(ledgerMock).getEstimatedBacklogSize();

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize().get();
topic.checkCompaction();
verify(compactor, times(0)).compact(anyString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class HierarchyTopicPolicies {
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Long> compactionThreshold;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
Expand All @@ -64,5 +65,6 @@ public HierarchyTopicPolicies() {
messageTTLInSeconds = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
}
}

0 comments on commit 8798392

Please sign in to comment.