From c59a37b2a718e45866dc5b6b23d8daef9aa1e307 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 28 Oct 2019 08:22:28 -0700 Subject: [PATCH] [pulsar-perf] record message failure and avoid exiting from process on publish failure (#5441) * [pulsar-perf] record message failure and avoid exiting from process on publish failure * add flag to exit on publish failure (default: disable and continue) --- .../pulsar/testclient/PerformanceProducer.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index e976fcc1740dc..216202da98aee 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -81,6 +81,7 @@ public class PerformanceProducer { .newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-producer-exec")); private static final LongAdder messagesSent = new LongAdder(); + private static final LongAdder messagesFailed = new LongAdder(); private static final LongAdder bytesSent = new LongAdder(); private static final LongAdder totalMessagesSent = new LongAdder(); @@ -185,6 +186,10 @@ static class Arguments { @Parameter(names = { "-d", "--delay" }, description = "Mark messages with a given delay in seconds") public long delay = 0; + + @Parameter(names = { "-ef", + "--exit-on-failure" }, description = "Exit from the process on publish failure (default: disable)") + public boolean exitOnFailure = false; } static class EncKeyReader implements CryptoKeyReader { @@ -353,13 +358,15 @@ public static void main(String[] args) throws Exception { double elapsed = (now - oldTime) / 1e9; double rate = messagesSent.sumThenReset() / elapsed; + double failureRate = messagesFailed.sumThenReset() / elapsed; double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8; reportHistogram = recorder.getIntervalHistogram(reportHistogram); log.info( - "Throughput produced: {} msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", + "Throughput produced: {} msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", throughputFormat.format(rate), throughputFormat.format(throughput), + throughputFormat.format(failureRate), dec.format(reportHistogram.getMean() / 1000.0), dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0), dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0), @@ -506,7 +513,10 @@ private static void runProducer(Arguments arguments, } }).exceptionally(ex -> { log.warn("Write error on message", ex); - System.exit(-1); + messagesFailed.increment(); + if (arguments.exitOnFailure) { + System.exit(-1); + } return null; }); }