Skip to content

Commit

Permalink
[FLINK-4537] [cluster management] ResourceManager registration with J…
Browse files Browse the repository at this point in the history
…obManager
  • Loading branch information
beyond1920 authored and StephanEwen committed Dec 23, 2016
1 parent 62b5731 commit 5cb81c4
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface HighAvailabilityServices {
*/
LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;

/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @return
* @throws Exception
*/
LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception;

/**
* Gets the leader election service for the cluster's resource manager.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.highavailability;

import org.apache.flink.api.common.JobID;
import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
Expand All @@ -42,13 +43,26 @@ public class NonHaServices implements HighAvailabilityServices {
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;

private final ConcurrentHashMap<JobID, String> jobMastersAddress;

/**
* Creates a new services class for the fix pre-defined leaders.
*
* @param resourceManagerAddress The fix address of the ResourceManager
*/
public NonHaServices(String resourceManagerAddress) {
this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
this.jobMastersAddress = new ConcurrentHashMap<>(16);
}

/**
* Binds address of a specified job master
*
* @param jobID JobID for the specified job master
* @param jobMasterAddress address for the specified job master
*/
public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
jobMastersAddress.put(jobID, jobMasterAddress);
}

// ------------------------------------------------------------------------
Expand All @@ -60,6 +74,11 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti
return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
}

@Override
public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
}

@Override
public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
return new StandaloneLeaderElectionService();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.registration.RegistrationResponse;

/**
* Base class for responses from the ResourceManager to a registration attempt by a JobMaster.
*/
public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {

private static final long serialVersionUID = 5577641250204140415L;

private final long heartbeatInterval;

public JobMasterRegistrationSuccess(long heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}

/**
* Gets the interval in which the ResourceManager will heartbeat the JobMaster.
*
* @return the interval in which the ResourceManager will heartbeat the JobMaster
*/
public long getHeartbeatInterval() {
return heartbeatInterval;
}

@Override
public String toString() {
return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,56 @@

package org.apache.flink.runtime.resourcemanager;

<<<<<<< HEAD
import org.apache.flink.api.common.JobID;
=======
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
>>>>>>> db98efb... rsourceManager registration with JobManager

import java.io.Serializable;
import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
*/
public class JobMasterRegistration implements Serializable {
private static final long serialVersionUID = 8411214999193765202L;

<<<<<<< HEAD
private final String address;
private final JobID jobID;

public JobMasterRegistration(String address, JobID jobID) {
this.address = address;
this.jobID = jobID;
=======
private static final long serialVersionUID = -2316627821716999527L;

private final JobMasterGateway jobMasterGateway;

private UUID jobMasterLeaderSessionID;

public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
this.jobMasterGateway = checkNotNull(jobMasterGateway);
}

public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
this.jobMasterGateway = checkNotNull(jobMasterGateway);
this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
}

public JobMasterGateway getJobMasterGateway() {
return jobMasterGateway;
}

public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
>>>>>>> db98efb... rsourceManager registration with JobManager
}

public String getAddress() {
return address;
public UUID getJobMasterLeaderSessionID() {
return jobMasterLeaderSessionID;
}

public JobID getJobID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@

package org.apache.flink.runtime.resourcemanager;

import akka.dispatch.Futures;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
<<<<<<< HEAD
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
=======
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
>>>>>>> db98efb... rsourceManager registration with JobManager
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.registration.RegistrationResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,15 +58,21 @@
*
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* <ul>
* <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
<<<<<<< HEAD
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {

private final Logger LOG = LoggerFactory.getLogger(getClass());

private final Map<JobID, JobMasterGateway> jobMasterGateways;
=======
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
/** the mapping relationship of JobID and JobMasterGateway */
private final Map<JobID, JobMasterRegistration> jobMasters;
>>>>>>> db98efb... rsourceManager registration with JobManager

private final HighAvailabilityServices highAvailabilityServices;

Expand All @@ -74,8 +88,12 @@ public ResourceManager(
SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
<<<<<<< HEAD
this.jobMasterGateways = new HashMap<>();
this.slotManager = slotManager;
=======
this.jobMasters = new HashMap<>(16);
>>>>>>> db98efb... rsourceManager registration with JobManager
}

@Override
Expand All @@ -95,8 +113,11 @@ public void start() {
public void shutDown() {
try {
leaderElectionService.stop();
for(JobID jobID : jobMasters.keySet()) {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
}
super.shutDown();
} catch(Throwable e) {
} catch (Throwable e) {
log.error("A fatal error happened when shutdown the ResourceManager", e);
throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
}
Expand All @@ -115,24 +136,58 @@ UUID getLeaderSessionID() {
/**
* Register a {@link JobMaster} at the resource manager.
*
* @param jobMasterRegistration Job master registration information
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader
* @param jobMasterAddress The address of the JobMaster that registers
* @param jobID The Job ID of the JobMaster that registers
* @return Future registration response
*/
@RpcMethod
<<<<<<< HEAD
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
final Future<JobMasterGateway> jobMasterFuture =
getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
final JobID jobID = jobMasterRegistration.getJobID();
=======
public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {

if(!leaderSessionID.equals(resourceManagerLeaderId)) {
log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}",
jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
}

Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
>>>>>>> db98efb... rsourceManager registration with JobManager

return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
@Override
public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
<<<<<<< HEAD
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
LOG.info("Replacing existing gateway {} for JobID {} with {}.",
existingGateway, jobID, jobMasterGateway);
}
return new RegistrationResponse(true);
=======
if (jobMasters.containsKey(jobID)) {
JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
jobMasters.put(jobID, jobMasterRegistration);
log.info("Replacing gateway for registered JobID {}.", jobID);
} else {
JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
jobMasters.put(jobID, jobMasterRegistration);
try {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
} catch(Throwable e) {
log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
jobID, jobMasterAddress);
return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
}
}

return new JobMasterRegistrationSuccess(5000);
>>>>>>> db98efb... rsourceManager registration with JobManager
}
}, getMainThreadExecutor());
}
Expand All @@ -158,26 +213,41 @@ public SlotRequestReply requestSlot(SlotRequest slotRequest) {


/**
*
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader
* @param taskExecutorAddress The address of the TaskExecutor that registers
* @param resourceID The resource ID of the TaskExecutor that registers
*
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader
* @param taskExecutorAddress The address of the TaskExecutor that registers
* @param resourceID The resource ID of the TaskExecutor that registers
* @return The response by the ResourceManager.
*/
@RpcMethod
public org.apache.flink.runtime.registration.RegistrationResponse registerTaskExecutor(
UUID resourceManagerLeaderId,
String taskExecutorAddress,
ResourceID resourceID) {
public RegistrationResponse registerTaskExecutor(
UUID resourceManagerLeaderId,
String taskExecutorAddress,
ResourceID resourceID) {

return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}


<<<<<<< HEAD
// ------------------------------------------------------------------------
// Leader Contender
// ------------------------------------------------------------------------
=======
/**
* Callback method when current resourceManager lose leadership.
*/
@Override
public void revokeLeadership() {
runAsync(new Runnable() {
@Override
public void run() {
log.info("ResourceManager {} was revoked leadership.", getAddress());
jobMasters.clear();
leaderSessionID = null;
}
});
}
>>>>>>> db98efb... rsourceManager registration with JobManager

/**
* Callback method when current resourceManager is granted leadership
Expand Down Expand Up @@ -232,4 +302,35 @@ public void run() {
}
});
}

private class JobMasterLeaderListener implements LeaderRetrievalListener {
private final JobID jobID;

private JobMasterLeaderListener(JobID jobID) {
this.jobID = jobID;
}

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(new Runnable() {
@Override
public void run() {
log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
// update job master leader session id
JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
}
});
}

@Override
public void handleError(final Exception exception) {
runAsync(new Runnable() {
@Override
public void run() {
log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
}
});
}
}
}
Loading

0 comments on commit 5cb81c4

Please sign in to comment.