diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 731ae09e4c035..06393f210b14b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -79,6 +79,9 @@ public class Consumer { private final Rate msgOut; private final Rate msgRedeliver; + private long lastConsumedTimestamp; + private long lastAckedTimestamp; + // Represents how many messages we can safely send to the consumer without // overflowing its receiving queue. The consumer will use Flow commands to // increase its availability @@ -188,6 +191,7 @@ public boolean readCompacted() { */ public ChannelPromise sendMessages(final List entries, EntryBatchSizes batchSizes, int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) { + this.lastConsumedTimestamp = System.currentTimeMillis(); final ChannelHandlerContext ctx = cnx.ctx(); final ChannelPromise writePromise = ctx.newPromise(); @@ -335,6 +339,7 @@ void doUnsubscribe(final long requestId) { } void messageAcked(CommandAck ack) { + this.lastAckedTimestamp = System.currentTimeMillis(); Map properties = Collections.emptyMap(); if (ack.getPropertiesCount() > 0) { properties = ack.getPropertiesList().stream() @@ -450,6 +455,8 @@ public void updateRates() { } public ConsumerStats getStats() { + stats.lastAckedTimestamp = lastAckedTimestamp; + stats.lastConsumedTimestamp = lastConsumedTimestamp; stats.availablePermits = getAvailablePermits(); stats.unackedMessages = unackedMessages; stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 508f30148545f..88d8e45c8b3cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -89,6 +89,7 @@ public class PersistentSubscription implements Subscription { private PersistentMessageExpiryMonitor expiryMonitor; private long lastExpireTimestamp = 0L; + private long lastConsumedFlowTimestamp = 0L; // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; @@ -315,6 +316,7 @@ public void deactivateCursor() { @Override public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { + this.lastConsumedFlowTimestamp = System.currentTimeMillis(); dispatcher.consumerFlow(consumer, additionalNumberOfMessages); } @@ -935,6 +937,7 @@ public long estimateBacklogSize() { public SubscriptionStats getStats() { SubscriptionStats subStats = new SubscriptionStats(); subStats.lastExpireTimestamp = lastExpireTimestamp; + subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp; Dispatcher dispatcher = this.dispatcher; if (dispatcher != null) { dispatcher.getConsumers().forEach(consumer -> { @@ -944,6 +947,8 @@ public SubscriptionStats getStats() { subStats.msgThroughputOut += consumerStats.msgThroughputOut; subStats.msgRateRedeliver += consumerStats.msgRateRedeliver; subStats.unackedMessages += consumerStats.unackedMessages; + subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp); + subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index ffa44a5a31a86..e65aac2d4d5db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -21,6 +21,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -31,6 +32,7 @@ import com.google.common.collect.Sets; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -60,6 +62,8 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; @@ -948,4 +952,110 @@ public void testCreateNamespaceWithNoClusters() throws PulsarAdminException { assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace), Collections.singletonList(localCluster)); } + + @Test(timeOut = 30000) + public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException { + long timestamp = System.currentTimeMillis(); + final String topicName = "consumer-stats-" + timestamp; + final String subscribeName = topicName + "-test-stats-sub"; + final String topic = "persistent://prop-xyz/ns1/" + topicName; + final String producerName = "producer-" + topicName; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + Producer producer = client.newProducer().topic(topic) + .enableBatching(false) + .producerName(producerName) + .create(); + + // a. Send a message to the topic. + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + + // b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed + // by the broker, the lastConsumedTimestamp will as the consume subscribe time. + Consumer consumer = client.newConsumer().topic(topic) + .subscriptionName(subscribeName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message message = consumer.receive(); + + // Get the consumer stats. + TopicStats topicStats = admin.topics().getStats(topic); + SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subscribeName); + long startConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long startAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + ConsumerStats consumerStats = subscriptionStats.consumers.get(0); + long startConsumedTimestampInConsumerStats = consumerStats.lastConsumedTimestamp; + long startAckedTimestampInConsumerStats = consumerStats.lastAckedTimestamp; + + // Because the message was pushed by the broker, the consumedTimestamp should not as 0. + assertNotEquals(0, startConsumedTimestampInConsumerStats); + // There is no consumer ack the message, so the lastAckedTimestamp still as 0. + assertEquals(0, startAckedTimestampInConsumerStats); + assertNotEquals(0, startConsumedFlowTimestamp); + assertEquals(0, startAckedTimestampInSubStats); + + // c. The Consumer receives the message and acks the message. + consumer.acknowledge(message); + // Waiting for the ack command send to the broker. + while (true) { + topicStats = admin.topics().getStats(topic); + if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != 0) { + break; + } + TimeUnit.MILLISECONDS.sleep(100); + } + + // Get the consumer stats. + topicStats = admin.topics().getStats(topic); + subscriptionStats = topicStats.subscriptions.get(subscribeName); + long consumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + consumerStats = subscriptionStats.consumers.get(0); + long consumedTimestamp = consumerStats.lastConsumedTimestamp; + long ackedTimestamp = consumerStats.lastAckedTimestamp; + + // The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the + // consumer does not pull any messages. + assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp); + assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp); + assertNotEquals(0, consumedFlowTimestamp); + assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats); + + // d. Send another messages. The lastConsumedTimestamp should be updated. + producer.send("message-2".getBytes(StandardCharsets.UTF_8)); + + // e. Receive the message and ack it. + message = consumer.receive(); + consumer.acknowledge(message); + // Waiting for the ack command send to the broker. + while (true) { + topicStats = admin.topics().getStats(topic); + if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != ackedTimestampInSubStats) { + break; + } + TimeUnit.MILLISECONDS.sleep(100); + } + + // Get the consumer stats again. + topicStats = admin.topics().getStats(topic); + subscriptionStats = topicStats.subscriptions.get(subscribeName); + long lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; + long lastConsumedTimestampInSubStats = subscriptionStats.lastConsumedTimestamp; + long lastAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; + consumerStats = subscriptionStats.consumers.get(0); + long lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; + long lastAckedTimestamp = consumerStats.lastAckedTimestamp; + + assertTrue(consumedTimestamp < lastConsumedTimestamp); + assertTrue(ackedTimestamp < lastAckedTimestamp); + assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp); + assertTrue(startAckedTimestampInConsumerStats < lastAckedTimestamp); + assertTrue(consumedFlowTimestamp == lastConsumedFlowTimestamp); + assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats); + assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats); + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index f929e22792a4d..7411f03a29f4c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -59,6 +59,9 @@ public class ConsumerStats { private int clientVersionOffset = -1; private int clientVersionLength; + public long lastAckedTimestamp; + public long lastConsumedTimestamp; + /** Metadata (key/value strings) associated with this consumer. */ public Map metadata; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 30b04c9c569a4..a4c299493be79 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -61,6 +61,15 @@ public class SubscriptionStats { /** Last message expire execution timestamp. */ public long lastExpireTimestamp; + /** Last received consume flow command timestamp. */ + public long lastConsumedFlowTimestamp; + + /** Last consume message timestamp. */ + public long lastConsumedTimestamp; + + /** Last acked message timestamp. */ + public long lastAckedTimestamp; + /** List of connected consumers on this subscription w/ their stats. */ public List consumers; diff --git a/site2/docs/admin-api-persistent-topics.md b/site2/docs/admin-api-persistent-topics.md index 1f688c547021e..b09040d098016 100644 --- a/site2/docs/admin-api-persistent-topics.md +++ b/site2/docs/admin-api-persistent-topics.md @@ -222,6 +222,14 @@ It shows current statistics of a given non-partitioned topic. - **type**: This subscription type - **msgRateExpired**: The rate at which messages were discarded instead of dispatched from this subscription due to TTL + + - **lastExpireTimestamp**: The last message expire execution timestamp + + - **lastConsumedFlowTimestamp**: The last flow command received timestamp + + - **lastConsumedTimestamp**: The latest timestamp of all the consumed timestamp of the consumers + + - **lastAckedTimestamp**: The latest timestamp of all the acked timestamp of the consumers - **consumers**: The list of connected consumers for this subscription @@ -236,6 +244,10 @@ It shows current statistics of a given non-partitioned topic. - **unackedMessages**: Number of unacknowledged messages for the consumer - **blockedConsumerOnUnackedMsgs**: Flag to verify if the consumer is blocked due to reaching threshold of unacked messages + + - **lastConsumedTimestamp**: The timestamp of the consumer last consume a message + + - **lastAckedTimestamp**: The timestamp of the consumer last ack a message - **replication**: This section gives the stats for cross-colo replication of this topic