Skip to content

Commit

Permalink
[FLINK-3701] enable reuse of ExecutionConfig
Browse files Browse the repository at this point in the history
Depending on the context, the ExecutionConfig's type fields may either
be deserialized using a custom class loader or the default class
loader. It may be explicitly serialized for the Task or shipped inside
the PojoSerializer where it is serialized or directly passed in local
mode. An ExecutionConfig may be reused and thus its fields can't be set
to null after it has been shipped once.

The entire ExecutionConfig is now serialized upon setting it on the
JobGraph. It is not passed through the JobGraph's constructor but set
explicitly on the JobGraph. If no ExecutionConfig has been set, the
default is used. Unlike before, no code may modify the ExecutionConfig
after it has been set on the JobGraph.

This closes apache#1913
  • Loading branch information
mxm committed May 13, 2016
1 parent 099fdfa commit 48b469a
Show file tree
Hide file tree
Showing 65 changed files with 437 additions and 486 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.util.SerializedValue;


import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
Expand Down Expand Up @@ -128,7 +127,7 @@ public class ExecutionConfig implements Serializable {

// ------------------------------- User code values --------------------------------------------

private transient GlobalJobParameters globalJobParameters;
private GlobalJobParameters globalJobParameters;

// Serializers and types registered with Kryo and the PojoSerializer
// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
Expand All @@ -145,22 +144,6 @@ public class ExecutionConfig implements Serializable {

private LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();

// ----------------------- Helper values for serialized user objects ---------------------------

private SerializedValue<GlobalJobParameters> serializedGlobalJobParameters;

private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedRegisteredTypesWithKryoSerializers;

private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedRegisteredTypesWithKryoSerializerClasses;

private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedDefaultKryoSerializers;

private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedDefaultKryoSerializerClasses;

private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredKryoTypes;

private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredPojoTypes;

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -695,79 +678,6 @@ public void disableAutoTypeRegistration() {
this.autoTypeRegistrationEnabled = false;
}

/**
* Deserializes user code objects given a user code class loader
*
* @param userCodeClassLoader User code class loader
* @throws IOException Thrown if an IOException occurs while loading the classes
* @throws ClassNotFoundException Thrown if the given class cannot be loaded
*/
public void deserializeUserCode(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
if (serializedRegisteredKryoTypes != null) {
registeredKryoTypes = serializedRegisteredKryoTypes.deserializeValue(userCodeClassLoader);
} else {
registeredKryoTypes = new LinkedHashSet<>();
}

if (serializedRegisteredPojoTypes != null) {
registeredPojoTypes = serializedRegisteredPojoTypes.deserializeValue(userCodeClassLoader);
} else {
registeredPojoTypes = new LinkedHashSet<>();
}

if (serializedRegisteredTypesWithKryoSerializerClasses != null) {
registeredTypesWithKryoSerializerClasses = serializedRegisteredTypesWithKryoSerializerClasses.deserializeValue(userCodeClassLoader);
} else {
registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
}

if (serializedRegisteredTypesWithKryoSerializers != null) {
registeredTypesWithKryoSerializers = serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
} else {
registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
}

if (serializedDefaultKryoSerializers != null) {
defaultKryoSerializers = serializedDefaultKryoSerializers.deserializeValue(userCodeClassLoader);
} else {
defaultKryoSerializers = new LinkedHashMap<>();

}

if (serializedDefaultKryoSerializerClasses != null) {
defaultKryoSerializerClasses = serializedDefaultKryoSerializerClasses.deserializeValue(userCodeClassLoader);
} else {
defaultKryoSerializerClasses = new LinkedHashMap<>();
}

if (serializedGlobalJobParameters != null) {
globalJobParameters = serializedGlobalJobParameters.deserializeValue(userCodeClassLoader);
}
}

public void serializeUserCode() throws IOException {
serializedRegisteredKryoTypes = new SerializedValue<>(registeredKryoTypes);
registeredKryoTypes = null;

serializedRegisteredPojoTypes = new SerializedValue<>(registeredPojoTypes);
registeredPojoTypes = null;

serializedRegisteredTypesWithKryoSerializerClasses = new SerializedValue<>(registeredTypesWithKryoSerializerClasses);
registeredTypesWithKryoSerializerClasses = null;

serializedRegisteredTypesWithKryoSerializers = new SerializedValue<>(registeredTypesWithKryoSerializers);
registeredTypesWithKryoSerializers = null;

serializedDefaultKryoSerializers = new SerializedValue<>(defaultKryoSerializers);
defaultKryoSerializers = null;

serializedDefaultKryoSerializerClasses = new SerializedValue<>(defaultKryoSerializerClasses);
defaultKryoSerializerClasses = null;

serializedGlobalJobParameters = new SerializedValue<>(globalJobParameters);
globalJobParameters = null;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ExecutionConfig) {
Expand Down Expand Up @@ -854,10 +764,10 @@ public static class GlobalJobParameters implements Serializable {
* Convert UserConfig into a {@code Map<String, String>} representation.
* This can be used by the runtime, for example for presenting the user config in the web frontend.
*
* @return Key/Value representation of the UserConfig, or null.
* @return Key/Value representation of the UserConfig
*/
public Map<String, String> toMap() {
return null;
return Collections.emptyMap();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.api.common;

import org.apache.flink.util.SerializedValue;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -74,4 +76,29 @@ public void testConfigurationOfParallelism() {

assertEquals(parallelism, config.getParallelism());
}

/**
* Helper function to create a new ExecutionConfig for tests.
* @return A serialized ExecutionConfig
*/
public static SerializedValue<ExecutionConfig> getSerializedConfig() {
try {
return new SerializedValue<>(new ExecutionConfig());
} catch (IOException e) {
throw new RuntimeException("Couldn't create new ExecutionConfig for test.", e);
}
}

/**
* Deserializes the given ExecutionConfig with the System class loader.
* @param serializedConfig The serialized ExecutionConfig
* @return ExecutionConfig
*/
public static ExecutionConfig deserializeConfig(SerializedValue<ExecutionConfig> serializedConfig) {
try {
return serializedConfig.deserializeValue(ExecutionConfigTest.class.getClassLoader());
} catch (Exception e) {
throw new RuntimeException("Could not deserialize ExecutionConfig for test.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -215,7 +214,8 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
// ----------- finalize the job graph -----------

// create the job graph object
JobGraph graph = new JobGraph(jobId, program.getJobName(), program.getOriginalPlan().getExecutionConfig());
JobGraph graph = new JobGraph(jobId, program.getJobName());
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());

graph.setAllowQueuedScheduling(false);
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
Expand Down Expand Up @@ -243,18 +243,10 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
this.iterations = null;
this.iterationStack = null;

try {
// make sure that we can send the ExecutionConfig using the system class loader
graph.getExecutionConfig().serializeUserCode();
} catch (IOException e) {
throw new CompilerException("Could not serialize the user code object in the " +
"ExecutionConfig.", e);
}

// return job graph
return graph;
}

/**
* This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and
* sets local strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
gen.writeStringField("jid", graph.getJobID().toString());
gen.writeStringField("name", graph.getJobName());

ExecutionConfig ec = graph.getExecutionConfig();
ExecutionConfig ec;
try {
ec = graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader());
} catch (Exception e) {
throw new RuntimeException("Couldn't deserialize ExecutionConfig.", e);
}

if (ec != null) {
gen.writeObjectFieldStart("execution-config");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testBackPressuredProducer() throws Exception {
final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);

// The JobGraph
final JobGraph jobGraph = new JobGraph(new ExecutionConfig());
final JobGraph jobGraph = new JobGraph();
final int parallelism = 4;

final JobVertex task = new JobVertex("Task");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testTaskClearedWhileSampling() throws Exception {
final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);

// The JobGraph
final JobGraph jobGraph = new JobGraph(new ExecutionConfig());
final JobGraph jobGraph = new JobGraph();
final int parallelism = 1;

final JobVertex task = new JobVertex("Task");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
private final SerializedValue<StateHandle<?>> operatorState;

/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
private final ExecutionConfig executionConfig;
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;

private long recoveryTimestamp;

Expand All @@ -101,7 +101,7 @@ public TaskDeploymentDescriptor(
JobID jobID,
JobVertexID vertexID,
ExecutionAttemptID executionId,
ExecutionConfig executionConfig,
SerializedValue<ExecutionConfig> serializedExecutionConfig,
String taskName,
int indexInSubtaskGroup,
int numberOfSubtasks,
Expand All @@ -125,7 +125,7 @@ public TaskDeploymentDescriptor(
this.jobID = checkNotNull(jobID);
this.vertexID = checkNotNull(vertexID);
this.executionId = checkNotNull(executionId);
this.executionConfig = checkNotNull(executionConfig);
this.serializedExecutionConfig = checkNotNull(serializedExecutionConfig);
this.taskName = checkNotNull(taskName);
this.indexInSubtaskGroup = indexInSubtaskGroup;
this.numberOfSubtasks = numberOfSubtasks;
Expand All @@ -146,7 +146,7 @@ public TaskDeploymentDescriptor(
JobID jobID,
JobVertexID vertexID,
ExecutionAttemptID executionId,
ExecutionConfig executionConfig,
SerializedValue<ExecutionConfig> serializedExecutionConfig,
String taskName,
int indexInSubtaskGroup,
int numberOfSubtasks,
Expand All @@ -164,7 +164,7 @@ public TaskDeploymentDescriptor(
jobID,
vertexID,
executionId,
executionConfig,
serializedExecutionConfig,
taskName,
indexInSubtaskGroup,
numberOfSubtasks,
Expand All @@ -185,8 +185,8 @@ public TaskDeploymentDescriptor(
* Returns the execution configuration (see {@link ExecutionConfig}) related to the
* specific job.
*/
public ExecutionConfig getExecutionConfig() {
return executionConfig;
public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
return serializedExecutionConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public class ExecutionGraph implements Serializable {
// ------ Configuration of the Execution -------

/** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */
private ExecutionConfig executionConfig;
private SerializedValue<ExecutionConfig> serializedExecutionConfig;

/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
* to deploy them immediately. */
Expand Down Expand Up @@ -245,15 +245,15 @@ public class ExecutionGraph implements Serializable {
JobID jobId,
String jobName,
Configuration jobConfig,
ExecutionConfig config,
SerializedValue<ExecutionConfig> serializedConfig,
FiniteDuration timeout,
RestartStrategy restartStrategy) {
this(
executionContext,
jobId,
jobName,
jobConfig,
config,
serializedConfig,
timeout,
restartStrategy,
new ArrayList<BlobKey>(),
Expand All @@ -267,7 +267,7 @@ public ExecutionGraph(
JobID jobId,
String jobName,
Configuration jobConfig,
ExecutionConfig config,
SerializedValue<ExecutionConfig> serializedConfig,
FiniteDuration timeout,
RestartStrategy restartStrategy,
List<BlobKey> requiredJarFiles,
Expand Down Expand Up @@ -301,7 +301,7 @@ public ExecutionGraph(
this.requiredJarFiles = requiredJarFiles;
this.requiredClasspaths = requiredClasspaths;

this.executionConfig = checkNotNull(config);
this.serializedExecutionConfig = checkNotNull(serializedConfig);

this.timeout = timeout;

Expand Down Expand Up @@ -962,12 +962,12 @@ public void prepareForArchiving() {
}

/**
* Returns the {@link ExecutionConfig}.
* Returns the serialized {@link ExecutionConfig}.
*
* @return ExecutionConfig
*/
public ExecutionConfig getExecutionConfig() {
return executionConfig;
public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
return serializedExecutionConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,15 +667,15 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
}

ExecutionConfig config = getExecutionGraph().getExecutionConfig();
SerializedValue<ExecutionConfig> serializedConfig = getExecutionGraph().getSerializedExecutionConfig();
List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
List<URL> classpaths = getExecutionGraph().getRequiredClasspaths();

return new TaskDeploymentDescriptor(
getJobId(),
getJobvertexId(),
executionId,
config,
serializedConfig,
getTaskName(),
subTaskIndex,
getTotalNumberOfParallelSubtasks(),
Expand Down
Loading

0 comments on commit 48b469a

Please sign in to comment.