Skip to content

Commit

Permalink
[FLINK-24113] Add option to disable automatic shutdown in application…
Browse files Browse the repository at this point in the history
… mode
  • Loading branch information
Nicolaus Weidner authored and zentol committed Nov 22, 2021
1 parent 10c7a78 commit 6c46c9b
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>List&lt;String&gt;</td>
<td>Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.</td>
</tr>
<tr>
<td><h5>execution.shutdown-on-application-finish</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether a Flink Application cluster should shut down automatically after its application finishes (either successfully or as result of a failure). Has no effect for other deployment modes.</td>
</tr>
<tr>
<td><h5>execution.shutdown-on-attached-exit</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
Expand Down Expand Up @@ -112,7 +113,7 @@ public ApplicationDispatcherBootstrap(
this.applicationCompletionFuture =
fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);

this.clusterShutdownFuture = runApplicationAndShutdownClusterAsync(dispatcherGateway);
this.clusterShutdownFuture = shutDownClusterAsync(dispatcherGateway);
}

@Override
Expand Down Expand Up @@ -142,18 +143,22 @@ CompletableFuture<Acknowledge> getClusterShutdownFuture() {
}

/**
* Runs the user program entrypoint and shuts down the given dispatcherGateway when the
* application completes (either successfully or in case of failure).
* Shuts down the given dispatcherGateway when the application completes (either successfully or
* in case of failure).
*/
private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
private CompletableFuture<Acknowledge> shutDownClusterAsync(
final DispatcherGateway dispatcherGateway) {
boolean shouldShutDownOnFinish =
configuration.getBoolean(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH);
return applicationCompletionFuture
.handle(
(ignored, t) -> {
if (t == null) {
LOG.info("Application completed SUCCESSFULLY");
return dispatcherGateway.shutDownCluster(
ApplicationStatus.SUCCEEDED);
return shouldShutDownOnFinish
? dispatcherGateway.shutDownCluster(
ApplicationStatus.SUCCEEDED)
: CompletableFuture.completedFuture(Acknowledge.get());
}

final Optional<UnsuccessfulExecutionException> maybeException =
Expand All @@ -165,11 +170,16 @@ private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
if (applicationStatus == ApplicationStatus.CANCELED
|| applicationStatus == ApplicationStatus.FAILED) {
LOG.info("Application {}: ", applicationStatus, t);
return dispatcherGateway.shutDownCluster(applicationStatus);
return shouldShutDownOnFinish
? dispatcherGateway.shutDownCluster(applicationStatus)
: CompletableFuture.completedFuture(Acknowledge.get());
}
}

LOG.warn("Application failed unexpectedly: ", t);
// errorHandler triggers a cluster shutdown. We never prevent shutdown
// in this case as the job likely failed on startup and never reached a
// running state.
this.errorHandler.onFatalError(
new FlinkException("Application failed unexpectedly.", t));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,51 @@ public void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
assertFalse(maybeDuplicate.get().isGloballyTerminated());
}

@Test
public void testShutdownDisabledWithSuccessfulApplication() throws Exception {
testShutdownDisabled(JobStatus.FINISHED, ApplicationStatus.SUCCEEDED);
}

@Test
public void testShutdownDisabledWithCanceledApplication() throws Exception {
testShutdownDisabled(JobStatus.CANCELED, ApplicationStatus.CANCELED);
}

@Test
public void testShutdownDisabledWithFailedApplication() throws Exception {
testShutdownDisabled(JobStatus.FAILED, ApplicationStatus.FAILED);
}

private void testShutdownDisabled(JobStatus jobStatus, ApplicationStatus applicationStatus)
throws Exception {
final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, false);

final TestingDispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder()
.setSubmitFunction(
jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(
jobId -> CompletableFuture.completedFuture(jobStatus))
.setRequestJobResultFunction(
jobId ->
CompletableFuture.completedFuture(
createJobResult(jobId, applicationStatus)))
.setClusterShutdownFunction(
(status) -> {
fail("Cluster shutdown should not be called");
return CompletableFuture.completedFuture(Acknowledge.get());
})
.build();

ApplicationDispatcherBootstrap bootstrap =
createApplicationDispatcherBootstrap(
configurationUnderTest, dispatcherGateway, scheduledExecutor);

// Wait until bootstrap is finished to make sure cluster shutdown isn't called
bootstrap.getClusterShutdownFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

private CompletableFuture<Void> runApplication(
TestingDispatcherGateway.Builder dispatcherBuilder, int noOfJobs)
throws FlinkException {
Expand Down Expand Up @@ -838,11 +883,31 @@ private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(
final ScheduledExecutor scheduledExecutor,
final FatalErrorHandler errorHandler)
throws FlinkException {
return createApplicationDispatcherBootstrap(
noOfJobs, getConfiguration(), dispatcherGateway, scheduledExecutor, errorHandler);
}

private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor)
throws FlinkException {
return createApplicationDispatcherBootstrap(
1, configuration, dispatcherGateway, scheduledExecutor, exception -> {});
}

private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(
final int noOfJobs,
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final FatalErrorHandler errorHandler)
throws FlinkException {
final PackagedProgram program = getProgram(noOfJobs);
return new ApplicationDispatcherBootstrap(
program,
Collections.emptyList(),
getConfiguration(),
configuration,
dispatcherGateway,
scheduledExecutor,
errorHandler);
Expand All @@ -865,42 +930,37 @@ private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
}

private static JobResult createUnknownJobResult(final JobID jobId) {
return new JobResult.Builder()
.jobId(jobId)
.netRuntime(2L)
.applicationStatus(ApplicationStatus.UNKNOWN)
.serializedThrowable(
new SerializedThrowable(
new JobExecutionException(jobId, "unknown bla bla bla")))
.build();
return createJobResult(jobId, ApplicationStatus.UNKNOWN);
}

private static JobResult createFailedJobResult(final JobID jobId) {
return new JobResult.Builder()
.jobId(jobId)
.netRuntime(2L)
.applicationStatus(ApplicationStatus.FAILED)
.serializedThrowable(
new SerializedThrowable(new JobExecutionException(jobId, "bla bla bla")))
.build();
return createJobResult(jobId, ApplicationStatus.FAILED);
}

private static JobResult createSuccessfulJobResult(final JobID jobId) {
return new JobResult.Builder()
.jobId(jobId)
.netRuntime(2L)
.applicationStatus(ApplicationStatus.SUCCEEDED)
.build();
return createJobResult(jobId, ApplicationStatus.SUCCEEDED);
}

private static JobResult createCancelledJobResult(final JobID jobId) {
return new JobResult.Builder()
.jobId(jobId)
.netRuntime(2L)
.serializedThrowable(
new SerializedThrowable(new JobCancellationException(jobId, "Hello", null)))
.applicationStatus(ApplicationStatus.CANCELED)
.build();
return createJobResult(jobId, ApplicationStatus.CANCELED);
}

private static JobResult createJobResult(
final JobID jobID, final ApplicationStatus applicationStatus) {
JobResult.Builder builder =
new JobResult.Builder()
.jobId(jobID)
.netRuntime(2L)
.applicationStatus(applicationStatus);
if (applicationStatus == ApplicationStatus.CANCELED) {
builder.serializedThrowable(
new SerializedThrowable(new JobCancellationException(jobID, "Hello", null)));
} else if (applicationStatus == ApplicationStatus.FAILED
|| applicationStatus == ApplicationStatus.UNKNOWN) {
builder.serializedThrowable(
new SerializedThrowable(new JobExecutionException(jobID, "bla bla bla")));
}
return builder.build();
}

private static <T, E extends Throwable> E assertException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ public class DeploymentOptions {
.withDescription(
"Custom JobListeners to be registered with the execution environment."
+ " The registered listeners cannot have constructors with arguments.");

public static final ConfigOption<Boolean> SHUTDOWN_ON_APPLICATION_FINISH =
ConfigOptions.key("execution.shutdown-on-application-finish")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether a Flink Application cluster should shut down automatically after its application finishes"
+ " (either successfully or as result of a failure). Has no effect for other deployment modes.");
}

0 comments on commit 6c46c9b

Please sign in to comment.