Skip to content

Commit

Permalink
[fix][broker] Fix deadlock in broker after race condition in topic cr…
Browse files Browse the repository at this point in the history
…eation failure (apache#15570)

* Fix deadlock in broker after race condition in topic creation failure

* Fixed checkstyle
  • Loading branch information
merlimat authored May 13, 2022
1 parent aa03ccd commit 5db06a1
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1400,9 +1400,10 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
});
persistentTopic.stopReplProducers()
.whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
}, executor());
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
Expand All @@ -1411,10 +1412,10 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
.exceptionally((ex) -> {
log.warn("Replication or dedup check failed."
+ " Removing topic from topics list {}, {}", topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
});
}, executor());
return null;
});
} catch (PulsarServerException e) {
Expand Down

0 comments on commit 5db06a1

Please sign in to comment.