Skip to content

Commit

Permalink
[FLINK-4537] rebase and refine
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm authored and StephanEwen committed Dec 23, 2016
1 parent 5cb81c4 commit b7db70e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,47 @@

package org.apache.flink.runtime.resourcemanager;

<<<<<<< HEAD
import org.apache.flink.api.common.JobID;
=======
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
>>>>>>> db98efb... rsourceManager registration with JobManager
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;

import java.io.Serializable;
import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
*/
public class JobMasterRegistration implements Serializable {
public class JobMasterRegistration implements LeaderRetrievalListener {

<<<<<<< HEAD
private final String address;
private final JobMasterGateway gateway;
private final JobID jobID;
private final UUID leaderSessionID;
private LeaderRetrievalListener retriever;

public JobMasterRegistration(String address, JobID jobID) {
this.address = address;
public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) {
this.gateway = gateway;
this.jobID = jobID;
=======
private static final long serialVersionUID = -2316627821716999527L;

private final JobMasterGateway jobMasterGateway;

private UUID jobMasterLeaderSessionID;

public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
this.jobMasterGateway = checkNotNull(jobMasterGateway);
this.leaderSessionID = leaderSessionID;
}

public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
this.jobMasterGateway = checkNotNull(jobMasterGateway);
this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
public JobMasterGateway getGateway() {
return gateway;
}

public JobMasterGateway getJobMasterGateway() {
return jobMasterGateway;
public UUID getLeaderSessionID() {
return leaderSessionID;
}

public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
>>>>>>> db98efb... rsourceManager registration with JobManager
public JobID getJobID() {
return jobID;
}

public UUID getJobMasterLeaderSessionID() {
return jobMasterLeaderSessionID;
@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {

}

public JobID getJobID() {
return jobID;
@Override
public void handleError(Exception exception) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.resourcemanager;

import akka.dispatch.Futures;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand All @@ -29,26 +28,31 @@
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
<<<<<<< HEAD
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
=======
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
>>>>>>> db98efb... rsourceManager registration with JobManager
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.registration.RegistrationResponse;

import org.apache.flink.runtime.concurrent.Future;

import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -62,17 +66,13 @@
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
<<<<<<< HEAD
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {

private final Logger LOG = LoggerFactory.getLogger(getClass());

private final Map<JobID, JobMasterGateway> jobMasterGateways;
=======
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
/** the mapping relationship of JobID and JobMasterGateway */
private final Map<JobID, JobMasterRegistration> jobMasters;
>>>>>>> db98efb... rsourceManager registration with JobManager

private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;

private final HighAvailabilityServices highAvailabilityServices;

Expand All @@ -88,12 +88,9 @@ public ResourceManager(
SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
<<<<<<< HEAD
this.jobMasterGateways = new HashMap<>();
this.slotManager = slotManager;
=======
this.jobMasters = new HashMap<>(16);
>>>>>>> db98efb... rsourceManager registration with JobManager
this.jobMasterLeaderRetrievalListeners = new HashSet<>();
}

@Override
Expand All @@ -113,7 +110,7 @@ public void start() {
public void shutDown() {
try {
leaderElectionService.stop();
for(JobID jobID : jobMasters.keySet()) {
for(JobID jobID : jobMasterGateways.keySet()) {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
}
super.shutDown();
Expand Down Expand Up @@ -142,52 +139,64 @@ UUID getLeaderSessionID() {
* @return Future registration response
*/
@RpcMethod
<<<<<<< HEAD
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
final Future<JobMasterGateway> jobMasterFuture =
getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
final JobID jobID = jobMasterRegistration.getJobID();
=======
public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
public Future<RegistrationResponse> registerJobMaster(
final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
final String jobMasterAddress, final JobID jobID) {

checkNotNull(resourceManagerLeaderId);
checkNotNull(jobMasterAddress);
checkNotNull(jobID);

// TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread
// The state updates should be performed inside the main thread

final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>();

if(!leaderSessionID.equals(resourceManagerLeaderId)) {
log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}",
log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
" did not equal the received leader session ID {}",
jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
future.complete(new RegistrationResponse.Decline("Invalid leader session id"));
return future;
}

Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
>>>>>>> db98efb... rsourceManager registration with JobManager
final LeaderConnectionInfo jobMasterLeaderInfo;
try {
jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
} catch (Exception e) {
LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
return future;
}

if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
future.complete(new RegistrationResponse.Decline("JobManager is not leading"));
return future;
}

return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
Future<JobMasterGateway> jobMasterGatewayFuture =
getRpcService().connect(jobMasterAddress, JobMasterGateway.class);

return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
@Override
public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
<<<<<<< HEAD

final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
try {
LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
}
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
LOG.info("Replacing existing gateway {} for JobID {} with {}.",
existingGateway, jobID, jobMasterGateway);
}
return new RegistrationResponse(true);
=======
if (jobMasters.containsKey(jobID)) {
JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
jobMasters.put(jobID, jobMasterRegistration);
log.info("Replacing gateway for registered JobID {}.", jobID);
} else {
JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
jobMasters.put(jobID, jobMasterRegistration);
try {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
} catch(Throwable e) {
log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
jobID, jobMasterAddress);
return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
}
}

return new JobMasterRegistrationSuccess(5000);
>>>>>>> db98efb... rsourceManager registration with JobManager
}
}, getMainThreadExecutor());
}
Expand Down Expand Up @@ -228,26 +237,9 @@ public RegistrationResponse registerTaskExecutor(
}


<<<<<<< HEAD
// ------------------------------------------------------------------------
// Leader Contender
// ------------------------------------------------------------------------
=======
/**
* Callback method when current resourceManager lose leadership.
*/
@Override
public void revokeLeadership() {
runAsync(new Runnable() {
@Override
public void run() {
log.info("ResourceManager {} was revoked leadership.", getAddress());
jobMasters.clear();
leaderSessionID = null;
}
});
}
>>>>>>> db98efb... rsourceManager registration with JobManager

/**
* Callback method when current resourceManager is granted leadership
Expand All @@ -263,7 +255,7 @@ public void run() {
// confirming the leader session ID might be blocking,
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
// notify SlotManager
slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
slotManager.setLeaderUUID(leaderSessionID);
ResourceManager.this.leaderSessionID = leaderSessionID;
}
});
Expand All @@ -279,7 +271,8 @@ public void revokeLeadership() {
public void run() {
log.info("ResourceManager {} was revoked leadership.", getAddress());
jobMasterGateways.clear();
ResourceManager.this.leaderSessionID = null;
slotManager.clearState();
leaderSessionID = null;
}
});
}
Expand All @@ -291,46 +284,28 @@ public void run() {
*/
@Override
public void handleError(final Exception exception) {
runAsync(new Runnable() {
@Override
public void run() {
log.error("ResourceManager received an error from the LeaderElectionService.", exception);
// notify SlotManager
slotManager.handleError(exception);
// terminate ResourceManager in case of an error
shutDown();
}
});
log.error("ResourceManager received an error from the LeaderElectionService.", exception);
// terminate ResourceManager in case of an error
shutDown();
}

private class JobMasterLeaderListener implements LeaderRetrievalListener {
private static class JobMasterLeaderListener implements LeaderRetrievalListener {

private final JobID jobID;
private UUID leaderID;

private JobMasterLeaderListener(JobID jobID) {
this.jobID = jobID;
}

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(new Runnable() {
@Override
public void run() {
log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
// update job master leader session id
JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
}
});
this.leaderID = leaderSessionID;
}

@Override
public void handleError(final Exception exception) {
runAsync(new Runnable() {
@Override
public void run() {
log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
}
});
// TODO
}
}
}
Loading

0 comments on commit b7db70e

Please sign in to comment.