Skip to content

Commit

Permalink
Allow topic metrics to be exported to prometheus optionally. (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
cckellogg authored and merlimat committed Jan 23, 2018
1 parent 5a8103a commit 3396abc
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 16 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,9 @@ webSocketNumIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8


### --- Metrics --- ###

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,9 @@ webSocketNumIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8


### --- Metrics --- ###

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Number of connections per Broker in Pulsar Client used in WebSocket proxy
private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();

/**** --- Metrics --- ****/
// If true, export topic level metrics otherwise namespace level
private boolean exposeTopicLevelMetricsInPrometheus = true;


public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down Expand Up @@ -1319,4 +1324,12 @@ public void setPreferLaterVersions(boolean preferLaterVersions) {
public int getWebSocketConnectionsPerBroker() { return webSocketConnectionsPerBroker; }

public void setWebSocketConnectionsPerBroker(int webSocketConnectionsPerBroker) { this.webSocketConnectionsPerBroker = webSocketConnectionsPerBroker; }

public boolean exposeTopicLevelMetricsInPrometheus() {
return exposeTopicLevelMetricsInPrometheus;
}

public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ public void start() throws PulsarServerException {
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin", true);
this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true);

this.webService.addServlet("/metrics", new ServletHolder(new PrometheusMetricsServlet(this)), false);
this.webService.addServlet("/metrics",
new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus())), false);

if (config.isWebSocketServiceEnabled()) {
// Use local broker address to avoid different IP address when using a VIP for service discovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,39 @@ public class AggregatedNamespaceStats {

public Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();

void updateStats(TopicStats stats) {
topicsCount++;

subscriptionsCount += stats.subscriptionsCount;
producersCount += stats.producersCount;
consumersCount += stats.consumersCount;

rateIn += stats.rateIn;
rateOut += stats.rateOut;
throughputIn += stats.throughputIn;
throughputOut += stats.throughputOut;

storageSize += stats.storageSize;

storageWriteRate += stats.storageWriteRate;
storageReadRate += stats.storageWriteRate;

msgBacklog += msgBacklog;

storageWriteLatencyBuckets.addAll(stats.storageWriteLatencyBuckets);
entrySizeBuckets.addAll(stats.entrySizeBuckets);

stats.replicationStats.forEach((n, as) -> {
AggregatedReplicationStats replStats =
replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
replStats.msgRateIn += as.msgRateIn;
replStats.msgRateOut += as.msgRateOut;
replStats.msgThroughputIn += as.msgThroughputIn;
replStats.msgThroughputOut += as.msgThroughputOut;
replStats.replicationBacklog += as.replicationBacklog;
});
}

public void reset() {
topicsCount = 0;
subscriptionsCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,42 @@ protected AggregatedNamespaceStats initialValue() throws Exception {
}
};

public static void generate(PulsarService pulsar, SimpleTextOutputStream stream) {
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = new TopicStats();

pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();

bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
updateNamespaceStats(namespaceStats, topic);
getTopicStats(topic, topicStats);
namespaceStats.updateStats(topicStats);
if (includeTopicMetrics) {
TopicStats.printNamespaceStats(stream, cluster, namespace, name, topicStats);
}
});
});

printNamespaceStats(stream, cluster, namespace, namespaceStats);
});
}

private static void updateNamespaceStats(AggregatedNamespaceStats stats, Topic topic) {

private static void getTopicStats(Topic topic, TopicStats stats) {
stats.reset();

if(topic instanceof PersistentTopic) {
// Managed Ledger stats
// Managed Ledger stats
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ((PersistentTopic)topic).getManagedLedger().getStats();

stats.storageSize += mlStats.getStoredMessagesSize();
stats.storageSize = mlStats.getStoredMessagesSize();
stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
stats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());

stats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.storageReadRate = mlStats.getReadEntriesRate();
stats.storageReadRate = mlStats.getReadEntriesRate();
}

stats.topicsCount++;

topic.getProducers().forEach(producer -> {
if (producer.isRemote()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
import io.prometheus.client.hotspot.DefaultExports;

/**
* Generate metrics aggregated at the namespace level and formats them out in a text format suitable to be consumed by
* Prometheus. Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
* Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
* in a text format suitable to be consumed by Prometheus.
* Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
*/
public class PrometheusMetricsGenerator {

Expand All @@ -63,14 +64,14 @@ public double get() {
}).register(CollectorRegistry.defaultRegistry);
}

public static void generate(PulsarService pulsar, OutputStream out) throws IOException {
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, OutputStream out) throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);

generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());

NamespaceStatsAggregator.generate(pulsar, stream);
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream);

out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ public class PrometheusMetricsServlet extends HttpServlet {
private static final long serialVersionUID = 1L;

private final PulsarService pulsar;
private final boolean shouldExportTopicMetrics;

private ExecutorService executor = null;

public PrometheusMetricsServlet(PulsarService pulsar) {
public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics) {
this.pulsar = pulsar;
this.shouldExportTopicMetrics = includeTopicMetrics;
}

@Override
Expand All @@ -63,7 +66,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
try {
res.setStatus(HttpStatus.OK_200);
res.setContentType("text/plain");
PrometheusMetricsGenerator.generate(pulsar, res.getOutputStream());
PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, res.getOutputStream());
context.complete();

} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.utils.SimpleTextOutputStream;

import java.util.HashMap;
import java.util.Map;

class TopicStats {

int subscriptionsCount;
int producersCount;
int consumersCount;
double rateIn;
double rateOut;
double throughputIn;
double throughputOut;

long storageSize;
public long msgBacklog;

StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
double storageWriteRate;
double storageReadRate;

Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();

public void reset() {
subscriptionsCount = 0;
producersCount = 0;
consumersCount = 0;
rateIn = 0;
rateOut = 0;
throughputIn = 0;
throughputOut = 0;

storageSize = 0;
msgBacklog = 0;
storageWriteRate = 0;
storageReadRate = 0;

replicationStats.clear();
storageWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
}

static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
TopicStats stats) {

metric(stream, cluster, namespace, topic,"pulsar_subscriptions_count", stats.subscriptionsCount);
metric(stream, cluster, namespace, topic,"pulsar_producers_count", stats.producersCount);
metric(stream, cluster, namespace, topic,"pulsar_consumers_count", stats.consumersCount);

metric(stream, cluster, namespace, topic,"pulsar_rate_in", stats.rateIn);
metric(stream, cluster, namespace, topic,"pulsar_rate_out", stats.rateOut);
metric(stream, cluster, namespace, topic,"pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, topic,"pulsar_throughput_out", stats.throughputOut);

metric(stream, cluster, namespace, topic,"pulsar_storage_size", stats.storageSize);
metric(stream, cluster, namespace, topic,"pulsar_msg_backlog", stats.msgBacklog);
}

private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String name, double value) {
stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
.write("\", topic=\"").write(topic).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
}

0 comments on commit 3396abc

Please sign in to comment.