Skip to content

Commit

Permalink
Log warn message if exception occurs while WebSocket proxy is sending…
Browse files Browse the repository at this point in the history
…/receiving message (apache#1537)
  • Loading branch information
massakam authored and merlimat committed Apr 10, 2018
1 parent 9525047 commit f4300bd
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -181,6 +182,14 @@ public void writeSuccess() {
service.getExecutor().execute(() -> receiveMessage());
}
}).exceptionally(exception -> {
if (exception.getCause() instanceof AlreadyClosedException) {
log.info("[{}/{}] Consumer was closed while receiving msg from broker", consumer.getTopic(),
subscription);
} else {
log.warn("[{}/{}] Error occurred while consumer handler was delivering msg to {}: {}",
consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString(),
exception.getMessage());
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ public void onWebSocketText(String message) {
sendAckResponse(new ProducerAck(messageId, sendRequest.context));
}
}).exceptionally(exception -> {
log.warn("[{}] Error occurred while producer handler was sending msg from {}: {}", producer.getTopic(),
getRemote().getInetSocketAddress().toString(), exception.getMessage());
numMsgsFailed.increment();
sendAckResponse(
new ProducerAck(UnknownError, exception.getMessage(), null, sendRequest.context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -166,8 +167,12 @@ public void writeSuccess() {
service.getExecutor().execute(() -> receiveMessage());
}
}).exceptionally(exception -> {
log.warn("[{}/{}] Failed to deliver msg to {} {}", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), exception);
if (exception.getCause() instanceof AlreadyClosedException) {
log.info("[{}/{}] Reader was closed while receiving msg from broker", reader.getTopic(), subscription);
} else {
log.warn("[{}/{}] Error occurred while reader handler was delivering msg to {}: {}", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), exception.getMessage());
}
return null;
});
}
Expand Down

0 comments on commit f4300bd

Please sign in to comment.