Skip to content

Commit

Permalink
Do not strip ExecutionException from the stack trace (apache#4493)
Browse files Browse the repository at this point in the history
* Do not strip ExecutionException from the stack trace
  • Loading branch information
merlimat authored and codelipenghui committed Jun 18, 2019
1 parent 2f53fb6 commit 571b684
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* Base type of exception thrown by Pulsar client
Expand Down Expand Up @@ -62,12 +63,20 @@ public NotFoundException(Throwable t) {
}

public static class TimeoutException extends PulsarClientException {
public TimeoutException(Throwable t) {
super(t);
}

public TimeoutException(String msg) {
super(msg);
}
}

public static class IncompatibleSchemaException extends PulsarClientException {
public IncompatibleSchemaException(Throwable t) {
super(t);
}

public IncompatibleSchemaException(String msg) {
super(msg);
}
Expand All @@ -86,6 +95,10 @@ public TooManyRequestsException(String msg) {
}

public static class ConnectException extends PulsarClientException {
public ConnectException(Throwable t) {
super(t);
}

public ConnectException(String msg) {
super(msg);
}
Expand Down Expand Up @@ -212,4 +225,76 @@ public CryptoException(String msg) {
super(msg);
}
}

public static PulsarClientException unwrap(Throwable t) {
if (t instanceof PulsarClientException) {
return (PulsarClientException) t;
} else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (!(t instanceof ExecutionException)) {
// Generic exception
return new PulsarClientException(t);
} else if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
return new PulsarClientException(t);
}

// Unwrap the exception to keep the same exception type but a stack trace that includes the application calling
// site
Throwable cause = t.getCause();
String msg = cause.getMessage();
if (cause instanceof TimeoutException) {
return new TimeoutException(msg);
} else if (cause instanceof InvalidConfigurationException) {
return new InvalidConfigurationException(msg);
} else if (cause instanceof AuthenticationException) {
return new AuthenticationException(msg);
} else if (cause instanceof IncompatibleSchemaException) {
return new IncompatibleSchemaException(msg);
} else if (cause instanceof TooManyRequestsException) {
return new TooManyRequestsException(msg);
} else if (cause instanceof LookupException) {
return new LookupException(msg);
} else if (cause instanceof ConnectException) {
return new ConnectException(msg);
} else if (cause instanceof AlreadyClosedException) {
return new AlreadyClosedException(msg);
} else if (cause instanceof TopicTerminatedException) {
return new TopicTerminatedException(msg);
} else if (cause instanceof AuthorizationException) {
return new AuthorizationException(msg);
} else if (cause instanceof GettingAuthenticationDataException) {
return new GettingAuthenticationDataException(msg);
} else if (cause instanceof UnsupportedAuthenticationException) {
return new UnsupportedAuthenticationException(msg);
} else if (cause instanceof BrokerPersistenceException) {
return new BrokerPersistenceException(msg);
} else if (cause instanceof BrokerMetadataException) {
return new BrokerMetadataException(msg);
} else if (cause instanceof ProducerBusyException) {
return new ProducerBusyException(msg);
} else if (cause instanceof ConsumerBusyException) {
return new ConsumerBusyException(msg);
} else if (cause instanceof NotConnectedException) {
return new NotConnectedException();
} else if (cause instanceof InvalidMessageException) {
return new InvalidMessageException(msg);
} else if (cause instanceof InvalidTopicNameException) {
return new InvalidTopicNameException(msg);
} else if (cause instanceof NotSupportedException) {
return new NotSupportedException(msg);
} else if (cause instanceof ProducerQueueIsFullError) {
return new ProducerQueueIsFullError(msg);
} else if (cause instanceof ProducerBlockedQuotaExceededError) {
return new ProducerBlockedQuotaExceededError(msg);
} else if (cause instanceof ProducerBlockedQuotaExceededException) {
return new ProducerBlockedQuotaExceededException(msg);
} else if (cause instanceof ChecksumException) {
return new ChecksumException(msg);
} else if (cause instanceof CryptoException) {
return new CryptoException(msg);
} else {
return new PulsarClientException(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,8 @@ public void acknowledge(Message<?> message) throws PulsarClientException {
public void acknowledge(MessageId messageId) throws PulsarClientException {
try {
acknowledgeAsync(messageId).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand All @@ -206,16 +198,8 @@ public void acknowledgeCumulative(Message<?> message) throws PulsarClientExcepti
public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
try {
acknowledgeCumulativeAsync(messageId).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down Expand Up @@ -264,16 +248,8 @@ abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, Ac
public void unsubscribe() throws PulsarClientException {
try {
unsubscribeAsync().get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand All @@ -284,16 +260,8 @@ public void unsubscribe() throws PulsarClientException {
public void close() throws PulsarClientException {
try {
closeAsync().get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -87,16 +86,8 @@ public ConsumerBuilder<T> clone() {
public Consumer<T> subscribe() throws PulsarClientException {
try {
return subscribeAsync().get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,8 @@ protected Message<T> internalReceive() throws PulsarClientException {
messageProcessed(interceptMsg);
return interceptMsg;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
stats.incrementNumReceiveFailed();
throw new PulsarClientException(e);
throw PulsarClientException.unwrap(e);
}
}

Expand Down Expand Up @@ -363,12 +362,10 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
}
return interceptMsg;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumReceiveFailed();
throw new PulsarClientException(e);
throw PulsarClientException.unwrap(e);
} else {
return null;
}
Expand Down Expand Up @@ -1369,17 +1366,17 @@ private boolean processPossibleToDLQ(MessageIdImpl messageId) {
public void seek(MessageId messageId) throws PulsarClientException {
try {
seekAsync(messageId).get();
} catch (ExecutionException | InterruptedException e) {
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

@Override
public void seek(long timestamp) throws PulsarClientException {
try {
seekAsync(timestamp).get();
} catch (ExecutionException | InterruptedException e) {
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down Expand Up @@ -1457,8 +1454,8 @@ public boolean hasMessageAvailable() throws PulsarClientException {
}

return hasMessageAvailableAsync().get();
} catch (ExecutionException | InterruptedException e) {
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,8 @@ protected Message<T> internalReceive() throws PulsarClientException {
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand All @@ -356,9 +355,8 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down Expand Up @@ -570,21 +568,17 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
public void seek(MessageId messageId) throws PulsarClientException {
try {
seekAsync(messageId).get();
} catch (ExecutionException e) {
throw new PulsarClientException(e.getCause());
} catch (InterruptedException e) {
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

@Override
public void seek(long timestamp) throws PulsarClientException {
try {
seekAsync(timestamp).get();
} catch (ExecutionException e) {
throw new PulsarClientException(e.getCause());
} catch (InterruptedException e) {
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -83,33 +82,17 @@ public MessageId send(Message<T> message) throws PulsarClientException {
}

return sendFuture.get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

@Override
public void flush() throws PulsarClientException {
try {
flushAsync().get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof PulsarClientException) {
throw (PulsarClientException) cause;
} else {
throw new PulsarClientException(cause);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand All @@ -119,16 +102,8 @@ public void flush() throws PulsarClientException {
public void close() throws PulsarClientException {
try {
closeAsync().get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

Expand Down
Loading

0 comments on commit 571b684

Please sign in to comment.