Skip to content

Commit

Permalink
[pulsar-perf] record message failure and avoid exiting from process o…
Browse files Browse the repository at this point in the history
…n publish failure (apache#5441)

* [pulsar-perf] record message failure and avoid exiting from process on publish failure

* add flag to exit on publish failure (default: disable and continue)
  • Loading branch information
rdhabalia authored and merlimat committed Oct 28, 2019
1 parent 0c7319f commit c59a37b
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class PerformanceProducer {
.newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-producer-exec"));

private static final LongAdder messagesSent = new LongAdder();
private static final LongAdder messagesFailed = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();

private static final LongAdder totalMessagesSent = new LongAdder();
Expand Down Expand Up @@ -185,6 +186,10 @@ static class Arguments {
@Parameter(names = { "-d",
"--delay" }, description = "Mark messages with a given delay in seconds")
public long delay = 0;

@Parameter(names = { "-ef",
"--exit-on-failure" }, description = "Exit from the process on publish failure (default: disable)")
public boolean exitOnFailure = false;
}

static class EncKeyReader implements CryptoKeyReader {
Expand Down Expand Up @@ -353,13 +358,15 @@ public static void main(String[] args) throws Exception {
double elapsed = (now - oldTime) / 1e9;

double rate = messagesSent.sumThenReset() / elapsed;
double failureRate = messagesFailed.sumThenReset() / elapsed;
double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;

reportHistogram = recorder.getIntervalHistogram(reportHistogram);

log.info(
"Throughput produced: {} msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
"Throughput produced: {} msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
throughputFormat.format(rate), throughputFormat.format(throughput),
throughputFormat.format(failureRate),
dec.format(reportHistogram.getMean() / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
Expand Down Expand Up @@ -506,7 +513,10 @@ private static void runProducer(Arguments arguments,
}
}).exceptionally(ex -> {
log.warn("Write error on message", ex);
System.exit(-1);
messagesFailed.increment();
if (arguments.exitOnFailure) {
System.exit(-1);
}
return null;
});
}
Expand Down

0 comments on commit c59a37b

Please sign in to comment.