Skip to content

Commit

Permalink
[FLINK-6270] port some memory and network task manager options to Con…
Browse files Browse the repository at this point in the history
…figOption

This closes apache#3683.
  • Loading branch information
Nico Kruber authored and zentol committed Apr 6, 2017
1 parent d1d761e commit e2a4f47
Show file tree
Hide file tree
Showing 42 changed files with 209 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -98,7 +99,7 @@ public static void prepare() throws IOException, ClassNotFoundException {
// start also a re-usable Flink mini cluster
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

flink = new LocalFlinkMiniCluster(flinkConfig, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
Expand Down Expand Up @@ -113,7 +114,7 @@ protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.storm.api;

import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.KillOptions;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void submitTopologyWithOpts(final String topologyName, final Map conf, fi
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

this.flink = new LocalFlinkMiniCluster(configuration, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,34 +210,52 @@ public final class ConfigConstants {
* The config parameter defining the amount of memory to be allocated by the task manager's
* memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined
* by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}.
*
* @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size";

/**
* The config parameter defining the fraction of free memory allocated by the memory manager.
*
* @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";

/**
* The config parameter defining the memory allocation method (JVM heap or off-heap).
*/
*
* @deprecated Use {@link TaskManagerOptions#MEMORY_OFF_HEAP} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";

/**
* The config parameter for specifying whether TaskManager managed memory should be preallocated
* when the TaskManager is starting. (default is false)
*
* @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY = "taskmanager.memory.preallocate";

/**
* The config parameter defining the number of buffers used in the network stack. This defines the
* number of possible tasks and shuffles.
*
* @deprecated Use {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} instead
*/
@Deprecated
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";

/**
* Config parameter defining the size of memory buffers used by the network stack and the memory manager.
*
* @deprecated Use {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} instead
*/
@Deprecated
public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size";

/**
Expand Down Expand Up @@ -1126,20 +1144,29 @@ public final class ConfigConstants {
* The default directory for temporary files of the task manager.
*/
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");

/**
* The default fraction of the free memory allocated by the task manager's memory manager.
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} provides the default value now
*/
@Deprecated
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;

/**
* Default number of buffers used in the network stack.
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} provides the default value now
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;

/**
* Default size of memory segments in the network stack and the memory manager.
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} provides the default value now
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768;

/**
Expand Down Expand Up @@ -1179,8 +1206,11 @@ public final class ConfigConstants {
public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";

/**
* The default setting for TaskManager memory eager allocation of managed memory
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} provides the default value now
*/
@Deprecated
public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false;

/** @deprecated Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,53 @@ public class TaskManagerOptions {
key("taskmanager.jvm-exit-on-oom")
.defaultValue(false);

/** Size of memory buffers used by the network stack and the memory manager (in bytes). */
public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
.defaultValue(32768);

/**
* Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
* set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
*/
public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
key("taskmanager.memory.size")
.defaultValue(-1L);

/**
* Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
* not set.
*/
public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
key("taskmanager.memory.fraction")
.defaultValue(0.7f);

/**
* Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
* as well as the network buffers.
**/
public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
key("taskmanager.memory.off-heap")
.defaultValue(false);

/** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */
public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
key("taskmanager.memory.preallocate")
.defaultValue(false);

// ------------------------------------------------------------------------
// Network Options
// ------------------------------------------------------------------------

/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*/
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);


/** Minimum backoff for partition requests of input channels. */
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.net.request-backoff.initial")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -142,19 +143,14 @@ public static ContaineredTaskManagerParameters create(

// (2) split the Java memory between heap and off-heap

final boolean useOffHeap = config.getBoolean(
ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false);
final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

final long heapSizeMB;
if (useOffHeap) {
long offHeapSize = config.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);

if (offHeapSize <= 0) {
double fraction = config.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);

double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);

offHeapSize = (long) (fraction * javaMemorySizeMB);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
Expand Down Expand Up @@ -199,7 +199,7 @@ public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) t
numRequiredBuffers,
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY));
TaskManagerOptions.NETWORK_NUM_BUFFERS.key()));
}

this.numTotalRequiredBuffers += numRequiredBuffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
Expand Down Expand Up @@ -50,9 +50,12 @@
* this state, different reader variants are returned (see
* {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
*
* <p>Since the network buffer pool size is usually quite small (default is
* {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most
* spillable partitions will be spilled for real-world data sets.
* <p>Since the network buffer pool size for outgoing partitions is usually
* quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
* and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
* for bounded channels or from the default value of
* {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}, most spillable partitions
* will be spilled for real-world data sets.
*/
class SpillableSubpartition extends ResultSubpartition {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -165,7 +164,7 @@ public Configuration generateConfiguration() {
Configuration newConfiguration = new Configuration(config);
// set the memory
long memory = getOrCalculateManagedMemoryPerTaskManager();
newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory);
newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);

return newConfiguration;
}
Expand Down Expand Up @@ -196,29 +195,20 @@ public String toString() {
private long getOrCalculateManagedMemoryPerTaskManager() {
if (managedMemoryPerTaskManager == -1) {
// no memory set in the mini cluster configuration
final ConfigOption<Integer> memorySizeOption = ConfigOptions
.key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)
.defaultValue(-1);

int memorySize = config.getInteger(memorySizeOption);
long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);

if (memorySize == -1) {
// we could probably use config.contains() but the previous implementation compared to
// the default (-1) thus allowing the user to explicitly specify this as well
// -> don't change this behaviour now
if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
// no memory set in the flink configuration
// share the available memory among all running components
final ConfigOption<Integer> bufferSizeOption = ConfigOptions
.key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY)
.defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);

final ConfigOption<Long> bufferMemoryOption = ConfigOptions
.key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
.defaultValue((long) ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);

final ConfigOption<Float> memoryFractionOption = ConfigOptions
.key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY)
.defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);

float memoryFraction = config.getFloat(memoryFractionOption);
long networkBuffersMemory = config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption);
float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
long networkBuffersMemory =
(long) config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS) *
(long) config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);

long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();

Expand Down
Loading

0 comments on commit e2a4f47

Please sign in to comment.