Skip to content

Commit

Permalink
Add producer metrics for Prometheus (apache#9541)
Browse files Browse the repository at this point in the history
### Motivation
There are subscriptions and consumers metrics for prometheus but miss producers' metrics.

### Modifications
1. Add producer metrics for prometheus.
2. Add a configration for whether export producer metrics, default is false.

### Verifying this change
Add test case testPerProducerStats() in PrometheusMetricsTest
  • Loading branch information
wangjialing218 authored Feb 17, 2021
1 parent c54a66f commit 7bf7cdd
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 22 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,9 @@ exposeTopicLevelMetricsInPrometheus=true
# Enable consumer level metrics. default is false
exposeConsumerLevelMetricsInPrometheus=false

# Enable producer level metrics. default is false
exposeProducerLevelMetricsInPrometheus=false

# Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
# jvmGCMetricsLoggerClassName=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "If true, export consumer level metrics otherwise namespace level"
)
private boolean exposeConsumerLevelMetricsInPrometheus = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, export producer level metrics otherwise namespace level"
)
private boolean exposeProducerLevelMetricsInPrometheus = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ public Boolean get() {
"org.apache.pulsar.broker.lookup", true, attributeMap);
this.metricsServlet = new PrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus());
config.isExposeConsumerLevelMetricsInPrometheus(),
config.isExposeProducerLevelMetricsInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;

public class AggregatedProducerStats {

public long producerId;

public double msgRateIn;

public double msgThroughputIn;

public double averageMsgSize;

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected TopicStats initialValue() throws Exception {
};

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
SimpleTextOutputStream stream) {
boolean includeProducerMetrics, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats.resetTypes();
Expand All @@ -62,7 +62,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b

bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics,
getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());

Expand All @@ -86,7 +86,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
}

private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
stats.reset();

if (topic instanceof PersistentTopic) {
Expand Down Expand Up @@ -131,6 +131,15 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.producersCount++;
stats.rateIn += producer.getStats().msgRateIn;
stats.throughputIn += producer.getStats().msgThroughputIn;

if (includeProducerMetrics) {
AggregatedProducerStats producerStats = stats.producerStats.computeIfAbsent(
producer.getProducerName(), k -> new AggregatedProducerStats());
producerStats.producerId = producer.getStats().producerId;
producerStats.msgRateIn = producer.getStats().msgRateIn;
producerStats.msgThroughputIn = producer.getStats().msgThroughputIn;
producerStats.averageMsgSize = producer.getStats().averageMsgSize;
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,21 @@ public double get() {
}

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, out, null);
boolean includeProducerMetrics, OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, out, null);
}

public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
boolean includeProducerMetrics, OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders)
throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);

generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());

NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics, stream);
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
includeProducerMetrics, stream);

if (pulsar.getWorkerServiceOpt().isPresent()) {
pulsar.getWorkerService().generateFunctionsStats(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ public class PrometheusMetricsServlet extends HttpServlet {
private final PulsarService pulsar;
private final boolean shouldExportTopicMetrics;
private final boolean shouldExportConsumerMetrics;
private final boolean shouldExportProducerMetrics;
private List<PrometheusRawMetricsProvider> metricsProviders;

private ExecutorService executor = null;

public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics) {
public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean shouldExportProducerMetrics) {
this.pulsar = pulsar;
this.shouldExportTopicMetrics = includeTopicMetrics;
this.shouldExportConsumerMetrics = includeConsumerMetrics;
this.shouldExportProducerMetrics = shouldExportProducerMetrics;
}

@Override
Expand All @@ -67,7 +70,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
res.setStatus(HttpStatus.OK_200);
res.setContentType("text/plain");
PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
res.getOutputStream(), metricsProviders);
shouldExportProducerMetrics, res.getOutputStream(), metricsProviders);
context.complete();

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class TopicStats {

Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
Map<String, AggregatedProducerStats> producerStats = new HashMap<>();

// Used for tracking duplicate TYPE definitions
static Map<String, String> metricWithTypeDefinition = new HashMap<>();
Expand Down Expand Up @@ -82,6 +83,7 @@ public void reset() {

replicationStats.clear();
subscriptionStats.clear();
producerStats.clear();
storageWriteLatencyBuckets.reset();
storageLedgerWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
Expand Down Expand Up @@ -163,6 +165,15 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());

stats.producerStats.forEach((p, producerStats) -> {
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
producerStats.msgRateIn);
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
producerStats.msgThroughputIn);
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
producerStats.averageMsgSize);
});

stats.subscriptionStats.forEach((n, subsStats) -> {
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
subsStats.msgBacklog);
Expand Down Expand Up @@ -265,6 +276,15 @@ private static void metric(SimpleTextOutputStream stream, String cluster, String
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String producerName, long produceId, String name, double value) {
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\",producer_name=\"").write(producerName)
.write("\",producer_id=\"").write(produceId).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String subscription, String name, double value) {
metricType(stream, name);
Expand Down
Loading

0 comments on commit 7bf7cdd

Please sign in to comment.