Skip to content

Commit

Permalink
[FLINK-32410] Replace new HashSet<>(size) with CollectionUtil.newHash…
Browse files Browse the repository at this point in the history
…SetWithExpectedSize(size)
  • Loading branch information
StefanRRichter committed Jun 30, 2023
1 parent ab9445a commit 4532e41
Show file tree
Hide file tree
Showing 17 changed files with 39 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1150,7 +1151,7 @@ private RelNode genGBRelNode(
// Grouping sets: we need to transform them into ImmutableBitSet objects for Calcite
List<ImmutableBitSet> transformedGroupSets = null;
if (hasGroupSets) {
Set<ImmutableBitSet> set = new HashSet<>(groupSets.size());
Set<ImmutableBitSet> set = CollectionUtil.newHashSetWithExpectedSize(groupSets.size());
for (int val : groupSets) {
set.add(convert(val, groupSet.cardinality()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
import org.apache.flink.util.CollectionUtil;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand All @@ -53,7 +53,8 @@ public ArtificialKeyedStateMapper(

this.mapFunction = mapFunction;
this.artificialStateBuilders = artificialStateBuilders;
Set<String> stateNames = new HashSet<>(this.artificialStateBuilders.size());
Set<String> stateNames =
CollectionUtil.newHashSetWithExpectedSize(this.artificialStateBuilders.size());
for (ArtificialStateBuilder<IN> stateBuilder : this.artificialStateBuilders) {
if (!stateNames.add(stateBuilder.getStateName())) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ public PendingCheckpoint(
operatorCoordinatorsToConfirm.isEmpty()
? Collections.emptySet()
: new HashSet<>(operatorCoordinatorsToConfirm);
this.acknowledgedTasks = new HashSet<>(checkpointPlan.getTasksToWaitFor().size());
this.acknowledgedTasks =
CollectionUtil.newHashSetWithExpectedSize(
checkpointPlan.getTasksToWaitFor().size());
this.onCompletionPromise = checkNotNull(onCompletionPromise);
this.pendingCheckpointStats = pendingCheckpointStats;
this.masterTriggerCompletionPromise = checkNotNull(masterTriggerCompletionPromise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.util.IntArrayList;
import org.apache.flink.util.CollectionUtil;

import java.io.Serializable;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;

Expand Down Expand Up @@ -115,7 +115,8 @@ public RescaleMappings invert() {
}

public Set<Integer> getAmbiguousTargets() {
final Set<Integer> ambiguousTargets = new HashSet<>(numberOfTargets);
final Set<Integer> ambiguousTargets =
CollectionUtil.newHashSetWithExpectedSize(numberOfTargets);
final BitSet usedTargets = new BitSet(numberOfTargets);

for (int[] targets : mappings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoader;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
Expand Down Expand Up @@ -402,7 +403,7 @@ private ResolvedClassLoader(
// NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons
// see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS
// -> alternatively, compare their string representation
this.classPaths = new HashSet<>(requiredClassPaths.size());
this.classPaths = CollectionUtil.newHashSetWithExpectedSize(requiredClassPaths.size());
for (URL url : requiredClassPaths) {
classPaths.add(url.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.LongFunctionWithException;
Expand All @@ -34,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -238,7 +238,7 @@ public void allocatePages(Object owner, Collection<MemorySegment> target, int nu
(o, currentSegmentsForOwner) -> {
Set<MemorySegment> segmentsForOwner =
currentSegmentsForOwner == null
? new HashSet<>(numberOfPages)
? CollectionUtil.newHashSetWithExpectedSize(numberOfPages)
: currentSegmentsForOwner;
for (long i = numberOfPages; i > 0; i--) {
MemorySegment segment =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.sort;

import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.util.CollectionUtil;

import java.io.File;
import java.util.HashSet;
Expand All @@ -36,8 +37,8 @@ final class SpillChannelManager implements AutoCloseable {
private volatile boolean closed;

public SpillChannelManager() {
this.channelsToDeleteAtShutdown = new HashSet<>(64);
this.openChannels = new HashSet<>(64);
this.channelsToDeleteAtShutdown = CollectionUtil.newHashSetWithExpectedSize(64);
this.openChannels = CollectionUtil.newHashSetWithExpectedSize(64);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -43,7 +44,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -182,7 +182,7 @@ protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(
*/
private static Collection<String> getAvailableMetrics(
Collection<? extends MetricStore.ComponentMetricStore> stores) {
Set<String> uniqueMetrics = new HashSet<>(32);
Set<String> uniqueMetrics = CollectionUtil.newHashSetWithExpectedSize(32);
for (MetricStore.ComponentMetricStore store : stores) {
uniqueMetrics.addAll(store.metrics.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,8 @@ private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnec
taskSlotTable.getAllocatedSlots(jobId);
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
final Collection<SlotOffer> reservedSlots =
CollectionUtil.newHashSetWithExpectedSize(2);

while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.flink.runtime.taskexecutor.partition;

import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import javax.annotation.concurrent.ThreadSafe;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -54,7 +54,7 @@ public void startTrackingPartitions(K key, Collection<ResultPartitionID> newPart
key,
(ignored, partitionIds) -> {
if (partitionIds == null) {
partitionIds = new HashSet<>(8);
partitionIds = CollectionUtil.newHashSetWithExpectedSize(8);
}
partitionIds.addAll(newPartitionIds);
return partitionIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public boolean allocateSlot(
Set<AllocationID> slots = slotsPerJob.get(jobId);

if (slots == null) {
slots = new HashSet<>(4);
slots = CollectionUtil.newHashSetWithExpectedSize(4);
slotsPerJob.put(jobId, slots);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -155,7 +156,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
"message-acknowledging-source-state",
new JavaSerializer<>()));

this.idsForCurrentCheckpoint = new HashSet<>(64);
this.idsForCurrentCheckpoint = CollectionUtil.newHashSetWithExpectedSize(64);
this.pendingCheckpoints = new ArrayDeque<>();
this.idsProcessedButNotAcknowledged = new HashSet<>();

Expand Down Expand Up @@ -235,7 +236,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {

pendingCheckpoints.addLast(
new Tuple2<>(context.getCheckpointId(), idsForCurrentCheckpoint));
idsForCurrentCheckpoint = new HashSet<>(64);
idsForCurrentCheckpoint = CollectionUtil.newHashSetWithExpectedSize(64);

this.checkpointedState.clear();
this.checkpointedState.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.util.CollectionUtil;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Set;

/** This class represents serialized checkpoint data for a collection of elements. */
Expand Down Expand Up @@ -165,7 +165,7 @@ public static <T> ArrayDeque<Tuple2<Long, Set<T>>> toDeque(
deser.setBuffer(serializedData);
}

final Set<T> ids = new HashSet<>(checkpoint.getNumIds());
final Set<T> ids = CollectionUtil.newHashSetWithExpectedSize(checkpoint.getNumIds());
final int numIds = checkpoint.getNumIds();

for (int i = 0; i < numIds; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.CollectionUtil;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -225,7 +225,7 @@ public final InternalTimersSnapshot<K, N> readTimersSnapshot(DataInputView in)
// read the event time timers
int sizeOfEventTimeTimers = in.readInt();
Set<TimerHeapInternalTimer<K, N>> restoredEventTimers =
new HashSet<>(sizeOfEventTimeTimers);
CollectionUtil.newHashSetWithExpectedSize(sizeOfEventTimeTimers);
if (sizeOfEventTimeTimers > 0) {
for (int i = 0; i < sizeOfEventTimeTimers; i++) {
TimerHeapInternalTimer<K, N> timer = timerSerializer.deserialize(in);
Expand All @@ -237,7 +237,7 @@ public final InternalTimersSnapshot<K, N> readTimersSnapshot(DataInputView in)
// read the processing time timers
int sizeOfProcessingTimeTimers = in.readInt();
Set<TimerHeapInternalTimer<K, N>> restoredProcessingTimers =
new HashSet<>(sizeOfProcessingTimeTimers);
CollectionUtil.newHashSetWithExpectedSize(sizeOfProcessingTimeTimers);
if (sizeOfProcessingTimeTimers > 0) {
for (int i = 0; i < sizeOfProcessingTimeTimers; i++) {
TimerHeapInternalTimer<K, N> timer = timerSerializer.deserialize(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -33,7 +34,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -214,7 +214,7 @@ static class Segment<OUT> {
private final Queue<StreamElementQueueEntry<OUT>> completedElements;

Segment(int initialCapacity) {
incompleteElements = new HashSet<>(initialCapacity);
incompleteElements = CollectionUtil.newHashSetWithExpectedSize(initialCapacity);
completedElements = new ArrayDeque<>(initialCapacity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.operators.sort;

import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.util.CollectionUtil;

import java.io.Closeable;
import java.io.File;
Expand All @@ -37,8 +38,8 @@ public class SpillChannelManager implements Closeable {
private volatile boolean closed;

public SpillChannelManager() {
this.channels = new HashSet<>(64);
this.openChannels = new HashSet<>(64);
this.channels = CollectionUtil.newHashSetWithExpectedSize(64);
this.openChannels = CollectionUtil.newHashSetWithExpectedSize(64);
}

/** Add a new File channel. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ private ApplicationReport startAppMaster(
getFileReplication());

// The files need to be shipped and added to classpath.
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
Set<File> systemShipFiles = CollectionUtil.newHashSetWithExpectedSize(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
Expand Down

0 comments on commit 4532e41

Please sign in to comment.