Skip to content

Commit

Permalink
Report compacted topic ledger info when calling get internal stats. (a…
Browse files Browse the repository at this point in the history
…pache#7988)

Fixes apache#7895

### Motivation
For get-internal-stats of persistent topic admin cli: https://pulsar.apache.org/docs/en/2.6.0/admin-api-persistent-topics/#get-internal-stats, we can also return the compacted topic ledger id if compaction is enabled. So we'll able to read from ledger without creating additional subscription, it can benefit like querying compacted topic from Pulsar SQL.

### Modifications
Expose CompactedTopicContext from CompactedTopicImpl, try to get ledger information of compacted topic ledger if exist in PersistentTopic.

### Verifying this change

This change added tests and can be verified as follows:
- Added unit test to verify correct compacted ledger info is returned after compaction.


* Report compacted topic ledger info when calling get internal stats.

* Update documentation to add information about returning compacted topic ledger when get-internal-stats.
  • Loading branch information
MarvinCai authored Sep 8, 2020
1 parent 7b5301d commit 97ba09e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -1605,6 +1606,26 @@ public PersistentTopicInternalStats getInternalStats() {
stats.ledgers.add(info);
});

// Add ledger info for compacted topic ledger if exist.
LedgerInfo info = new LedgerInfo();
info.ledgerId = -1;
info.entries = -1;
info.size = -1;

try {
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
((CompactedTopicImpl)compactedTopic).getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
stats.compactedLedger = info;

stats.cursors = Maps.newTreeMap();
ml.getCursors().forEach(c -> {
ManagedCursorImpl cursor = (ManagedCursorImpl) c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import lombok.Getter;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
Expand Down Expand Up @@ -250,7 +253,8 @@ private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long
});
}

static class CompactedTopicContext {
@Getter
public static class CompactedTopicContext {
final LedgerHandle ledger;
final AsyncLoadingCache<Long,MessageIdData> cache;

Expand All @@ -260,6 +264,14 @@ static class CompactedTopicContext {
}
}

/**
* Getter for CompactedTopicContext.
* @return CompactedTopicContext
*/
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
return compactedTopicContext == null? Optional.empty() : Optional.of(compactedTopicContext.get());
}

private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -132,6 +133,12 @@ public void testCompaction() throws Exception {
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();

PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic);
// Compacted topic ledger should have same number of entry equals to number of unique key.
Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries);
Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
Assert.assertFalse(internalStats.compactedLedger.offloaded);

// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class PersistentTopicInternalStats {
public List<LedgerInfo> ledgers;
public Map<String, CursorStats> cursors;

// LedgerInfo for compacted topic if exist.
public LedgerInfo compactedLedger;

/**
* Ledger information.
*/
Expand Down
19 changes: 18 additions & 1 deletion site2/docs/admin-api-persistent-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ It shows detailed statistics of a topic.

- **offloaded**: Whether this ledger is offloaded

- **compactedLedger**: The ledgers holding un-acked messages after topic compaction.

- **ledgerId**: Id of this ledger

- **entries**: Total number of entries belong to this ledger

- **size**: Size of messages written to this ledger (in bytes)

- **offloaded**: Will always be false for compacted topic ledger.

- **cursors**: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.

- **markDeletePosition**: All of messages before the markDeletePosition are acknowledged by the subscriber.
Expand Down Expand Up @@ -403,9 +413,16 @@ It shows detailed statistics of a topic.
{
"ledgerId": 324711539,
"entries": 0,
"size": 0
"size": 0,
"offloaded": true
}
],
"compactedLedger": {
"ledgerId": 324711540,
"entries": 10,
"size": 100,
"offloaded": false
},
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
Expand Down

0 comments on commit 97ba09e

Please sign in to comment.