Skip to content

Commit

Permalink
[improve][client] support aggregate metrics for partition topic stats (
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Oct 27, 2022
1 parent b061c6a commit 5e3f8ba
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand All @@ -50,6 +51,8 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
private final LongAdder totalBytesSent;
private final LongAdder totalSendFailed;
private final LongAdder totalAcksReceived;
private final DoubleAdder sendMsgsRateAggregate;
private final DoubleAdder sendBytesRateAggregate;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
private final transient DoublesSketch ds;
Expand All @@ -58,6 +61,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {

private volatile double sendMsgsRate;
private volatile double sendBytesRate;
private int partitions = 0;
private volatile double[] latencyPctValues = new double[PERCENTILES.length];
private volatile double[] batchSizePctValues = new double[PERCENTILES.length];
private volatile double[] msgSizePctValues = new double[PERCENTILES.length];
Expand All @@ -73,6 +77,8 @@ public ProducerStatsRecorderImpl() {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
sendMsgsRateAggregate = new DoubleAdder();
sendBytesRateAggregate = new DoubleAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
Expand All @@ -91,6 +97,8 @@ public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, ProducerConfigur
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
sendMsgsRateAggregate = new DoubleAdder();
sendBytesRateAggregate = new DoubleAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
Expand Down Expand Up @@ -239,6 +247,7 @@ void reset() {
totalBytesSent.reset();
totalSendFailed.reset();
totalAcksReceived.reset();
partitions = 0;
}

void updateCumulativeStats(ProducerStats stats) {
Expand All @@ -253,6 +262,10 @@ void updateCumulativeStats(ProducerStats stats) {
totalBytesSent.add(stats.getTotalBytesSent());
totalSendFailed.add(stats.getTotalSendFailed());
totalAcksReceived.add(stats.getTotalAcksReceived());
// update rates
sendMsgsRateAggregate.add(stats.getSendMsgsRate());
sendBytesRateAggregate.add(stats.getSendBytesRate());
partitions++;
}

@Override
Expand Down Expand Up @@ -293,12 +306,12 @@ public long getTotalAcksReceived() {

@Override
public double getSendMsgsRate() {
return sendMsgsRate;
return partitions != 0 ? sendMsgsRateAggregate.doubleValue() / partitions : sendMsgsRate;
}

@Override
public double getSendBytesRate() {
return sendBytesRate;
return partitions != 0 ? sendBytesRateAggregate.doubleValue() / partitions : sendBytesRate;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -74,4 +76,15 @@ public void testGetStatsAndCancelStatsTimeoutWithoutArriveUpdateInterval() {
recorder.cancelStatsTimeout();
assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
}

@Test
public void testPartitionTopicAggegationStats() {
ProducerStatsRecorderImpl recorder1 = spy(new ProducerStatsRecorderImpl());
ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl();
when(recorder1.getSendMsgsRate()).thenReturn(1000.0);
when(recorder1.getSendBytesRate()).thenReturn(1000.0);
recorder2.updateCumulativeStats(recorder1);
assertTrue(recorder2.getSendBytesRate() > 0);
assertTrue(recorder2.getSendMsgsRate() > 0);
}
}

0 comments on commit 5e3f8ba

Please sign in to comment.