Skip to content

Commit

Permalink
[hotfix][runtime] Adds missing configuration parameter to cleanup err…
Browse files Browse the repository at this point in the history
…or log message
  • Loading branch information
XComp committed Mar 24, 2022
1 parent 8e7266b commit a3aec76
Showing 1 changed file with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
Expand Down Expand Up @@ -620,26 +622,28 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy
cleanupJobState ->
removeJob(jobId, cleanupJobState)
.exceptionally(
throwable -> {
log.warn(
"The cleanup of job {} failed. The job's artifacts in '{}' and its JobResultStore entry in '{}' needs to be cleaned manually.",
jobId,
configuration.get(
HighAvailabilityOptions
.HA_STORAGE_PATH),
configuration.get(
JobResultStoreOptions
.STORAGE_PATH),
throwable);
return null;
}));
throwable ->
logCleanupErrorWarning(jobId, throwable)));

FutureUtils.handleUncaughtException(
jobTerminationFuture,
(thread, throwable) -> fatalErrorHandler.onFatalError(throwable));
registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);
}

@Nullable
private Void logCleanupErrorWarning(JobID jobId, Throwable cleanupError) {
log.warn(
"The cleanup of job {} failed. The job's artifacts in the different directories ('{}', '{}', '{}') and its JobResultStore entry in '{}' (in HA mode) should be checked for manual cleanup.",
jobId,
configuration.get(HighAvailabilityOptions.HA_STORAGE_PATH),
configuration.get(BlobServerOptions.STORAGE_DIRECTORY),
configuration.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY),
configuration.get(JobResultStoreOptions.STORAGE_PATH),
cleanupError);
return null;
}

private CleanupJobState handleJobManagerRunnerResult(
JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) {
if (jobManagerRunnerResult.isInitializationFailure()
Expand Down

0 comments on commit a3aec76

Please sign in to comment.