Skip to content

Commit

Permalink
[FLINK-13573][coordination] Merge SubmittedJobGraph into JobGraph
Browse files Browse the repository at this point in the history
This closes apache#9514.
  • Loading branch information
tisonkun authored and tillrohrmann committed Aug 23, 2019
1 parent 48e1385 commit 2b9410e
Show file tree
Hide file tree
Showing 28 changed files with 273 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
Expand Down Expand Up @@ -97,13 +96,13 @@
* about the state of the Flink session cluster.
*/
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
DispatcherGateway, LeaderContender, JobGraphStore.JobGraphListener {

public static final String DISPATCHER_NAME = "dispatcher";

private final Configuration configuration;

private final SubmittedJobGraphStore submittedJobGraphStore;
private final JobGraphStore jobGraphStore;
private final RunningJobsRegistry runningJobsRegistry;

private final HighAvailabilityServices highAvailabilityServices;
Expand Down Expand Up @@ -138,7 +137,7 @@ public Dispatcher(
String endpointId,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
SubmittedJobGraphStore submittedJobGraphStore,
JobGraphStore jobGraphStore,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
Expand All @@ -156,7 +155,7 @@ public Dispatcher(
this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
this.blobServer = Preconditions.checkNotNull(blobServer);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
this.submittedJobGraphStore = Preconditions.checkNotNull(submittedJobGraphStore);
this.jobGraphStore = Preconditions.checkNotNull(jobGraphStore);
this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup);
this.metricServiceQueryAddress = metricServiceQueryAddress;

Expand Down Expand Up @@ -196,7 +195,7 @@ public void onStart() throws Exception {

private void startDispatcherServices() throws Exception {
try {
submittedJobGraphStore.start(this);
jobGraphStore.start(this);
leaderElectionService.start(this);

registerDispatcherMetrics(jobManagerMetricGroup);
Expand Down Expand Up @@ -239,7 +238,7 @@ private void stopDispatcherServices() throws Exception {
}

try {
submittedJobGraphStore.stop();
jobGraphStore.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
Expand Down Expand Up @@ -338,13 +337,13 @@ private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
}

private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph));
jobGraphStore.putJobGraph(jobGraph);

final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
if (throwable != null) {
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
jobGraphStore.removeJobGraph(jobGraph.getJobID());
}
}));
}
Expand Down Expand Up @@ -666,7 +665,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
boolean cleanupHABlobs = false;
if (cleanupHA) {
try {
submittedJobGraphStore.removeJobGraph(jobId);
jobGraphStore.removeJobGraph(jobId);

// only clean up the HA blobs if we could remove the job from HA storage
cleanupHABlobs = true;
Expand All @@ -681,7 +680,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
}
} else {
try {
submittedJobGraphStore.releaseJobGraph(jobId);
jobGraphStore.releaseJobGraph(jobId);
} catch (Exception e) {
log.warn("Could not properly release job {} from submitted job graph store.", jobId, e);
}
Expand Down Expand Up @@ -715,15 +714,15 @@ private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFutur
@VisibleForTesting
Collection<JobGraph> recoverJobs() throws Exception {
log.info("Recovering all persisted jobs.");
final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
final Collection<JobID> jobIds = jobGraphStore.getJobIds();

try {
return recoverJobGraphs(jobIds);
} catch (Exception e) {
// release all recovered job graphs
for (JobID jobId : jobIds) {
try {
submittedJobGraphStore.releaseJobGraph(jobId);
jobGraphStore.releaseJobGraph(jobId);
} catch (Exception ie) {
e.addSuppressed(ie);
}
Expand Down Expand Up @@ -752,13 +751,7 @@ private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws E
@Nullable
private JobGraph recoverJob(JobID jobId) throws Exception {
log.debug("Recover job {}.", jobId);
final SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);

if (submittedJobGraph != null) {
return submittedJobGraph.getJobGraph();
} else {
return null;
}
return jobGraphStore.recoverJobGraph(jobId);
}

protected void onFatalError(Throwable throwable) {
Expand Down Expand Up @@ -899,7 +892,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
} else {
for (JobGraph recoveredJob : recoveredJobs) {
submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
jobGraphStore.releaseJobGraph(recoveredJob.getJobID());
}
}
return null;
Expand Down Expand Up @@ -1012,7 +1005,7 @@ public void handleError(final Exception exception) {
}

//------------------------------------------------------
// SubmittedJobGraphListener
// JobGraphListener
//------------------------------------------------------

@Override
Expand All @@ -1021,8 +1014,8 @@ public void onAddedJobGraph(final JobID jobId) {
() -> {
if (!jobManagerRunnerFutures.containsKey(jobId)) {
// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
// the specified job is already removed from the SubmittedJobGraphStore. In this case,
// SubmittedJobGraphStore.recoverJob returns null.
// the specified job is already removed from the JobGraphStore. In this case,
// JobGraphStore.recoverJob returns null.
final CompletableFuture<Optional<JobGraph>> recoveredJob = recoveryOperation.thenApplyAsync(
FunctionUtils.uncheckedFunction(ignored -> Optional.ofNullable(recoverJob(jobId))),
getRpcService().getExecutor());
Expand All @@ -1033,7 +1026,7 @@ public void onAddedJobGraph(final JobID jobId) {
FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph, dispatcherId).thenAcceptAsync(
FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
if (!isRecoveredJobRunning) {
submittedJobGraphStore.releaseJobGraph(jobId);
jobGraphStore.releaseJobGraph(jobId);
}
}),
getRpcService().getExecutor())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public MiniDispatcher(
endpointId,
configuration,
highAvailabilityServices,
new SingleJobSubmittedJobGraphStore(jobGraph),
new SingleJobJobGraphStore(jobGraph),
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

/**
* {@link SubmittedJobGraphStore} implementation for a single job.
* {@link JobGraphStore} implementation for a single job.
*/
public class SingleJobSubmittedJobGraphStore implements SubmittedJobGraphStore {
public class SingleJobJobGraphStore implements JobGraphStore {

private final JobGraph jobGraph;

public SingleJobSubmittedJobGraphStore(JobGraph jobGraph) {
public SingleJobJobGraphStore(JobGraph jobGraph) {
this.jobGraph = Preconditions.checkNotNull(jobGraph);
}

@Override
public void start(SubmittedJobGraphListener jobGraphListener) throws Exception {
public void start(JobGraphListener jobGraphListener) throws Exception {
// noop
}

Expand All @@ -50,17 +50,17 @@ public void stop() throws Exception {
}

@Override
public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
public JobGraph recoverJobGraph(JobID jobId) throws Exception {
if (jobGraph.getJobID().equals(jobId)) {
return new SubmittedJobGraph(jobGraph);
return jobGraph;
} else {
throw new FlinkException("Could not recover job graph " + jobId + '.');
}
}

@Override
public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
if (!jobGraph.getJobId().equals(jobGraph.getJobId())) {
public void putJobGraph(JobGraph jobGraph) throws Exception {
if (!Objects.equals(this.jobGraph.getJobID(), jobGraph.getJobID())) {
throw new FlinkException("Cannot put additional jobs into this submitted job graph store.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public StandaloneDispatcher(
endpointId,
configuration,
highAvailabilityServices,
highAvailabilityServices.getSubmittedJobGraphStore(),
highAvailabilityServices.getJobGraphStore(),
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;

Expand All @@ -32,7 +32,7 @@
* The HighAvailabilityServices give access to all services needed for a highly-available
* setup. In particular, the services provide access to highly available storage and
* registries, as well as distributed counters and leader election.
*
*
* <ul>
* <li>ResourceManager leader election and leader retrieval</li>
* <li>JobManager leader election and leader retrieval</li>
Expand Down Expand Up @@ -137,7 +137,7 @@ public interface HighAvailabilityServices extends AutoCloseable {
* @return Submitted job graph store
* @throws Exception if the submitted job graph store could not be created
*/
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
JobGraphStore getJobGraphStore() throws Exception;

/**
* Gets the registry that holds information about whether jobs are currently running.
Expand All @@ -160,11 +160,11 @@ public interface HighAvailabilityServices extends AutoCloseable {

/**
* Closes the high availability services, releasing all resources.
*
*
* <p>This method <b>does not delete or clean up</b> any data stored in external stores
* (file systems, ZooKeeper, etc). Another instance of the high availability
* services will be able to recover the job.
*
*
* <p>If an exception occurs during closing services, this method will attempt to
* continue closing other services and report exceptions only after all services
* have been attempted to be closed.
Expand All @@ -177,14 +177,14 @@ public interface HighAvailabilityServices extends AutoCloseable {
/**
* Closes the high availability services (releasing all resources) and deletes
* all data stored by these services in external stores.
*
*
* <p>After this method was called, the any job or session that was managed by
* these high availability services will be unrecoverable.
*
*
* <p>If an exception occurs during cleanup, this method will attempt to
* continue the cleanup and report exceptions only after all cleanup steps have
* been attempted.
*
*
* @throws Exception Thrown, if an exception occurred while closing these services
* or cleaning up data stored by them.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;

import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
Expand Down Expand Up @@ -69,11 +69,11 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
}

@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
public JobGraphStore getJobGraphStore() throws Exception {
synchronized (lock) {
checkNotShutdown();

return new StandaloneSubmittedJobGraphStore();
return new StandaloneJobGraphStore();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
Expand Down Expand Up @@ -181,8 +181,8 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
}

@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
public JobGraphStore getJobGraphStore() throws Exception {
return ZooKeeperUtils.createJobGraphs(client, configuration);
}

@Override
Expand Down
Loading

0 comments on commit 2b9410e

Please sign in to comment.