Skip to content

Commit

Permalink
Add subscription backlog size info for topicstats. (apache#9302)
Browse files Browse the repository at this point in the history
Fixes apache#9254

### Motivation
Add ability to fetch backlog size for subscription, add flag in topic-stats partitioned-topic-stats for getting backlog size for subscriptions.
Sample output
```
./pulsar-admin topics partitioned-stats zxc-t/zxc-ns/zxc-p -gpb -sbs
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 0,
  "msgOutCounter" : 0,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 875,
  "backlogSize" : 585,
  "publishers" : [ ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "zxc-sub-1" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 10,
      "backlogSize" : 585,
      "msgBacklogNoDelayed" : 10,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0
    },
    "zxc-sub-2" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 5,
      "backlogSize" : 295,
      "msgBacklogNoDelayed" : 5,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0
    }
  },
  "replication" : { },
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  "metadata" : {
    "partitions" : 5
  },
  "partitions" : { }
}
```

### Modifications
In ManagedLedgerImpl add API to get backlog size starting from specific position. 
In Admin Rest API and CLI add option to get subscription backlog.

### Verifying this change

This change added tests and can be verified as follows:
Added check in existing test to verify backlog size.
  • Loading branch information
MarvinCai authored Feb 8, 2021
1 parent ff9923e commit ab94743
Show file tree
Hide file tree
Showing 24 changed files with 154 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,16 @@ public long getEstimatedBacklogSize() {
}
}

/**
* Get estimated backlog size from a specific position.
*/
public long getEstimatedBacklogSize(PositionImpl pos) {
if (pos == null) {
return 0;
}
return estimateBacklogFromPosition(pos);
}

long estimateBacklogFromPosition(PositionImpl pos) {
synchronized (this) {
LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean exposePreciseBacklogInPrometheus = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable expose the backlog size for each subscription when generating stats.\n" +
" Locking is used for fetching the status so default to false."
)
private boolean exposeSubscriptionBacklogSizeInPrometheus = false;

/**** --- Functions --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,14 +1139,15 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
}
}

protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog) {
protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog,
boolean subscriptionBacklogSize) {
validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
Topic topic = getTopicReference(topicName);
return topic.getStats(getPreciseBacklog);
return topic.getStats(getPreciseBacklog, subscriptionBacklogSize);
}

protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
Expand Down Expand Up @@ -1267,7 +1268,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}

protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
boolean perPartition, boolean getPreciseBacklog) {
boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
Expand All @@ -1289,7 +1290,8 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
try {
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog));
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public NonPersistentTopicStats getStats(@PathParam("property") String property,
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
return ((NonPersistentTopic) topic).getStats(false);
return ((NonPersistentTopic) topic).getStats(false, false);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public TopicStats getStats(@PathParam("property") String property, @PathParam("c
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetStats(authoritative, false);
return internalGetStats(authoritative, false, false);
}

@GET
Expand Down Expand Up @@ -352,7 +352,7 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,15 @@ public NonPersistentTopicStats getStats(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminOperationOnTopic(topicName, authoritative);
Topic topic = getTopicReference(topicName);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, subscriptionBacklogSize);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,13 @@ public TopicStats getStats(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetStats(authoritative, getPreciseBacklog);
return internalGetStats(authoritative, getPreciseBacklog, subscriptionBacklogSize);
}

@GET
Expand Down Expand Up @@ -1015,11 +1018,15 @@ public void getPartitionedStats(
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
subscriptionBacklogSize);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,11 @@ public long getBytesInCounter() {
}

public long getMsgOutCounter() {
return getStats(false).msgOutCounter;
return getStats(false, false).msgOutCounter;
}

public long getBytesOutCounter() {
return getStats(false).bytesOutCounter;
return getStats(false, false).bytesOutCounter;
}

public boolean isDeleteWhileInactive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ public String generateUniqueProducerName() {
public Map<String, TopicStats> getTopicStats() {
HashMap<String, TopicStats> stats = new HashMap<>();

forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false)));
forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false)));

return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();

TopicStats getStats(boolean getPreciseBacklog);
TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize);

CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}

@Override
public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {

NonPersistentTopicStats stats = new NonPersistentTopicStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ public long estimateBacklogSize() {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}

public SubscriptionStats getStats(Boolean getPreciseBacklog) {
public SubscriptionStats getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Expand Down Expand Up @@ -956,6 +956,10 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
}
}
subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
if (subscriptionBacklogSize) {
subStats.backlogSize = ((ManagedLedgerImpl) topic.getManagedLedger())
.getEstimatedBacklogSize((PositionImpl) cursor.getMarkDeletedPosition());
}
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,7 @@ public double getLastUpdatedAvgPublishRateInByte() {
}

@Override
public TopicStats getStats(boolean getPreciseBacklog) {
public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {

TopicStats stats = new TopicStats();

Expand All @@ -1656,7 +1656,7 @@ public TopicStats getStats(boolean getPreciseBacklog) {
stats.waitingPublishers = getWaitingProducersCount();

subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics,
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus());
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());

if (includeTopicMetrics) {
topicsCount.add(1);
Expand All @@ -85,7 +86,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean getPreciseBacklog) {
boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
stats.reset();

if (topic instanceof PersistentTopic) {
Expand All @@ -110,7 +111,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.storageReadRate = mlStats.getReadEntriesRate();
}

org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog);
org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog,
subscriptionBacklogSize);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,15 +1154,17 @@ public void testPreciseBacklog() throws PulsarClientException, PulsarAdminExcept
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);

topicStats = admin.topics().getStats(topic, true);
topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.subscriptions.get(subName).backlogSize, 43);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1);
consumer.acknowledge(message);

// wait for ack send
Thread.sleep(500);

// Consumer acks the message, so the precise backlog is 0
topicStats = admin.topics().getStats(topic, true);
topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.subscriptions.get(subName).backlogSize, 0);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);

topicStats = admin.topics().getStats(topic);
Expand Down Expand Up @@ -1207,7 +1209,7 @@ public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminExce
// not yet guaranteed to see the stats updated.
Thread.sleep(500);

TopicStats topicStats = admin.topics().getStats(topic, true);
TopicStats topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5);

Expand All @@ -1216,7 +1218,7 @@ public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminExce
}
// Wait the ack send.
Thread.sleep(500);
topicStats = admin.topics().getStats(topic, true);
topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
}
Expand Down Expand Up @@ -1258,8 +1260,9 @@ public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException
TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 20);

topicStats = admin.topics().getPartitionedStats(topic, false, true);
topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1);
assertEquals(topicStats.subscriptions.get(subName).backlogSize, 43);
}

@Test(timeOut = 30000)
Expand Down Expand Up @@ -1296,17 +1299,19 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
}
}

TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true);
TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
assertEquals(topicStats.subscriptions.get(subName).backlogSize, 470);
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5);

for (int i = 0; i < 5; i++) {
consumer.acknowledge(consumer.receive());
}
// Wait the ack send.
Thread.sleep(500);
topicStats = admin.topics().getPartitionedStats(topic, false, true);
topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
assertEquals(topicStats.subscriptions.get(subName).backlogSize, 238);
assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
}

Expand Down
Loading

0 comments on commit ab94743

Please sign in to comment.