Skip to content

Commit

Permalink
Add more info in the error messages (apache#5714)
Browse files Browse the repository at this point in the history
*Modifications*

Add topic and subscription info in the error message to help locate the problem that which topic or subscription throws the error.
  • Loading branch information
zymap authored and sijie committed Dec 9, 2019
1 parent 5504dc6 commit a8c8a7e
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import java.io.IOException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

/**
Expand Down Expand Up @@ -577,6 +578,75 @@ public CryptoException(String msg) {
}
}

// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
// wrap an exception with new message info
if (t instanceof TimeoutException) {
return new TimeoutException(msg);
} else if (t instanceof InvalidConfigurationException) {
return new InvalidConfigurationException(msg);
} else if (t instanceof AuthenticationException) {
return new AuthenticationException(msg);
} else if (t instanceof IncompatibleSchemaException) {
return new IncompatibleSchemaException(msg);
} else if (t instanceof TooManyRequestsException) {
return new TooManyRequestsException(msg);
} else if (t instanceof LookupException) {
return new LookupException(msg);
} else if (t instanceof ConnectException) {
return new ConnectException(msg);
} else if (t instanceof AlreadyClosedException) {
return new AlreadyClosedException(msg);
} else if (t instanceof TopicTerminatedException) {
return new TopicTerminatedException(msg);
} else if (t instanceof AuthorizationException) {
return new AuthorizationException(msg);
} else if (t instanceof GettingAuthenticationDataException) {
return new GettingAuthenticationDataException(msg);
} else if (t instanceof UnsupportedAuthenticationException) {
return new UnsupportedAuthenticationException(msg);
} else if (t instanceof BrokerPersistenceException) {
return new BrokerPersistenceException(msg);
} else if (t instanceof BrokerMetadataException) {
return new BrokerMetadataException(msg);
} else if (t instanceof ProducerBusyException) {
return new ProducerBusyException(msg);
} else if (t instanceof ConsumerBusyException) {
return new ConsumerBusyException(msg);
} else if (t instanceof NotConnectedException) {
return new NotConnectedException();
} else if (t instanceof InvalidMessageException) {
return new InvalidMessageException(msg);
} else if (t instanceof InvalidTopicNameException) {
return new InvalidTopicNameException(msg);
} else if (t instanceof NotSupportedException) {
return new NotSupportedException(msg);
} else if (t instanceof ProducerQueueIsFullError) {
return new ProducerQueueIsFullError(msg);
} else if (t instanceof ProducerBlockedQuotaExceededError) {
return new ProducerBlockedQuotaExceededError(msg);
} else if (t instanceof ProducerBlockedQuotaExceededException) {
return new ProducerBlockedQuotaExceededException(msg);
} else if (t instanceof ChecksumException) {
return new ChecksumException(msg);
} else if (t instanceof CryptoException) {
return new CryptoException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
return t;
} else if (t instanceof RuntimeException) {
return new RuntimeException(msg, t.getCause());
} else if (t instanceof InterruptedException) {
return t;
} else if (t instanceof ExecutionException) {
return t;
}

return t;
}

public static PulsarClientException unwrap(Throwable t) {
if (t instanceof PulsarClientException) {
return (PulsarClientException) t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
format("Failed to parse partition-response redirect=%s , partitions with %s",
lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage())));
format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s",
lookupDataResult.redirect, topicName.toString(), lookupDataResult.partitions,
e.getMessage())));
}
}).exceptionally((e) -> {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
Expand Down Expand Up @@ -251,8 +252,10 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
}).exceptionally((e) -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
topicsFuture.completeExceptionally(new PulsarClientException
.TimeoutException("Could not getTopicsUnderNamespace within configured timeout."));
topicsFuture.completeExceptionally(
new PulsarClientException.TimeoutException(
format("Could not get topics of namespace %s within configured timeout",
namespace.toString())));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,17 @@ public CompletableFuture<Void> unsubscribeAsync() {
}).exceptionally(e -> {
log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(e.getCause());
unsubscribeFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
topicName.toString(), subscription)));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(new PulsarClientException("Not connected to broker"));
unsubscribeFuture.completeExceptionally(
new PulsarClientException(
String.format("The client is not connected to the broker when unsubscribing the " +
"subscription %s of the topic %s", subscription, topicName.toString())));
}
return unsubscribeFuture;
}
Expand Down Expand Up @@ -623,7 +629,9 @@ && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
// unable to create new consumer, fail operation
setState(State.Failed);
closeConsumerTasks();
subscribeFuture.completeExceptionally(e);
subscribeFuture.completeExceptionally(
PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s with subscription " +
"name %s when connecting to the broker", topicName.toString(), subscription)));
client.cleanupConsumer(this);
} else if (e.getCause() instanceof TopicDoesNotExistException) {
// The topic was deleted after the consumer was created, and we're
Expand Down Expand Up @@ -775,7 +783,10 @@ private void failPendingReceive() {
CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
if (receiveFuture != null) {
receiveFuture.completeExceptionally(
new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
new PulsarClientException.AlreadyClosedException(
String.format("The consumer which subscribes the topic %s with subscription name %s " +
"was already closed when cleaning and closing the consumers",
topicName.toString(), subscription)));
} else {
break;
}
Expand Down Expand Up @@ -1442,11 +1453,15 @@ public void seek(long timestamp) throws PulsarClientException {
public CompletableFuture<Void> seekAsync(long timestamp) {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when seeking the subscription %s of the topic " +
"%s to the timestamp %d", consumerName, subscription, topicName.toString(), timestamp)));
}

if (!isConnected()) {
return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
return FutureUtil.failedFuture(new PulsarClientException(
String.format("The client is not connected to the broker when seeking the subscription %s of the " +
"topic %s to the timestamp %d", subscription, topicName.toString(), timestamp)));
}

final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
Expand All @@ -1466,7 +1481,10 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(e.getCause());
seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to the timestamp %d",
subscription, topicName.toString(), timestamp)));
return null;
});
return seekFuture;
Expand All @@ -1476,11 +1494,16 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
public CompletableFuture<Void> seekAsync(MessageId messageId) {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when seeking the subscription %s of the topic " +
"%s to the message %s", consumerName, subscription, topicName.toString(),
messageId.toString())));
}

if (!isConnected()) {
return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
return FutureUtil.failedFuture(new PulsarClientException(
String.format("The client is not connected to the broker when seeking the subscription %s of the " +
"topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
}

final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
Expand All @@ -1501,7 +1524,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(e.getCause());
seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("[%s][%s] Failed to seek the subscription %s of the topic %s to the message %s",
subscription, topicName.toString(), messageId.toString())));
return null;
});
return seekFuture;
Expand Down Expand Up @@ -1562,8 +1588,10 @@ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastD
public CompletableFuture<MessageId> getLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
}
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when the subscription %s of the topic %s " +
"getting the last message id", consumerName, subscription, topicName.toString())));
}

AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
Expand All @@ -1584,9 +1612,11 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
future.completeExceptionally(new PulsarClientException
.NotSupportedException("GetLastMessageId Not supported for ProtocolVersion: " +
cnx.getRemoteEndpointProtocolVersion()));
future.completeExceptionally(
new PulsarClientException.NotSupportedException(
String.format("The command `GetLastMessageId` is not supported for the protocol version %d. " +
"The consumer is %s, topic %s, subscription %s", cnx.getRemoteEndpointProtocolVersion(),
consumerName, topicName.toString(), subscription)));
}

long requestId = client.newRequestId();
Expand All @@ -1600,14 +1630,19 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
result.getEntryId(), result.getPartition()));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(e.getCause());
future.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("The subscription %s of the topic %s gets the last message id was failed",
subscription, topicName.toString())));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(new PulsarClientException
.TimeoutException("Could not getLastMessageId within configured timeout."));
future.completeExceptionally(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not get the last message id " +
"withing configured timeout", subscription, topicName.toString())));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarCli
.exceptionally(e -> {
log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}",
topicName, numPartitions, e);
subscribeFuture.completeExceptionally(((Throwable)e).getCause());
subscribeFuture.completeExceptionally(
PulsarClientException.wrap(((Throwable) e).getCause(), String.format("Failed to subscribe %s with %d partitions", topicName, numPartitions)));
return null;
});;
return consumer;
Expand Down
Loading

0 comments on commit a8c8a7e

Please sign in to comment.