diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 76c58aaf25fad..3e9e7de8884f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1690,6 +1690,7 @@ public TopicStats getStats(boolean getPreciseBacklog) { stats.backlogSize = ledger.getEstimatedBacklogSize(); stats.deduplicationStatus = messageDeduplication.getStatus().toString(); stats.topicEpoch = topicEpoch.orElse(null); + stats.offloadedStorageSize = ledger.getOffloadedSize(); return stats; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 97ddcca0de82c..5688f973bb0a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -169,6 +169,9 @@ public void testBrokerServicePersistentTopicStats() throws Exception { assertEquals(subStats.msgBacklog, 0); assertEquals(subStats.consumers.size(), 1); + // storage stats + assertEquals(stats.offloadedStorageSize, 0); + Producer producer = pulsarClient.newProducer().topic(topicName).create(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); @@ -206,6 +209,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut); assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut); assertNotNull(subStats.consumers.get(0).getClientVersion()); + assertEquals(stats.offloadedStorageSize, 0); Message msg; for (int i = 0; i < 10; i++) { @@ -218,6 +222,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { rolloverPerIntervalStats(); stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); + assertEquals(stats.offloadedStorageSize, 0); assertEquals(subStats.msgBacklog, 0); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java index 9633e0f6aca9a..33a6f8210f5b4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java @@ -66,6 +66,9 @@ public class TopicStats { /** Get estimated total unconsumed or backlog size in bytes. */ public long backlogSize; + /** Space used to store the offloaded messages for the topic/. */ + public long offloadedStorageSize; + /** List of connected publishers on this topic w/ their stats. */ public List publishers; @@ -115,6 +118,7 @@ public void reset() { this.topicEpoch = null; this.nonContiguousDeletedMessagesRanges = 0; this.nonContiguousDeletedMessagesRangesSerializedSize = 0; + this.offloadedStorageSize = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -135,6 +139,7 @@ public TopicStats add(TopicStats stats) { this.averageMsgSize = newAverageMsgSize; this.storageSize += stats.storageSize; this.backlogSize += stats.backlogSize; + this.offloadedStorageSize += stats.offloadedStorageSize; this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; if (this.publishers.size() != stats.publishers.size()) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java index cdbfcd5ba5767..dcfbd2099c5cf 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java @@ -37,6 +37,7 @@ public void testPersistentTopicStats() { topicStats.msgThroughputOut = 1; topicStats.averageMsgSize = 1; topicStats.storageSize = 1; + topicStats.offloadedStorageSize = 1; topicStats.publishers.add(new PublisherStats()); topicStats.subscriptions.put("test_ns", new SubscriptionStats()); topicStats.replication.put("test_ns", new ReplicatorStats()); @@ -47,6 +48,7 @@ public void testPersistentTopicStats() { assertEquals(topicStats.msgRateOut, 1.0); assertEquals(topicStats.msgThroughputOut, 1.0); assertEquals(topicStats.averageMsgSize, 1.0); + assertEquals(topicStats.offloadedStorageSize, 1); assertEquals(topicStats.storageSize, 1); assertEquals(topicStats.publishers.size(), 1); assertEquals(topicStats.subscriptions.size(), 1); @@ -61,6 +63,7 @@ public void testPersistentTopicStats() { assertEquals(topicStats.publishers.size(), 0); assertEquals(topicStats.subscriptions.size(), 0); assertEquals(topicStats.replication.size(), 0); + assertEquals(topicStats.offloadedStorageSize, 0); } @Test