Skip to content

Commit

Permalink
[FLINK-3000] Adds shutdown hook to clean up lingering yarn sessions
Browse files Browse the repository at this point in the history
This closes apache#1354
  • Loading branch information
sachingoel0101 authored and rmetzger committed Nov 25, 2015
1 parent 90c76ad commit aad99f2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -635,14 +635,6 @@ protected int cancel(String[] args) {
// --------------------------------------------------------------------------------------------

protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
// log message for detached yarn job
if (yarnCluster != null) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
}

LOG.info("Starting execution of program");

JobSubmissionResult result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
private Configuration conf;
private YarnClient yarnClient;
private YarnClientApplication yarnApplication;

private Thread deploymentFailureHook = new DeploymentFailureHook();

/**
* Files (usually in a distributed file system) used for the YARN session of Flink.
Expand Down Expand Up @@ -629,13 +629,20 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {
appContext.setQueue(yarnQueue);
}

// add a hook to clean up in case deployment fails
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);

LOG.info("Waiting for the cluster to be allocated");
int waittime = 0;
loop: while( true ) {
ApplicationReport report = yarnClient.getApplicationReport(appId);
ApplicationReport report;
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage());
}
YarnApplicationState appState = report.getYarnApplicationState();
switch(appState) {
case FAILED:
Expand All @@ -660,6 +667,19 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {
waittime += 1000;
Thread.sleep(1000);
}
// print the application id for user to cancel themselves.
if (isDetached()) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop " +
"it:\nyarn application -kill " + appId + "\nPlease also note that the " +
"temporary files of the YARN session in the home directoy will not be removed.");
}
// since deployment was successful, remove the hook
try {
Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
} catch (IllegalStateException e) {
// we're already in the shut down hook.
}
// the Flink cluster is deployed in YARN. Represent cluster
return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached);
}
Expand Down Expand Up @@ -869,5 +889,20 @@ public YarnDeploymentException(String message, Throwable cause) {
}
}

private class DeploymentFailureHook extends Thread {
@Override
public void run() {
LOG.info("Cancelling deployment from Deployment Failure Hook");
failSessionDuringDeployment();
LOG.info("Deleting files in " + sessionFilesDir);
try {
FileSystem fs = FileSystem.get(conf);
fs.delete(sessionFilesDir, true);
fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
}
}
}
}

0 comments on commit aad99f2

Please sign in to comment.