Skip to content

Commit

Permalink
[Broker] Fix NPE when subscription is already removed (apache#14363)
Browse files Browse the repository at this point in the history
* [Broker] Fix NPE when subscription is already removed

* Cover same case for NonPersistentTopic

Master Issue: apache#14362

### Motivation

There is current a race condition when we remove a subscription. The race and how to reproduce it is described in the apache#14362. One of the consequences of the race is that there is a chance we try to remove the subscription from the topic twice. This leads to an NPE, as described in the issue.

### Modifications

* Verify that the `sub` is not null before getting its stats.

### Verifying this change

This is a trivial change.

(cherry picked from commit aee1e7d)
  • Loading branch information
michaeljmarshall committed Mar 25, 2022
1 parent 766e5fe commit 2318a18
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1063,10 +1063,12 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
// That creates deadlock. so, execute remove it in different thread.
return CompletableFuture.runAsync(() -> {
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
// preserve accumulative stats form removed subscription
SubscriptionStatsImpl stats = sub.getStats();
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
if (sub != null) {
// preserve accumulative stats form removed subscription
SubscriptionStatsImpl stats = sub.getStats();
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}
}, brokerService.executor());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,10 +1050,12 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {

void removeSubscription(String subscriptionName) {
PersistentSubscription sub = subscriptions.remove(subscriptionName);
// preserve accumulative stats form removed subscription
SubscriptionStatsImpl stats = sub.getStats(false, false, false);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
if (sub != null) {
// preserve accumulative stats form removed subscription
SubscriptionStatsImpl stats = sub.getStats(false, false, false);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}
}

/**
Expand Down

0 comments on commit 2318a18

Please sign in to comment.