Skip to content

Commit

Permalink
Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGen…
Browse files Browse the repository at this point in the history
…erator"

This reverts commit 9cfae89.

The fixes around FLINK-5808 introduced follow-up issues.
  • Loading branch information
aljoscha committed Apr 18, 2017
1 parent a13750c commit 0182141
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);

/** The default parallelism used when creating a local environment */
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

/** The configuration to use for the mini cluster */
private final Configuration conf;

Expand All @@ -65,7 +62,6 @@ public Flip6LocalStreamEnvironment() {
* @param config The configuration used to configure the local executor.
*/
public Flip6LocalStreamEnvironment(Configuration config) {
super(defaultLocalParallelism);
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@
@Public
public class LocalStreamEnvironment extends StreamExecutionEnvironment {

/** The default parallelism used when creating a local environment */
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);

/** The configuration to use for the local cluster */
Expand All @@ -58,43 +55,24 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
* Creates a new local stream environment that uses the default configuration.
*/
public LocalStreamEnvironment() {
this(defaultLocalParallelism);
this(null);
}

/**
* Creates a new local stream environment that uses the default configuration.
*/
public LocalStreamEnvironment(int parallelism) {
this(null, parallelism);
}


/**
* Creates a new local stream environment that configures its local executor with the given configuration.
*
* @param config The configuration used to configure the local executor.
*/
public LocalStreamEnvironment(Configuration config) {
this(config, defaultLocalParallelism);
}

/**
* Creates a new local stream environment that configures its local executor with the given configuration.
*
* @param config The configuration used to configure the local executor.
*/
public LocalStreamEnvironment(Configuration config, int parallelism) {
super(parallelism);
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
"or running in a TestEnvironment context.");
}

this.conf = config == null ? new Configuration() : config;
}


/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
* specified name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -130,10 +129,6 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig
* The protocol must be supported by the {@link java.net.URLClassLoader}.
*/
public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
super(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));

if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be used when submitting a program through a client, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private final ContextEnvironment ctx;

protected StreamContextEnvironment(ContextEnvironment ctx) {
// if the batch ContextEnvironment has a parallelism this must have come from
// the CLI Client. We should set that as our default parallelism
super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
this.ctx = ctx;
if (ctx.getParallelism() > 0) {
setParallelism(ctx.getParallelism());
} else {
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public abstract class StreamExecutionEnvironment {
/** The environment of the context (local by default, cluster if invoked through command line) */
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;

/** The default parallelism used when creating a local environment */
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

// ------------------------------------------------------------------------

/** The execution configuration for this environment */
Expand All @@ -132,23 +135,11 @@ public abstract class StreamExecutionEnvironment {
/** The time characteristic used by the data streams */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;

/** The parallelism to use when no parallelism is set on an operation. */
private final int defaultParallelism;


// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------


public StreamExecutionEnvironment() {
this(ConfigConstants.DEFAULT_PARALLELISM);
}

public StreamExecutionEnvironment(int defaultParallelism) {
this.defaultParallelism = defaultParallelism;
}

/**
* Gets the config object.
*/
Expand Down Expand Up @@ -1527,7 +1518,7 @@ public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations, defaultParallelism);
return StreamGraphGenerator.generate(this, transformations);
}

/**
Expand Down Expand Up @@ -1615,7 +1606,7 @@ public static StreamExecutionEnvironment getExecutionEnvironment() {
* @return A local execution environment.
*/
public static LocalStreamEnvironment createLocalEnvironment() {
return new LocalStreamEnvironment();
return createLocalEnvironment(defaultLocalParallelism);
}

/**
Expand All @@ -1624,12 +1615,14 @@ public static LocalStreamEnvironment createLocalEnvironment() {
* environment was created in. It will use the parallelism specified in the
* parameter.
*
* @param defaultParallelism The default parallelism for the local environment.
*
* @param parallelism
* The parallelism for the local environment.
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) {
return new LocalStreamEnvironment(defaultParallelism);
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
LocalStreamEnvironment env = new LocalStreamEnvironment();
env.setParallelism(parallelism);
return env;
}

/**
Expand All @@ -1638,13 +1631,16 @@ public static LocalStreamEnvironment createLocalEnvironment(int defaultParalleli
* environment was created in. It will use the parallelism specified in the
* parameter.
*
* @param defaultParallelism The parallelism for the local environment.
* @param configuration Pass a custom configuration into the cluster
*
* @param parallelism
* The parallelism for the local environment.
* @param configuration
* Pass a custom configuration into the cluster
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) {
return new LocalStreamEnvironment(configuration, defaultParallelism);
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
return currentEnvironment;
}

/**
Expand All @@ -1669,6 +1665,7 @@ public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configu
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
localEnv.setParallelism(defaultLocalParallelism);

return localEnv;
}
Expand Down Expand Up @@ -1754,6 +1751,28 @@ public static StreamExecutionEnvironment createRemoteEnvironment(
return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
}

/**
* Gets the default parallelism that will be used for the local execution environment created by
* {@link #createLocalEnvironment()}.
*
* @return The default local parallelism
*/
@PublicEvolving
public static int getDefaultLocalParallelism() {
return defaultLocalParallelism;
}

/**
* Sets the default parallelism that will be used for the local execution
* environment created by {@link #createLocalEnvironment()}.
*
* @param parallelism The parallelism to use as the default local parallelism.
*/
@PublicEvolving
public static void setDefaultLocalParallelism(int parallelism) {
defaultLocalParallelism = parallelism;
}

// --------------------------------------------------------------------------------------------
// Methods to control the context and local environments for execution from packaged programs
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
private ExecutionEnvironment env;

protected StreamPlanEnvironment(ExecutionEnvironment env) {
super(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));

super();
this.env = env;

int parallelism = env.getParallelism();
if (parallelism > 0) {
setParallelism(parallelism);
} else {
// determine parallelism
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ public class StreamGraph extends StreamingPlan {
private AbstractStateBackend stateBackend;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;

private final int defaultParallelism;

public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) {
public StreamGraph(StreamExecutionEnvironment environment) {
this.environment = environment;
this.executionConfig = environment.getConfig();
this.checkpointConfig = environment.getCheckpointConfig();

this.defaultParallelism = defaultParallelism;
// create an empty new stream graph.
clear();
}
Expand Down Expand Up @@ -657,7 +655,7 @@ public JobGraph getJobGraph() {
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
}

StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism);
StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);

return jobgraphGenerator.createJobGraph();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ public static int getNewIterationNodeId() {
// we have loops, i.e. feedback edges.
private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;


/**
* Private constructor. The generator should only be invoked using {@link #generate}.
*/
private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) {
this.streamGraph = new StreamGraph(env, defaultParallelism);
private StreamGraphGenerator(StreamExecutionEnvironment env) {
this.streamGraph = new StreamGraph(env);
this.streamGraph.setChaining(env.isChainingEnabled());
this.streamGraph.setStateBackend(env.getStateBackend());
this.env = env;
Expand All @@ -119,11 +120,8 @@ private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallel
*
* @return The generated {@code StreamGraph}
*/
public static StreamGraph generate(
StreamExecutionEnvironment env,
List<StreamTransformation<?>> transformations,
int defaultParallelism) {
return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations);
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand Down Expand Up @@ -93,13 +92,10 @@ public class StreamingJobGraphGenerator {
private final StreamGraphHasher defaultStreamGraphHasher;
private final List<StreamGraphHasher> legacyStreamGraphHashers;

private final int defaultParallelism;

public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) {
public StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
this.defaultParallelism = defaultParallelism;
}

private void init() {
Expand Down Expand Up @@ -342,12 +338,12 @@ private StreamConfig createJobVertex(

int parallelism = streamNode.getParallelism();

if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
parallelism = defaultParallelism;
if (parallelism > 0) {
jobVertex.setParallelism(parallelism);
} else {
parallelism = jobVertex.getParallelism();
}

jobVertex.setParallelism(parallelism);

jobVertex.setMaxParallelism(streamNode.getMaxParallelism());

if (LOG.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 0182141

Please sign in to comment.