Skip to content

Commit

Permalink
[FLINK-19154] Pass DispatcherGateway to DispatcherBootstrap
Browse files Browse the repository at this point in the history
In order to make sure that all calls from the
ApplicationDispatcherBootstrap are executed from the main
thread, we are passing the DispatcherGateway
to the Bootstrap and not the Dispatcher itself.

This closes apache#13699
  • Loading branch information
kl0u committed Oct 26, 2020
1 parent f29a18e commit 021456d
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -43,6 +41,7 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,15 +70,15 @@
* if it should submit a job for execution (in case of a new job) or the job was already recovered and is running.
*/
@Internal
public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap {
public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {

private static final Logger LOG = LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

private final PackagedProgram application;

private final Collection<JobGraph> recoveredJobs;
private final Collection<JobID> recoveredJobIds;

private final Configuration configuration;

Expand All @@ -91,22 +90,21 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap

public ApplicationDispatcherBootstrap(
final PackagedProgram application,
final Collection<JobGraph> recoveredJobs,
final Collection<JobID> recoveredJobIds,
final Configuration configuration,
final FatalErrorHandler errorHandler) {
this.configuration = checkNotNull(configuration);
this.recoveredJobs = checkNotNull(recoveredJobs);
this.recoveredJobIds = checkNotNull(recoveredJobIds);
this.application = checkNotNull(application);
this.errorHandler = checkNotNull(errorHandler);
}

@Override
public void initialize(final Dispatcher dispatcher, ScheduledExecutor scheduledExecutor) {
checkNotNull(dispatcher);
launchRecoveredJobGraphs(dispatcher, recoveredJobs);
public void initialize(final DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) {
checkNotNull(dispatcherGateway);

runApplicationAndShutdownClusterAsync(
dispatcher,
dispatcherGateway,
scheduledExecutor);
}

Expand All @@ -127,16 +125,15 @@ ScheduledFuture<?> getApplicationExecutionFuture() {
}

/**
* Runs the user program entrypoint using {@link #runApplicationAsync(DispatcherGateway,
* ScheduledExecutor, boolean)} and shuts down the given dispatcher when the application
* completes (either successfully or in case of failure).
* Runs the user program entrypoint and shuts down the given dispatcherGateway when
* the application completes (either successfully or in case of failure).
*/
@VisibleForTesting
CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcher,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor) {

applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);

return applicationCompletionFuture
.handle((r, t) -> {
Expand All @@ -150,38 +147,38 @@ CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(

if (applicationStatus == ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) {
LOG.info("Application {}: ", applicationStatus, t);
return dispatcher.shutDownCluster(applicationStatus);
return dispatcherGateway.shutDownCluster(applicationStatus);
}
}

LOG.warn("Exiting with Application Status UNKNOWN: ", t);
this.errorHandler.onFatalError(t);
this.errorHandler.onFatalError(new FlinkException("Application failed unexpectedly.", t));

return FutureUtils.<Acknowledge>completedExceptionally(t);
}

LOG.info("Application completed SUCCESSFULLY");
return dispatcher.shutDownCluster(ApplicationStatus.SUCCEEDED);
return dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
})
.thenCompose(Function.identity());
}

@VisibleForTesting
CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
final DispatcherGateway dispatcher,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor) {

final Optional<String> configuredJobId =
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);

if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && !configuredJobId.isPresent()) {
return runApplicationAsync(dispatcher, scheduledExecutor, false);
return runApplicationAsync(dispatcherGateway, scheduledExecutor, false);
}

if (!configuredJobId.isPresent()) {
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, ZERO_JOB_ID.toHexString());
}
return runApplicationAsync(dispatcher, scheduledExecutor, true);
return runApplicationAsync(dispatcherGateway, scheduledExecutor, true);
}

/**
Expand All @@ -190,7 +187,7 @@ CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
* succeeded. if any of them fails, or if job submission fails.
*/
private CompletableFuture<Void> runApplicationAsync(
final DispatcherGateway dispatcher,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution) {
final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>();
Expand All @@ -200,14 +197,14 @@ private CompletableFuture<Void> runApplicationAsync(
applicationExecutionTask = scheduledExecutor.schedule(
() -> runApplicationEntryPoint(
applicationExecutionFuture,
dispatcher,
dispatcherGateway,
scheduledExecutor,
enforceSingleJobExecution),
0L,
TimeUnit.MILLISECONDS);

return applicationExecutionFuture.thenCompose(
jobIds -> getApplicationResult(dispatcher, jobIds, scheduledExecutor));
jobIds -> getApplicationResult(dispatcherGateway, jobIds, scheduledExecutor));
}

/**
Expand All @@ -218,16 +215,15 @@ private CompletableFuture<Void> runApplicationAsync(
*/
private void runApplicationEntryPoint(
final CompletableFuture<List<JobID>> jobIdsFuture,
final DispatcherGateway dispatcher,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution) {
try {
final List<JobID> applicationJobIds =
new ArrayList<>(getRecoveredJobIds(recoveredJobs));
final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);

final PipelineExecutorServiceLoader executorServiceLoader =
new EmbeddedExecutorServiceLoader(
applicationJobIds, dispatcher, scheduledExecutor);
applicationJobIds, dispatcherGateway, scheduledExecutor);

ClientUtils.executeProgram(
executorServiceLoader,
Expand Down Expand Up @@ -288,11 +284,4 @@ private CompletableFuture<JobResult> unwrapJobResultException(final CompletableF
result, application.getUserCodeClassLoader()));
});
}

private List<JobID> getRecoveredJobIds(final Collection<JobGraph> recoveredJobs) {
return recoveredJobs
.stream()
.map(JobGraph::getJobID)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.Dispatcher;
Expand All @@ -34,6 +35,8 @@
import org.apache.flink.util.FlinkRuntimeException;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -77,12 +80,15 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {

final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

final Dispatcher dispatcher;
try {
dispatcher = dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
errorHandler -> new ApplicationDispatcherBootstrap(application, recoveredJobs, configuration, errorHandler),
recoveredJobs,
errorHandler -> new ApplicationDispatcherBootstrap(application, recoveredJobIds, configuration, errorHandler),
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
Expand All @@ -92,4 +98,11 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(

return DefaultDispatcherGatewayService.from(dispatcher);
}

private List<JobID> getRecoveredJobIds(final Collection<JobGraph> recoveredJobs) {
return recoveredJobs
.stream()
.map(JobGraph::getJobID)
.collect(Collectors.toList());
}
}
Loading

0 comments on commit 021456d

Please sign in to comment.