Skip to content

Commit

Permalink
support shrink for map or set (apache#14663)
Browse files Browse the repository at this point in the history
* support shrink for map or set

* check style

* check style
  • Loading branch information
lordcheng10 authored Mar 14, 2022
1 parent fe7e55d commit 1d10dff
Show file tree
Hide file tree
Showing 29 changed files with 860 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>();
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
ConcurrentOpenHashMap.<String, Long>newBuilder().build();

final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ public static void fillNamespaceToBundlesMap(final Set<String> bundles,
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
target.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
});
}

Expand Down Expand Up @@ -263,8 +265,12 @@ public static void removeMostServicingBrokersForNamespace(

for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size();
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
Expand All @@ -276,8 +282,12 @@ public static void removeMostServicingBrokersForNamespace(

final int finalLeastBundles = leastBundles;
candidates.removeIf(
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder().build())
.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.size() > finalLeastBundles);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
Expand Down Expand Up @@ -583,7 +586,10 @@ private void updateBundleData() {
brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
.computeIfAbsent(broker, k ->
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
Expand Down Expand Up @@ -866,9 +872,13 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
.computeIfAbsent(broker.get(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
return broker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ public SimpleLoadManagerImpl() {
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
brokerToNamespaceToBundleRange =
ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
.build();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
Expand Down Expand Up @@ -851,8 +854,12 @@ private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, Resource
// same broker.
brokerToNamespaceToBundleRange
.computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build())
.computeIfAbsent(namespaceName, k ->
ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
Expand Down Expand Up @@ -1271,7 +1278,10 @@ private synchronized void updateBrokerToNamespaceToBundle() {
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
.computeIfAbsent(broker.replace("http://", ""),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public NamespaceService(PulsarService pulsar) {
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
this.namespaceClients = new ConcurrentOpenHashMap<>();
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.localPoliciesCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalPolicies.class);
Expand Down Expand Up @@ -356,9 +357,15 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
}

private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative = new ConcurrentOpenHashMap<>();
findingBundlesAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>();
findingBundlesNotAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();

/**
* Main internal method to lookup and setup ownership of service unit to a broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ private CompletableFuture<Void> lookUpBrokerForTopic(TopicName partitionedTopicN
partitionedTopicName, result.getLookupData());
}
pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
.getPartitionedTopicName(), (key) -> new ConcurrentOpenHashSet<Integer>())
.getPartitionedTopicName(),
(key) -> ConcurrentOpenHashSet.<Integer>newBuilder().build())
.add(partitionedTopicName.getPartitionIndex());
completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,28 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
this.clusterAdmins = new ConcurrentOpenHashMap<>();
this.topics =
ConcurrentOpenHashMap.<String, CompletableFuture<Optional<Topic>>>newBuilder()
.build();
this.replicationClients =
ConcurrentOpenHashMap.<String, PulsarClient>newBuilder().build();
this.clusterAdmins =
ConcurrentOpenHashMap.<String, PulsarAdmin>newBuilder().build();
this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
this.configRegisteredListeners =
ConcurrentOpenHashMap.<String, Consumer<?>>newBuilder().build();
this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();

this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
this.owningTopics = new ConcurrentOpenHashMap<>();
this.multiLayerTopicsMap = ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>>newBuilder()
.build();
this.owningTopics = ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<Integer>>newBuilder()
.build();
this.pulsarStats = new PulsarStats(pulsar);
this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
this.offlineTopicStatCache =
ConcurrentOpenHashMap.<TopicName,
PersistentOfflineTopicStats>newBuilder().build();

this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
Expand Down Expand Up @@ -327,7 +338,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers = new ConcurrentOpenHashSet<>();
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
Expand Down Expand Up @@ -1589,8 +1601,12 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
synchronized (multiLayerTopicsMap) {
String serviceUnit = namespaceBundle.toString();
multiLayerTopicsMap //
.computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) //
.computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) //
.computeIfAbsent(topicName.getNamespace(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashMap<String, Topic>>newBuilder()
.build()) //
.computeIfAbsent(serviceUnit,
k -> ConcurrentOpenHashMap.<String, Topic>newBuilder().build()) //
.put(topicName.toString(), topic);
}
}
Expand Down Expand Up @@ -2412,7 +2428,8 @@ public static boolean validateDynamicConfiguration(String key, String value) {
}

private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
Expand All @@ -2425,7 +2442,8 @@ private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigur
}

private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>();
ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap =
ConcurrentOpenHashMap.<String, Object>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,16 @@ public void reset() {
public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);

this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.subscriptions =
ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators =
ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.isFenced = false;
registerTopicPolicyListener();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,20 @@ public MessageDupUnknownException() {
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,14 @@ public void reset() {
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
super(topic, brokerService);
this.ledger = ledger;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
registerTopicPolicyListener();
Expand Down Expand Up @@ -346,8 +352,14 @@ public CompletableFuture<Void> initialize() {
super(topic, brokerService);
this.ledger = ledger;
this.messageDeduplication = messageDeduplication;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
Expand Down
Loading

0 comments on commit 1d10dff

Please sign in to comment.