diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 6a79677fc8a6d..5a57bf4a077a0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -635,14 +635,6 @@ protected int cancel(String[] args) { // -------------------------------------------------------------------------------------------- protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) { - // log message for detached yarn job - if (yarnCluster != null) { - logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " + - "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + yarnCluster.getApplicationId() + "\n" + - "Please also note that the temporary files of the YARN session in the home directoy will not be removed."); - } - LOG.info("Starting execution of program"); JobSubmissionResult result; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java index 8c4a53e9c1d71..74ef5c39b17fb 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java @@ -110,7 +110,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { private Configuration conf; private YarnClient yarnClient; private YarnClientApplication yarnApplication; - + private Thread deploymentFailureHook = new DeploymentFailureHook(); /** * Files (usually in a distributed file system) used for the YARN session of Flink. @@ -629,13 +629,20 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception { appContext.setQueue(yarnQueue); } + // add a hook to clean up in case deployment fails + Runtime.getRuntime().addShutdownHook(deploymentFailureHook); LOG.info("Submitting application master " + appId); yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated"); int waittime = 0; loop: while( true ) { - ApplicationReport report = yarnClient.getApplicationReport(appId); + ApplicationReport report; + try { + report = yarnClient.getApplicationReport(appId); + } catch (IOException e) { + throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage()); + } YarnApplicationState appState = report.getYarnApplicationState(); switch(appState) { case FAILED: @@ -660,6 +667,19 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception { waittime += 1000; Thread.sleep(1000); } + // print the application id for user to cancel themselves. + if (isDetached()) { + LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + + "Flink on YARN, use the following command or a YARN web interface to stop " + + "it:\nyarn application -kill " + appId + "\nPlease also note that the " + + "temporary files of the YARN session in the home directoy will not be removed."); + } + // since deployment was successful, remove the hook + try { + Runtime.getRuntime().removeShutdownHook(deploymentFailureHook); + } catch (IllegalStateException e) { + // we're already in the shut down hook. + } // the Flink cluster is deployed in YARN. Represent cluster return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached); } @@ -869,5 +889,20 @@ public YarnDeploymentException(String message, Throwable cause) { } } + private class DeploymentFailureHook extends Thread { + @Override + public void run() { + LOG.info("Cancelling deployment from Deployment Failure Hook"); + failSessionDuringDeployment(); + LOG.info("Deleting files in " + sessionFilesDir); + try { + FileSystem fs = FileSystem.get(conf); + fs.delete(sessionFilesDir, true); + fs.close(); + } catch (IOException e) { + LOG.error("Failed to delete Flink Jar and conf files in HDFS", e); + } + } + } }