Skip to content

Commit

Permalink
expose managedLedgerCache, managedLedger, loadBalance metrics to prom…
Browse files Browse the repository at this point in the history
…etheus (apache#6705)

## Motivation
The managed ledger read cache monitor metric is export via /admin/broker-stats/metrics with json format, it is hard to parse, collect and display, what's more the read cache is a very import module for message consuming throughput and latency. So collect and display the read cache metrics is extremely urgent for pulsar in production.

## Changes
I parse the json format metric to prometheus message type and export to prometheus monitor port, so those metrics can be displayed in grafana.

Please help check those changes, if it's ok, i will update the metric document.
  • Loading branch information
hangc0276 authored Apr 25, 2020
1 parent 53407fc commit a9ed984
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {

public static final long[] ENTRY_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
200_000, 1000_000 };
public static final long[] ENTRY_SIZE_BUCKETS_BYTES = { 128, 512, 1024, 2084, 4096, 16_384, 102_400, 1_232_896 };
public static final long[] ENTRY_SIZE_BUCKETS_BYTES = { 128, 512, 1024, 2048, 4096, 16_384, 102_400, 1_232_896 };

private final ManagedLedgerImpl managedLedger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Enumeration;

import org.apache.pulsar.broker.PulsarService;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;

import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.SimpleTextOutputStream;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -75,12 +83,78 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
pulsar.getConfiguration().getClusterName(), stream);

generateBrokerBasicMetrics(pulsar, stream);

out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
}
}

private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextOutputStream stream) {
String clusterName = pulsar.getConfiguration().getClusterName();
// generate managedLedgerCache metrics
parseMetricsToPrometheusMetrics(new ManagedLedgerCacheMetrics(pulsar).generate(),
clusterName, Collector.Type.GAUGE, stream);

// generate managedLedger metrics
parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
clusterName, Collector.Type.GAUGE, stream);

// generate loadBalance metrics
parseMetricsToPrometheusMetrics(pulsar.getLoadManager().get().getLoadBalancingMetrics(),
clusterName, Collector.Type.GAUGE, stream);
}

private static void parseMetricsToPrometheusMetrics(Collection<Metrics> metrics, String cluster,
Collector.Type metricType, SimpleTextOutputStream stream) {
Set<String> names = new HashSet<>();
for (Metrics metrics1 : metrics) {
for (Map.Entry<String, Object> entry : metrics1.getMetrics().entrySet()) {
String value = null;
if (entry.getKey().contains(".")) {
try {
String key = entry.getKey();
int dotIndex = key.indexOf(".");
int nameIndex = key.substring(0, dotIndex).lastIndexOf("_");
if (nameIndex == -1) {
continue;
}

String name = key.substring(0, nameIndex);
value = key.substring(nameIndex + 1);
if (!names.contains(name)) {
stream.write("# TYPE ").write(name.replace("brk_", "pulsar_")).write(' ')
.write(getTypeStr(metricType)).write("\n");
names.add(name);
}
stream.write(name.replace("brk_", "pulsar_"))
.write("{cluster=\"").write(cluster).write('"');
} catch (Exception e) {
continue;
}
} else {
stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
.write(getTypeStr(metricType)).write('\n');
stream.write(entry.getKey().replace("brk_", "pulsar_"))
.write("{cluster=\"").write(cluster).write('"');
}

for (Map.Entry<String, String> metric : metrics1.getDimensions().entrySet()) {
if (metric.getKey().isEmpty() || "cluster".equals(metric.getKey())) {
continue;
}
stream.write(", ").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"');
if (value != null && !value.isEmpty()) {
stream.write(", ").write("quantile=\"").write(value).write('"');
}
}
stream.write("} ").write(String.valueOf(entry.getValue()))
.write(' ').write(System.currentTimeMillis()).write("\n");
}
}
}

private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
while (metricFamilySamples.hasMoreElements()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,71 @@ public void testDuplicateMetricTypeDefinitions() throws Exception {
p2.close();
}

@Test
public void testManagedLedgerCacheStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());

Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e ->
System.out.println(e.getKey() + ": " + e.getValue())
);

List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cache_evictions");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");

cm = (List<Metric>) metrics.get("pulsar_ml_cache_hits_rate");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");

p1.close();
p2.close();
}

@Test
public void testManagedLedgerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());

Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e ->
System.out.println(e.getKey() + ": " + e.getValue())
);

List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

p1.close();
p2.close();
}

/**
* Hacky parsing of Prometheus text format. Sould be good enough for unit tests
Expand Down
49 changes: 49 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ Broker has the following kinds of metrics:
* [Replication metrics](#replication-metrics)
* [Topic metrics](#topic-metrics)
* [Replication metrics](#replication-metrics-1)
* [ManagedLedgerCache metrics](#managedledgercache-metrics)
* [ManagedLedger metrics](#managedledger-metrics)
* [LoadBalancing metrics](#loadbalancing-metrics)
* [BundleUnloading metrics](#bundleunloading-metrics)
* [BundleSplit metrics](#bundlesplit-metrics)
Expand Down Expand Up @@ -193,6 +195,53 @@ All the replication metrics will also be labelled with `remoteCluster=${pulsar_r
| pulsar_replication_throughput_out | Gauge | The total throughput of the topic replicating to remote cluster (bytes/second). |
| pulsar_replication_backlog | Gauge | The total backlog of the topic replicating to remote cluster (messages). |

### ManagedLedgerCache metrics
All the ManagedLedgerCache metrics are labelled with the following labels:
- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you configured in broker.conf.

| Name | Type | Description |
| --- | --- | --- |
| pulsar_ml_cache_evictions | Gauge | The number of cache evictions during the last minute. |
| pulsar_ml_cache_hits_rate | Gauge | The number of cache hits per second. |
| pulsar_ml_cache_hits_throughput | Gauge | The amount of data is retrieved from the cache in byte/s |
| pulsar_ml_cache_misses_rate | Gauge | The number of cache misses per second |
| pulsar_ml_cache_misses_throughput | Gauge | The amount of data is retrieved from the cache in byte/s |
| pulsar_ml_cache_pool_active_allocations | Gauge | The number of currently active allocations in direct arena |
| pulsar_ml_cache_pool_active_allocations_huge | Gauge | The number of currently active huge allocation in direct arena |
| pulsar_ml_cache_pool_active_allocations_normal | Gauge | The number of currently active normal allocations in direct arena |
| pulsar_ml_cache_pool_active_allocations_small | Gauge | The number of currently active small allocations in direct arena |
| pulsar_ml_cache_pool_active_allocations_tiny | Gauge | The number of currently active tiny allocations in direct arena |
| pulsar_ml_cache_pool_allocated | Gauge | The total allocated memory of chunk lists in direct arena |
| pulsar_ml_cache_pool_used | Gauge | The total used memory of chunk lists in direct arena |
| pulsar_ml_cache_used_size | Gauge | The size in byte used to store the entries payloads |
| pulsar_ml_count | Gauge | The number of currently opened managed ledgers |

### ManagedLedger metrics
All the managedLedger metrics are labelled with the following labels:
- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you configured in broker.conf.
- namespace: namespace=${pulsar_namespace}. ${pulsar_namespace} is the namespace name.
- quantile: quantile=${quantile}. Quantile is only for `Histogram` type metric, and represents the threshold for given Buckets.

| Name | Type | Description |
| --- | --- | --- |
| pulsar_ml_AddEntryBytesRate | Gauge | The bytes/s rate of messages added |
| pulsar_ml_AddEntryErrors | Gauge | The number of addEntry requests that failed |
| pulsar_ml_AddEntryLatencyBuckets | Histogram | The add entry latency of a ledger with a given quantile (threshold).<br> Available quantile: <br><ul><li> quantile="0.0_0.5" is AddEntryLatency between (0.0ms, 0.5ms]</li> <li>quantile="0.5_1.0" is AddEntryLatency between (0.5ms, 1.0ms]</li><li>quantile="1.0_5.0" is AddEntryLatency between (1ms, 5ms]</li><li>quantile="5.0_10.0" is AddEntryLatency between (5ms, 10ms]</li><li>quantile="10.0_20.0" is AddEntryLatency between (10ms, 20ms]</li><li>quantile="20.0_50.0" is AddEntryLatency between (20ms, 50ms]</li><li>quantile="50.0_100.0" is AddEntryLatency between (50ms, 100ms]</li><li>quantile="100.0_200.0" is AddEntryLatency between (100ms, 200ms]</li><li>quantile="200.0_1000.0" is AddEntryLatency between (200ms, 1s]</li></ul>|
| pulsar_ml_AddEntryLatencyBuckets_OVERFLOW | Gauge | The add entry latency > 1s |
| pulsar_ml_AddEntryMessagesRate | Gauge | The msg/s rate of messages added |
| pulsar_ml_AddEntrySucceed | Gauge | The number of addEntry requests that succeeded |
| pulsar_ml_EntrySizeBuckets | Histogram | The add entry size of a ledger with given quantile.<br> Available quantile: <br><ul><li>quantile="0.0_128.0" is EntrySize between (0byte, 128byte]</li><li>quantile="128.0_512.0" is EntrySize between (128byte, 512byte]</li><li>quantile="512.0_1024.0" is EntrySize between (512byte, 1KB]</li><li>quantile="1024.0_2048.0" is EntrySize between (1KB, 2KB]</li><li>quantile="2048.0_4096.0" is EntrySize between (2KB, 4KB]</li><li>quantile="4096.0_16384.0" is EntrySize between (4KB, 16KB]</li><li>quantile="16384.0_102400.0" is EntrySize between (16KB, 100KB]</li><li>quantile="102400.0_1232896.0" is EntrySize between (100KB, 1MB]</li></ul> |
| pulsar_ml_EntrySizeBuckets_OVERFLOW |Gauge | The add entry size > 1MB |
| pulsar_ml_LedgerSwitchLatencyBuckets | Histogram | The ledger switch latency with given quantile. <br> Available quantile: <br><ul><li>quantile="0.0_0.5" is EntrySize between (0ms, 0.5ms]</li><li>quantile="0.5_1.0" is EntrySize between (0.5ms, 1ms]</li><li>quantile="1.0_5.0" is EntrySize between (1ms, 5ms]</li><li>quantile="5.0_10.0" is EntrySize between (5ms, 10ms]</li><li>quantile="10.0_20.0" is EntrySize between (10ms, 20ms]</li><li>quantile="20.0_50.0" is EntrySize between (20ms, 50ms]</li><li>quantile="50.0_100.0" is EntrySize between (50ms, 100ms]</li><li>quantile="100.0_200.0" is EntrySize between (100ms, 200ms]</li><li>quantile="200.0_1000.0" is EntrySize between (200ms, 1000ms]</li></ul> |
| pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW | Gauge | The ledger switch latency > 1s |
| pulsar_ml_MarkDeleteRate | Gauge | The rate of mark-delete ops/s |
| pulsar_ml_NumberOfMessagesInBacklog | Gauge | The number of backlog messages for all the consumers |
| pulsar_ml_ReadEntriesBytesRate | Gauge | The bytes/s rate of messages read |
| pulsar_ml_ReadEntriesErrors | Gauge | The number of readEntries requests that failed |
| pulsar_ml_ReadEntriesRate | Gauge | The msg/s rate of messages read |
| pulsar_ml_ReadEntriesSucceeded | Gauge | The number of readEntries requests that succeeded |
| pulsar_ml_StoredMessagesSize | Gauge | The total size of the messages in active ledgers (accounting for the multiple copies stored) |

### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you configured in broker.conf.
Expand Down

0 comments on commit a9ed984

Please sign in to comment.