From 77ea9b8340169ca3fd36a7e282d596cf256a8709 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Fri, 10 Jul 2020 21:43:20 -0700 Subject: [PATCH] Fix: Exception being ignored in PulsarAdmin (#7510) Co-authored-by: Jerry Peng --- .../client/admin/internal/FunctionsImpl.java | 61 +++++++++++++------ .../client/admin/internal/SinksImpl.java | 8 +++ .../client/admin/internal/SourcesImpl.java | 8 +++ 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 577f635c589b6..a536d85cff8f0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -357,6 +357,10 @@ public CompletableFuture createFunctionAsync(FunctionConfig functionConfig } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { @@ -472,6 +476,10 @@ public CompletableFuture updateFunctionAsync( } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { future.completeExceptionally(getApiException(e)); @@ -755,6 +763,10 @@ public CompletableFuture uploadFunctionAsync(String sourceFile, String pat } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { future.completeExceptionally(getApiException(e)); @@ -844,24 +856,29 @@ public void onThrowable(Throwable t) { } }).toCompletableFuture(); - statusFuture.thenAccept(status -> { - try { - os.close(); - } catch (Exception e) { - future.completeExceptionally(getApiException(e)); - return; - } - - if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { - future.completeExceptionally( - getApiException(Response - .status(status.getStatusCode()) - .entity(status.getStatusText()) - .build())); - } else { - future.complete(null); - } - }); + statusFuture + .thenAccept(status -> { + try { + os.close(); + } catch (Exception e) { + future.completeExceptionally(getApiException(e)); + return; + } + + if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { + future.completeExceptionally( + getApiException(Response + .status(status.getStatusCode()) + .entity(status.getStatusText()) + .build())); + } else { + future.complete(null); + } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; + }); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -978,6 +995,10 @@ public CompletableFuture putFunctionStateAsync( } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { @@ -1025,6 +1046,10 @@ public CompletableFuture updateOnWorkerLeaderAsync(String tenant, String n } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java index 4a37c8bbed4ca..a4398234009ff 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java @@ -259,6 +259,10 @@ public CompletableFuture createSinkAsync(SinkConfig sinkConfig, String fil } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { future.completeExceptionally(getApiException(e)); @@ -358,6 +362,10 @@ public CompletableFuture updateSinkAsync( } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { future.completeExceptionally(getApiException(e)); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java index 14459dce0b76b..155bb545a7fbc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java @@ -260,6 +260,10 @@ public CompletableFuture createSourceAsync(SourceConfig sourceConfig, Stri } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { future.completeExceptionally(getApiException(e)); @@ -360,6 +364,10 @@ public CompletableFuture updateSourceAsync( } else { future.complete(null); } + }) + .exceptionally(throwable -> { + future.completeExceptionally(getApiException(throwable)); + return null; }); } catch (Exception e) { future.completeExceptionally(getApiException(e));