Skip to content

Commit

Permalink
[FLINK-8502] [flip6] Remove LibraryCacheManager from JobMaster
Browse files Browse the repository at this point in the history
This commit removes the LibraryCacheManager from the JobMaster since it is
no longer needed. The JobMaster is started with the correct user code class
loader and, thus, does not need the LibraryCacheManager.

This commit also corrects that the BlobServer is not closed by the
JobManagerServices#shutdown method.

This closes apache#5352.
  • Loading branch information
tillrohrmann committed Feb 6, 2018
1 parent 0f0a637 commit 2af2b73
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private final ResourceManagerGateway resourceManagerGateway;
private final JobManagerServices jobManagerServices;
private final HeartbeatServices heartbeatServices;
private final BlobServer blobServer;
private final MetricRegistry metricRegistry;

private final FatalErrorHandler fatalErrorHandler;
Expand Down Expand Up @@ -119,13 +120,15 @@ protected Dispatcher(
this.configuration = Preconditions.checkNotNull(configuration);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.resourceManagerGateway = Preconditions.checkNotNull(resourceManagerGateway);
this.jobManagerServices = JobManagerServices.fromConfiguration(
configuration,
Preconditions.checkNotNull(blobServer));
this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
this.blobServer = Preconditions.checkNotNull(blobServer);
this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);

this.jobManagerServices = JobManagerServices.fromConfiguration(
configuration,
this.blobServer);

this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();

Expand Down Expand Up @@ -228,6 +231,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
getRpcService(),
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
new DispatcherOnCompleteActions(jobGraph.getJobID()),
Expand Down Expand Up @@ -396,7 +400,7 @@ public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskMana

@Override
public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
return CompletableFuture.completedFuture(blobServer.getPort());
}

@Override
Expand Down Expand Up @@ -526,6 +530,7 @@ protected abstract JobManagerRunner createJobManagerRunner(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ protected JobManagerRunner createJobManagerRunner(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
Expand All @@ -87,6 +88,7 @@ protected JobManagerRunner createJobManagerRunner(
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
onCompleteActions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ protected void startClusterComponents(
highAvailabilityServices,
jobManagerServices,
heartbeatServices,
blobServer,
metricRegistry,
this,
jobMasterRestEndpoint.getRestAddress());
Expand Down Expand Up @@ -191,6 +192,7 @@ protected JobManagerRunner createJobManagerRunner(
HighAvailabilityServices highAvailabilityServices,
JobManagerServices jobManagerServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception {
Expand All @@ -204,6 +206,7 @@ protected JobManagerRunner createJobManagerRunner(
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
new TerminatingOnCompleteActions(jobGraph.getJobID()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
Expand Down Expand Up @@ -113,6 +114,7 @@ public JobManagerRunner(
final RpcService rpcService,
final HighAvailabilityServices haServices,
final HeartbeatServices heartbeatServices,
final BlobServer blobServer,
final JobManagerServices jobManagerServices,
final MetricRegistry metricRegistry,
final OnCompletionActions toNotifyOnComplete,
Expand Down Expand Up @@ -161,8 +163,7 @@ public JobManagerRunner(
haServices,
heartbeatServices,
jobManagerServices.executorService,
jobManagerServices.blobServer,
jobManagerServices.libraryCacheManager,
blobServer,
jobManagerServices.restartStrategyFactory,
jobManagerServices.rpcAskTimeout,
jobManagerMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -46,7 +45,6 @@ public class JobManagerServices {

public final ScheduledExecutorService executorService;

public final BlobServer blobServer;
public final BlobLibraryCacheManager libraryCacheManager;

public final RestartStrategyFactory restartStrategyFactory;
Expand All @@ -55,24 +53,23 @@ public class JobManagerServices {

public JobManagerServices(
ScheduledExecutorService executorService,
BlobServer blobServer,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
Time rpcAskTimeout) {

this.executorService = checkNotNull(executorService);
this.blobServer = checkNotNull(blobServer);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
}

/**
*
* Shutdown the {@link JobMaster} services.
*
* <p>This method makes sure all services are closed or shut down, even when an exception occurred
* in the shutdown of one component. The first encountered exception is thrown, with successive
* exceptions added as suppressed exceptions.
*
*
* @throws Exception The first Exception encountered during shutdown.
*/
public void shutdown() throws Exception {
Expand All @@ -85,33 +82,22 @@ public void shutdown() throws Exception {
}

libraryCacheManager.shutdown();
try {
blobServer.close();
}
catch (Throwable t) {
if (firstException == null) {
firstException = t;
} else {
firstException.addSuppressed(t);
}
}

if (firstException != null) {
ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services");
}
}

// ------------------------------------------------------------------------
// Creating the components from a configuration
// Creating the components from a configuration
// ------------------------------------------------------------------------


public static JobManagerServices fromConfiguration(
Configuration config,
BlobServer blobServer) throws Exception {

Preconditions.checkNotNull(config);
Preconditions.checkNotNull(blobServer);
checkNotNull(config);
checkNotNull(blobServer);

final String classLoaderResolveOrder =
config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
Expand Down Expand Up @@ -139,7 +125,6 @@ public static JobManagerServices fromConfiguration(

return new JobManagerServices(
futureExecutor,
blobServer,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
Time.of(timeout.length(), timeout.unit()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -149,9 +148,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
/** Logical representation of the job. */
private final JobGraph jobGraph;

/** Configuration of the JobManager. */
private final Configuration configuration;

private final Time rpcTimeout;

/** Service to contend for and retrieve the leadership of JM and RM. */
Expand All @@ -160,12 +156,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
/** Blob server used across jobs. */
private final BlobServer blobServer;

/** Blob library cache manager used across jobs. */
private final BlobLibraryCacheManager libraryCacheManager;

/** The metrics for the JobManager itself. */
private final MetricGroup jobManagerMetricGroup;

/** The metrics for the job. */
private final MetricGroup jobMetricGroup;

Expand Down Expand Up @@ -218,7 +208,6 @@ public JobMaster(
HeartbeatServices heartbeatServices,
ScheduledExecutorService executor,
BlobServer blobServer,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
Time rpcAskTimeout,
@Nullable JobManagerMetricGroup jobManagerMetricGroup,
Expand All @@ -234,11 +223,9 @@ public JobMaster(

this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.configuration = checkNotNull(configuration);
this.rpcTimeout = rpcAskTimeout;
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobServer = checkNotNull(blobServer);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executor = checkNotNull(executor);
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.errorHandler = checkNotNull(errorHandler);
Expand All @@ -260,10 +247,8 @@ public JobMaster(
final JobID jid = jobGraph.getJobID();

if (jobManagerMetricGroup != null) {
this.jobManagerMetricGroup = jobManagerMetricGroup;
this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
} else {
this.jobManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,46 +60,49 @@ public class MiniClusterJobDispatcher {

// ------------------------------------------------------------------------

/** lock to ensure that this dispatcher executes only one job at a time */
/** lock to ensure that this dispatcher executes only one job at a time. */
private final Object lock = new Object();

/** the configuration with which the mini cluster was started */
/** the configuration with which the mini cluster was started. */
private final Configuration configuration;

/** the RPC services to use by the job managers */
/** the RPC services to use by the job managers. */
private final RpcService[] rpcServices;

/** services for discovery, leader election, and recovery */
/** services for discovery, leader election, and recovery. */
private final HighAvailabilityServices haServices;

/** services for heartbeating */
/** services for heartbeating. */
private final HeartbeatServices heartbeatServices;

/** all the services that the JobManager needs, such as BLOB service, factories, etc */
/** BlobServer for storing blobs. */
private final BlobServer blobServer;

/** all the services that the JobManager needs, such as BLOB service, factories, etc. */
private final JobManagerServices jobManagerServices;

/** Registry for all metrics in the mini cluster */
/** Registry for all metrics in the mini cluster. */
private final MetricRegistry metricRegistry;

/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
/** The number of JobManagers to launch (more than one simulates a high-availability setup). */
private final int numJobManagers;

/** The runner for the job and master. non-null if a job is currently running */
/** The runner for the job and master. non-null if a job is currently running. */
private volatile JobManagerRunner[] runners;

/** flag marking the dispatcher as hut down */
/** flag marking the dispatcher as hut down. */
private volatile boolean shutdown;


/**
* Starts a mini cluster job dispatcher.
*
*
* <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
* non-highly-available setup.
*
*
* @param config The configuration of the mini cluster
* @param haServices Access to the discovery, leader election, and recovery services
*
*
* @throws Exception Thrown, if the services for the JobMaster could not be started.
*/
public MiniClusterJobDispatcher(
Expand All @@ -124,11 +127,11 @@ public MiniClusterJobDispatcher(
*
* <p>The dispatcher may kick off more than one JobManager per job, thus simulating
* a highly-available setup.
*
*
* @param config The configuration of the mini cluster
* @param haServices Access to the discovery, leader election, and recovery services
* @param numJobManagers The number of JobMasters to start for each job.
*
*
* @throws Exception Thrown, if the services for the JobMaster could not be started.
*/
public MiniClusterJobDispatcher(
Expand All @@ -147,6 +150,7 @@ public MiniClusterJobDispatcher(
this.rpcServices = rpcServices;
this.haServices = checkNotNull(haServices);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.blobServer = checkNotNull(blobServer);
this.metricRegistry = checkNotNull(metricRegistry);
this.numJobManagers = numJobManagers;

Expand Down Expand Up @@ -280,6 +284,7 @@ private JobManagerRunner[] startJobRunners(
rpcServices[i],
haServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
onCompletion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,26 @@ protected JobManagerRunner createJobManagerRunner(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
FatalErrorHandler fatalErrorHandler) throws Exception {
assertEquals(expectedJobId, jobGraph.getJobID());

return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService,
highAvailabilityServices, heartbeatServices, jobManagerServices, metricRegistry,
onCompleteActions, fatalErrorHandler, null);
return new JobManagerRunner(
resourceId,
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
onCompleteActions,
fatalErrorHandler,
null);
}

@Override
Expand Down
Loading

0 comments on commit 2af2b73

Please sign in to comment.