Skip to content

Commit

Permalink
[FLINK-6058] Don't read DEFAULT_PARALLELISM from GlobalConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored and aljoscha committed Jul 11, 2017
1 parent d0e5295 commit 4aa2ffc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
Expand Down Expand Up @@ -143,6 +144,8 @@ public class CliFrontend {

private final FiniteDuration clientTimeout;

private final int defaultParallelism;

/**
*
* @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
Expand All @@ -169,6 +172,9 @@ public CliFrontend(String configDir) throws Exception {
}

this.clientTimeout = AkkaUtils.getClientTimeout(config);
this.defaultParallelism = GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM);
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -250,6 +256,8 @@ protected int run(String[] args) {
+ client.getMaxSlots() + "). "
+ "To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
userParallelism = defaultParallelism;
}

return executeProgram(program, client, userParallelism);
Expand Down Expand Up @@ -314,6 +322,9 @@ protected int info(String[] args) {

try {
int parallelism = options.getParallelism();
if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
parallelism = defaultParallelism;
}

LOG.info("Creating program plan dump");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testRun() {
// test without parallelism
{
String[] parameters = {"-v", getTestJarPath()};
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true, false);
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, true, false);
assertEquals(0, testFrontend.run(parameters));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Preconditions;

Expand All @@ -45,10 +43,6 @@ protected StreamContextEnvironment(ContextEnvironment ctx) {
this.ctx = ctx;
if (ctx.getParallelism() > 0) {
setParallelism(ctx.getParallelism());
} else {
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
}
}

Expand Down

0 comments on commit 4aa2ffc

Please sign in to comment.