Skip to content

Commit

Permalink
[Transaction] Fix performance (apache#13253)
Browse files Browse the repository at this point in the history
### Motivation
There is one omission and several irregular log formats.
### Modification
1. messagesSent.increment()
2. log format
  • Loading branch information
liangyepianzhou authored Dec 13, 2021
1 parent e9aae76 commit c531c1c
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ public static void main(String[] args) throws Exception {
dec.format(rateAck));
}
log.info(
"Throughput received: {} msg --- {} msg/s -- {} Mbit/s "
"Throughput received: {} msg --- {} msg/s --- {} Mbit/s "
+ "--- Latency: mean: {} ms - med: {} "
+ "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
intFormat.format(total),
Expand Down Expand Up @@ -622,7 +622,7 @@ private static void printAggregatedThroughput(long start, Arguments arguments) {
}
log.info(
"Aggregated throughput stats --- {} records received --- {} msg/s --- {} Mbit/s"
+ "--- AckRate: {} msg/s --- ack failed {} msg",
+ " --- AckRate: {} msg/s --- ack failed {} msg",
totalMessagesReceived.sum(),
dec.format(rate),
dec.format(throughput),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public static void main(String[] args) throws Exception {
totalTxnOpSuccess = totalEndTxnOpSuccessNum.sum();
totalTxnOpFail = totalEndTxnOpFailNum.sum();
rateOpenTxn = numTxnOpSuccess.sumThenReset() / elapsed;
log.info("--- Transaction : {} transaction end successfully ---{} transaction end failed "
log.info("--- Transaction : {} transaction end successfully --- {} transaction end failed "
+ "--- {} Txn/s",
totalTxnOpSuccess, totalTxnOpFail, totalFormat.format(rateOpenTxn));
}
Expand Down Expand Up @@ -728,6 +728,7 @@ private static void runProducer(int producerId,
PulsarClient pulsarClient = client;
messageBuilder.sendAsync().thenRun(() -> {
bytesSent.add(payloadData.length);
messagesSent.increment();

totalMessagesSent.increment();
totalBytesSent.add(payloadData.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ public static void main(String[] args)
? "Throughput transaction: {} transaction executes --- {} transaction/s"
: "Throughput task: {} task executes --- {} task/s";
log.info(
txnOrTaskLog + " ---send Latency: mean: {} ms - med: {} "
+ "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + "---ack Latency: "
txnOrTaskLog + " --- send Latency: mean: {} ms - med: {} "
+ "- 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}" + " --- ack Latency: "
+ "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
intFormat.format(total),
dec.format(rate),
Expand Down Expand Up @@ -582,8 +582,8 @@ private static void printTxnAggregatedThroughput(long start) {
"Aggregated throughput stats --- {} transaction executed --- {} transaction/s "
+ " --- {} transaction open successfully --- {} transaction open failed"
+ " --- {} transaction end successfully --- {} transaction end failed"
+ "--- {} message ack failed --- {} message send failed"
+ "--- {} message ack success --- {} message send success ",
+ " --- {} message ack failed --- {} message send failed"
+ " --- {} message ack success --- {} message send success ",
total,
dec.format(rate),
numTransactionOpenSuccess,
Expand All @@ -606,9 +606,9 @@ private static void printAggregatedThroughput(long start) {
long numMessageSendFailed = numMessagesSendFailed.sum();
long numMessageSendSuccess = numMessagesSendSuccess.sum();
log.info(
"Aggregated throughput stats --- {} task executed --- {} task/s "
+ "--- {} message ack failed --- {} message send failed"
+ "--- {} message ack success --- {} message send success ",
"Aggregated throughput stats --- {} task executed --- {} task/s"
+ " --- {} message ack failed --- {} message send failed"
+ " --- {} message ack success --- {} message send success",
total,
totalFormat.format(rate),
numMessageAckFailed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testConsumeTxnMessage() throws InterruptedException, PulsarClientExc
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
for (int i = 0; i < 505; i++) {
producer.newMessage().send();
producer.newMessage().value("messages for test transaction consumer".getBytes()).send();
}
Thread thread = new Thread(() -> {
try {
Expand Down

0 comments on commit c531c1c

Please sign in to comment.