Skip to content

Commit

Permalink
[improve][broker]PIP-214 Add broker level metrics statistics and expo…
Browse files Browse the repository at this point in the history
…se to prometheus (apache#19047)
  • Loading branch information
yyj8 authored Mar 16, 2023
1 parent da78879 commit 80c5791
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;

public class AggregatedBrokerStats {
public int topicsCount;
public int subscriptionsCount;
public int producersCount;
public int consumersCount;
public double rateIn;
public double rateOut;
public double throughputIn;
public double throughputOut;
public long storageSize;
public long storageLogicalSize;
public double storageWriteRate;
public double storageReadRate;
public long msgBacklog;

void updateStats(TopicStats stats) {
topicsCount++;
subscriptionsCount += stats.subscriptionsCount;
producersCount += stats.producersCount;
consumersCount += stats.consumersCount;
rateIn += stats.rateIn;
rateOut += stats.rateOut;
throughputIn += stats.throughputIn;
throughputOut += stats.throughputOut;
storageSize += stats.managedLedgerStats.storageSize;
storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
storageWriteRate += stats.managedLedgerStats.storageWriteRate;
storageReadRate += stats.managedLedgerStats.storageReadRate;
msgBacklog += stats.msgBacklog;
}

public void reset() {
topicsCount = 0;
subscriptionsCount = 0;
producersCount = 0;
consumersCount = 0;
rateIn = 0;
rateOut = 0;
throughputIn = 0;
throughputOut = 0;
storageSize = 0;
storageLogicalSize = 0;
storageWriteRate = 0;
storageReadRate = 0;
msgBacklog = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@
@Slf4j
public class NamespaceStatsAggregator {

private static final FastThreadLocal<AggregatedBrokerStats> localBrokerStats =
new FastThreadLocal<>() {
@Override
protected AggregatedBrokerStats initialValue() {
return new AggregatedBrokerStats();
}
};

private static final FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
new FastThreadLocal<>() {
@Override
Expand All @@ -64,14 +72,13 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
PrometheusMetricStreams stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedBrokerStats brokerStats = localBrokerStats.get();
brokerStats.reset();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = localTopicStats.get();
Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
LongAdder topicsCount = new LongAdder();
Map<String, Long> localNamespaceTopicCount = new HashMap<>();

printDefaultBrokerStats(stream, cluster);

pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
Expand All @@ -83,6 +90,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
compactorMXBean
);

brokerStats.updateStats(topicStats);

if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name,
Expand All @@ -104,6 +113,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
if (includeTopicMetrics) {
printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
}

printBrokerStats(stream, cluster, brokerStats);
}

private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
Expand Down Expand Up @@ -301,22 +312,23 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
});
}

private static void printDefaultBrokerStats(PrometheusMetricStreams stream, String cluster) {
// Print metrics with 0 values. This is necessary to have the available brokers being
private static void printBrokerStats(PrometheusMetricStreams stream, String cluster,
AggregatedBrokerStats brokerStats) {
// Print metrics values. This is necessary to have the available brokers being
// reported in the brokers dashboard even if they don't have any topic or traffic
writeMetric(stream, "pulsar_topics_count", 0, cluster);
writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
writeMetric(stream, "pulsar_producers_count", 0, cluster);
writeMetric(stream, "pulsar_consumers_count", 0, cluster);
writeMetric(stream, "pulsar_rate_in", 0, cluster);
writeMetric(stream, "pulsar_rate_out", 0, cluster);
writeMetric(stream, "pulsar_throughput_in", 0, cluster);
writeMetric(stream, "pulsar_throughput_out", 0, cluster);
writeMetric(stream, "pulsar_storage_size", 0, cluster);
writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
writeMetric(stream, "pulsar_broker_topics_count", brokerStats.topicsCount, cluster);
writeMetric(stream, "pulsar_broker_subscriptions_count", brokerStats.subscriptionsCount, cluster);
writeMetric(stream, "pulsar_broker_producers_count", brokerStats.producersCount, cluster);
writeMetric(stream, "pulsar_broker_consumers_count", brokerStats.consumersCount, cluster);
writeMetric(stream, "pulsar_broker_rate_in", brokerStats.rateIn, cluster);
writeMetric(stream, "pulsar_broker_rate_out", brokerStats.rateOut, cluster);
writeMetric(stream, "pulsar_broker_throughput_in", brokerStats.throughputIn, cluster);
writeMetric(stream, "pulsar_broker_throughput_out", brokerStats.throughputOut, cluster);
writeMetric(stream, "pulsar_broker_storage_size", brokerStats.storageSize, cluster);
writeMetric(stream, "pulsar_broker_storage_logical_size", brokerStats.storageLogicalSize, cluster);
writeMetric(stream, "pulsar_broker_storage_write_rate", brokerStats.storageWriteRate, cluster);
writeMetric(stream, "pulsar_broker_storage_read_rate", brokerStats.storageReadRate, cluster);
writeMetric(stream, "pulsar_broker_msg_backlog", brokerStats.msgBacklog, cluster);
}

private static void printTopicsCountStats(PrometheusMetricStreams stream, Map<String, Long> namespaceTopicsCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ public void testPerTopicStats() throws Exception {
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 3);
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.size(), 2);
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("topic_load_times_count");
assertEquals(cm.size(), 1);
Expand Down Expand Up @@ -367,6 +367,97 @@ public void testPerTopicStats() throws Exception {
c2.close();
}

@Test
public void testPerBrokerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("test")
.subscribe();

final int messages = 10;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
}

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);

Collection<Metric> brokerMetrics = metrics.get("pulsar_broker_topics_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_subscriptions_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_producers_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_consumers_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_rate_in");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_rate_out");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_throughput_in");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_throughput_out");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_storage_size");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_storage_logical_size");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_storage_write_rate");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_storage_read_rate");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

brokerMetrics = metrics.get("pulsar_broker_msg_backlog");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

p1.close();
p2.close();
c1.close();
c2.close();
}

/**
* Test that the total message and byte counts for a topic are not reset when a consumer disconnects.
*
Expand Down Expand Up @@ -674,9 +765,9 @@ public void testPerNamespaceStats() throws Exception {
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 2);
assertNull(cm.get(1).tags.get("topic"));
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.size(), 1);
assertNull(cm.get(0).tags.get("topic"));
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ public void testManagedLedgerMetrics() throws Exception {
metricsStr = statsOut.toString();
metrics = parseMetrics(metricsStr);
metric = metrics.get("pulsar_storage_size");
assertEquals(metric.size(), 3);
assertEquals(metric.size(), 2);
metric = metrics.get("pulsar_storage_logical_size");
assertEquals(metric.size(), 3);
assertEquals(metric.size(), 2);
metric = metrics.get("pulsar_storage_backlog_size");
assertEquals(metric.size(), 2);
}
Expand Down

0 comments on commit 80c5791

Please sign in to comment.