Skip to content

Commit

Permalink
Add metrics [AddEntryWithReplicasBytesRate] for namespace (apache#11472)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Aug 4, 2021
1 parent 2b72a1e commit 92e7825
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public interface ManagedLedgerMXBean {
*/
double getAddEntryBytesRate();

/**
* @return the bytes/s rate of messages added with replicas
*/
double getAddEntryWithReplicasBytesRate();

/**
* @return the msg/s rate of messages read
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class EntryCacheManager {

private final long maxSize;
private final long evictionTriggerThreshold;
private final double cacheEvictionWatermak;
private final double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
Expand All @@ -64,7 +64,7 @@ public class EntryCacheManager {
public EntryCacheManager(ManagedLedgerFactoryImpl factory) {
this.maxSize = factory.getConfig().getMaxCacheSize();
this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
this.cacheEvictionWatermak = factory.getConfig().getCacheEvictionWatermark();
this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark();
this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
this.mlFactory = factory;
this.mlFactoryMBean = factory.mbean;
Expand Down Expand Up @@ -109,7 +109,7 @@ boolean hasSpaceInCache() {
mlFactory.scheduledExecutor.execute(safeRun(() -> {
// Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark
// percentage limit
long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermak);
long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermark);
long startTime = System.nanoTime();
log.info("Triggering cache eviction. total size: {} Mb -- Need to discard: {} Mb", currentSize / MB,
sizeToEvict / MB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final ManagedLedgerImpl managedLedger;

private final Rate addEntryOps = new Rate();
private final Rate addEntryWithReplicasOps = new Rate();
private final Rate addEntryOpsFailed = new Rate();
private final Rate readEntriesOps = new Rate();
private final Rate readEntriesOpsFailed = new Rate();
Expand All @@ -49,7 +50,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final LongAdder cursorLedgerCreateOp = new LongAdder();
private final LongAdder cursorLedgerDeleteOp = new LongAdder();

// addEntryLatencyStatsUsec measure total latency including time entry spent while waiting in queue
// addEntryLatencyStatsUsec measure total latency including time entry spent while waiting in queue
private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
// ledgerAddEntryLatencyStatsUsec measure latency to persist entry into ledger
private final StatsBuckets ledgerAddEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
Expand All @@ -63,6 +64,7 @@ public ManagedLedgerMBeanImpl(ManagedLedgerImpl managedLedger) {
public void refreshStats(long period, TimeUnit unit) {
double seconds = unit.toMillis(period) / 1000.0;
addEntryOps.calculateRate(seconds);
addEntryWithReplicasOps.calculateRate(seconds);
addEntryOpsFailed.calculateRate(seconds);
readEntriesOps.calculateRate(seconds);
readEntriesOpsFailed.calculateRate(seconds);
Expand All @@ -77,6 +79,7 @@ public void refreshStats(long period, TimeUnit unit) {
public void addAddEntrySample(long size) {
addEntryOps.recordEvent(size);
entryStats.addValue(size);
addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize());
}

public void addMarkDeleteOp() {
Expand Down Expand Up @@ -186,6 +189,11 @@ public double getAddEntryBytesRate() {
return addEntryOps.getValueRate();
}

@Override
public double getAddEntryWithReplicasBytesRate() {
return addEntryWithReplicasOps.getValueRate();
}

@Override
public double getReadEntriesRate() {
return readEntriesOps.getRate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void simple() throws Exception {
}).get();

assertEquals(mbean.getAddEntryBytesRate(), 0.0);
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0);
assertEquals(mbean.getAddEntryMessagesRate(), 0.0);
assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntryErrors(), 0);
Expand Down Expand Up @@ -101,6 +102,7 @@ public void simple() throws Exception {
}).get();

assertEquals(mbean.getAddEntryBytesRate(), 800.0);
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0);
assertEquals(mbean.getAddEntryMessagesRate(), 2.0);
assertEquals(mbean.getAddEntrySucceed(), 2);
assertEquals(mbean.getAddEntryErrors(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ private List<Metrics> aggregate(Map<Metrics, List<ManagedLedgerImpl>> ledgersByD

populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryBytesRate",
lStats.getAddEntryBytesRate());
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryWithReplicasBytesRate",
lStats.getAddEntryWithReplicasBytesRate());
populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_AddEntryErrors",
(double) lStats.getAddEntryErrors());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ private static void printDefaultBrokerStats(SimpleTextOutputStream stream, Strin
metric(stream, cluster, "pulsar_throughput_in", 0);
metric(stream, cluster, "pulsar_throughput_out", 0);
metric(stream, cluster, "pulsar_storage_size", 0);
metric(stream, cluster, "pulsar_storage_logical_size", 0);
metric(stream, cluster, "pulsar_storage_write_rate", 0);
metric(stream, cluster, "pulsar_storage_read_rate", 0);
metric(stream, cluster, "pulsar_msg_backlog", 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,27 +119,27 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());

long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
ledgerWritelatencyBuckets[0]);
ledgerWriteLatencyBuckets[0]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
ledgerWritelatencyBuckets[1]);
ledgerWriteLatencyBuckets[1]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
ledgerWritelatencyBuckets[2]);
ledgerWriteLatencyBuckets[2]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
ledgerWritelatencyBuckets[3]);
ledgerWriteLatencyBuckets[3]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
ledgerWritelatencyBuckets[4]);
ledgerWriteLatencyBuckets[4]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
ledgerWritelatencyBuckets[5]);
ledgerWriteLatencyBuckets[5]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
ledgerWritelatencyBuckets[6]);
ledgerWriteLatencyBuckets[6]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
ledgerWritelatencyBuckets[7]);
ledgerWriteLatencyBuckets[7]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
ledgerWritelatencyBuckets[8]);
ledgerWriteLatencyBuckets[8]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
ledgerWritelatencyBuckets[9]);
ledgerWriteLatencyBuckets[9]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testManagedLedgerMetrics() throws Exception{
metric = metrics.get("pulsar_storage_size");
assertEquals(metric.size(), 3);
metric = metrics.get("pulsar_storage_logical_size");
assertEquals(metric.size(), 2);
assertEquals(metric.size(), 3);
metric = metrics.get("pulsar_storage_backlog_size");
assertEquals(metric.size(), 2);
}
Expand Down
4 changes: 3 additions & 1 deletion site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ All the namespace metrics are labelled with the following labels:
| pulsar_throughput_in | Gauge | The total throughput of the namespace coming into this broker (bytes/second). |
| pulsar_throughput_out | Gauge | The total throughput of the namespace going out from this broker (bytes/second). |
| pulsar_storage_size | Gauge | The total storage size of the topics in this namespace owned by this broker (bytes). |
| pulsar_storage_logical_size | Gauge | The storage size (without accounting for replicas) of the topics in this namespace owned by this broker (bytes). |
| pulsar_storage_logical_size | Gauge | The storage size of topics in the namespace owned by the broker without replicas (in bytes). |
| pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this namespace owned by this broker (messages). |
| pulsar_storage_offloaded_size | Gauge | The total amount of the data in this namespace offloaded to the tiered storage (bytes). |
| pulsar_storage_write_rate | Gauge | The total message batches (entries) written to the storage for this namespace (message batches / second). |
Expand Down Expand Up @@ -191,6 +191,7 @@ All the topic metrics are labelled with the following labels:
| pulsar_throughput_in | Gauge | The total throughput of the topic coming into this broker (bytes/second). |
| pulsar_throughput_out | Gauge | The total throughput of the topic going out from this broker (bytes/second). |
| pulsar_storage_size | Gauge | The total storage size of the topics in this topic owned by this broker (bytes). |
| pulsar_storage_logical_size | Gauge | The storage size of topics in the namespace owned by the broker without replicas (in bytes). |
| pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this topic owned by this broker (messages). |
| pulsar_storage_offloaded_size | Gauge | The total amount of the data in this topic offloaded to the tiered storage (bytes). |
| pulsar_storage_backlog_quota_limit | Gauge | The total amount of the data in this topic that limit the backlog quota (bytes). |
Expand Down Expand Up @@ -248,6 +249,7 @@ All the managedLedger metrics are labelled with the following labels:
| Name | Type | Description |
| --- | --- | --- |
| pulsar_ml_AddEntryBytesRate | Gauge | The bytes/s rate of messages added |
| pulsar_ml_AddEntryWithReplicasBytesRate | Gauge | The bytes/s rate of messages added with replicas |
| pulsar_ml_AddEntryErrors | Gauge | The number of addEntry requests that failed |
| pulsar_ml_AddEntryLatencyBuckets | Histogram | The add entry latency of a ledger with a given quantile (threshold).<br> Available quantile: <br><ul><li> quantile="0.0_0.5" is AddEntryLatency between (0.0ms, 0.5ms]</li> <li>quantile="0.5_1.0" is AddEntryLatency between (0.5ms, 1.0ms]</li><li>quantile="1.0_5.0" is AddEntryLatency between (1ms, 5ms]</li><li>quantile="5.0_10.0" is AddEntryLatency between (5ms, 10ms]</li><li>quantile="10.0_20.0" is AddEntryLatency between (10ms, 20ms]</li><li>quantile="20.0_50.0" is AddEntryLatency between (20ms, 50ms]</li><li>quantile="50.0_100.0" is AddEntryLatency between (50ms, 100ms]</li><li>quantile="100.0_200.0" is AddEntryLatency between (100ms, 200ms]</li><li>quantile="200.0_1000.0" is AddEntryLatency between (200ms, 1s]</li></ul>|
| pulsar_ml_AddEntryLatencyBuckets_OVERFLOW | Gauge | The add entry latency > 1s |
Expand Down

0 comments on commit 92e7825

Please sign in to comment.