Skip to content

Commit

Permalink
Fix compaction not working for system topic (apache#10941)
Browse files Browse the repository at this point in the history
If a topic only have non-durable subscriptions but not durable subscriptions,
and the non-durable subscription reach the end of the topic, we will get 0 estimated backlog size
So that the compaction will never been triggered.

The expected behavior is if we have no durable subscriptions, we should use the total size for triggering
the compaction.
  • Loading branch information
codelipenghui authored Jun 17, 2021
1 parent 202da11 commit 797cb12
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,8 @@ public void checkCompaction() {
} else {
// compaction has never run, so take full backlog size,
// or total size if we have no durable subs yet.
backlogEstimate = subscriptions.isEmpty()
backlogEstimate = subscriptions.isEmpty() || subscriptions.values().stream()
.noneMatch(sub -> sub.getCursor().isDurable())
? ledger.getTotalSize()
: ledger.getEstimatedBacklogSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
Expand All @@ -54,6 +53,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -91,6 +91,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {

private final String persistenceTopic = "persistent://" + myNamespace + "/test-set-persistence";

private final String topicPolicyEventsTopic = "persistent://" + myNamespace + "/__change_events";

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -2278,4 +2280,31 @@ public void testProduceConsumeOnTopicPolicy() {
}
}

@Test
public void testSystemTopicShouldBeCompacted() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build();
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);

Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

admin.topics().setBacklogQuota(testTopic, backlogQuota);
log.info("Backlog quota set success on topic: {}", testTopic);

Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));

pulsar.getBrokerService().checkCompaction();

Awaitility.await()
.untilAsserted(() -> {
TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic);
Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction"));
});
}

}

0 comments on commit 797cb12

Please sign in to comment.