Skip to content

Commit

Permalink
[Admin] Expose offloaded storage size to the admin stats (apache#9335)
Browse files Browse the repository at this point in the history
*Motivation*

Add offloaded storage size in the topic stats.
  • Loading branch information
zymap authored Jan 28, 2021
1 parent 232b324 commit 2af0aed
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,7 @@ public TopicStats getStats(boolean getPreciseBacklog) {
stats.backlogSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
stats.topicEpoch = topicEpoch.orElse(null);
stats.offloadedStorageSize = ledger.getOffloadedSize();
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);

// storage stats
assertEquals(stats.offloadedStorageSize, 0);

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

Expand Down Expand Up @@ -206,6 +209,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertNotNull(subStats.consumers.get(0).getClientVersion());
assertEquals(stats.offloadedStorageSize, 0);

Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
Expand All @@ -218,6 +222,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
rolloverPerIntervalStats();
stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
assertEquals(stats.offloadedStorageSize, 0);

assertEquals(subStats.msgBacklog, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class TopicStats {
/** Get estimated total unconsumed or backlog size in bytes. */
public long backlogSize;

/** Space used to store the offloaded messages for the topic/. */
public long offloadedStorageSize;

/** List of connected publishers on this topic w/ their stats. */
public List<PublisherStats> publishers;

Expand Down Expand Up @@ -115,6 +118,7 @@ public void reset() {
this.topicEpoch = null;
this.nonContiguousDeletedMessagesRanges = 0;
this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
this.offloadedStorageSize = 0;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand All @@ -135,6 +139,7 @@ public TopicStats add(TopicStats stats) {
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
this.backlogSize += stats.backlogSize;
this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
if (this.publishers.size() != stats.publishers.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void testPersistentTopicStats() {
topicStats.msgThroughputOut = 1;
topicStats.averageMsgSize = 1;
topicStats.storageSize = 1;
topicStats.offloadedStorageSize = 1;
topicStats.publishers.add(new PublisherStats());
topicStats.subscriptions.put("test_ns", new SubscriptionStats());
topicStats.replication.put("test_ns", new ReplicatorStats());
Expand All @@ -47,6 +48,7 @@ public void testPersistentTopicStats() {
assertEquals(topicStats.msgRateOut, 1.0);
assertEquals(topicStats.msgThroughputOut, 1.0);
assertEquals(topicStats.averageMsgSize, 1.0);
assertEquals(topicStats.offloadedStorageSize, 1);
assertEquals(topicStats.storageSize, 1);
assertEquals(topicStats.publishers.size(), 1);
assertEquals(topicStats.subscriptions.size(), 1);
Expand All @@ -61,6 +63,7 @@ public void testPersistentTopicStats() {
assertEquals(topicStats.publishers.size(), 0);
assertEquals(topicStats.subscriptions.size(), 0);
assertEquals(topicStats.replication.size(), 0);
assertEquals(topicStats.offloadedStorageSize, 0);
}

@Test
Expand Down

0 comments on commit 2af0aed

Please sign in to comment.