diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 61b488b29a25c..25efc0f946465 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1811,6 +1811,20 @@ public CompletableFuture getInternalStats(boolean cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage(); cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange(); cs.properties = cursor.getProperties(); + // subscription metrics + PersistentSubscription sub = subscriptions.get(Codec.decode(c.getName())); + if (sub != null) { + if (sub.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) sub + .getDispatcher(); + cs.subscriptionHavePendingRead = dispatcher.havePendingRead; + cs.subscriptionHavePendingReplayRead = dispatcher.havePendingReplayRead; + } else if (sub.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) { + PersistentDispatcherSingleActiveConsumer dispatcher = (PersistentDispatcherSingleActiveConsumer) sub + .getDispatcher(); + cs.subscriptionHavePendingRead = dispatcher.havePendingRead; + } + } stats.cursors.put(cursor.getName(), cs); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index 0bdb2fe005949..ade1cc8c7fba2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -44,6 +44,7 @@ import java.net.URL; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; @@ -128,7 +129,8 @@ public void testTopicInternalStats() throws Exception { assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs); assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0 && (cursor.totalNonContiguousDeletedMessagesRange) < numberOfMsgs / 2); - + assertFalse(cursor.subscriptionHavePendingRead); + assertFalse(cursor.subscriptionHavePendingReplayRead); producer.close(); consumer.close(); log.info("-- Exiting {} test --", methodName); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java index 4bd61c001dc87..c9b5ae5ff15cb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java @@ -76,6 +76,8 @@ public static class CursorStats { public String state; public long numberOfEntriesSinceFirstNotAckedMessage; public int totalNonContiguousDeletedMessagesRange; + public boolean subscriptionHavePendingRead; + public boolean subscriptionHavePendingReplayRead; public Map properties; }