Skip to content

Commit

Permalink
Reset msg backlog and backlog size (apache#7082)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored May 28, 2020
1 parent a4a4a18 commit e86c4c8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void updateStats(TopicStats stats) {
storageSize += stats.storageSize;
backlogSize += stats.backlogSize;
offloadedStorageUsed += stats.offloadedStorageUsed;
backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit);

storageWriteRate += stats.storageWriteRate;
storageReadRate += stats.storageReadRate;
Expand Down Expand Up @@ -128,10 +129,13 @@ public void reset() {
throughputOut = 0;

storageSize = 0;
backlogSize = 0;
msgBacklog = 0;
msgDelayed = 0;
storageWriteRate = 0;
storageReadRate = 0;
offloadedStorageUsed = 0;
backlogQuotaLimit= 0;

replicationStats.clear();
subscriptionStats.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
Expand All @@ -150,6 +148,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
});

// Consumer stats can be a lot if a subscription has many consumers
Expand Down

0 comments on commit e86c4c8

Please sign in to comment.