Skip to content

Commit

Permalink
[FLINK-1320] [core] Add an off-heap variant of the managed memory
Browse files Browse the repository at this point in the history
This closes apache#1093
  • Loading branch information
StephanEwen committed Sep 8, 2015
1 parent 1800434 commit 655a891
Show file tree
Hide file tree
Showing 195 changed files with 14,373 additions and 3,243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";

/**
* The key for the config parameter defining whether the memory manager allocates memory lazy.
* The fraction of off-heap memory relative to the heap size.
*/
public static final String TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY = "taskmanager.memory.lazyalloc";
public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = "taskmanager.memory.off-heap-ratio";

/**
* The config parameter defining the memory allocation method (JVM heap or off-heap).
*/
public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";

/**
* The config parameter defining the number of buffers used in the network stack. This defines the
Expand Down Expand Up @@ -542,6 +547,11 @@ public final class ConfigConstants {
*/
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;

/**
* The default ratio of heap to off-heap memory, when the TaskManager is started with off-heap memory.
*/
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO = 3.0f;

/**
* Default number of buffers used in the network stack.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.memory;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

/**
* This class represents a piece of heap memory managed by Flink.
* The segment is backed by a byte array and features random put and get methods for the basic types,
* as well as compare and swap methods.
* <p>
* This class specialized byte access and byte copy calls for heap memory, while reusing the
* multi-byte type accesses and cross-segment operations from the MemorySegment.
* <p>
* Note that memory segments should usually not be allocated manually, but rather through the
* {@link MemorySegmentFactory}.
*/
public final class HeapMemorySegment extends MemorySegment {

/** An extra reference to the heap memory, so we can let byte array checks fail
* by the built-in checks automatically without extra checks */
private byte[] memory;

/**
* Creates a new memory segment that represents the data in the given byte array.
* The owner of this memory segment is null.
*
* @param memory The byte array that holds the data.
*/
HeapMemorySegment(byte[] memory) {
this(memory, null);
}

/**
* Creates a new memory segment that represents the data in the given byte array.
* The memory segment references the given owner.
*
* @param memory The byte array that holds the data.
* @param owner The owner referenced by the memory segment.
*/
HeapMemorySegment(byte[] memory, Object owner) {
super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}

// -------------------------------------------------------------------------
// MemorySegment operations
// -------------------------------------------------------------------------

@Override
public void free() {
super.free();
this.memory = null;
}

@Override
public ByteBuffer wrap(int offset, int length) {
try {
return ByteBuffer.wrap(this.memory, offset, length);
}
catch (NullPointerException e) {
throw new IllegalStateException("segment has been freed");
}
}

/**
* Gets the byte array that backs this memory segment.
*
* @return The byte array that backs this memory segment, or null, if the segment has been freed.
*/
public byte[] getArray() {
return this.heapMemory;
}

// ------------------------------------------------------------------------
// Random Access get() and put() methods
// ------------------------------------------------------------------------

@Override
public final byte get(int index) {
return this.memory[index];
}

@Override
public final void put(int index, byte b) {
this.memory[index] = b;
}

@Override
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}

@Override
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}

@Override
public final void get(int index, byte[] dst, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(this.memory, index, dst, offset, length);
}

@Override
public final void put(int index, byte[] src, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(src, offset, this.memory, index, length);
}

@Override
public final boolean getBoolean(int index) {
return this.memory[index] != 0;
}

@Override
public final void putBoolean(int index, boolean value) {
this.memory[index] = (byte) (value ? 1 : 0);
}

// -------------------------------------------------------------------------
// Bulk Read and Write Methods
// -------------------------------------------------------------------------

@Override
public final void get(DataOutput out, int offset, int length) throws IOException {
out.write(this.memory, offset, length);
}

@Override
public final void put(DataInput in, int offset, int length) throws IOException {
in.readFully(this.memory, offset, length);
}

@Override
public final void get(int offset, ByteBuffer target, int numBytes) {
// ByteBuffer performs the boundary checks
target.put(this.memory, offset, numBytes);
}

@Override
public final void put(int offset, ByteBuffer source, int numBytes) {
// ByteBuffer performs the boundary checks
source.get(this.memory, offset, numBytes);
}

// -------------------------------------------------------------------------
// Factoring
// -------------------------------------------------------------------------

/**
* A memory segment factory that produces heap memory segments. Note that this factory does not
* support to allocate off-heap memory.
*/
public static final class HeapMemorySegmentFactory implements MemorySegmentFactory.Factory {

@Override
public HeapMemorySegment wrap(byte[] memory) {
return new HeapMemorySegment(memory);
}

@Override
public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
return new HeapMemorySegment(new byte[size], owner);
}

@Override
public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
return new HeapMemorySegment(memory, owner);
}

@Override
public HeapMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
throw new UnsupportedOperationException(
"The MemorySegment factory was not initialized for off-heap memory.");
}

/** prevent external instantiation */
HeapMemorySegmentFactory() {}
};

public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
}
Loading

0 comments on commit 655a891

Please sign in to comment.