Skip to content

Commit

Permalink
[FLINK-4445] [client] Add allowNonRestoredState flag to CLI
Browse files Browse the repository at this point in the history
Allow to specify whether a checkpoint restore should allow
checkpoint state that it cannot map to the program. This is
exposed via the CLI in the run command:

bin/flink run -s <savepointPath> -n ...

Furthermore, the savepoint restore settings are moved out of
the snapshotting settings.
  • Loading branch information
uce committed Nov 2, 2016
1 parent 4955441 commit 74c0770
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 119 deletions.
19 changes: 16 additions & 3 deletions docs/setup/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ The job will only be cancelled if the savepoint succeeds.

The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.

By default, we try to match all savepoint state to the job being submitted. If you want to allow to skip savepoint state that cannot be restored with the new job you can set the `allowNonRestoredState` flag. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.

{% highlight bash %}
./bin/flink run -s <savepointPath> -n ...
{% endhighlight %}

This is useful if your program dropped an operator that was part of the savepoint.

#### Dispose a savepoint

{% highlight bash %}
Expand Down Expand Up @@ -224,20 +232,25 @@ Action "run" compiles and runs a program.
JobManager than the one
specified in the
configuration.
-n,--allowNonRestoredState Allow non restored savepoint
state in case an operator has
been removed from the job.
-p,--parallelism <parallelism> The parallelism with which
to run the program. Optional
flag to override the default
value specified in the
configuration.
-q,--sysoutLogging If present, suppress logging
output to standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to reset
the job back to (for example
file:///flink/savepoint-1537
-s,--fromSavepoint <savepointPath> Path to a savepoint to
restore the job from (for
example
hdfs:///flink/savepoint-1537
).
-z,--zookeeperNamespace <zookeeperNamespace> Namespace to create the
Zookeeper sub-paths for high
availability mode
Options for yarn-cluster mode:
-yD <arg> Dynamic properties
-yd,--yarndetached Start detached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ protected int run(String[] args) {
client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());

LOG.debug("Savepoint path is set to {}", options.getSavepointPath());
LOG.debug(options.getSavepointRestoreSettings().toString());

int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
Expand Down Expand Up @@ -890,7 +890,7 @@ else if (!jarFile.isFile()) {
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);

program.setSavepointPath(options.getSavepointPath());
program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());

return program;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public class CliFrontendParser {
"Use this flag to connect to a different JobManager than the one specified in the configuration.");

static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");

static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
"Allow to skip savepoint state that cannot be restored. " +
"You need to allow this if you removed an operator from your " +
"program that was part of the program when the savepoint was triggered.");

static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true,
"Path of savepoint to dispose.");
Expand Down Expand Up @@ -121,6 +126,8 @@ public class CliFrontendParser {
SAVEPOINT_PATH_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setArgName("savepointPath");

SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);

ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");

Expand Down Expand Up @@ -155,7 +162,6 @@ public static Options getProgramSpecificOptions(Options options) {
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);
options.addOption(ZOOKEEPER_NAMESPACE_OPTION);
return options;
}
Expand All @@ -166,13 +172,15 @@ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);
options.addOption(ZOOKEEPER_NAMESPACE_OPTION);
return options;
}

private static Options getRunOptions(Options options) {
options = getProgramSpecificOptions(options);
options.addOption(SAVEPOINT_PATH_OPTION);
options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);

options = getJobManagerAddressOption(options);
return addCustomCliOptions(options, true);
}
Expand Down Expand Up @@ -220,6 +228,9 @@ private static Options getSavepointOptions(Options options) {

private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
o.addOption(SAVEPOINT_PATH_OPTION);
o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);

return getJobManagerAddressOption(o);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -33,6 +34,7 @@
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;

/**
Expand All @@ -54,7 +56,7 @@ public abstract class ProgramOptions extends CommandLineOptions {

private final boolean detachedMode;

private final String savepointPath;
private final SavepointRestoreSettings savepointSettings;

protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
Expand Down Expand Up @@ -111,9 +113,11 @@ else if (args.length > 0) {
detachedMode = line.hasOption(DETACHED_OPTION.getOpt());

if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
this.savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
} else {
savepointPath = null;
this.savepointSettings = SavepointRestoreSettings.none();
}
}

Expand Down Expand Up @@ -145,7 +149,7 @@ public boolean getDetachedMode() {
return detachedMode;
}

public String getSavepointPath() {
return savepointPath;
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
Expand Down Expand Up @@ -316,7 +317,7 @@ public JobSubmissionResult run(PackagedProgram prog, int parallelism)
jobWithJars = prog.getPlanWithJars();
}

return run(jobWithJars, parallelism, prog.getSavepointPath());
return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
Expand All @@ -330,7 +331,7 @@ else if (prog.isUsingInteractiveMode()) {

ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointPath());
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);

try {
Expand Down Expand Up @@ -358,7 +359,7 @@ else if (prog.isUsingInteractiveMode()) {
}

public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
return run(program, parallelism, null);
return run(program, parallelism, SavepointRestoreSettings.none());
}

/**
Expand All @@ -375,27 +376,27 @@ public JobSubmissionResult run(JobWithJars program, int parallelism) throws Prog
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, String savepointPath)
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}

OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointPath);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}

public JobSubmissionResult run(
FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
return run(compiledPlan, libraries, classpaths, classLoader, null);
return run(compiledPlan, libraries, classpaths, classLoader, SavepointRestoreSettings.none());
}

public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, String savepointPath)
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException
{
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}

Expand Down Expand Up @@ -647,15 +648,15 @@ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars pr
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}

public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
}

private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) {
private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
JobGraph job;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
job.setSavepointPath(savepointPath);
job.setSavepointRestoreSettings(savepointSettings);
} else {
JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig);
job = gen.compileJobGraph((OptimizedPlan) optPlan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.net.URL;
import java.util.List;
Expand All @@ -42,23 +43,23 @@ public class ContextEnvironment extends ExecutionEnvironment {

protected final ClassLoader userCodeClassLoader;

protected final String savepointPath;
protected final SavepointRestoreSettings savepointSettings;

public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths,
ClassLoader userCodeClassLoader, String savepointPath) {
ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
this.classpathsToAttach = classpaths;
this.userCodeClassLoader = userCodeClassLoader;
this.savepointPath = savepointPath;
this.savepointSettings = savepointSettings;
}

@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
this.userCodeClassLoader);
this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointPath).getJobExecutionResult();
this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointSettings).getJobExecutionResult();
return this.lastJobExecutionResult;
}

Expand Down Expand Up @@ -99,8 +100,8 @@ public ClassLoader getUserCodeClassLoader() {
return userCodeClassLoader;
}

public String getSavepointPath() {
return savepointPath;
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.net.URL;
import java.util.List;
Expand All @@ -46,19 +47,19 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {

private ExecutionEnvironment lastEnvCreated;

private String savepointPath;
private SavepointRestoreSettings savepointSettings;

public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
boolean isDetached, String savepointPath)
boolean isDetached, SavepointRestoreSettings savepointSettings)
{
this.client = client;
this.jarFilesToAttach = jarFilesToAttach;
this.classpathsToAttach = classpathsToAttach;
this.userCodeClassLoader = userCodeClassLoader;
this.defaultParallelism = defaultParallelism;
this.isDetached = isDetached;
this.savepointPath = savepointPath;
this.savepointSettings = savepointSettings;
}

@Override
Expand All @@ -68,8 +69,8 @@ public ExecutionEnvironment createExecutionEnvironment() {
}

lastEnvCreated = isDetached ?
new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath):
new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings):
new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
if (defaultParallelism > 0) {
lastEnvCreated.setParallelism(defaultParallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,8 +47,8 @@ public DetachedEnvironment(
List<URL> jarFiles,
List<URL> classpaths,
ClassLoader userCodeClassLoader,
String savepointPath) {
super(remoteConnection, jarFiles, classpaths, userCodeClassLoader, savepointPath);
SavepointRestoreSettings savepointSettings) {
super(remoteConnection, jarFiles, classpaths, userCodeClassLoader, savepointSettings);
}

@Override
Expand All @@ -72,7 +73,7 @@ public void setDetachedPlan(FlinkPlan plan) {
* Finishes this Context Environment's execution by explicitly running the plan constructed.
*/
JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
}

public static final class DetachedJobExecutionResult extends JobExecutionResult {
Expand Down
Loading

0 comments on commit 74c0770

Please sign in to comment.