Skip to content

Commit

Permalink
[pulsar-broker] add pending read subscription metrics to stats-intern…
Browse files Browse the repository at this point in the history
…al (apache#9788)
  • Loading branch information
rdhabalia authored Mar 8, 2021
1 parent a81b332 commit 926bb69
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,20 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
cs.properties = cursor.getProperties();
// subscription metrics
PersistentSubscription sub = subscriptions.get(Codec.decode(c.getName()));
if (sub != null) {
if (sub.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) sub
.getDispatcher();
cs.subscriptionHavePendingRead = dispatcher.havePendingRead;
cs.subscriptionHavePendingReplayRead = dispatcher.havePendingReplayRead;
} else if (sub.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
PersistentDispatcherSingleActiveConsumer dispatcher = (PersistentDispatcherSingleActiveConsumer) sub
.getDispatcher();
cs.subscriptionHavePendingRead = dispatcher.havePendingRead;
}
}
stats.cursors.put(cursor.getName(), cs);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.net.URL;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -128,7 +129,8 @@ public void testTopicInternalStats() throws Exception {
assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
&& (cursor.totalNonContiguousDeletedMessagesRange) < numberOfMsgs / 2);

assertFalse(cursor.subscriptionHavePendingRead);
assertFalse(cursor.subscriptionHavePendingReplayRead);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static class CursorStats {
public String state;
public long numberOfEntriesSinceFirstNotAckedMessage;
public int totalNonContiguousDeletedMessagesRange;
public boolean subscriptionHavePendingRead;
public boolean subscriptionHavePendingReplayRead;

public Map<String, Long> properties;
}
Expand Down

0 comments on commit 926bb69

Please sign in to comment.