Skip to content

Commit

Permalink
[FLINK-33668][runtime] Make the SortBuffer be able to return unused m…
Browse files Browse the repository at this point in the history
…emory segments
  • Loading branch information
jiangxin369 authored and reswqa committed Mar 8, 2024
1 parent 88138d0 commit ac2ec34
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract class SortBuffer implements DataBuffer {
private final int bufferSize;

/** Number of guaranteed buffers can be allocated from the buffer pool for data sort. */
private final int numGuaranteedBuffers;
private int numGuaranteedBuffers;

// ---------------------------------------------------------------------------------------------
// Statistics and states
Expand Down Expand Up @@ -184,6 +184,27 @@ public boolean append(ByteBuffer source, int targetSubpartition, Buffer.DataType
return false;
}

/**
* Try to release some unused memory segments.
*
* <p>Note that this class is not thread safe, so please make sure to call {@link
* #append(ByteBuffer source, int targetSubpartition, Buffer.DataType dataType)} and this method
* with lock acquired.
*
* @param numFreeSegments the number of segments to be released.
* @return true if released successfully, otherwise false.
*/
public boolean returnFreeSegments(int numFreeSegments) {
if (numFreeSegments < numGuaranteedBuffers - segments.size()) {
for (int i = 0; i < numFreeSegments; i++) {
bufferRecycler.recycle(freeSegments.poll());
}
numGuaranteedBuffers -= numFreeSegments;
return true;
}
return false;
}

private void writeIndex(int subpartitionIndex, int numRecordBytes, Buffer.DataType dataType) {
MemorySegment segment = segments.get(writeSegmentIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public void setup(TriConsumer<TieredStorageSubpartitionId, Buffer, Integer> buff
int newSize = this.memoryManager.getBufferPoolSize();
int oldSize = poolSize.getAndSet(newSize);
if (oldSize > newSize) {
flushCurrentDataBuffer();
if (!returnFreeSegments(oldSize - newSize)) {
flushCurrentDataBuffer();
}
}
},
poolSizeCheckInterval,
Expand Down Expand Up @@ -273,6 +275,16 @@ private synchronized void flushCurrentDataBuffer() {
}
}

private synchronized boolean returnFreeSegments(int numSegments) {
if (currentDataBuffer == null
|| currentDataBuffer.isReleased()
|| !currentDataBuffer.hasRemaining()) {
return false;
} else {
return currentDataBuffer.returnFreeSegments(numSegments);
}
}

private void writeLargeRecord(ByteBuffer record, int subpartitionId, Buffer.DataType dataType) {

checkState(dataType != Buffer.DataType.EVENT_BUFFER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -194,6 +195,33 @@ void testBufferIsRecycledWhenGetEvent() throws Exception {
assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffersForSort);
}

@Test
void testReturnFreeSegments() throws IOException, InterruptedException {
final int bufferPoolSize = 512;
final int numBuffersForSort = 20;
NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE_BYTES);
BufferPool bufferPool =
globalPool.createBufferPool(bufferPoolSize, bufferPoolSize, bufferPoolSize);

LinkedList<MemorySegment> segments = new LinkedList<>();
for (int i = 0; i < numBuffersForSort; ++i) {
segments.add(bufferPool.requestMemorySegmentBlocking());
}
TieredStorageSortBuffer sortBuffer =
new TieredStorageSortBuffer(
segments, bufferPool, 1, BUFFER_SIZE_BYTES, numBuffersForSort, true);

for (int i = 0; i < 5; i++) {
byte[] bytes = new byte[BUFFER_SIZE_BYTES];
ByteBuffer record = ByteBuffer.wrap(bytes);
sortBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
}
assertThat(sortBuffer.returnFreeSegments(10)).isTrue();
assertThat(sortBuffer.returnFreeSegments(10)).isFalse();

sortBuffer.finish();
}

private static BufferWithSubpartition copyIntoSegment(SortBuffer dataBuffer) {
MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE_BYTES);
return dataBuffer.getNextBuffer(segment);
Expand Down

0 comments on commit ac2ec34

Please sign in to comment.