From 571b68464e25e8e71d3d278f58ba3a3df0c67a5b Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 18 Jun 2019 01:55:43 -0700 Subject: [PATCH] Do not strip ExecutionException from the stack trace (#4493) * Do not strip ExecutionException from the stack trace --- .../client/api/PulsarClientException.java | 85 +++++++++++++++++++ .../pulsar/client/impl/ConsumerBase.java | 48 ++--------- .../client/impl/ConsumerBuilderImpl.java | 13 +-- .../pulsar/client/impl/ConsumerImpl.java | 19 ++--- .../client/impl/MultiTopicsConsumerImpl.java | 22 ++--- .../pulsar/client/impl/ProducerBase.java | 37 ++------ .../client/impl/ProducerBuilderImpl.java | 13 +-- .../pulsar/client/impl/PulsarClientImpl.java | 15 +--- .../pulsar/client/impl/ReaderBuilderImpl.java | 13 +-- .../client/impl/ZeroQueueConsumerImpl.java | 5 +- .../client/impl/auth/AuthenticationBasic.java | 2 +- .../functions/instance/ContextImpl.java | 12 +-- 12 files changed, 130 insertions(+), 154 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 7aa72e1367339..bb681ad18c9ca 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * Base type of exception thrown by Pulsar client @@ -62,12 +63,20 @@ public NotFoundException(Throwable t) { } public static class TimeoutException extends PulsarClientException { + public TimeoutException(Throwable t) { + super(t); + } + public TimeoutException(String msg) { super(msg); } } public static class IncompatibleSchemaException extends PulsarClientException { + public IncompatibleSchemaException(Throwable t) { + super(t); + } + public IncompatibleSchemaException(String msg) { super(msg); } @@ -86,6 +95,10 @@ public TooManyRequestsException(String msg) { } public static class ConnectException extends PulsarClientException { + public ConnectException(Throwable t) { + super(t); + } + public ConnectException(String msg) { super(msg); } @@ -212,4 +225,76 @@ public CryptoException(String msg) { super(msg); } } + + public static PulsarClientException unwrap(Throwable t) { + if (t instanceof PulsarClientException) { + return (PulsarClientException) t; + } else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (!(t instanceof ExecutionException)) { + // Generic exception + return new PulsarClientException(t); + } else if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return new PulsarClientException(t); + } + + // Unwrap the exception to keep the same exception type but a stack trace that includes the application calling + // site + Throwable cause = t.getCause(); + String msg = cause.getMessage(); + if (cause instanceof TimeoutException) { + return new TimeoutException(msg); + } else if (cause instanceof InvalidConfigurationException) { + return new InvalidConfigurationException(msg); + } else if (cause instanceof AuthenticationException) { + return new AuthenticationException(msg); + } else if (cause instanceof IncompatibleSchemaException) { + return new IncompatibleSchemaException(msg); + } else if (cause instanceof TooManyRequestsException) { + return new TooManyRequestsException(msg); + } else if (cause instanceof LookupException) { + return new LookupException(msg); + } else if (cause instanceof ConnectException) { + return new ConnectException(msg); + } else if (cause instanceof AlreadyClosedException) { + return new AlreadyClosedException(msg); + } else if (cause instanceof TopicTerminatedException) { + return new TopicTerminatedException(msg); + } else if (cause instanceof AuthorizationException) { + return new AuthorizationException(msg); + } else if (cause instanceof GettingAuthenticationDataException) { + return new GettingAuthenticationDataException(msg); + } else if (cause instanceof UnsupportedAuthenticationException) { + return new UnsupportedAuthenticationException(msg); + } else if (cause instanceof BrokerPersistenceException) { + return new BrokerPersistenceException(msg); + } else if (cause instanceof BrokerMetadataException) { + return new BrokerMetadataException(msg); + } else if (cause instanceof ProducerBusyException) { + return new ProducerBusyException(msg); + } else if (cause instanceof ConsumerBusyException) { + return new ConsumerBusyException(msg); + } else if (cause instanceof NotConnectedException) { + return new NotConnectedException(); + } else if (cause instanceof InvalidMessageException) { + return new InvalidMessageException(msg); + } else if (cause instanceof InvalidTopicNameException) { + return new InvalidTopicNameException(msg); + } else if (cause instanceof NotSupportedException) { + return new NotSupportedException(msg); + } else if (cause instanceof ProducerQueueIsFullError) { + return new ProducerQueueIsFullError(msg); + } else if (cause instanceof ProducerBlockedQuotaExceededError) { + return new ProducerBlockedQuotaExceededError(msg); + } else if (cause instanceof ProducerBlockedQuotaExceededException) { + return new ProducerBlockedQuotaExceededException(msg); + } else if (cause instanceof ChecksumException) { + return new ChecksumException(msg); + } else if (cause instanceof CryptoException) { + return new CryptoException(msg); + } else { + return new PulsarClientException(t); + } + } } \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index f8f65697f80f8..fe9cf6669661d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -180,16 +180,8 @@ public void acknowledge(Message message) throws PulsarClientException { public void acknowledge(MessageId messageId) throws PulsarClientException { try { acknowledgeAsync(messageId).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -206,16 +198,8 @@ public void acknowledgeCumulative(Message message) throws PulsarClientExcepti public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException { try { acknowledgeCumulativeAsync(messageId).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -264,16 +248,8 @@ abstract protected CompletableFuture doAcknowledge(MessageId messageId, Ac public void unsubscribe() throws PulsarClientException { try { unsubscribeAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -284,16 +260,8 @@ public void unsubscribe() throws PulsarClientException { public void close() throws PulsarClientException { try { closeAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 0df9a89dd027f..691344253d27d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -87,16 +86,8 @@ public ConsumerBuilder clone() { public Consumer subscribe() throws PulsarClientException { try { return subscribeAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f329cd70f8639..a25a98a1d291d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -317,9 +317,8 @@ protected Message internalReceive() throws PulsarClientException { messageProcessed(interceptMsg); return interceptMsg; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); stats.incrementNumReceiveFailed(); - throw new PulsarClientException(e); + throw PulsarClientException.unwrap(e); } } @@ -363,12 +362,10 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarCl } return interceptMsg; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - State state = getState(); if (state != State.Closing && state != State.Closed) { stats.incrementNumReceiveFailed(); - throw new PulsarClientException(e); + throw PulsarClientException.unwrap(e); } else { return null; } @@ -1369,8 +1366,8 @@ private boolean processPossibleToDLQ(MessageIdImpl messageId) { public void seek(MessageId messageId) throws PulsarClientException { try { seekAsync(messageId).get(); - } catch (ExecutionException | InterruptedException e) { - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -1378,8 +1375,8 @@ public void seek(MessageId messageId) throws PulsarClientException { public void seek(long timestamp) throws PulsarClientException { try { seekAsync(timestamp).get(); - } catch (ExecutionException | InterruptedException e) { - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -1457,8 +1454,8 @@ public boolean hasMessageAvailable() throws PulsarClientException { } return hasMessageAvailableAsync().get(); - } catch (ExecutionException | InterruptedException e) { - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f2f0c351339f5..a5e513870650e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -339,9 +339,8 @@ protected Message internalReceive() throws PulsarClientException { unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -356,9 +355,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarCl } resumeReceivingFromPausedConsumersIfNeeded(); return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -570,10 +568,8 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { public void seek(MessageId messageId) throws PulsarClientException { try { seekAsync(messageId).get(); - } catch (ExecutionException e) { - throw new PulsarClientException(e.getCause()); - } catch (InterruptedException e) { - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -581,10 +577,8 @@ public void seek(MessageId messageId) throws PulsarClientException { public void seek(long timestamp) throws PulsarClientException { try { seekAsync(timestamp).get(); - } catch (ExecutionException e) { - throw new PulsarClientException(e.getCause()); - } catch (InterruptedException e) { - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index fa046ca1bad6e..2913efd6a2e5b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -83,16 +82,8 @@ public MessageId send(Message message) throws PulsarClientException { } return sendFuture.get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -100,16 +91,8 @@ public MessageId send(Message message) throws PulsarClientException { public void flush() throws PulsarClientException { try { flushAsync().get(); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof PulsarClientException) { - throw (PulsarClientException) cause; - } else { - throw new PulsarClientException(cause); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -119,16 +102,8 @@ public void flush() throws PulsarClientException { public void close() throws PulsarClientException { try { closeAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 193d0341640e0..888b58e829ec6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -84,16 +83,8 @@ public ProducerBuilder clone() { public Producer create() throws PulsarClientException { try { return createAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 0e58796786b8b..764d25c52010a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -343,7 +343,7 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); } else { consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1, - consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors, + consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors, this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos()); } @@ -519,15 +519,8 @@ public CompletableFuture> getSchema(String topic) { public void close() throws PulsarClientException { try { closeAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } @@ -580,7 +573,7 @@ public void shutdown() throws PulsarClientException { conf.getAuthentication().close(); } catch (Throwable t) { log.warn("Failed to shutdown Pulsar client", t); - throw new PulsarClientException(t); + throw PulsarClientException.unwrap(t); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index dc898cc9f4725..47f71064d0d03 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -66,16 +65,8 @@ public ReaderBuilder clone() { public Reader create() throws PulsarClientException { try { return createAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index b0dc4b357bb28..6beb0ebec17a3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -53,7 +53,7 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS); } - + public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId, Schema schema, @@ -117,9 +117,8 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { stats.updateNumMsgsReceived(message); return message; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); stats.incrementNumReceiveFailed(); - throw new PulsarClientException(e); + throw PulsarClientException.unwrap(e); } finally { // Finally blocked is invoked in case the block on incomingMessages is interrupted waitingOnReceiveForZeroQueueSize = false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java index 37454d2f06c4b..4aecb10da3303 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java @@ -48,7 +48,7 @@ public AuthenticationDataProvider getAuthData() throws PulsarClientException { try { return new AuthenticationDataBasic(userId, password); } catch (Exception e) { - throw new PulsarClientException(e); + throw PulsarClientException.unwrap(e); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index f81c582a1dd8d..e5de503b220fd 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -436,16 +436,8 @@ class MessageBuilderImpl implements TypedMessageBuilder { public MessageId send() throws PulsarClientException { try { return sendAsync().get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); } }