Skip to content

Commit

Permalink
Add debug logging when consumer fails to send permits to Broker (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
devinbost authored Apr 28, 2021
1 parent 89a808c commit 8767c8e
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -866,8 +866,19 @@ private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Adding {} additional permits", topic, subscription, numMessages);
}

cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise());
if (log.isDebugEnabled()) {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages))
.addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.debug("Consumer {} failed to send {} permits to broker: {}", consumerId, numMessages,
writeFuture.cause().getMessage());
} else {
log.debug("Consumer {} sent {} permits to broker", consumerId, numMessages);
}
});
} else {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise());
}
}
}

Expand Down

0 comments on commit 8767c8e

Please sign in to comment.