Skip to content

Commit

Permalink
[FLINK-17765] Remove JobExecutionException from JobManagerRunnerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Jun 9, 2020
1 parent 8fb330a commit 0864b15
Showing 1 changed file with 21 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
Expand Down Expand Up @@ -114,40 +113,31 @@ public JobManagerRunnerImpl(
this.terminationFuture = new CompletableFuture<>();
this.leadershipOperation = CompletableFuture.completedFuture(null);

// make sure we cleanly shut down out JobManager services if initialization fails
try {
this.jobGraph = checkNotNull(jobGraph);
this.classLoaderLease = checkNotNull(classLoaderLease);
this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

// libraries and class loader first
final ClassLoader userCodeLoader;
try {
userCodeLoader = classLoaderLease.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
} catch (IOException e) {
throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
}

// high availability services next
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
this.jobGraph = checkNotNull(jobGraph);
this.classLoaderLease = checkNotNull(classLoaderLease);
this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

this.leaderGatewayFuture = new CompletableFuture<>();
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

// now start the JobManager
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
// libraries and class loader first
final ClassLoader userCodeLoader;
try {
userCodeLoader = classLoaderLease.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
} catch (IOException e) {
throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
}
catch (Throwable t) {
terminationFuture.completeExceptionally(t);
resultFuture.completeExceptionally(t);

throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t);
}
// high availability services next
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

this.leaderGatewayFuture = new CompletableFuture<>();

// now start the JobManager
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
}

//----------------------------------------------------------------------------------------------
Expand Down

0 comments on commit 0864b15

Please sign in to comment.