Skip to content

Commit

Permalink
Showing 10 changed files with 267 additions and 279 deletions.
1 change: 0 additions & 1 deletion flink-runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -443,7 +443,6 @@ under the License.
**/runtime/jobmanager/**,
**/runtime/jobmaster/**,
**/runtime/leaderelection/**,
**/runtime/memory/**,
**/runtime/messages/**,
**/runtime/minicluster/**,
**/runtime/operators/**,
Original file line number Diff line number Diff line change
@@ -16,46 +16,45 @@
* limitations under the License.
*/


package org.apache.flink.runtime.memory;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;

import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;


/**
* The base class for all input views that are backed by multiple memory pages. This base class contains all
* decoding methods to read data from a page and detect page boundary crossing. The concrete sub classes must
* implement the methods to provide the next memory page once the boundary is crossed.
*/
public abstract class AbstractPagedInputView implements DataInputView {

private MemorySegment currentSegment;

protected final int headerLength; // the number of bytes to skip at the beginning of each segment

private int positionInSegment; // the offset in the current segment

private int limitInSegment; // the limit in the current segment before switching to the next

private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding


// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------

/**
* Creates a new view that starts with the given segment. The input starts directly after the header
* of the given page. If the header size is zero, it starts at the beginning. The specified initial
* limit describes up to which position data may be read from the current segment, before the view must
* advance to the next segment.
*
*
* @param initialSegment The memory segment to start reading from.
* @param initialLimit The position one after the last valid byte in the initial segment.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header. This
@@ -66,14 +65,14 @@ protected AbstractPagedInputView(MemorySegment initialSegment, int initialLimit,
this.positionInSegment = headerLength;
seekInput(initialSegment, headerLength, initialLimit);
}

/**
* Creates a new view that is initially not bound to a memory segment. This constructor is typically
* for views that always seek first.
* <p>
* WARNING: The view is not readable until the first call to either {@link #advance()},
*
* <p>WARNING: The view is not readable until the first call to either {@link #advance()},
* or to {@link #seekInput(MemorySegment, int, int)}.
*
*
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
protected AbstractPagedInputView(int headerLength) {
@@ -83,73 +82,73 @@ protected AbstractPagedInputView(int headerLength) {
// --------------------------------------------------------------------------------------------
// Page Management
// --------------------------------------------------------------------------------------------

/**
* Gets the memory segment that will be used to read the next bytes from. If the segment is exactly exhausted,
* meaning that the last byte read was the last byte available in the segment, then this segment will
* not serve the next bytes. The segment to serve the next bytes will be obtained through the
* {@link #nextSegment(MemorySegment)} method.
*
*
* @return The current memory segment.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}

/**
* Gets the position from which the next byte will be read. If that position is equal to the current limit,
* then the next byte will be read from next segment.
*
*
* @return The position from which the next byte will be read.
* @see #getCurrentSegmentLimit()
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}

/**
* Gets the current limit in the memory segment. This value points to the byte one after the last valid byte
* in the memory segment.
*
*
* @return The current limit in the memory segment.
* @see #getCurrentPositionInSegment()
*/
public int getCurrentSegmentLimit() {
return this.limitInSegment;
}

/**
* The method by which concrete subclasses realize page crossing. This method is invoked when the current page
* is exhausted and a new page is required to continue the reading. If no further page is available, this
* method must throw an {@link EOFException}.
*
*
* @param current The current page that was read to its limit. May be {@code null}, if this method is
* invoked for the first time.
* @return The next page from which the reading should continue. May not be {@code null}. If the input is
* exhausted, an {@link EOFException} must be thrown instead.
*
*
* @throws EOFException Thrown, if no further segment is available.
* @throws IOException Thrown, if the method cannot provide the next page due to an I/O related problem.
*/
protected abstract MemorySegment nextSegment(MemorySegment current) throws EOFException, IOException;

/**
* Gets the limit for reading bytes from the given memory segment. This method must return the position
* of the byte after the last valid byte in the given memory segment. When the position returned by this
* method is reached, the view will attempt to switch to the next memory segment.
*
*
* @param segment The segment to determine the limit for.
* @return The limit for the given memory segment.
*/
protected abstract int getLimitForSegment(MemorySegment segment);

/**
* Advances the view to the next memory segment. The reading will continue after the header of the next
* segment. This method uses {@link #nextSegment(MemorySegment)} and {@link #getLimitForSegment(MemorySegment)}
* to get the next segment and set its limit.
*
*
* @throws IOException Thrown, if the next segment could not be obtained.
*
*
* @see #nextSegment(MemorySegment)
* @see #getLimitForSegment(MemorySegment)
*/
@@ -160,11 +159,11 @@ protected final void advance() throws IOException {
this.limitInSegment = getLimitForSegment(this.currentSegment);
this.positionInSegment = this.headerLength;
}

/**
* Sets the internal state of the view such that the next bytes will be read from the given memory segment,
* starting at the given position. The memory segment will provide bytes up to the given limit position.
*
*
* @param segment The segment to read the next bytes from.
* @param positionInSegment The position in the segment to start reading from.
* @param limitInSegment The limit in the segment. When reached, the view will attempt to switch to
@@ -175,7 +174,7 @@ protected void seekInput(MemorySegment segment, int positionInSegment, int limit
this.positionInSegment = positionInSegment;
this.limitInSegment = limitInSegment;
}

/**
* Clears the internal state of the view. After this call, all read attempts will fail, until the
* {@link #advance()} or {@link #seekInput(MemorySegment, int, int)} method have been invoked.
@@ -185,14 +184,14 @@ protected void clear() {
this.positionInSegment = this.headerLength;
this.limitInSegment = headerLength;
}

// --------------------------------------------------------------------------------------------
// Data Input Specific methods
// --------------------------------------------------------------------------------------------

@Override
public int read(byte[] b) throws IOException{
return read(b,0,b.length);
return read(b, 0, b.length);
}

@Override
@@ -220,7 +219,7 @@ public int read(byte[] b, int off, int len) throws IOException{

int bytesRead = 0;
while (true) {
int toRead = Math.min(remaining, len-bytesRead);
int toRead = Math.min(remaining, len - bytesRead);
this.currentSegment.get(this.positionInSegment, b, off, toRead);
off += toRead;
bytesRead += toRead;
@@ -243,17 +242,17 @@ public int read(byte[] b, int off, int len) throws IOException{
return len;
}
}

@Override
public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
}

@Override
public void readFully(byte[] b, int off, int len) throws IOException {
int bytesRead = read(b,off,len);
int bytesRead = read(b, off, len);

if(bytesRead < len){
if (bytesRead < len){
throw new EOFException("There is no enough data left in the DataInputView.");
}
}
@@ -384,7 +383,7 @@ public double readDouble() throws IOException {
@Override
public String readLine() throws IOException {
final StringBuilder bld = new StringBuilder(32);

try {
int b;
while ((b = readUnsignedByte()) != '\n') {
@@ -398,7 +397,7 @@ public String readLine() throws IOException {
if (bld.length() == 0) {
return null;
}

// trim a trailing carriage return
int len = bld.length();
if (len > 0 && bld.charAt(len - 1) == '\r') {
@@ -410,10 +409,10 @@ public String readLine() throws IOException {
@Override
public String readUTF() throws IOException {
final int utflen = readUnsignedShort();

final byte[] bytearr;
final char[] chararr;

if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
bytearr = new byte[utflen];
this.utfByteBuffer = bytearr;
@@ -429,7 +428,7 @@ public String readUTF() throws IOException {

int c, char2, char3;
int count = 0;
int chararr_count = 0;
int chararrCount = 0;

readFully(bytearr, 0, utflen);

@@ -439,7 +438,7 @@ public String readUTF() throws IOException {
break;
}
count++;
chararr[chararr_count++] = (char) c;
chararr[chararrCount++] = (char) c;
}

while (count < utflen) {
@@ -455,7 +454,7 @@ public String readUTF() throws IOException {
case 7:
/* 0xxxxxxx */
count++;
chararr[chararr_count++] = (char) c;
chararr[chararrCount++] = (char) c;
break;
case 12:
case 13:
@@ -468,7 +467,7 @@ public String readUTF() throws IOException {
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
}
chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
@@ -481,23 +480,23 @@ public String readUTF() throws IOException {
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
}
chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
break;
default:
/* 10xx xxxx, 1111 xxxx */
throw new UTFDataFormatException("malformed input around byte " + count);
}
}
// The number of chars produced may be less than utflen
return new String(chararr, 0, chararr_count);
return new String(chararr, 0, chararrCount);
}

@Override
public int skipBytes(int n) throws IOException {
if (n < 0) {
throw new IllegalArgumentException();
}

int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= n) {
this.positionInSegment += n;
@@ -512,20 +511,20 @@ public int skipBytes(int n) throws IOException {
}
remaining = this.limitInSegment - this.positionInSegment;
}

int skipped = 0;
while (true) {
int toSkip = Math.min(remaining, n);
n -= toSkip;
skipped += toSkip;

if (n > 0) {
try {
advance();
} catch (EOFException eofex) {
return skipped;
}
remaining = this.limitInSegment - this.positionInSegment;
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toSkip;
@@ -541,7 +540,7 @@ public void skipBytesToRead(int numBytes) throws IOException {
if (numBytes < 0) {
throw new IllegalArgumentException();
}

int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= numBytes) {
this.positionInSegment += numBytes;
@@ -551,12 +550,12 @@ public void skipBytesToRead(int numBytes) throws IOException {
advance();
remaining = this.limitInSegment - this.positionInSegment;
}

while (true) {
if (numBytes > remaining) {
numBytes -= remaining;
advance();
remaining = this.limitInSegment - this.positionInSegment;
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += numBytes;
Original file line number Diff line number Diff line change
@@ -16,46 +16,45 @@
* limitations under the License.
*/


package org.apache.flink.runtime.memory;

import java.io.IOException;
import java.io.UTFDataFormatException;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;

import java.io.IOException;
import java.io.UTFDataFormatException;


/**
* The base class for all output views that are backed by multiple memory pages. This base class contains all
* encoding methods to write data to a page and detect page boundary crossing. The concrete sub classes must
* implement the methods to collect the current page and provide the next memory page once the boundary is crossed.
* <p>
* The paging assumes that all memory segments are of the same size.
*
* <p>The paging assumes that all memory segments are of the same size.
*/
public abstract class AbstractPagedOutputView implements DataOutputView {

private MemorySegment currentSegment; // the current memory segment to write to

protected final int segmentSize; // the size of the memory segments

protected final int headerLength; // the number of bytes to skip at the beginning of each segment

private int positionInSegment; // the offset in the current segment

private byte[] utfBuffer; // the reusable array for UTF encodings


// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------

/**
* Creates a new output view that writes initially to the given initial segment. All segments in the
* view have to be of the given {@code segmentSize}. A header of length {@code headerLength} is left
* at the beginning of each segment.
*
*
* @param initialSegment The segment that the view starts writing to.
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
@@ -69,105 +68,103 @@ protected AbstractPagedOutputView(MemorySegment initialSegment, int segmentSize,
this.currentSegment = initialSegment;
this.positionInSegment = headerLength;
}

/**
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
protected AbstractPagedOutputView(int segmentSize, int headerLength)
{
protected AbstractPagedOutputView(int segmentSize, int headerLength) {
this.segmentSize = segmentSize;
this.headerLength = headerLength;
}


// --------------------------------------------------------------------------------------------
// Page Management
// --------------------------------------------------------------------------------------------

/**
*
* This method must return a segment. If no more segments are available, it must throw an
* {@link java.io.EOFException}.
*
*
* @param current The current memory segment
* @param positionInCurrent The position in the segment, one after the last valid byte.
* @return The next memory segment.
*
* @return The next memory segment.
*
* @throws IOException
*/
protected abstract MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException;


/**
* Gets the segment that the view currently writes to.
*
*
* @return The segment the view currently writes to.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}

/**
* Gets the current write position (the position where the next bytes will be written)
* in the current memory segment.
*
*
* @return The current write offset in the current memory segment.
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}

/**
* Gets the size of the segments used by this view.
*
*
* @return The memory segment size.
*/
public int getSegmentSize() {
return this.segmentSize;
}

/**
* Moves the output view to the next page. This method invokes internally the
* {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass'
* {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass'
* implementation and obtain the next segment to write to. Writing will continue inside the new segment
* after the header.
*
*
* @throws IOException Thrown, if the current segment could not be processed or a new segment could not
* be obtained.
* be obtained.
*/
protected void advance() throws IOException {
this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
this.positionInSegment = this.headerLength;
}

/**
* Sets the internal state to the given memory segment and the given position within the segment.
*
* Sets the internal state to the given memory segment and the given position within the segment.
*
* @param seg The memory segment to write the next bytes to.
* @param position The position to start writing the next bytes to.
*/
protected void seekOutput(MemorySegment seg, int position) {
this.currentSegment = seg;
this.positionInSegment = position;
}

/**
* Clears the internal state. Any successive write calls will fail until either {@link #advance()} or
* {@link #seekOutput(MemorySegment, int)} is called.
*
* {@link #seekOutput(MemorySegment, int)} is called.
*
* @see #advance()
* @see #seekOutput(MemorySegment, int)
*/
protected void clear() {
this.currentSegment = null;
this.positionInSegment = this.headerLength;
}

// --------------------------------------------------------------------------------------------
// Data Output Specific methods
// --------------------------------------------------------------------------------------------

@Override
public void write(int b) throws IOException {
writeByte(b);
@@ -195,11 +192,11 @@ public void write(byte[] b, int off, int len) throws IOException {
this.currentSegment.put(this.positionInSegment, b, off, toPut);
off += toPut;
len -= toPut;

if (len > 0) {
this.positionInSegment = this.segmentSize;
advance();
remaining = this.segmentSize - this.positionInSegment;
remaining = this.segmentSize - this.positionInSegment;
}
else {
this.positionInSegment += toPut;
@@ -349,7 +346,7 @@ public void writeUTF(String str) throws IOException {
final byte[] bytearr = this.utfBuffer;

bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ( utflen & 0xFF);
bytearr[count++] = (byte) (utflen & 0xFF);

int i;
for (i = 0; i < strlen; i++) {
@@ -368,10 +365,10 @@ public void writeUTF(String str) throws IOException {
} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ( c & 0x3F));
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ( c & 0x3F));
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
}
}

@@ -401,13 +398,13 @@ public void write(DataInputView source, int numBytes) throws IOException {
this.positionInSegment += numBytes;
return;
}

if (remaining > 0) {
this.currentSegment.put(source, this.positionInSegment, remaining);
this.positionInSegment = this.segmentSize;
numBytes -= remaining;
}

advance();
}
}
Original file line number Diff line number Diff line change
@@ -16,26 +16,23 @@
* limitations under the License.
*/


package org.apache.flink.runtime.memory;

import java.util.List;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;

import java.util.List;

/**
* Simple memory segment source that draws segments from a list.
*
*
*/
public class ListMemorySegmentSource implements MemorySegmentSource
{
public class ListMemorySegmentSource implements MemorySegmentSource {
private final List<MemorySegment> segments;

public ListMemorySegmentSource(final List<MemorySegment> memorySegments) {
this.segments = memorySegments;
}


@Override
public MemorySegment nextSegment() {
Original file line number Diff line number Diff line change
@@ -16,14 +16,13 @@
* limitations under the License.
*/


package org.apache.flink.runtime.memory;

/**
* An exception to be thrown when a memory allocation operation is not successful.
*/
public class MemoryAllocationException extends Exception {

private static final long serialVersionUID = -403983866457947012L;

public MemoryAllocationException() {
Original file line number Diff line number Diff line change
@@ -18,6 +18,15 @@

package org.apache.flink.runtime.memory;

import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.HybridMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.util.MathUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -29,24 +38,16 @@
import java.util.List;
import java.util.Set;

import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.HybridMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The memory manager governs the memory that Flink uses for sorting, hashing, and caching. Memory
* is represented in segments of equal size. Operators allocate the memory by requesting a number
* of memory segments.
* <p>
* The memory may be represented as on-heap byte arrays ({@link HeapMemorySegment}), or as off-heap
*
* <p>The memory may be represented as on-heap byte arrays ({@link HeapMemorySegment}), or as off-heap
* memory regions ({@link HybridMemorySegment}). Which kind of memory the MemoryManager serves can
* be passed as an argument to the initialization.
* <p>
* The memory manager can either pre-allocate all memory, or allocate the memory on demand. In the
*
* <p>The memory manager can either pre-allocate all memory, or allocate the memory on demand. In the
* former version, memory will be occupied and reserved from start on, which means that no OutOfMemoryError
* can come while requesting memory. Released memory will also return to the MemoryManager's pool.
* On-demand allocation means that the memory manager only keeps track how many memory segments are
@@ -69,35 +70,35 @@ public class MemoryManager {

/** The memory pool from which we draw memory segments. Specific to on-heap or off-heap memory */
private final MemoryPool memoryPool;
/** Memory segments allocated per memory owner */

/** Memory segments allocated per memory owner. */
private final HashMap<Object, Set<MemorySegment>> allocatedSegments;

/** The type of memory governed by this memory manager */
/** The type of memory governed by this memory manager. */
private final MemoryType memoryType;
/** mask used to round down sizes to multiples of the page size */

/** Mask used to round down sizes to multiples of the page size. */
private final long roundingMask;

/** The size of the memory segments */
/** The size of the memory segments. */
private final int pageSize;

/** The initial total size, for verification. */
private final int totalNumPages;

/** The total size of the memory managed by this memory manager */
/** The total size of the memory managed by this memory manager. */
private final long memorySize;

/** Number of slots of the task manager */
/** Number of slots of the task manager. */
private final int numberOfSlots;

/** Flag marking whether the memory manager immediately allocates the memory */
/** Flag marking whether the memory manager immediately allocates the memory. */
private final boolean isPreAllocated;

/** The number of memory pages that have not been allocated and are available for lazy allocation */
/** The number of memory pages that have not been allocated and are available for lazy allocation. */
private int numNonAllocatedPages;

/** flag whether the close() has already been invoked */
/** Flag whether the close() has already been invoked. */
private boolean isShutDown;


@@ -160,13 +161,13 @@ public MemoryManager(long memorySize, int numberOfSlots, int pageSize,

this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;

switch (memoryType) {
case HEAP:
this.memoryPool = new HeapMemoryPool(memToAllocate, pageSize);
break;
case OFF_HEAP:
if(!preAllocateMemory) {
if (!preAllocateMemory) {
LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
" the memory type 'taskmanager.memory.off-heap' is set to true.");
}
@@ -189,8 +190,7 @@ public MemoryManager(long memorySize, int numberOfSlots, int pageSize,
*/
public void shutdown() {
// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock)
{
synchronized (lock) {
if (!isShutDown) {
// mark as shutdown and release memory
isShutDown = true;
@@ -202,7 +202,7 @@ public void shutdown() {
seg.free();
}
}

memoryPool.clear();
}
}
@@ -242,7 +242,7 @@ public boolean verifyEmpty() {
*
* @param owner The owner to associate with the memory segment, for the fallback release.
* @param numPages The number of pages to allocate.
* @return A list with the memory segments.
* @return A list with the memory segments.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
*/
@@ -256,16 +256,15 @@ public List<MemorySegment> allocatePages(Object owner, int numPages) throws Memo
* Allocates a set of memory segments from this memory manager. If the memory manager pre-allocated the
* segments, they will be taken from the pool of memory segments. Otherwise, they will be allocated
* as part of this call.
*
*
* @param owner The owner to associate with the memory segment, for the fallback release.
* @param target The list into which to put the allocated memory pages.
* @param numPages The number of pages to allocate.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
*/
public void allocatePages(Object owner, List<MemorySegment> target, int numPages)
throws MemoryAllocationException
{
throws MemoryAllocationException {
// sanity check
if (owner == null) {
throw new IllegalArgumentException("The memory owner must not be null.");
@@ -277,8 +276,7 @@ public void allocatePages(Object owner, List<MemorySegment> target, int numPages
}

// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock)
{
synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
@@ -319,8 +317,8 @@ public void allocatePages(Object owner, List<MemorySegment> target, int numPages
/**
* Tries to release the memory for the specified segment. If the segment has already been released or
* is null, the request is simply ignored.
* <p>
* If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
*
* <p>If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
* Otherwise, the segment is only freed and made eligible for reclamation by the GC.
*
* @param segment The segment to be released.
@@ -333,10 +331,9 @@ public void release(MemorySegment segment) {
}

final Object owner = segment.getOwner();

// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock)
{
synchronized (lock) {
// prevent double return to this memory manager
if (segment.isFreed()) {
return;
@@ -374,10 +371,10 @@ public void release(MemorySegment segment) {

/**
* Tries to release many memory segments together.
* <p>
* If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
*
* <p>If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
* Otherwise, the segment is only freed and made eligible for reclamation by the GC.
*
*
* @param segments The segments to be released.
* @throws NullPointerException Thrown, if the given collection is null.
* @throws IllegalArgumentException Thrown, id the segments are of an incompatible type.
@@ -388,8 +385,7 @@ public void release(Collection<MemorySegment> segments) {
}

// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock)
{
synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
@@ -459,7 +455,7 @@ public void release(Collection<MemorySegment> segments) {
}

/**
* Releases all memory segments for the given owner.
* Releases all memory segments for the given owner.
*
* @param owner The owner memory segments are to be released.
*/
@@ -469,8 +465,7 @@ public void releaseAll(Object owner) {
}

// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock)
{
synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
@@ -507,7 +502,7 @@ public void releaseAll(Object owner) {

/**
* Gets the type of memory (heap / off-heap) managed by this memory manager.
*
*
* @return The type of memory managed by this memory manager.
*/
public MemoryType getMemoryType() {
@@ -556,14 +551,14 @@ public int getTotalNumPages() {
* than the page size) is not included.
*
* @param fraction the fraction of the total memory per slot
* @return The number of pages to which
* @return The number of pages to which
*/
public int computeNumberOfPages(double fraction) {
if (fraction <= 0 || fraction > 1) {
throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
}

return (int)(totalNumPages * fraction / numberOfSlots);
return (int) (totalNumPages * fraction / numberOfSlots);
}

/**
@@ -584,13 +579,13 @@ public long computeMemorySize(double fraction) {
public long roundDownToPageSizeMultiple(long numBytes) {
return numBytes & roundingMask;
}


// ------------------------------------------------------------------------
// Memory Pools
// ------------------------------------------------------------------------

static abstract class MemoryPool {
abstract static class MemoryPool {

abstract int getNumberOfAvailableMemorySegments();

@@ -599,21 +594,21 @@ static abstract class MemoryPool {
abstract MemorySegment requestSegmentFromPool(Object owner);

abstract void returnSegmentToPool(MemorySegment segment);

abstract void clear();
}

static final class HeapMemoryPool extends MemoryPool {

/** The collection of available memory segments */
/** The collection of available memory segments. */
private final ArrayDeque<byte[]> availableMemory;

private final int segmentSize;

public HeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<byte[]>(numInitialSegments);
this.segmentSize = segmentSize;

for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(new byte[segmentSize]);
}
@@ -652,10 +647,10 @@ void clear() {
availableMemory.clear();
}
}

static final class HybridOffHeapMemoryPool extends MemoryPool {

/** The collection of available memory segments */
/** The collection of available memory segments. */
private final ArrayDeque<ByteBuffer> availableMemory;

private final int segmentSize;
Original file line number Diff line number Diff line change
@@ -20,15 +20,18 @@

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;

import org.junit.Test;

import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;

import static org.junit.Assert.*;

import static org.junit.Assert.fail;

/**
* Validate memory release under concurrent modification exceptions.
*/
public class MemoryManagerConcurrentModReleaseTest {

@Test
@@ -49,48 +52,47 @@ public void testConcurrentModificationOnce() {
fail(e.getMessage());
}
}

@Test
public void testConcurrentModificationWhileReleasing() {
try {
final int numSegments = 10000;
final int segmentSize = 4096;

MemoryManager memMan = new MemoryManager(numSegments * segmentSize, 1, segmentSize, MemoryType.HEAP, true);

ArrayList<MemorySegment> segs = new ArrayList<>(numSegments);
memMan.allocatePages(this, segs, numSegments);

// start a thread that performs concurrent modifications
Modifier mod = new Modifier(segs);
Thread modRunner = new Thread(mod);
modRunner.start();

// give the thread some time to start working
Thread.sleep(500);

try {
memMan.release(segs);
}
finally {
mod.cancel();
}

modRunner.join();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private class Modifier implements Runnable {

private final ArrayList<MemorySegment> toModify;

private volatile boolean running = true;


private Modifier(ArrayList<MemorySegment> toModify) {
this.toModify = toModify;
}
@@ -112,13 +114,13 @@ public void run() {
}
}
}

private class ListWithConcModExceptionOnFirstAccess<E> extends ArrayList<E> {

private static final long serialVersionUID = -1623249699823349781L;

private boolean returnedIterator;

@Override
public Iterator<E> iterator() {
if (returnedIterator) {
@@ -130,8 +132,7 @@ public Iterator<E> iterator() {
}
}
}



private class ConcFailingIterator<E> implements Iterator<E> {

@Override
Original file line number Diff line number Diff line change
@@ -38,20 +38,19 @@
* Tests for the memory manager, in the mode where it pre-allocates all memory.
*/
public class MemoryManagerLazyAllocationTest {

private static final long RANDOM_SEED = 643196033469871L;

private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes

private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes

private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;

private MemoryManager memoryManager;

private Random random;


@Before
public void setUp() {
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, false);
@@ -72,7 +71,7 @@ public void allocateAllSingle() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segments = new ArrayList<MemorySegment>();

try {
for (int i = 0; i < NUM_PAGES; i++) {
segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
@@ -81,7 +80,7 @@ public void allocateAllSingle() {
catch (MemoryAllocationException e) {
fail("Unable to allocate memory");
}

for (MemorySegment seg : segments) {
this.memoryManager.release(seg);
}
@@ -91,59 +90,59 @@ public void allocateAllSingle() {
fail(e.getMessage());
}
}

@Test
public void allocateAllMulti() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
final List<MemorySegment> segments = new ArrayList<MemorySegment>();

try {
for(int i = 0; i < NUM_PAGES / 2; i++) {
for (int i = 0; i < NUM_PAGES / 2; i++) {
segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
}
} catch (MemoryAllocationException e) {
Assert.fail("Unable to allocate memory");
}

this.memoryManager.release(segments);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void allocateMultipleOwners() {
final int NUM_OWNERS = 17;
final int numOwners = 17;

try {
AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
AbstractInvokable[] owners = new AbstractInvokable[numOwners];

@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
for (int i = 0; i < NUM_OWNERS; i++) {
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}

// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(NUM_OWNERS);
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}

// free one owner at a time
for (int i = 0; i < NUM_OWNERS; i++) {
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;

// check that the owner owners were not affected
for (int k = i+1; k < NUM_OWNERS; k++) {
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
@@ -153,32 +152,32 @@ public void allocateMultipleOwners() {
fail(e.getMessage());
}
}

@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();

List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);

try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}

Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));

this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (seg.isFreed()) {
@@ -187,7 +186,7 @@ private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
}
return true;
}

private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (!seg.isFreed()) {
Original file line number Diff line number Diff line change
@@ -18,40 +18,39 @@

package org.apache.flink.runtime.memory;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.junit.Assert;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import static org.junit.Assert.fail;

/**
* Tests for the memory manager, in the mode where it pre-allocates all memory.
*/
public class MemoryManagerTest {

private static final long RANDOM_SEED = 643196033469871L;

private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes

private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes

private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;

private MemoryManager memoryManager;

private Random random;


@Before
public void setUp() {
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
@@ -72,7 +71,7 @@ public void allocateAllSingle() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segments = new ArrayList<MemorySegment>();

try {
for (int i = 0; i < NUM_PAGES; i++) {
segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
@@ -81,67 +80,67 @@ public void allocateAllSingle() {
catch (MemoryAllocationException e) {
fail("Unable to allocate memory");
}

this.memoryManager.release(segments);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void allocateAllMulti() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
final List<MemorySegment> segments = new ArrayList<MemorySegment>();

try {
for(int i = 0; i < NUM_PAGES / 2; i++) {
for (int i = 0; i < NUM_PAGES / 2; i++) {
segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
}
} catch (MemoryAllocationException e) {
Assert.fail("Unable to allocate memory");
}

this.memoryManager.release(segments);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void allocateMultipleOwners() {
final int NUM_OWNERS = 17;
final int numOwners = 17;

try {
AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
AbstractInvokable[] owners = new AbstractInvokable[numOwners];

@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
for (int i = 0; i < NUM_OWNERS; i++) {
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}

// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(NUM_OWNERS);
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}

// free one owner at a time
for (int i = 0; i < NUM_OWNERS; i++) {
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;

// check that the owner owners were not affected
for (int k = i+1; k < NUM_OWNERS; k++) {
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
@@ -151,32 +150,32 @@ public void allocateMultipleOwners() {
fail(e.getMessage());
}
}

@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();

List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);

try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}

Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));

this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (seg.isFreed()) {
@@ -185,7 +184,7 @@ private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
}
return true;
}

private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (!seg.isFreed()) {
Original file line number Diff line number Diff line change
@@ -18,26 +18,29 @@

package org.apache.flink.runtime.memory;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.ByteBuffer;
import java.util.Random;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.core.memory.MemorySegment;

import org.junit.Assert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.Random;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Test reading and writing primitive types to {@link MemorySegment}.
*/
public class MemorySegmentSimpleTest {

public static final long RANDOM_SEED = 643196033469871L;

public static final int MANAGED_MEMORY_SIZE = 1024 * 1024 * 16;
@@ -67,7 +70,7 @@ public void tearDown() {
this.manager.release(this.segment);
this.random = null;
this.segment = null;

if (!this.manager.verifyEmpty()) {
Assert.fail("Not all memory has been properly released.");
}
@@ -430,7 +433,7 @@ public void longAccess() {
assertEquals(random.nextLong(), segment.getLong(i));
}
}

// test unaligned offsets
{
final long seed = random.nextLong();
@@ -440,7 +443,7 @@ public void longAccess() {
long value = random.nextLong();
segment.putLong(offset, value);
}

random.setSeed(seed);
for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
long shouldValue = random.nextLong();
@@ -547,22 +550,22 @@ public void shortAccess() {
}
}
}

@Test
public void testByteBufferWrapping() {
try {
MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);

ByteBuffer buf1 = seg.wrap(13, 47);
assertEquals(13, buf1.position());
assertEquals(60, buf1.limit());
assertEquals(47, buf1.remaining());

ByteBuffer buf2 = seg.wrap(500, 267);
assertEquals(500, buf2.position());
assertEquals(767, buf2.limit());
assertEquals(267, buf2.remaining());

ByteBuffer buf3 = seg.wrap(0, 1024);
assertEquals(0, buf3.position());
assertEquals(1024, buf3.limit());

0 comments on commit 7c150a6

Please sign in to comment.