Skip to content

Commit

Permalink
Add more information in send timeout exception (apache#8931)
Browse files Browse the repository at this point in the history
*Motivation*

Currently the TimeoutException doesn't provide any useful information
for troubleshooting. This change adds more information for troubleshooting.
  • Loading branch information
sijie authored Dec 16, 2020
1 parent 37f3e65 commit c875573
Showing 1 changed file with 50 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand Down Expand Up @@ -749,13 +750,15 @@ private static final class WriteInEventLoopCallback implements Runnable {
private ByteBufPair cmd;
private long sequenceId;
private ClientCnx cnx;
private OpSendMsg op;

static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
WriteInEventLoopCallback c = RECYCLER.get();
c.producer = producer;
c.cnx = cnx;
c.sequenceId = op.sequenceId;
c.cmd = op.cmd;
c.op = op;
return c;
}

Expand All @@ -768,6 +771,7 @@ public void run() {

try {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} finally {
recycle();
}
Expand All @@ -778,6 +782,7 @@ private void recycle() {
cnx = null;
cmd = null;
sequenceId = -1;
op = null;
recyclerHandle.recycle(this);
}

Expand Down Expand Up @@ -836,7 +841,7 @@ public CompletableFuture<Void> closeAsync() {
format("The producer %s of the topic %s was already closed when closing the producers",
producerName, topic));
pendingMessages.forEach(msg -> {
msg.callback.sendComplete(ex);
msg.sendComplete(ex);
msg.cmd.release();
msg.recycle();
});
Expand Down Expand Up @@ -960,7 +965,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le

// Need to protect ourselves from any exception being thrown in the future handler from the
// application
op.callback.sendComplete(null);
op.sendComplete(null);
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
producerName, sequenceId, t);
Expand Down Expand Up @@ -1017,7 +1022,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId)
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
try {
op.callback.sendComplete(
op.sendComplete(
new PulsarClientException.ChecksumException(
format("The checksum of the message which is produced by producer %s to the topic " +
"%s is corrupted", producerName, topic)));
Expand Down Expand Up @@ -1051,7 +1056,7 @@ protected synchronized void recoverNotAllowedError(long sequenceId) {
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
try {
op.callback.sendComplete(
op.sendComplete(
new PulsarClientException.NotAllowedException(
format("The size of the message which is produced by producer %s to the topic " +
"%s is not allowed", producerName, topic)));
Expand Down Expand Up @@ -1113,6 +1118,9 @@ protected static final class OpSendMsg {
Runnable rePopulate;
long sequenceId;
long createdAt;
long firstSentAt;
long lastSentAt;
int retryCount;
long batchSizeByte = 0;
int numMessagesInBatch = 1;
long highestSequenceId;
Expand Down Expand Up @@ -1151,6 +1159,38 @@ static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long lowestS
return op;
}

void updateSentTimestamp() {
this.lastSentAt = System.nanoTime();
if (this.firstSentAt == -1L) {
this.firstSentAt = this.lastSentAt;
}
++this.retryCount;
}

void sendComplete(final Exception e) {
SendCallback callback = this.callback;
if (null != callback) {
Exception finalEx = e;
if (finalEx != null && finalEx instanceof TimeoutException) {
TimeoutException te = (TimeoutException) e;
long sequenceId = te.getSequenceId();
long ns = System.nanoTime();
String errMsg = String.format(
"%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s",
te.getMessage(),
ns - this.createdAt,
ns - this.firstSentAt,
ns - this.lastSentAt,
retryCount
);

finalEx = new TimeoutException(errMsg, sequenceId);
}

callback.sendComplete(finalEx);
}
}

void recycle() {
msg = null;
msgs = null;
Expand All @@ -1159,6 +1199,8 @@ void recycle() {
rePopulate = null;
sequenceId = -1L;
createdAt = -1L;
firstSentAt = -1L;
lastSentAt = -1L;
highestSequenceId = -1L;
totalChunks = 0;
chunkId = -1;
Expand Down Expand Up @@ -1538,7 +1580,7 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
op.callback.sendComplete(ex);
op.sendComplete(ex);
}
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
Expand Down Expand Up @@ -1662,13 +1704,13 @@ protected void processOpSendMsg(OpSendMsg op) {
Thread.currentThread().interrupt();
releaseSemaphoreForSendOp(op);
if (op != null) {
op.callback.sendComplete(new PulsarClientException(ie, op.sequenceId));
op.sendComplete(new PulsarClientException(ie, op.sequenceId));
}
} catch (Throwable t) {
releaseSemaphoreForSendOp(op);
log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
if (op != null) {
op.callback.sendComplete(new PulsarClientException(t, op.sequenceId));
op.sendComplete(new PulsarClientException(t, op.sequenceId));
}
}
}
Expand Down Expand Up @@ -1711,6 +1753,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from) {
cnx.channel(), op.sequenceId);
}
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
}
cnx.ctx().flush();
Expand Down

0 comments on commit c875573

Please sign in to comment.