From 3607575a234f65b5ea50d987e9bd9f01c7f4a605 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 1 Dec 2015 18:49:14 +0100 Subject: [PATCH] [FLINK-2976] [clients] Add savepoint commands to CliFrontend [comments] Use handleError(Throwable) --- .../org/apache/flink/client/CliFrontend.java | 140 +++++++- .../flink/client/cli/CliFrontendParser.java | 46 ++- .../flink/client/cli/ProgramOptions.java | 23 +- .../flink/client/cli/SavepointOptions.java | 51 +++ .../apache/flink/client/program/Client.java | 95 +++-- .../client/program/ContextEnvironment.java | 21 +- .../program/ContextEnvironmentFactory.java | 8 +- .../client/program/DetachedEnvironment.java | 11 +- .../flink/client/program/PackagedProgram.java | 14 +- .../flink/client/CliFrontendRunTest.java | 12 + .../client/CliFrontendSavepointTest.java | 328 ++++++++++++++++++ .../environment/StreamContextEnvironment.java | 2 +- 12 files changed, 689 insertions(+), 62 deletions(-) create mode 100644 flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java create mode 100644 flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index b201cf44f72bc..0363d6a2dd815 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -101,6 +101,7 @@ public class CliFrontend { public static final String ACTION_INFO = "info"; private static final String ACTION_LIST = "list"; private static final String ACTION_CANCEL = "cancel"; + private static final String ACTION_SAVEPOINT = "savepoint"; // config dir parameters private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR"; @@ -301,6 +302,8 @@ protected int run(String[] args) { client.setPrintStatusDuringExecution(options.getStdoutLogging()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); + LOG.debug("Savepoint path is set to {}", options.getSavepointPath()); + try { if (client.getMaxSlots() != -1 && userParallelism == -1) { logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " + @@ -630,6 +633,135 @@ protected int cancel(String[] args) { } } + /** + * Executes the SAVEPOINT action. + * + * @param args Command line arguments for the cancel action. + */ + protected int savepoint(String[] args) { + LOG.info("Running 'savepoint' command."); + + SavepointOptions options; + try { + options = CliFrontendParser.parseSavepointCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Throwable t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForCancel(); + return 0; + } + + if (options.isDispose()) { + // Discard + return disposeSavepoint(options, options.getDisposeSavepointPath()); + } + else { + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length > 0) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + return handleError(new IllegalArgumentException( + "Error: The value for the Job ID is not a valid ID.")); + } + } + else { + return handleError(new IllegalArgumentException( + "Error: The value for the Job ID is not a valid ID. " + + "Specify a Job ID to trigger a savepoint.")); + } + + return triggerSavepoint(options, jobId); + } + } + + /** + * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint} + * message to the job manager. + */ + private int triggerSavepoint(SavepointOptions options, JobID jobId) { + try { + ActorGateway jobManager = getJobManagerGateway(options); + Future response = jobManager.ask(new TriggerSavepoint(jobId), askTimeout); + + Object result; + try { + logAndSysout("Triggering savepoint for job " + jobId + ". Waiting for response..."); + result = Await.result(response, askTimeout); + } + catch (Exception e) { + throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e); + } + + if (result instanceof TriggerSavepointSuccess) { + TriggerSavepointSuccess success = (TriggerSavepointSuccess) result; + logAndSysout("Savepoint completed. Path: " + success.savepointPath()); + logAndSysout("You can resume your program from this savepoint with the run command."); + + return 0; + } + else if (result instanceof TriggerSavepointFailure) { + TriggerSavepointFailure failure = (TriggerSavepointFailure) result; + throw failure.cause(); + } + else { + throw new IllegalStateException("Unknown JobManager response of type " + + result.getClass()); + } + } + catch (Throwable t) { + return handleError(t); + } + } + + /** + * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} + * message to the job manager. + */ + private int disposeSavepoint(SavepointOptions options, String savepointPath) { + try { + ActorGateway jobManager = getJobManagerGateway(options); + Future response = jobManager.ask(new DisposeSavepoint(savepointPath), askTimeout); + + Object result; + try { + logAndSysout("Disposing savepoint '" + savepointPath + "'. Waiting for response..."); + result = Await.result(response, askTimeout); + } + catch (Exception e) { + throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + } + + if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) { + logAndSysout("Savepoint '" + savepointPath + "' disposed."); + return 0; + } + else if (result instanceof DisposeSavepointFailure) { + DisposeSavepointFailure failure = (DisposeSavepointFailure) result; + throw failure.cause(); + } + else { + throw new IllegalStateException("Unknown JobManager response of type " + + result.getClass()); + } + } + catch (Throwable t) { + return handleError(t); + } + } + // -------------------------------------------------------------------------------------------- // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- @@ -719,9 +851,13 @@ else if (!jarFile.isFile()) { // Get assembler class String entryPointClass = options.getEntryPointClassName(); - return entryPointClass == null ? + PackagedProgram program = entryPointClass == null ? new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); + + program.setSavepointPath(options.getSavepointPath()); + + return program; } /** @@ -993,6 +1129,8 @@ public Integer run() throws Exception { return info(params); case ACTION_CANCEL: return cancel(params); + case ACTION_SAVEPOINT: + return savepoint(params) case "-h": case "--help": CliFrontendParser.printHelp(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 1226d481c360a..4e081fdb9b74b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -71,6 +71,12 @@ public class CliFrontendParser { + "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " + "different JobManager than the one specified in the configuration."); + static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true, + "Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537)."); + + static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true, + "Disposes an existing savepoint."); + // list specific options static final Option RUNNING_OPTION = new Option("r", "running", false, "Show only running programs and their JobIDs"); @@ -105,13 +111,19 @@ public class CliFrontendParser { RUNNING_OPTION.setRequired(false); SCHEDULED_OPTION.setRequired(false); + + SAVEPOINT_PATH_OPTION.setRequired(false); + SAVEPOINT_PATH_OPTION.setArgName("savepointPath"); + + SAVEPOINT_DISPOSE_OPTION.setRequired(false); + SAVEPOINT_DISPOSE_OPTION.setArgName("savepointPath"); } private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options())); private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options())); private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options())); private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options())); - + private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options())); private static Options buildGeneralOptions(Options options) { options.addOption(HELP_OPTION); @@ -128,6 +140,7 @@ public static Options getProgramSpecificOptions(Options options) { options.addOption(ARGS_OPTION); options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); + options.addOption(SAVEPOINT_PATH_OPTION); // also add the YARN options so that the parser can parse them yarnSessionCLi.getYARNSessionCLIOptions(options); @@ -140,6 +153,7 @@ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options.addOption(PARALLELISM_OPTION); options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); + options.addOption(SAVEPOINT_PATH_OPTION); return options; } @@ -183,6 +197,12 @@ private static Options getCancelOptions(Options options) { return options; } + private static Options getSavepointOptions(Options options) { + options = getJobManagerAddressOption(options); + options.addOption(SAVEPOINT_DISPOSE_OPTION); + return options; + } + // -------------------------------------------------------------------------------------------- // Help // -------------------------------------------------------------------------------------------- @@ -199,6 +219,7 @@ public static void printHelp() { printHelpForInfo(); printHelpForList(); printHelpForCancel(); + printHelpForSavepoint(); System.out.println(); } @@ -255,6 +276,18 @@ public static void printHelpForCancel() { System.out.println(); } + public static void printHelpForSavepoint() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones."); + System.out.println("\n Syntax: savepoint [OPTIONS] "); + formatter.setSyntaxPrefix(" \"savepoint\" action options:"); + formatter.printHelp(" ", getSavepointOptions(new Options())); + System.out.println(); + } + // -------------------------------------------------------------------------------------------- // Line Parsing // -------------------------------------------------------------------------------------------- @@ -292,6 +325,17 @@ public static CancelOptions parseCancelCommand(String[] args) throws CliArgsExce } } + public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException { + try { + PosixParser parser = new PosixParser(); + CommandLine line = parser.parse(SAVEPOINT_OPTIONS, args, false); + return new SavepointOptions(line); + } + catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException { try { PosixParser parser = new PosixParser(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java index 499d3cab2fd46..73d49b597b41c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -19,19 +19,20 @@ import org.apache.commons.cli.CommandLine; -import java.net.URL; import java.net.MalformedURLException; -import java.util.List; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION; -import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION; -import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION; -import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION; /** * Base class for command line options that refer to a JAR file program. @@ -52,6 +53,8 @@ public abstract class ProgramOptions extends CommandLineOptions { private final boolean detachedMode; + private final String savepointPath; + protected ProgramOptions(CommandLine line) throws CliArgsException { super(line); @@ -105,6 +108,12 @@ else if (args.length > 0) { stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt()); detachedMode = line.hasOption(DETACHED_OPTION.getOpt()); + + if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { + savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); + } else { + savepointPath = null; + } } public String getJarFilePath() { @@ -134,4 +143,8 @@ public boolean getStdoutLogging() { public boolean getDetachedMode() { return detachedMode; } + + public String getSavepointPath() { + return savepointPath; + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java new file mode 100644 index 0000000000000..10af76ee9f01b --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION; + +/** + * Command line options for the SAVEPOINT command + */ +public class SavepointOptions extends CommandLineOptions { + + private final String[] args; + private boolean dispose; + private String disposeSavepointPath; + + public SavepointOptions(CommandLine line) { + super(line); + this.args = line.getArgs(); + this.dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt()); + this.disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt()); + } + + public String[] getArgs() { + return args == null ? new String[0] : args; + } + + public boolean isDispose() { + return dispose; + } + + public String getDisposeSavepointPath() { + return disposeSavepointPath; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 8f92c51aa00a2..ff1a0fde14dd3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -62,18 +62,16 @@ import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorSystem; - /** * Encapsulates the functionality necessary to submit a program to a remote cluster. */ public class Client { - + private static final Logger LOG = LoggerFactory.getLogger(Client.class); - - + /** The optimizer used in the optimization of batch programs */ final Optimizer compiler; - + /** The actor system used to communicate with the JobManager */ private final ActorSystem actorSystem; @@ -96,9 +94,9 @@ public class Client { private boolean printStatusDuringExecution = true; /** - * For interactive invocations, the Job ID is only available after the ContextEnvironment has - * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment - * which lets us access the last JobID here. + * For interactive invocations, the Job ID is only available after the ContextEnvironment has + * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment + * which lets us access the last JobID here. */ private JobID lastJobID; @@ -112,7 +110,7 @@ public class Client { * if that is not possible. * * @param config The config used to obtain the job-manager's address, and used to configure the optimizer. - * + * * @throws java.io.IOException Thrown, if the client's actor system could not be started. * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. */ @@ -123,15 +121,14 @@ public Client(Configuration config) throws IOException { /** * Creates a new instance of the class that submits the jobs to a job-manager. * at the given address using the default port. - * + * * @param config The configuration for the client-side processes, like the optimizer. * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1. - * + * * @throws java.io.IOException Thrown, if the client's actor system could not be started. - * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. + * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. */ public Client(Configuration config, int maxSlots) throws IOException { - this.config = Preconditions.checkNotNull(config); this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); this.maxSlots = maxSlots; @@ -147,7 +144,7 @@ public Client(Configuration config, int maxSlots) throws IOException { timeout = AkkaUtils.getTimeout(config); lookupTimeout = AkkaUtils.getTimeout(config); } - + // ------------------------------------------------------------------------ // Startup & Shutdown // ------------------------------------------------------------------------ @@ -161,15 +158,15 @@ public void shutdown() { this.actorSystem.awaitTermination(); } } - + // ------------------------------------------------------------------------ // Configuration // ------------------------------------------------------------------------ - + /** * Configures whether the client should print progress updates during the execution to {@code System.out}. * All updates are logged via the SLF4J loggers regardless of this setting. - * + * * @param print True to print updates to standard out during execution, false to not print them. */ public void setPrintStatusDuringExecution(boolean print) { @@ -190,11 +187,11 @@ public boolean getPrintStatusDuringExecution() { public int getMaxSlots() { return this.maxSlots; } - + // ------------------------------------------------------------------------ // Access to the Program's Plan // ------------------------------------------------------------------------ - + public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { @@ -238,12 +235,14 @@ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int par public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { - return runBlocking(prog.getPlanWithJars(), parallelism); + return runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(), - prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true)); + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true, + prog.getSavepointPath())); + // invoke here try { prog.invokeInteractiveModeForExecution(); @@ -264,12 +263,13 @@ public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism) { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { - return runDetached(prog.getPlanWithJars(), parallelism); + return runDetached(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), - prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false); + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false, + prog.getSavepointPath()); ContextEnvironment.setAsContext(factory); // invoke here @@ -286,6 +286,10 @@ else if (prog.isUsingInteractiveMode()) { } } + public JobExecutionResult runBlocking(JobWithJars program, int parallelism) throws ProgramInvocationException { + return runBlocking(program, parallelism, null); + } + /** * Runs a program on the Flink cluster to which this client is connected. The call blocks until the * execution is complete, and returns afterwards. @@ -300,16 +304,19 @@ else if (prog.isUsingInteractiveMode()) { * i.e. the job-manager is unreachable, or due to the fact that the * parallel execution failed. */ - public JobExecutionResult runBlocking(JobWithJars program, int parallelism) - throws CompilerException, ProgramInvocationException - { + public JobExecutionResult runBlocking(JobWithJars program, int parallelism, String savepointPath) + throws CompilerException, ProgramInvocationException { ClassLoader classLoader = program.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); - return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader); + return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); + } + + public JobSubmissionResult runDetached(JobWithJars program, int parallelism) throws ProgramInvocationException { + return runDetached(program, parallelism, null); } /** @@ -325,30 +332,37 @@ public JobExecutionResult runBlocking(JobWithJars program, int parallelism) * or if the submission failed. That might be either due to an I/O problem, * i.e. the job-manager is unreachable. */ - public JobSubmissionResult runDetached(JobWithJars program, int parallelism) - throws CompilerException, ProgramInvocationException - { + public JobSubmissionResult runDetached(JobWithJars program, int parallelism, String savepointPath) + throws CompilerException, ProgramInvocationException { ClassLoader classLoader = program.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism); - return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader); + return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); + } + + public JobExecutionResult runBlocking( + FlinkPlan compiledPlan, List libraries, List classpaths, ClassLoader classLoader) throws ProgramInvocationException { + return runBlocking(compiledPlan, libraries, classpaths, classLoader, null); } - public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List libraries, List classpaths, - ClassLoader classLoader) throws ProgramInvocationException + ClassLoader classLoader, String savepointPath) throws ProgramInvocationException { - JobGraph job = getJobGraph(compiledPlan, libraries, classpaths); + JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath); return runBlocking(job, classLoader); } + public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List libraries, List classpaths, ClassLoader classLoader) throws ProgramInvocationException { + return runDetached(compiledPlan, libraries, classpaths, classLoader, null); + } + public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List libraries, List classpaths, - ClassLoader classLoader) throws ProgramInvocationException + ClassLoader classLoader, String savepointPath) throws ProgramInvocationException { - JobGraph job = getJobGraph(compiledPlan, libraries, classpaths); + JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath); return runDetached(job, classLoader); } @@ -519,13 +533,18 @@ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars pr } public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { - return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths()); + return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null); + } + + public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException { + return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath); } - private JobGraph getJobGraph(FlinkPlan optPlan, List jarFiles, List classpaths) { + private JobGraph getJobGraph(FlinkPlan optPlan, List jarFiles, List classpaths, String savepointPath) { JobGraph job; if (optPlan instanceof StreamingPlan) { job = ((StreamingPlan) optPlan).getJobGraph(); + job.setSavepointPath(savepointPath); } else { JobGraphGenerator gen = new JobGraphGenerator(this.config); job = gen.compileJobGraph((OptimizedPlan) optPlan); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 1e3d0d49c7644..987558c7b6c20 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -18,9 +18,6 @@ package org.apache.flink.client.program; -import java.net.URL; -import java.util.List; - import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; @@ -28,8 +25,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; +import java.net.URL; +import java.util.List; + /** - * Execution Environment for remote execution with the Client in blocking fashion. + * Execution Environment for remote execution with the Client. */ public class ContextEnvironment extends ExecutionEnvironment { @@ -40,13 +40,16 @@ public class ContextEnvironment extends ExecutionEnvironment { protected final List classpathsToAttach; protected final ClassLoader userCodeClassLoader; + + protected final String savepointPath; public ContextEnvironment(Client remoteConnection, List jarFiles, List classpaths, - ClassLoader userCodeClassLoader) { + ClassLoader userCodeClassLoader, String savepointPath) { this.client = remoteConnection; this.jarFilesToAttach = jarFiles; this.classpathsToAttach = classpaths; this.userCodeClassLoader = userCodeClassLoader; + this.savepointPath = savepointPath; } @Override @@ -54,7 +57,7 @@ public JobExecutionResult execute(String jobName) throws Exception { Plan p = createProgramPlan(jobName); JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach, this.userCodeClassLoader); - this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism()); + this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism(), savepointPath); return this.lastJobExecutionResult; } @@ -94,7 +97,11 @@ public List getClasspaths(){ public ClassLoader getUserCodeClassLoader() { return userCodeClassLoader; } - + + public String getSavepointPath() { + return savepointPath; + } + // -------------------------------------------------------------------------------------------- static void setAsContext(ContextEnvironmentFactory factory) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java index 55f705b5ea2a1..e820bad205cff 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java @@ -46,10 +46,11 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { private ExecutionEnvironment lastEnvCreated; + private String savepointPath; public ContextEnvironmentFactory(Client client, List jarFilesToAttach, List classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, - boolean wait) + boolean wait, String savepointPath) { this.client = client; this.jarFilesToAttach = jarFilesToAttach; @@ -57,6 +58,7 @@ public ContextEnvironmentFactory(Client client, List jarFilesToAttach, this.userCodeClassLoader = userCodeClassLoader; this.defaultParallelism = defaultParallelism; this.wait = wait; + this.savepointPath = savepointPath; } @Override @@ -66,8 +68,8 @@ public ExecutionEnvironment createExecutionEnvironment() { } lastEnvCreated = wait ? - new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader) : - new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader); + new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath) : + new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath); if (defaultParallelism > 0) { lastEnvCreated.setParallelism(defaultParallelism); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java index 0b1ae1d012b07..037c36b06bf30 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java @@ -41,8 +41,13 @@ public class DetachedEnvironment extends ContextEnvironment { private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class); - public DetachedEnvironment(Client remoteConnection, List jarFiles, List classpaths, ClassLoader userCodeClassLoader) { - super(remoteConnection, jarFiles, classpaths, userCodeClassLoader); + public DetachedEnvironment( + Client remoteConnection, + List jarFiles, + List classpaths, + ClassLoader userCodeClassLoader, + String savepointPath) { + super(remoteConnection, jarFiles, classpaths, userCodeClassLoader, savepointPath); } @Override @@ -67,7 +72,7 @@ public void setDetachedPlan(FlinkPlan plan) { * Finishes this Context Environment's execution by explicitly running the plan constructed. */ JobSubmissionResult finalizeExecute() throws ProgramInvocationException { - return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader); + return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath); } public static final class DetachedJobExecutionResult extends JobExecutionResult { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 8375ec25a88fe..f78502ab483e0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -86,6 +86,8 @@ public class PackagedProgram { private Plan plan; + private String savepointPath; + /** * Creates an instance that wraps the plan defined in the jar file using the given * argument. @@ -254,9 +256,15 @@ public PackagedProgram(File jarFile, List classpaths, String entryPointClas Program.class.getName() + " interface."); } } - - - + + public void setSavepointPath(String savepointPath) { + this.savepointPath = savepointPath; + } + + public String getSavepointPath() { + return savepointPath; + } + public String[] getArguments() { return this.args; } diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java index 64c2709ac1737..56173bd793114 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java @@ -22,7 +22,9 @@ import static org.apache.flink.client.CliFrontendTestUtils.*; import static org.junit.Assert.*; +import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.PackagedProgram; import org.junit.BeforeClass; @@ -90,6 +92,16 @@ public void testRun() { CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); assertNotEquals(0, testFrontend.run(parameters)); } + + // test configure savepoint path + { + String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()}; + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, false, false); + assertEquals(0, testFrontend.run(parameters)); + + RunOptions options = CliFrontendParser.parseRunCommand(parameters); + assertEquals("expectedSavepointPath", options.getSavepointPath()); + } } catch (Exception e) { e.printStackTrace(); diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java new file mode 100644 index 0000000000000..13c895c4c6753 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Promise; +import scala.concurrent.duration.FiniteDuration; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CliFrontendSavepointTest { + + private static PrintStream stdOut; + private static PrintStream stdErr; + private static ByteArrayOutputStream buffer; + + // ------------------------------------------------------------------------ + // Trigger savepoint + // ------------------------------------------------------------------------ + + @Test + public void testTriggerSavepointSuccess() throws Exception { + replaceStdOutAndStdErr(); + + try { + JobID jobId = new JobID(); + ActorGateway jobManager = mock(ActorGateway.class); + + Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)), + Mockito.any(FiniteDuration.class))) + .thenReturn(triggerResponse.future()); + + String savepointPath = "expectedSavepointPath"; + + triggerResponse.success(new JobManagerMessages + .TriggerSavepointSuccess(jobId, savepointPath)); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { jobId.toString() }; + int returnCode = frontend.savepoint(parameters); + + assertEquals(0, returnCode); + verify(jobManager, times(1)).ask( + Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)), + Mockito.any(FiniteDuration.class)); + + assertTrue(buffer.toString().contains("expectedSavepointPath")); + } + finally { + restoreStdOutAndStdErr(); + } + } + + @Test + public void testTriggerSavepointFailure() throws Exception { + replaceStdOutAndStdErr(); + + try { + JobID jobId = new JobID(); + ActorGateway jobManager = mock(ActorGateway.class); + + Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)), + Mockito.any(FiniteDuration.class))) + .thenReturn(triggerResponse.future()); + + Exception testException = new Exception("expectedTestException"); + + triggerResponse.success(new JobManagerMessages + .TriggerSavepointFailure(jobId, testException)); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { jobId.toString() }; + int returnCode = frontend.savepoint(parameters); + + assertTrue(returnCode != 0); + verify(jobManager, times(1)).ask( + Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)), + Mockito.any(FiniteDuration.class)); + + assertTrue(buffer.toString().contains("expectedTestException")); + } + finally { + restoreStdOutAndStdErr(); + } + } + + @Test + public void testTriggerSavepointFailureIllegalJobID() throws Exception { + replaceStdOutAndStdErr(); + + try { + CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + + String[] parameters = { "invalid job id" }; + int returnCode = frontend.savepoint(parameters); + + assertTrue(returnCode != 0); + assertTrue(buffer.toString().contains("not a valid ID")); + } + finally { + restoreStdOutAndStdErr(); + } + } + + @Test + public void testTriggerSavepointFailureUnknownResponse() throws Exception { + replaceStdOutAndStdErr(); + + try { + JobID jobId = new JobID(); + ActorGateway jobManager = mock(ActorGateway.class); + + Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)), + Mockito.any(FiniteDuration.class))) + .thenReturn(triggerResponse.future()); + + triggerResponse.success("UNKNOWN RESPONSE"); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { jobId.toString() }; + int returnCode = frontend.savepoint(parameters); + + assertTrue(returnCode != 0); + verify(jobManager, times(1)).ask( + Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)), + Mockito.any(FiniteDuration.class)); + + String errMsg = buffer.toString(); + assertTrue(errMsg.contains("IllegalStateException")); + assertTrue(errMsg.contains("Unknown JobManager response")); + } + finally { + restoreStdOutAndStdErr(); + } + } + + // ------------------------------------------------------------------------ + // Dispose savepoint + // ------------------------------------------------------------------------ + + @Test + public void testDisposeSavepointSuccess() throws Exception { + replaceStdOutAndStdErr(); + + try { + String savepointPath = "expectedSavepointPath"; + ActorGateway jobManager = mock(ActorGateway.class); + + Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)), + Mockito.any(FiniteDuration.class))).thenReturn(triggerResponse.future()); + + triggerResponse.success(JobManagerMessages.getDisposeSavepointSuccess()); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { "-d", savepointPath }; + int returnCode = frontend.savepoint(parameters); + + assertEquals(0, returnCode); + verify(jobManager, times(1)).ask( + Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)), + Mockito.any(FiniteDuration.class)); + + String outMsg = buffer.toString(); + assertTrue(outMsg.contains(savepointPath)); + assertTrue(outMsg.contains("disposed")); + } + finally { + restoreStdOutAndStdErr(); + } + } + + @Test + public void testDisposeSavepointFailure() throws Exception { + replaceStdOutAndStdErr(); + + try { + String savepointPath = "expectedSavepointPath"; + ActorGateway jobManager = mock(ActorGateway.class); + + Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)), + Mockito.any(FiniteDuration.class))) + .thenReturn(triggerResponse.future()); + + Exception testException = new Exception("expectedTestException"); + + triggerResponse.success(new JobManagerMessages + .DisposeSavepointFailure(testException)); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { "-d", savepointPath }; + int returnCode = frontend.savepoint(parameters); + + assertTrue(returnCode != 0); + verify(jobManager, times(1)).ask( + Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)), + Mockito.any(FiniteDuration.class)); + + assertTrue(buffer.toString().contains("expectedTestException")); + } + finally { + restoreStdOutAndStdErr(); + } + } + + @Test + public void testDisposeSavepointFailureUnknownResponse() throws Exception { + replaceStdOutAndStdErr(); + + try { + String savepointPath = "expectedSavepointPath"; + ActorGateway jobManager = mock(ActorGateway.class); + + Promise triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)), + Mockito.any(FiniteDuration.class))) + .thenReturn(triggerResponse.future()); + + triggerResponse.success("UNKNOWN RESPONSE"); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { "-d", savepointPath }; + int returnCode = frontend.savepoint(parameters); + + assertTrue(returnCode != 0); + verify(jobManager, times(1)).ask( + Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)), + Mockito.any(FiniteDuration.class)); + + String errMsg = buffer.toString(); + assertTrue(errMsg.contains("IllegalStateException")); + assertTrue(errMsg.contains("Unknown JobManager response")); + } + finally { + restoreStdOutAndStdErr(); + } + + replaceStdOutAndStdErr(); + } + + // ------------------------------------------------------------------------ + + private static class MockCliFrontend extends CliFrontend { + + private final ActorGateway mockJobManager; + + public MockCliFrontend(String configDir, ActorGateway mockJobManager) throws Exception { + super(configDir); + this.mockJobManager = mockJobManager; + } + + @Override + protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception { + return mockJobManager; + } + } + + private static void replaceStdOutAndStdErr() { + stdOut = System.out; + stdErr = System.err; + buffer = new ByteArrayOutputStream(); + PrintStream capture = new PrintStream(buffer); + System.setOut(capture); + System.setErr(capture); + } + + private static void restoreStdOutAndStdErr() { + System.setOut(stdOut); + System.setErr(stdErr); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 7a68fc52a556e..23fcfed58f75a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -60,7 +60,7 @@ public JobExecutionResult execute(String jobName) throws Exception { ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph); return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { - return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader()); + return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath()); } } }