Skip to content

Commit

Permalink
[pulsar-broker] add producer/consumer id in error-logging (apache#3961)
Browse files Browse the repository at this point in the history
### Motivation

Log Producer/Consumer Id when broker logs "Producer/consumer is already connected" to help in debugging when client is keep failing to create producer and broker is keep logging  the same error.
  • Loading branch information
rdhabalia authored and sijie committed Apr 8, 2019
1 parent acb1352 commit b41a020
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created: {}", remoteAddress,
consumer);
log.info("[{}] Consumer with the same id {} is already created: {}", remoteAddress,
consumerId, consumer);
ctx.writeAndFlush(Commands.newSuccess(requestId));
return null;
} else {
Expand All @@ -663,8 +663,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress,
topicName, subscriptionName);
log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection", remoteAddress,
topicName, subscriptionName, consumerId);
ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
: getErrorCode(existingConsumerFuture);
ctx.writeAndFlush(Commands.newError(requestId, error,
Expand Down Expand Up @@ -841,8 +841,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (existingProducerFuture != null) {
if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
Producer producer = existingProducerFuture.getNow(null);
log.info("[{}] Producer with the same id is already created: {}", remoteAddress,
producer);
log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress,
producerId, producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName(),
producer.getSchemaVersion()));
return null;
Expand All @@ -856,8 +856,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// either complete or fails.
ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
: getErrorCode(existingProducerFuture);
log.warn("[{}][{}] Producer is already present on the connection", remoteAddress,
topicName);
log.warn("[{}][{}] Producer with id {} is already present on the connection", remoteAddress,
producerId, topicName);
ctx.writeAndFlush(Commands.newError(requestId, error,
"Producer is already present on the connection"));
return null;
Expand Down

0 comments on commit b41a020

Please sign in to comment.