Skip to content

Commit

Permalink
non-persistent topic metrics (apache#13827)
Browse files Browse the repository at this point in the history
### Motivation
Non-persistent topic doesn't have subscription metrics

### Modifications

Expose a new non-persistent subscription metric: `pulsar_subscription_msg_drop_rate`
  • Loading branch information
gaozhangmin authored Feb 22, 2022
1 parent c153dea commit d548bc4
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats {

long totalMsgExpired;

double msgDropRate;

public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
Expand Down Expand Up @@ -109,6 +112,37 @@ private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subscriptionStats,
AggregatedSubscriptionStats subsStats) {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;

}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
Expand Down Expand Up @@ -141,7 +175,6 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
}

TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
Expand Down Expand Up @@ -175,37 +208,23 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
}
});

tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;

AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
if (topic instanceof PersistentTopic) {
tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
aggregateTopicStats(stats, subscriptionStats, subsStats);
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
});
} else {
((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions()
.forEach((subName, nonPersistentSubscriptionStats) -> {
NonPersistentSubscriptionStatsImpl subscriptionStats =
(NonPersistentSubscriptionStatsImpl) nonPersistentSubscriptionStats;
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
aggregateTopicStats(stats, subscriptionStats, subsStats);
subsStats.msgDropRate += subscriptionStats.getMsgDropRate();
});
}

// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ public void testBundlesMetrics() throws Exception {
.subscribe();

final int messages = 10;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
Expand Down Expand Up @@ -491,6 +490,45 @@ public void testBundlesMetrics() throws Exception {
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
}

@Test
public void testNonPersistentSubMetrics() throws Exception {
Producer<byte[]> p1 =
pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("non-persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

final int messages = 100;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
assertTrue(metrics.containsKey("pulsar_throughput_out"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
assertTrue(metrics.containsKey("pulsar_out_messages_total"));
assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
}

@Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Expand Down

0 comments on commit d548bc4

Please sign in to comment.