Skip to content

Commit

Permalink
[improve][broker] Improve knownBrokers update in ModularLoadManagerIm…
Browse files Browse the repository at this point in the history
…pl (apache#20196)
  • Loading branch information
AnonHxy authored Apr 28, 2023
1 parent 7ccd1ba commit a362eaa
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private long unloadBundleCount = 0;

private final Lock lock = new ReentrantLock();
private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
private final Set<String> knownBrokers = new HashSet<>();
private Map<String, String> bundleBrokerAffinityMap;

/**
Expand Down Expand Up @@ -480,13 +480,11 @@ public void updateAll() {
checkNamespaceBundleSplit();
}

private void cleanupDeadBrokersData() {
private synchronized void cleanupDeadBrokersData() {
final Set<String> activeBrokers = getAvailableBrokers();
final Set<String> knownBrokersCopy = new HashSet<>(this.knownBrokers);
Collection<String> newBrokers = CollectionUtils.subtract(activeBrokers, knownBrokersCopy);
this.knownBrokers.addAll(newBrokers);
Collection<String> deadBrokers = CollectionUtils.subtract(knownBrokersCopy, activeBrokers);
this.knownBrokers.removeAll(deadBrokers);
Collection<String> deadBrokers = CollectionUtils.subtract(knownBrokers, activeBrokers);
this.knownBrokers.clear();
this.knownBrokers.addAll(activeBrokers);
if (pulsar.getLeaderElectionService() != null
&& pulsar.getLeaderElectionService().isLeader()) {
deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync);
Expand Down

0 comments on commit a362eaa

Please sign in to comment.