Skip to content

Commit

Permalink
[FLINK-14377] Parse the ProgramOptions to a Configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Oct 31, 2019
1 parent bf955c5 commit ea6fdde
Show file tree
Hide file tree
Showing 20 changed files with 528 additions and 162 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/deployment_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
<td style="word-wrap: break-word;">false</td>
<td>Specifies if the pipeline is submitted in attached or detached mode.</td>
</tr>
<tr>
<td><h5>execution.shutdown-on-attached-exit</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.</td>
</tr>
<tr>
<td><h5>execution.target</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
21 changes: 21 additions & 0 deletions docs/_includes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>pipeline.classpaths</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.</td>
</tr>
<tr>
<td><h5>pipeline.jars</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.</td>
</tr>
</tbody>
</table>
21 changes: 21 additions & 0 deletions docs/_includes/generated/savepoint_config_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.</td>
</tr>
<tr>
<td><h5>execution.savepoint.path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).</td>
</tr>
</tbody>
</table>
5 changes: 5 additions & 0 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
### Execution

{% include generated/deployment_configuration.html %}
{% include generated/savepoint_config_configuration.html %}

### JobManager

Expand Down Expand Up @@ -188,6 +189,10 @@ The configuration keys in this section are independent of the used resource mana

{% include generated/environment_configuration.html %}

### Pipeline

{% include generated/pipeline_configuration.html %}

### Checkpointing

{% include generated/checkpointing_configuration.html %}
Expand Down
5 changes: 5 additions & 0 deletions docs/ops/config.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
### Execution

{% include generated/deployment_configuration.html %}
{% include generated/savepoint_config_configuration.html %}

### JobManager

Expand Down Expand Up @@ -188,6 +189,10 @@ The configuration keys in this section are independent of the used resource mana

{% include generated/environment_configuration.html %}

### Pipeline

{% include generated/pipeline_configuration.html %}

### Checkpointing

{% include generated/checkpointing_configuration.html %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.commons.cli.CommandLine;
Expand All @@ -83,6 +82,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -184,42 +184,44 @@ protected void run(String[] args) throws Exception {

final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);

final RunOptions runOptions = new RunOptions(commandLine);
final ProgramOptions programOptions = new ProgramOptions(commandLine);
final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);

// evaluate help flag
if (runOptions.isPrintHelp()) {
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}

if (!runOptions.isPython()) {
if (!programOptions.isPython()) {
// Java program should be specified a JAR file
if (runOptions.getJarFilePath() == null) {
if (executionParameters.getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}

final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
program = buildProgram(runOptions);
program = buildProgram(programOptions, executionParameters);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
}

final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
final Configuration executionConfig = executionParameters.getConfiguration();
try {
runProgram(executorConfig, runOptions, program);
runProgram(executorConfig, executionConfig, program);
} finally {
program.deleteExtractedLibraries();
}
}

private <T> void runProgram(
Configuration executorConfig,
RunOptions runOptions,
Configuration executionConfig,
PackagedProgram program) throws ProgramInvocationException, FlinkException {

final ClusterClientFactory<T> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
Expand All @@ -229,20 +231,20 @@ private <T> void runProgram(

try {
final T clusterId = clusterClientFactory.getClusterId(executorConfig);

final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig);
final ClusterClient<T> client;

// directly deploy the job if the cluster is started in job mode and detached
if (clusterId == null && runOptions.getDetachedMode()) {
int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
if (clusterId == null && executionParameters.getDetachedMode()) {
int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();

final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
runOptions.getDetachedMode());
executionParameters.getDetachedMode());

logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());

Expand All @@ -263,19 +265,17 @@ private <T> void runProgram(
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
// there's a race-condition here if cli is killed before shutdown hook is installed
if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
} else {
shutdownHook = null;
}
}

try {
client.setDetached(runOptions.getDetachedMode());

LOG.debug("{}", runOptions.getSavepointRestoreSettings());
client.setDetached(executionParameters.getDetachedMode());

int userParallelism = runOptions.getParallelism();
int userParallelism = executionParameters.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
userParallelism = defaultParallelism;
Expand Down Expand Up @@ -323,25 +323,26 @@ protected void info(String[] args) throws CliArgsException, FileNotFoundExceptio

final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true);

InfoOptions infoOptions = new InfoOptions(commandLine);
final ProgramOptions programOptions = new ProgramOptions(commandLine);
final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);

// evaluate help flag
if (infoOptions.isPrintHelp()) {
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForInfo();
return;
}

if (infoOptions.getJarFilePath() == null) {
if (programOptions.getJarFilePath() == null) {
throw new CliArgsException("The program JAR file was not specified.");
}

// -------- build the packaged program -------------

LOG.info("Building program from JAR file");
final PackagedProgram program = buildProgram(infoOptions);
final PackagedProgram program = buildProgram(programOptions, executionParameters);

try {
int parallelism = infoOptions.getParallelism();
int parallelism = programOptions.getParallelism();
if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
parallelism = defaultParallelism;
}
Expand Down Expand Up @@ -722,7 +723,7 @@ private String triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, Str
* Sends a SavepointDisposalRequest to the job manager.
*/
private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath) throws FlinkException {
Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
"Usage: bin/flink savepoint -d <savepoint-path>");

logAndSysout("Disposing savepoint '" + savepointPath + "'.");
Expand Down Expand Up @@ -769,15 +770,17 @@ protected void executeProgram(PackagedProgram program, ClusterClient<?> client,
*
* @return A PackagedProgram (upon success)
*/
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
PackagedProgram buildProgram(
final ProgramOptions runOptions,
final ExecutionConfigAccessor executionParameters) throws FileNotFoundException, ProgramInvocationException {
String[] programArgs = runOptions.getProgramArgs();
String jarFilePath = executionParameters.getJarFilePath();
List<URL> classpaths = executionParameters.getClasspaths();

// Get assembler class
String entryPointClass = options.getEntryPointClassName();
String entryPointClass = runOptions.getEntryPointClassName();
File jarFile = null;
if (options.isPython()) {
if (runOptions.isPython()) {
// If the job is specified a jar file
if (jarFilePath != null) {
jarFile = getJarFile(jarFilePath);
Expand All @@ -799,7 +802,7 @@ PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundExceptio
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);

program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());

return program;
}
Expand Down Expand Up @@ -1190,7 +1193,7 @@ private static CustomCommandLine loadCustomCommandLine(String className, Object.
// construct class types from the parameters
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public class CliFrontendParser {
PYMODULE_OPTION.setArgName("pyModule");
}

private static final Options RUN_OPTIONS = getRunCommandOptions();
static final Options RUN_OPTIONS = getRunCommandOptions();

private static Options buildGeneralOptions(Options options) {
options.addOption(HELP_OPTION);
Expand Down Expand Up @@ -445,17 +445,6 @@ public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLin
// Line Parsing
// --------------------------------------------------------------------------------------------

public static RunOptions parseRunCommand(String[] args) throws CliArgsException {
try {
DefaultParser parser = new DefaultParser();
CommandLine line = parser.parse(RUN_OPTIONS, args, true);
return new RunOptions(line);
}
catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}

public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException {
final DefaultParser parser = new DefaultParser();

Expand Down
Loading

0 comments on commit ea6fdde

Please sign in to comment.