diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8ff51f6c5017f..becd0943b308b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -38,6 +38,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -1605,6 +1606,26 @@ public PersistentTopicInternalStats getInternalStats() { stats.ledgers.add(info); }); + // Add ledger info for compacted topic ledger if exist. + LedgerInfo info = new LedgerInfo(); + info.ledgerId = -1; + info.entries = -1; + info.size = -1; + + try { + Optional compactedTopicContext = + ((CompactedTopicImpl)compactedTopic).getCompactedTopicContext(); + if (compactedTopicContext.isPresent()) { + CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get(); + info.ledgerId = ledgerContext.getLedger().getId(); + info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; + info.size = ledgerContext.getLedger().getLength(); + } + } catch (ExecutionException | InterruptedException e) { + log.warn("[{}]Fail to get ledger information for compacted topic.", topic); + } + stats.compactedLedger = info; + stats.cursors = Maps.newTreeMap(); ml.getCursors().forEach(c -> { ManagedCursorImpl cursor = (ManagedCursorImpl) c; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 2d430a7853ade..74d24c6dc5a64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -28,8 +28,11 @@ import java.util.Enumeration; import java.util.List; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import lombok.Getter; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; @@ -250,7 +253,8 @@ private static CompletableFuture> readEntries(LedgerHandle lh, long }); } - static class CompactedTopicContext { + @Getter + public static class CompactedTopicContext { final LedgerHandle ledger; final AsyncLoadingCache cache; @@ -260,6 +264,14 @@ static class CompactedTopicContext { } } + /** + * Getter for CompactedTopicContext. + * @return CompactedTopicContext + */ + public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException { + return compactedTopicContext == null? Optional.empty() : Optional.of(compactedTopicContext.get()); + } + private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) { return ComparisonChain.start() .compare(p.getLedgerId(), m.getLedgerId()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 2505a6eaffbb7..4ef450dc399b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; @@ -132,6 +133,12 @@ public void testCompaction() throws Exception { Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); compactor.compact(topic).get(); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic); + // Compacted topic ledger should have same number of entry equals to number of unique key. + Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries); + Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1); + Assert.assertFalse(internalStats.compactedLedger.offloaded); + // consumer with readCompacted enabled only get compacted entries try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe()) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java index aa9c59582f337..63fd3cb1134e4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java @@ -44,6 +44,9 @@ public class PersistentTopicInternalStats { public List ledgers; public Map cursors; + // LedgerInfo for compacted topic if exist. + public LedgerInfo compactedLedger; + /** * Ledger information. */ diff --git a/site2/docs/admin-api-persistent-topics.md b/site2/docs/admin-api-persistent-topics.md index 340e7c760ec6f..df950f9905fc3 100644 --- a/site2/docs/admin-api-persistent-topics.md +++ b/site2/docs/admin-api-persistent-topics.md @@ -364,6 +364,16 @@ It shows detailed statistics of a topic. - **offloaded**: Whether this ledger is offloaded + - **compactedLedger**: The ledgers holding un-acked messages after topic compaction. + + - **ledgerId**: Id of this ledger + + - **entries**: Total number of entries belong to this ledger + + - **size**: Size of messages written to this ledger (in bytes) + + - **offloaded**: Will always be false for compacted topic ledger. + - **cursors**: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats. - **markDeletePosition**: All of messages before the markDeletePosition are acknowledged by the subscriber. @@ -403,9 +413,16 @@ It shows detailed statistics of a topic. { "ledgerId": 324711539, "entries": 0, - "size": 0 + "size": 0, + "offloaded": true } ], + "compactedLedger": { + "ledgerId": 324711540, + "entries": 10, + "size": 100, + "offloaded": false + }, "cursors": { "my-subscription": { "markDeletePosition": "324711539:3133",