Skip to content

Commit

Permalink
Expose lastExpireTimestamp for subscription stats. (apache#5721)
Browse files Browse the repository at this point in the history
### Motivation

Expose lastExpireTimestamp for subscription stats. Can be used to troubleshooting TTL related issues

### Verifying this change

Added new unit tests
  • Loading branch information
codelipenghui authored and sijie committed Nov 22, 2019
1 parent 0565c78 commit 2bbb10c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class PersistentSubscription implements Subscription {
private volatile int isFenced = FALSE;
private PersistentMessageExpiryMonitor expiryMonitor;

private long lastExpireTimestamp = 0L;

// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

Expand Down Expand Up @@ -902,6 +904,7 @@ public List<Consumer> getConsumers() {

@Override
public void expireMessages(int messageTTLInSeconds) {
this.lastExpireTimestamp = System.currentTimeMillis();
if ((getNumberOfEntriesInBacklog() == 0) || (dispatcher != null && dispatcher.isConsumerConnected()
&& getNumberOfEntriesInBacklog() < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
&& !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
Expand All @@ -921,7 +924,7 @@ public long estimateBacklogSize() {

public SubscriptionStats getStats() {
SubscriptionStats subStats = new SubscriptionStats();

subStats.lastExpireTimestamp = lastExpireTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.lookup.data.LookupData;
Expand Down Expand Up @@ -147,6 +148,7 @@ public void setup() throws Exception {
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setMessageExpiryCheckIntervalInMinutes(1);

super.internalSetup();

Expand Down Expand Up @@ -2118,4 +2120,21 @@ public void testCompactionStatus() throws Exception {
LongRunningProcessStatus.Status.ERROR);
assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something"));
}

@Test(timeOut = 90000)
public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 60);
final String topic = "persistent://prop-xyz/ns1/testTopicStatsLastExpireTimestampForSubscription";
Consumer<byte[]> producer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscribe();

Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 1);
Assert.assertEquals(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp, 0L);

Thread.sleep(60000);

Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class SubscriptionStats {
/** Total rate of messages expired on this subscription (msg/s). */
public double msgRateExpired;

/** Last message expire execution timestamp. */
public long lastExpireTimestamp;

/** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;

Expand All @@ -75,6 +78,7 @@ public void reset() {
msgBacklog = 0;
unackedMessages = 0;
msgRateExpired = 0;
lastExpireTimestamp = 0L;
consumers.clear();
}

Expand Down

0 comments on commit 2bbb10c

Please sign in to comment.