Skip to content

Commit

Permalink
Monitor if a cursor moves its mark-delete position (apache#8930)
Browse files Browse the repository at this point in the history
Motivation

msgBacklog or storageSize doesn't provide a clear idea
if mark-delete position is advanced or not. Add a new metric
in SubscriptionStat to monitor if its mark-delete position
is advanced or not.
  • Loading branch information
sijie authored Jan 6, 2021
1 parent c12765a commit 2fd878a
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn
/**
* Return the URL of the broker who's owning a particular service unit in asynchronous way.
*
* If the service unit is not owned, return a CompletableFuture with empty optional
* If the service unit is not owned, return a CompletableFuture with empty optional.
*/
public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -134,7 +135,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryComplete(Position position, Object ctx) {
if (position != null) {
log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position);
Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) {
subscription.updateLastMarkDeleteAdvancedTimestamp();
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] No messages to expire", topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class PersistentSubscription implements Subscription {

private long lastExpireTimestamp = 0L;
private long lastConsumedFlowTimestamp = 0L;
private long lastMarkDeleteAdvancedTimestamp = 0L;

// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
Expand Down Expand Up @@ -137,6 +138,11 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
IS_FENCED_UPDATER.set(this, FALSE);
}

public void updateLastMarkDeleteAdvancedTimestamp() {
this.lastMarkDeleteAdvancedTimestamp =
Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
}

@Override
public BrokerInterceptor interceptor() {
return topic.getBrokerService().getInterceptor();
Expand Down Expand Up @@ -333,6 +339,8 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
}

if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
this.updateLastMarkDeleteAdvancedTimestamp();

// Mark delete position advance
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
Expand Down Expand Up @@ -899,6 +907,7 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public class AggregatedSubscriptionStats {

long lastExpireTimestamp;

long lastConsumedFlowTimestamp;

long lastConsumedTimestamp;

long lastAckedTimestamp;

long lastMarkDeleteAdvancedTimestamp;

double msgRateExpired;

long totalMsgExpired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
subsStats.msgOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
subsStats.lastExpireTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
subsStats.lastAckedTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
subsStats.lastConsumedFlowTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
subsStats.lastConsumedTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
subsStats.lastMarkDeleteAdvancedTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
subsStats.msgRateExpired);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2097,17 +2097,29 @@ public void testPersistentTopicsExpireMessages() throws Exception {
Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async

topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0);
assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(subStats1.msgBacklog, 0);
assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(subStats2.msgBacklog, 10);
assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
SubscriptionStats subStats3 = topicStats.subscriptions.get("my-sub3");
assertEquals(subStats3.msgBacklog, 10);
assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);

admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async

topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0);
assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 0);
assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 0);
SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
assertEquals(newSubStats1.msgBacklog, 0);
assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
assertEquals(newSubStats2.msgBacklog, 0);
assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
assertEquals(newSubStats3.msgBacklog, 0);
assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);

consumer1.close();
consumer2.close();
Expand Down Expand Up @@ -2547,15 +2559,19 @@ public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exc
producer.send(new byte[1024 * i * 5]);
}

TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp, 0L);

for (int i = 0; i < messages; i++) {
consumer.acknowledgeCumulative(consumer.receive());
}

// Wait ack send
Thread.sleep(1000);

TopicStats topicStats = admin.topics().getStats(topic);
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.backlogSize, 0);
assertTrue(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp > 0L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class SubscriptionStats {
/** Last acked message timestamp. */
public long lastAckedTimestamp;

/** Last MarkDelete position advanced timesetamp. */
public long lastMarkDeleteAdvancedTimestamp;

/** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;

Expand Down Expand Up @@ -121,6 +124,7 @@ public void reset() {
msgRateExpired = 0;
totalMsgExpired = 0;
lastExpireTimestamp = 0L;
lastMarkDeleteAdvancedTimestamp = 0L;
consumers.clear();
consumersAfterMarkDeletePosition.clear();
nonContiguousDeletedMessagesRanges = 0;
Expand Down

0 comments on commit 2fd878a

Please sign in to comment.