From 5494fe68d867694a1ce14eb613a087b99fcd6509 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Mon, 18 Jul 2022 18:08:35 +0800 Subject: [PATCH] [hotfix] Introduce BufferIndexAndChannel and make HsSpillingStrategy using it instead of BufferWithIdentity. --- .../hybrid/BufferIndexAndChannel.java | 39 +++++++++++++++++++ .../hybrid/HsFullSpillingStrategy.java | 14 +++---- .../hybrid/HsSelectiveSpillingStrategy.java | 6 +-- .../hybrid/HsSpillingInfoProvider.java | 2 +- .../partition/hybrid/HsSpillingStrategy.java | 25 ++++++------ .../hybrid/HsSpillingStrategyUtils.java | 28 ++++++------- .../hybrid/HsFullSpillingStrategyTest.java | 23 ++++++----- .../HsSelectiveSpillingStrategyTest.java | 23 ++++++----- .../hybrid/HsSpillingStrategyTestUtils.java | 23 ++++------- .../hybrid/HsSpillingStrategyUtilsTest.java | 24 ++++++------ .../hybrid/TestingSpillingInfoProvider.java | 14 +++---- 11 files changed, 127 insertions(+), 94 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java new file mode 100644 index 0000000000000..3d76244f0cd4f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +/** Integrate the buffer index and the channel id which it belongs. */ +public class BufferIndexAndChannel { + private final int bufferIndex; + + private final int channel; + + public BufferIndexAndChannel(int bufferIndex, int channel) { + this.bufferIndex = bufferIndex; + this.channel = channel; + } + + public int getBufferIndex() { + return bufferIndex; + } + + public int getChannel() { + return channel; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index bfddc73dc45f7..3c4d58369e062 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -57,7 +57,7 @@ public Optional onBufferFinished(int numTotalUnSpillBuffers) { // For the case of buffer consumed, there is no need to take action for HsFullSpillingStrategy. @Override - public Optional onBufferConsumed(BufferWithIdentity consumedBuffer) { + public Optional onBufferConsumed(BufferIndexAndChannel consumedBuffer) { return Optional.of(Decision.NO_ACTION); } @@ -85,7 +85,7 @@ private void checkSpill(HsSpillingInfoProvider spillingInfoProvider, Decision.Bu return; } // Spill all not spill buffers. - List unSpillBuffers = new ArrayList<>(); + List unSpillBuffers = new ArrayList<>(); for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { unSpillBuffers.addAll( spillingInfoProvider.getBuffersInOrder( @@ -105,13 +105,13 @@ private void checkRelease( int releaseNum = (int) (spillingInfoProvider.getPoolSize() * releaseBufferRatio); // first, release all consumed buffers - TreeMap> consumedBuffersToRelease = new TreeMap<>(); + TreeMap> consumedBuffersToRelease = new TreeMap<>(); int numConsumedBuffers = 0; for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); subpartitionId++) { - Deque consumedSpillSubpartitionBuffers = + Deque consumedSpillSubpartitionBuffers = spillingInfoProvider.getBuffersInOrder( subpartitionId, SpillStatus.SPILL, ConsumeStatus.CONSUMED); numConsumedBuffers += consumedSpillSubpartitionBuffers.size(); @@ -119,9 +119,9 @@ private void checkRelease( } // make up the releaseNum with unconsumed buffers, if needed, w.r.t. the consuming priority - TreeMap> unconsumedBufferToRelease = new TreeMap<>(); + TreeMap> unconsumedBufferToRelease = new TreeMap<>(); if (releaseNum > numConsumedBuffers) { - TreeMap> unconsumedBuffers = new TreeMap<>(); + TreeMap> unconsumedBuffers = new TreeMap<>(); for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); subpartitionId++) { @@ -138,7 +138,7 @@ private void checkRelease( } // collect results in order - List toRelease = new ArrayList<>(); + List toRelease = new ArrayList<>(); for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new ArrayDeque<>())); toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new ArrayList<>())); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java index 6b8f0b2a1d5fe..cd8c6edb8aec9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java @@ -52,7 +52,7 @@ public Optional onBufferFinished(int numTotalUnSpillBuffers) { // For the case of buffer consumed, this buffer need release. The control of the buffer is taken // over by the downstream task. @Override - public Optional onBufferConsumed(BufferWithIdentity consumedBuffer) { + public Optional onBufferConsumed(BufferIndexAndChannel consumedBuffer) { return Optional.of(Decision.builder().addBufferToRelease(consumedBuffer).build()); } @@ -80,7 +80,7 @@ public Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoPr int spillNum = (int) (spillingInfoProvider.getPoolSize() * spillBufferRatio); - TreeMap> subpartitionToBuffers = new TreeMap<>(); + TreeMap> subpartitionToBuffers = new TreeMap<>(); for (int channel = 0; channel < spillingInfoProvider.getNumSubpartitions(); channel++) { subpartitionToBuffers.put( channel, @@ -88,7 +88,7 @@ public Decision decideActionWithGlobalInfo(HsSpillingInfoProvider spillingInfoPr channel, SpillStatus.NOT_SPILL, ConsumeStatus.NOT_CONSUMED)); } - TreeMap> subpartitionToHighPriorityBuffers = + TreeMap> subpartitionToHighPriorityBuffers = getBuffersByConsumptionPriorityInOrder( spillingInfoProvider.getNextBufferIndexToConsume(), subpartitionToBuffers, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java index 01d2dfcee0476..6168c637de406 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java @@ -48,7 +48,7 @@ public interface HsSpillingInfoProvider { * according to bufferIndex from small to large, in other words, head is the buffer with the * minimum bufferIndex in the current subpartition. */ - Deque getBuffersInOrder( + Deque getBuffersInOrder( int subpartitionId, SpillStatus spillStatus, ConsumeStatus consumeStatus); /** Get total number of not decided to spill buffers. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java index 16f75d0cd0b26..01041aaed261d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java @@ -57,7 +57,7 @@ public interface HsSpillingStrategy { * @return A {@link Decision} based on the provided information, or {@link Optional#empty()} if * the decision cannot be made, which indicates global information is needed. */ - Optional onBufferConsumed(BufferWithIdentity consumedBuffer); + Optional onBufferConsumed(BufferIndexAndChannel consumedBuffer); /** * Make a decision based on global information. Because this method will directly touch the @@ -74,25 +74,26 @@ public interface HsSpillingStrategy { */ class Decision { /** A collection of buffer that needs to be spilled to disk. */ - private final List bufferToSpill; + private final List bufferToSpill; /** A collection of buffer that needs to be released. */ - private final List bufferToRelease; + private final List bufferToRelease; public static final Decision NO_ACTION = new Decision(Collections.emptyList(), Collections.emptyList()); private Decision( - List bufferToSpill, List bufferToRelease) { + List bufferToSpill, + List bufferToRelease) { this.bufferToSpill = bufferToSpill; this.bufferToRelease = bufferToRelease; } - public List getBufferToSpill() { + public List getBufferToSpill() { return bufferToSpill; } - public List getBufferToRelease() { + public List getBufferToRelease() { return bufferToRelease; } @@ -103,29 +104,29 @@ public static Builder builder() { /** Builder for {@link Decision}. */ static class Builder { /** A collection of buffer that needs to be spilled to disk. */ - private final List bufferToSpill = new ArrayList<>(); + private final List bufferToSpill = new ArrayList<>(); /** A collection of buffer that needs to be released. */ - private final List bufferToRelease = new ArrayList<>(); + private final List bufferToRelease = new ArrayList<>(); private Builder() {} - public Builder addBufferToSpill(BufferWithIdentity buffer) { + public Builder addBufferToSpill(BufferIndexAndChannel buffer) { bufferToSpill.add(buffer); return this; } - public Builder addBufferToSpill(List buffers) { + public Builder addBufferToSpill(List buffers) { bufferToSpill.addAll(buffers); return this; } - public Builder addBufferToRelease(BufferWithIdentity buffer) { + public Builder addBufferToRelease(BufferIndexAndChannel buffer) { bufferToRelease.add(buffer); return this; } - public Builder addBufferToRelease(List buffers) { + public Builder addBufferToRelease(List buffers) { bufferToRelease.addAll(buffers); return this; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java index b5283aeeb79a4..5dd11719f5730 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java @@ -45,10 +45,11 @@ public class HsSpillingStrategyUtils { * @return mapping for subpartitionId to buffers, the value of map entry must be order by * bufferIndex ascending. */ - public static TreeMap> getBuffersByConsumptionPriorityInOrder( - List nextBufferIndexToConsume, - TreeMap> subpartitionToAllBuffers, - int expectedSize) { + public static TreeMap> + getBuffersByConsumptionPriorityInOrder( + List nextBufferIndexToConsume, + TreeMap> subpartitionToAllBuffers, + int expectedSize) { if (expectedSize <= 0) { return new TreeMap<>(); } @@ -63,17 +64,17 @@ public static TreeMap> getBuffersByConsumption } }); - TreeMap> subpartitionToHighPriorityBuffers = + TreeMap> subpartitionToHighPriorityBuffers = new TreeMap<>(); for (int i = 0; i < expectedSize; i++) { if (heap.isEmpty()) { break; } BufferConsumptionPriorityIterator bufferConsumptionPriorityIterator = heap.poll(); - BufferWithIdentity bufferWithIdentity = bufferConsumptionPriorityIterator.next(); + BufferIndexAndChannel bufferIndexAndChannel = bufferConsumptionPriorityIterator.next(); subpartitionToHighPriorityBuffers - .computeIfAbsent(bufferWithIdentity.getChannelIndex(), ArrayList::new) - .add(bufferWithIdentity); + .computeIfAbsent(bufferIndexAndChannel.getChannel(), ArrayList::new) + .add(bufferIndexAndChannel); // if this iterator has next, re-added it. if (bufferConsumptionPriorityIterator.hasNext()) { heap.add(bufferConsumptionPriorityIterator); @@ -89,24 +90,25 @@ public static TreeMap> getBuffersByConsumption /** * Special {@link Iterator} for hybrid shuffle mode that wrapped a deque of {@link - * BufferWithIdentity}. Tow iterator can compare by compute consumption priority of peek + * BufferIndexAndChannel}. Tow iterator can compare by compute consumption priority of peek * element. */ private static class BufferConsumptionPriorityIterator - implements Comparable, Iterator { + implements Comparable, + Iterator { private final int consumptionProgress; - private final PeekingIterator bufferIterator; + private final PeekingIterator bufferIterator; public BufferConsumptionPriorityIterator( - Deque bufferQueue, int consumptionProgress) { + Deque bufferQueue, int consumptionProgress) { this.consumptionProgress = consumptionProgress; this.bufferIterator = Iterators.peekingIterator(bufferQueue.descendingIterator()); } // move the iterator to next item. - public BufferWithIdentity next() { + public BufferIndexAndChannel next() { return bufferIterator.next(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index d39ad315a057b..eee3ecb3165c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -27,8 +27,7 @@ import java.util.List; import java.util.Optional; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBuffer; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link HsFullSpillingStrategy}. */ @@ -67,9 +66,9 @@ void testOnBufferFinishedUnSpillBufferEqualToOrGreatThenThreshold() { @Test void testOnBufferConsumed() { - BufferWithIdentity bufferWithIdentity = new BufferWithIdentity(createBuffer(), 0, 0); + BufferIndexAndChannel bufferIndexAndChannel = new BufferIndexAndChannel(0, 0); Optional bufferConsumedDecision = - spillStrategy.onBufferConsumed(bufferWithIdentity); + spillStrategy.onBufferConsumed(bufferIndexAndChannel); assertThat(bufferConsumedDecision).hasValue(Decision.NO_ACTION); } @@ -96,16 +95,16 @@ void testDecideActionWithGlobalInfo() { final int progress1 = 10; final int progress2 = 20; - List subpartitionBuffers1 = - createBufferWithIdentitiesList( + List subpartitionBuffers1 = + createBufferIndexAndChannelsList( subpartition1, progress1, progress1 + 2, progress1 + 4, progress1 + 6, progress1 + 8); - List subpartitionBuffers2 = - createBufferWithIdentitiesList( + List subpartitionBuffers2 = + createBufferIndexAndChannelsList( subpartition2, progress2 + 1, progress2 + 3, @@ -132,13 +131,13 @@ void testDecideActionWithGlobalInfo() { Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider); // all not spilled buffers need to spill. - ArrayList expectedSpillBuffers = + ArrayList expectedSpillBuffers = new ArrayList<>(subpartitionBuffers1.subList(4, 5)); expectedSpillBuffers.add(subpartitionBuffers2.get(0)); expectedSpillBuffers.addAll(subpartitionBuffers2.subList(4, 5)); assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers); - ArrayList expectedReleaseBuffers = new ArrayList<>(); + ArrayList expectedReleaseBuffers = new ArrayList<>(); // all consumed spill buffers should release. expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(0, 2)); // priority higher buffers should release. @@ -154,8 +153,8 @@ void testDecideActionWithGlobalInfo() { @Test void testDecideActionWithGlobalInfoAllConsumedSpillBufferShouldRelease() { final int subpartitionId = 0; - List subpartitionBuffers = - createBufferWithIdentitiesList(subpartitionId, 0, 1, 2, 3, 4); + List subpartitionBuffers = + createBufferIndexAndChannelsList(subpartitionId, 0, 1, 2, 3, 4); final int poolSize = 5; TestingSpillingInfoProvider spillInfoProvider = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java index fd5548d641a92..eee7be9f40985 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java @@ -28,8 +28,7 @@ import java.util.List; import java.util.Optional; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBuffer; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link HsSelectiveSpillingStrategy}. */ @@ -55,15 +54,15 @@ void testOnBufferFinished() { @Test void testOnBufferConsumed() { - BufferWithIdentity bufferWithIdentity = new BufferWithIdentity(createBuffer(), 0, 0); - Optional consumedDecision = spillStrategy.onBufferConsumed(bufferWithIdentity); + BufferIndexAndChannel bufferIndexAndChannel = new BufferIndexAndChannel(0, 0); + Optional consumedDecision = spillStrategy.onBufferConsumed(bufferIndexAndChannel); assertThat(consumedDecision) .hasValueSatisfying( (decision -> { assertThat(decision.getBufferToRelease()) .hasSize(1) .element(0) - .isEqualTo(bufferWithIdentity); + .isEqualTo(bufferIndexAndChannel); assertThat(decision.getBufferToSpill()).isEmpty(); })); } @@ -87,14 +86,14 @@ void testOnUsedMemoryHigh() { final int progress2 = 20; final int progress3 = 30; - List subpartitionBuffer1 = - createBufferWithIdentitiesList( + List subpartitionBuffer1 = + createBufferIndexAndChannelsList( subpartition1, progress1 + 0, progress1 + 3, progress1 + 6, progress1 + 9); - List subpartitionBuffer2 = - createBufferWithIdentitiesList( + List subpartitionBuffer2 = + createBufferIndexAndChannelsList( subpartition2, progress2 + 1, progress2 + 4, progress2 + 7); - List subpartitionBuffer3 = - createBufferWithIdentitiesList( + List subpartitionBuffer3 = + createBufferIndexAndChannelsList( subpartition3, progress3 + 2, progress3 + 5, progress3 + 8); final int bufferPoolSize = 10; @@ -121,7 +120,7 @@ void testOnUsedMemoryHigh() { // progress1 + 9 has the highest priority, but it cannot be decided to spill, as its // spillStatus is SPILL. expected buffer's index : progress1 + 6, progress2 + 7, progress3 + // 8 - List expectedBuffers = new ArrayList<>(); + List expectedBuffers = new ArrayList<>(); expectedBuffers.addAll(subpartitionBuffer1.subList(2, 3)); expectedBuffers.addAll(subpartitionBuffer2.subList(2, 3)); expectedBuffers.addAll(subpartitionBuffer3.subList(2, 3)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java index c95646ac10cc1..e959c150f6347 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java @@ -20,7 +20,6 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import java.util.ArrayDeque; @@ -32,30 +31,24 @@ public class HsSpillingStrategyTestUtils { public static final int MEMORY_SEGMENT_SIZE = 128; - public static List createBufferWithIdentitiesList( + public static List createBufferIndexAndChannelsList( int subpartitionId, int... bufferIndexes) { - List bufferWithIdentityList = new ArrayList<>(); + List bufferIndexAndChannels = new ArrayList<>(); for (int bufferIndex : bufferIndexes) { MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE); NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {}); - bufferWithIdentityList.add(new BufferWithIdentity(buffer, bufferIndex, subpartitionId)); + bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId)); } - return bufferWithIdentityList; + return bufferIndexAndChannels; } - public static Deque createBufferWithIdentitiesDeque( + public static Deque createBufferIndexAndChannelsDeque( int subpartitionId, int... bufferIndexes) { - Deque bufferWithIdentityList = new ArrayDeque<>(); + Deque bufferIndexAndChannels = new ArrayDeque<>(); for (int bufferIndex : bufferIndexes) { - Buffer buffer = createBuffer(); - bufferWithIdentityList.add(new BufferWithIdentity(buffer, bufferIndex, subpartitionId)); + bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId)); } - return bufferWithIdentityList; - } - - public static Buffer createBuffer() { - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE); - return new NetworkBuffer(segment, (ignore) -> {}); + return bufferIndexAndChannels; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java index 721699ff8a445..98d3ab7907f79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java @@ -26,18 +26,18 @@ import java.util.List; import java.util.TreeMap; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesDeque; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsDeque; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link HsSpillingStrategyUtils}. */ class HsSpillingStrategyUtilsTest { @Test void testGetBuffersByConsumptionPriorityInOrderEmptyExpectedSize() { - TreeMap> subpartitionToAllBuffers = new TreeMap<>(); - subpartitionToAllBuffers.put(0, createBufferWithIdentitiesDeque(0, 0, 1)); - subpartitionToAllBuffers.put(1, createBufferWithIdentitiesDeque(1, 2, 4)); - TreeMap> buffersByConsumptionPriorityInOrder = + TreeMap> subpartitionToAllBuffers = new TreeMap<>(); + subpartitionToAllBuffers.put(0, createBufferIndexAndChannelsDeque(0, 0, 1)); + subpartitionToAllBuffers.put(1, createBufferIndexAndChannelsDeque(1, 2, 4)); + TreeMap> buffersByConsumptionPriorityInOrder = HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder( Arrays.asList(0, 1), subpartitionToAllBuffers, 0); assertThat(buffersByConsumptionPriorityInOrder).isEmpty(); @@ -51,17 +51,17 @@ void testGetBuffersByConsumptionPriorityInOrder() { final int progress1 = 10; final int progress2 = 20; - TreeMap> subpartitionBuffers = new TreeMap<>(); - List subpartitionBuffers1 = - createBufferWithIdentitiesList( + TreeMap> subpartitionBuffers = new TreeMap<>(); + List subpartitionBuffers1 = + createBufferIndexAndChannelsList( subpartition1, progress1, progress1 + 2, progress1 + 6); - List subpartitionBuffers2 = - createBufferWithIdentitiesList( + List subpartitionBuffers2 = + createBufferIndexAndChannelsList( subpartition2, progress2 + 1, progress2 + 2, progress2 + 5); subpartitionBuffers.put(subpartition1, new ArrayDeque<>(subpartitionBuffers1)); subpartitionBuffers.put(subpartition2, new ArrayDeque<>(subpartitionBuffers2)); - TreeMap> buffersByConsumptionPriorityInOrder = + TreeMap> buffersByConsumptionPriorityInOrder = HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder( Arrays.asList(progress1, progress2), subpartitionBuffers, 5); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java index 21beb5f1ebe74..f8b303479f098 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java @@ -42,7 +42,7 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { private final Supplier getNumSubpartitionsSupplier; - private final Map> allBuffers; + private final Map> allBuffers; private final Map> spillBufferIndexes; @@ -54,7 +54,7 @@ public TestingSpillingInfoProvider( Supplier getNumTotalRequestedBuffersSupplier, Supplier getPoolSizeSupplier, Supplier getNumSubpartitionsSupplier, - Map> allBuffers, + Map> allBuffers, Map> spillBufferIndexes, Map> consumedBufferIndexes) { this.getNextBufferIndexToConsumeSupplier = getNextBufferIndexToConsumeSupplier; @@ -78,11 +78,11 @@ public List getNextBufferIndexToConsume() { } @Override - public Deque getBuffersInOrder( + public Deque getBuffersInOrder( int subpartitionId, SpillStatus spillStatus, ConsumeStatus consumeStatus) { - Deque buffersInOrder = new ArrayDeque<>(); + Deque buffersInOrder = new ArrayDeque<>(); - List subpartitionBuffers = allBuffers.get(subpartitionId); + List subpartitionBuffers = allBuffers.get(subpartitionId); if (subpartitionBuffers == null) { return buffersInOrder; } @@ -159,7 +159,7 @@ public static class Builder { private Supplier getNumSubpartitionsSupplier = () -> 0; - private final Map> allBuffers = new HashMap<>(); + private final Map> allBuffers = new HashMap<>(); private final Map> spillBufferIndexes = new HashMap<>(); @@ -197,7 +197,7 @@ public Builder setGetNumSubpartitionsSupplier( } public Builder addSubpartitionBuffers( - int subpartitionId, List subpartitionBuffers) { + int subpartitionId, List subpartitionBuffers) { allBuffers.computeIfAbsent(subpartitionId, ArrayList::new).addAll(subpartitionBuffers); return this; }