Skip to content

Commit

Permalink
[pulsar-broker] capture managed-ledger add-latency (apache#4419)
Browse files Browse the repository at this point in the history
### Motivation
With apache#4290 , now broker has capability to capture e2e publish latency since publish-request arrives till it completes. now, we can capture bk-client latency to find out exact break down for broker to bookie latency. Right now broker does capture ml-add latency but ml-add-latency starts timer as soon as add-ops inserted into queue which also adds waiting time at the queue and doesn't give correct broker to bookie latency.


### Modification
To capture broker to bookie latency: start ml-add-ops latency timer when bk-add entry request initiates. 

### Result
with this change: broker is able to provide bookie-persistent latency.

It will add additional metrics: `brk_ml_LedgerAddEntryLatencyBuckets`
```
 "brk_ml_AddEntryErrors": 0.0,
        "brk_ml_AddEntryLatencyBuckets_0.0_0.5": 0.0,
        "brk_ml_AddEntryLatencyBuckets_0.5_1.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_1.0_5.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_10.0_20.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_100.0_200.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_20.0_50.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_200.0_1000.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_5.0_10.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_50.0_100.0": 0.0,
        "brk_ml_AddEntryLatencyBuckets_OVERFLOW": 0.0,
        "brk_ml_AddEntryMessagesRate": 0.0,
        "brk_ml_AddEntrySucceed": 0.0,
        "brk_ml_EntrySizeBuckets_0.0_128.0": 0.0,
        "brk_ml_EntrySizeBuckets_1024.0_2084.0": 0.0,
        "brk_ml_EntrySizeBuckets_102400.0_1232896.0": 0.0,
        "brk_ml_EntrySizeBuckets_128.0_512.0": 0.0,
        "brk_ml_EntrySizeBuckets_16384.0_102400.0": 0.0,
        "brk_ml_EntrySizeBuckets_2084.0_4096.0": 0.0,
        "brk_ml_EntrySizeBuckets_4096.0_16384.0": 0.0,
        "brk_ml_EntrySizeBuckets_512.0_1024.0": 0.0,
        "brk_ml_EntrySizeBuckets_OVERFLOW": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_0.0_0.5": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_0.5_1.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_1.0_5.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_10.0_20.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_100.0_200.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_20.0_50.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_200.0_1000.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_5.0_10.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_50.0_100.0": 0.0,
        "brk_ml_LedgerAddEntryLatencyBuckets_OVERFLOW": 0.0,
```
  • Loading branch information
rdhabalia authored Nov 4, 2020
1 parent 75a26a8 commit 04b6468
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,10 @@ public interface ManagedLedgerMXBean {
StatsBuckets getInternalEntrySizeBuckets();

PendingBookieOpsStats getPendingBookieOpsStats();

double getLedgerAddEntryLatencyAverageUsec();

long[] getLedgerAddEntryLatencyBuckets();

StatsBuckets getInternalLedgerAddEntryLatencyBuckets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final LongAdder cursorLedgerCreateOp = new LongAdder();
private final LongAdder cursorLedgerDeleteOp = new LongAdder();

// addEntryLatencyStatsUsec measure total latency including time entry spent while waiting in queue
private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
// ledgerAddEntryLatencyStatsUsec measure latency to persist entry into ledger
private final StatsBuckets ledgerAddEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
private final StatsBuckets ledgerSwitchLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
private final StatsBuckets entryStats = new StatsBuckets(ENTRY_SIZE_BUCKETS_BYTES);

Expand All @@ -66,6 +69,7 @@ public void refreshStats(long period, TimeUnit unit) {
markDeleteOps.calculateRate(seconds);

addEntryLatencyStatsUsec.refresh();
ledgerAddEntryLatencyStatsUsec.refresh();
ledgerSwitchLatencyStatsUsec.refresh();
entryStats.refresh();
}
Expand All @@ -91,6 +95,10 @@ public void addAddEntryLatencySample(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
}

public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
}

public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
}
Expand Down Expand Up @@ -233,6 +241,16 @@ public long[] getAddEntryLatencyBuckets() {
return addEntryLatencyStatsUsec.getBuckets();
}

@Override
public double getLedgerAddEntryLatencyAverageUsec() {
return ledgerAddEntryLatencyStatsUsec.getAvg();
}

@Override
public long[] getLedgerAddEntryLatencyBuckets() {
return ledgerAddEntryLatencyStatsUsec.getBuckets();
}

@Override
public long[] getLedgerSwitchLatencyBuckets() {
return ledgerSwitchLatencyStatsUsec.getBuckets();
Expand All @@ -243,6 +261,11 @@ public StatsBuckets getInternalAddEntryLatencyBuckets() {
return addEntryLatencyStatsUsec;
}

@Override
public StatsBuckets getInternalLedgerAddEntryLatencyBuckets() {
return ledgerAddEntryLatencyStatsUsec;
}

@Override
public StatsBuckets getInternalEntrySizeBuckets() {
return entryStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {

private void updateLatency() {
ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
ml.mbean.addLedgerAddEntryLatencySample(System.nanoTime() - lastInitTime, TimeUnit.NANOSECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public void simple() throws Exception {
mbean.addAddEntryLatencySample(1, TimeUnit.MILLISECONDS);
mbean.addAddEntryLatencySample(10, TimeUnit.MILLISECONDS);
mbean.addAddEntryLatencySample(1, TimeUnit.SECONDS);

mbean.addLedgerAddEntryLatencySample(1, TimeUnit.MILLISECONDS);
mbean.addLedgerAddEntryLatencySample(10, TimeUnit.MILLISECONDS);
mbean.addLedgerAddEntryLatencySample(1, TimeUnit.SECONDS);

mbean.addLedgerSwitchLatencySample(1, TimeUnit.MILLISECONDS);
mbean.addLedgerSwitchLatencySample(10, TimeUnit.MILLISECONDS);
Expand All @@ -81,6 +85,8 @@ public void simple() throws Exception {

assertEquals(mbean.getAddEntryLatencyBuckets(), new long[] { 0, 1, 0, 1, 0, 0, 0, 0, 1, 0 });
assertEquals(mbean.getAddEntryLatencyAverageUsec(), 337_000.0);
assertEquals(mbean.getLedgerAddEntryLatencyBuckets(), new long[] { 0, 1, 0, 1, 0, 0, 0, 0, 1, 0 });
assertEquals(mbean.getLedgerAddEntryLatencyAverageUsec(), 337_000.0);
assertEquals(mbean.getEntrySizeBuckets(), new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });

assertEquals(mbean.getLedgerSwitchLatencyBuckets(), new long[] { 0, 1, 0, 1, 0, 0, 0, 0, 1, 0 });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private List<Metrics> aggregate(Map<Metrics, List<ManagedLedgerImpl>> ledgersByD
// handle bucket entries initialization here
populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_AddEntryLatencyBuckets",
ENTRY_LATENCY_BUCKETS_MS, lStats.getAddEntryLatencyBuckets());

populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerAddEntryLatencyBuckets",
ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerAddEntryLatencyBuckets());
populateBucketEntries(tempAggregatedMetricsMap, "brk_ml_LedgerSwitchLatencyBuckets",
ENTRY_LATENCY_BUCKETS_MS, lStats.getLedgerSwitchLatencyBuckets());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class AggregatedNamespaceStats {

public StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(
ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
public StatsBuckets storageLedgerWriteLatencyBuckets = new StatsBuckets(
ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
public StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);

public double storageWriteRate;
Expand Down Expand Up @@ -86,6 +88,7 @@ void updateStats(TopicStats stats) {
msgBacklog += stats.msgBacklog;

storageWriteLatencyBuckets.addAll(stats.storageWriteLatencyBuckets);
storageLedgerWriteLatencyBuckets.addAll(stats.storageLedgerWriteLatencyBuckets);
entrySizeBuckets.addAll(stats.entrySizeBuckets);

stats.replicationStats.forEach((n, as) -> {
Expand Down Expand Up @@ -141,6 +144,7 @@ public void reset() {
subscriptionStats.clear();

storageWriteLatencyBuckets.reset();
storageLedgerWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include

stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
stats.storageWriteLatencyBuckets.refresh();
stats.storageLedgerWriteLatencyBuckets.addAll(mlStats.getInternalLedgerAddEntryLatencyBuckets());
stats.storageLedgerWriteLatencyBuckets.refresh();

stats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
stats.entrySizeBuckets.refresh();

Expand Down Expand Up @@ -254,6 +257,23 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
stats.storageWriteLatencyBuckets.getSum());

stats.storageLedgerWriteLatencyBuckets.refresh();
long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow", ledgerWritelatencyBuckets[9]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
stats.storageLedgerWriteLatencyBuckets.getCount());
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
stats.storageLedgerWriteLatencyBuckets.getSum());

stats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TopicStats {
long backlogQuotaLimit;

StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets storageLedgerWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets entrySizeBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
double storageWriteRate;
double storageReadRate;
Expand Down Expand Up @@ -83,6 +84,7 @@ public void reset() {
replicationStats.clear();
subscriptionStats.clear();
storageWriteLatencyBuckets.reset();
storageLedgerWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
}

Expand Down Expand Up @@ -123,6 +125,22 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
stats.storageWriteLatencyBuckets.getSum());

long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow", ledgerWritelatencyBuckets[9]);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
stats.storageLedgerWriteLatencyBuckets.getCount());
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
stats.storageLedgerWriteLatencyBuckets.getSum());

long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
Expand Down

0 comments on commit 04b6468

Please sign in to comment.