Skip to content

Commit

Permalink
[FLINK-4606] integrate features of old ResourceManager
Browse files Browse the repository at this point in the history
This closes apache#2540
  • Loading branch information
mxm authored and StephanEwen committed Dec 23, 2016
1 parent 7a23786 commit 9d1b5fb
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.RpcGateway;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
Expand All @@ -48,11 +52,10 @@
import scala.concurrent.duration.FiniteDuration;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -64,36 +67,43 @@
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* <ul>
* <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends RpcEndpoint<ResourceManagerGateway>
implements LeaderContender {

/** The exit code with which the process is stopped in case of a fatal error */
protected static final int EXIT_CODE_FATAL_ERROR = -13;

private final Map<JobID, JobMasterGateway> jobMasterGateways;

private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;

private final Map<ResourceID, WorkerType> taskExecutorGateways;

private final HighAvailabilityServices highAvailabilityServices;

private LeaderElectionService leaderElectionService;

private final SlotManager slotManager;

private LeaderElectionService leaderElectionService;

private UUID leaderSessionID;

private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;

public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) {
private final Time timeout = Time.seconds(5);

public ResourceManager(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
this.slotManager = checkNotNull(slotManager);
this.jobMasterLeaderRetrievalListeners = new HashSet<>();
this.jobMasterLeaderRetrievalListeners = new HashMap<>();
this.taskExecutorGateways = new HashMap<>();
infoMessageListeners = new HashMap<>();
}
Expand All @@ -105,6 +115,7 @@ public void start() {
super.start();
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
leaderElectionService.start(this);
slotManager.setupResourceManagerServices(new DefaultResourceManagerServices());
// framework specific initialization
initialize();
} catch (Throwable e) {
Expand All @@ -117,7 +128,7 @@ public void start() {
public void shutDown() {
try {
leaderElectionService.stop();
for(JobID jobID : jobMasterGateways.keySet()) {
for (JobID jobID : jobMasterGateways.keySet()) {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
}
super.shutDown();
Expand Down Expand Up @@ -189,15 +200,17 @@ public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable t
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
try {
LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
try {
LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
}
jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
}
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
log.info("Replacing gateway for registered JobID {}.", jobID);
Expand Down Expand Up @@ -232,7 +245,6 @@ public TaskExecutorGateway call() throws Exception {
resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
throw new Exception("Invalid leader session id");
}

return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
}
}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
Expand All @@ -241,24 +253,14 @@ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throw
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
WorkerType startedWorker = taskExecutorGateways.get(resourceID);
if(startedWorker != null) {
String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress();
if (taskExecutorAddress.equals(oldWorkerAddress)) {
log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
} else {
log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})",
resourceID, oldWorkerAddress, taskExecutorAddress);
// TODO :: suggest old taskExecutor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
startedWorker = workerStarted(resourceID, taskExecutorGateway);
taskExecutorGateways.put(resourceID, startedWorker);
}
} else {
startedWorker = workerStarted(resourceID, taskExecutorGateway);
taskExecutorGateways.put(resourceID, startedWorker);
WorkerType oldWorker = taskExecutorGateways.remove(resourceID);
if (oldWorker != null) {
// TODO :: suggest old taskExecutor to stop itself
slotManager.notifyTaskManagerFailure(resourceID);
}
return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
WorkerType newWorker = workerStarted(resourceID);
taskExecutorGateways.put(resourceID, newWorker);
return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}
}
}, getMainThreadExecutor());
Expand All @@ -271,11 +273,20 @@ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throw
* @return Slot assignment
*/
@RpcMethod
public SlotRequestReply requestSlot(SlotRequest slotRequest) {
final JobID jobId = slotRequest.getJobId();
final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
public SlotRequestReply requestSlot(
UUID jobMasterLeaderID,
UUID resourceManagerLeaderID,
SlotRequest slotRequest) {

JobID jobId = slotRequest.getJobId();
JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);

UUID leaderID = jobMasterLeaderListener.getLeaderID();

if (jobMasterGateway != null) {
if (jobMasterGateway != null
&& jobMasterLeaderID.equals(leaderID)
&& resourceManagerLeaderID.equals(leaderSessionID)) {
return slotManager.requestSlot(slotRequest);
} else {
log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
Expand Down Expand Up @@ -379,7 +390,7 @@ public void unRegisterInfoMessageListener(final String infoMessageListenerAddres
}

/**
* Shutdowns cluster
* Cleanup application and shut down cluster
*
* @param finalStatus
* @param optionalDiagnostics
Expand Down Expand Up @@ -446,17 +457,11 @@ public void run() {
protected abstract void initialize() throws Exception;

/**
* Callback when a task executor register.
* Notifies the resource master of a fatal error.
*
* @param resourceID The worker resource id
* @param taskExecutorGateway the task executor gateway
*/
protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);

/**
* Callback when a resource manager faced a fatal error
* @param message
* @param error
* <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
* such a way that a high-availability setting would restart this or fail over
* to another master.
*/
protected abstract void fatalError(String message, Throwable error);

Expand All @@ -472,6 +477,19 @@ public void run() {
*/
protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics);

/**
* Allocates a resource using the resource profile.
* @param resourceProfile The resource description
*/
@VisibleForTesting
public abstract void startNewWorker(ResourceProfile resourceProfile);

/**
* Callback when a worker was started.
* @param resourceID The worker resource id
*/
protected abstract WorkerType workerStarted(ResourceID resourceID);

// ------------------------------------------------------------------------
// Info messaging
// ------------------------------------------------------------------------
Expand All @@ -489,6 +507,24 @@ public void run() {
});
}

private class DefaultResourceManagerServices implements ResourceManagerServices {

@Override
public void allocateResource(ResourceProfile resourceProfile) {
ResourceManager.this.startNewWorker(resourceProfile);
}

@Override
public Executor getAsyncExecutor() {
return ResourceManager.this.getRpcService().getExecutor();
}

@Override
public Executor getExecutor() {
return ResourceManager.this.getMainThreadExecutor();
}
}

private static class JobMasterLeaderListener implements LeaderRetrievalListener {

private final JobID jobID;
Expand All @@ -498,6 +534,14 @@ private JobMasterLeaderListener(JobID jobID) {
this.jobID = jobID;
}

public JobID getJobID() {
return jobID;
}

public UUID getLeaderID() {
return leaderID;
}

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
this.leaderID = leaderSessionID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ Future<RegistrationResponse> registerJobMaster(
* @param slotRequest Slot request
* @return Future slot assignment
*/
Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
Future<SlotRequestReply> requestSlot(
UUID jobMasterLeaderID,
UUID resourceManagerLeaderID,
SlotRequest slotRequest,
@RpcTimeout Time timeout);

/**
* Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;

import java.util.concurrent.Executor;

/**
* Interface which provides access to services of the ResourceManager.
*/
public interface ResourceManagerServices {

/**
* Allocates a resource according to the resource profile.
*/
void allocateResource(ResourceProfile resourceProfile);

/**
* Gets the async excutor which executes outside of the main thread of the ResourceManager
*/
Executor getAsyncExecutor();

/**
* Gets the executor which executes in the main thread of the ResourceManager
*/
Executor getExecutor();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@

import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;

/**
* A standalone implementation of the resource manager. Used when the system is started in
* standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
*
* This ResourceManager doesn't acquire new resources.
*/
public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
public class StandaloneResourceManager extends ResourceManager<ResourceID> {

public StandaloneResourceManager(RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
Expand All @@ -51,14 +52,16 @@ protected void fatalError(final String message, final Throwable error) {
}

@Override
protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
InstanceID instanceID = new InstanceID();
TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID);
return taskExecutorRegistration;
protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
}

@Override
protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
public void startNewWorker(ResourceProfile resourceProfile) {
}

@Override
protected ResourceID workerStarted(ResourceID resourceID) {
return resourceID;
}

}
Loading

0 comments on commit 9d1b5fb

Please sign in to comment.