diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java index fbb9837be555a..fec0b55f48c54 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java @@ -57,10 +57,6 @@ public final class HybridMemorySegment extends MemorySegment { @Nullable private final ByteBuffer offHeapBuffer; - /** The cleaner is called to free the underlying native memory. */ - @Nullable - private final Runnable cleaner; - /** * Creates a new memory segment that represents the memory backing the given direct byte buffer. * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)}, @@ -70,13 +66,11 @@ public final class HybridMemorySegment extends MemorySegment { * * @param buffer The byte buffer whose memory is represented by this memory segment. * @param owner The owner references by this memory segment. - * @param cleaner optional action to run upon freeing the segment. * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. */ - HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable Runnable cleaner) { + HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) { super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner); this.offHeapBuffer = buffer; - this.cleaner = cleaner; } /** @@ -90,7 +84,6 @@ public final class HybridMemorySegment extends MemorySegment { HybridMemorySegment(byte[] buffer, Object owner) { super(buffer, owner); this.offHeapBuffer = null; - this.cleaner = null; } // ------------------------------------------------------------------------- @@ -133,14 +126,6 @@ public ByteBuffer wrap(int offset, int length) { } } - @Override - public void free() { - super.free(); - if (cleaner != null) { - cleaner.run(); - } - } - // ------------------------------------------------------------------------ // Random Access get() and put() methods // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index c297a2648047e..26f05825793ba 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -95,7 +95,7 @@ public static MemorySegment allocateUnpooledOffHeapMemory(int size) { */ public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) { ByteBuffer memory = ByteBuffer.allocateDirect(size); - return new HybridMemorySegment(memory, owner, null); + return new HybridMemorySegment(memory, owner); } /** @@ -112,7 +112,8 @@ public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner) { long address = MemoryUtils.allocateUnsafe(size); ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size); - return new HybridMemorySegment(offHeapBuffer, owner, MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address)); + MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address); + return new HybridMemorySegment(offHeapBuffer, owner); } /** @@ -126,7 +127,7 @@ public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner) * @return A new memory segment representing the given off-heap memory. */ public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) { - return new HybridMemorySegment(memory, null, null); + return new HybridMemorySegment(memory, null); } } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java index 3e3e267a44c86..09619cdd2ad97 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java @@ -46,12 +46,12 @@ public void testHybridHeapNullBuffer2() { @Test(expected = NullPointerException.class) public void testHybridOffHeapNullBuffer2() { - new HybridMemorySegment(null, new Object(), () -> {}); + new HybridMemorySegment((ByteBuffer) null, new Object()); } @Test(expected = IllegalArgumentException.class) public void testHybridNonDirectBuffer() { - new HybridMemorySegment(ByteBuffer.allocate(1024), new Object(), () -> {}); + new HybridMemorySegment(ByteBuffer.allocate(1024), new Object()); } @Test(expected = IllegalArgumentException.class)