Skip to content

Commit

Permalink
[pulsar-broker]Fix: race condition while deleting global topic (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed May 18, 2019
1 parent 364ed5e commit 78e794a
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,6 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
closeClientFuture.thenAccept(delete -> {
if (USAGE_COUNT_UPDATER.get(this) == 0) {
isFenced = true;

List<CompletableFuture<Void>> futures = Lists.newArrayList();

if (failIfHasSubscriptions) {
Expand Down Expand Up @@ -824,9 +823,14 @@ public void deleteLedgerComplete(Object ctx) {

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
isFenced = false;
log.error("[{}] Error deleting topic", topic, exception);
deleteFuture.completeExceptionally(new PersistenceException(exception));
if (exception.getCause() instanceof KeeperException.NoNodeException) {
log.info("[{}] Topic is already deleted {}", topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
isFenced = false;
log.error("[{}] Error deleting topic", topic, exception);
deleteFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, null);
}
Expand Down Expand Up @@ -984,7 +988,7 @@ public CompletableFuture<Void> checkReplication() {
// doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
configuredClusters);
topic, configuredClusters);
return deleteForcefully();
}

Expand Down

0 comments on commit 78e794a

Please sign in to comment.