Skip to content

Commit

Permalink
[Client] Continue shutdown of client when closing producers and consu…
Browse files Browse the repository at this point in the history
…mers timeout (apache#15921)

- don't return exception to caller
  • Loading branch information
lhotari authored Jun 3, 2022
1 parent 93c1a4c commit d73fce2
Showing 1 changed file with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,24 +750,23 @@ public CompletableFuture<Void> closeAsync() {
FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
PulsarClientImpl.class, "closeAsync"));
combinedFuture.whenComplete((__, t) -> new Thread(() -> {
combinedFuture.handle((__, t) -> {
if (t != null) {
log.error("Closing producers and consumers failed. Continuing with shutdown.", t);
}
shutdownExecutor.shutdownNow();
// All producers & consumers are now closed, we can stop the client safely
try {
shutdown();
closeFuture.complete(null);
new Thread(() -> {
shutdownExecutor.shutdownNow();
// All producers & consumers are now closed, we can stop the client safely
try {
shutdown();
} catch (PulsarClientException e) {
log.error("Shutdown failed. Ignoring the exception.", e);
}
state.set(State.Closed);
} catch (PulsarClientException e) {
closeFuture.completeExceptionally(e);
}
}, "pulsar-client-shutdown-thread").start()).exceptionally(exception -> {
closeFuture.completeExceptionally(exception);
closeFuture.complete(null);
}, "pulsar-client-shutdown-thread").start();
return null;
});

return closeFuture;
}

Expand Down

0 comments on commit d73fce2

Please sign in to comment.