Skip to content

Commit

Permalink
[FLINK-6379] [tests] Fix race condition in MesosResourceManagerTest
Browse files Browse the repository at this point in the history
The MesosResourceManagerTest#testAdapter tests the AkkaAdapter class. The tests
are executed asynchronously and thus it is necessary to introduce timeouts for
the verify calls. This commit fixes the test instability by introducing timeouts.
  • Loading branch information
tillrohrmann committed Jun 29, 2017
1 parent 3fe27ac commit 4bb488c
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
Expand All @@ -80,8 +81,6 @@
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;

/**
* The Mesos implementation of the resource manager.
*/
Expand Down Expand Up @@ -121,7 +120,7 @@ public class MesosResourceManager extends ResourceManager<MesosResourceManagerGa

private ActorRef connectionMonitor;

private ActorRef taskRouter;
private ActorRef taskMonitor;

private ActorRef launchCoordinator;

Expand Down Expand Up @@ -164,20 +163,20 @@ public MesosResourceManager(
jobLeaderIdService,
fatalErrorHandler);

this.actorSystem = actorSystem;
this.actorSystem = Preconditions.checkNotNull(actorSystem);

this.flinkConfig = requireNonNull(flinkConfig);
this.mesosConfig = requireNonNull(mesosConfig);
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.mesosConfig = Preconditions.checkNotNull(mesosConfig);

this.workerStore = requireNonNull(workerStore);
this.artifactResolver = requireNonNull(artifactResolver);
this.workerStore = Preconditions.checkNotNull(workerStore);
this.artifactResolver = Preconditions.checkNotNull(artifactResolver);

this.taskManagerParameters = requireNonNull(taskManagerParameters);
this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters);
this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec);

this.workersInNew = new HashMap<>();
this.workersInLaunch = new HashMap<>();
this.workersBeingReturned = new HashMap<>();
this.workersInNew = new HashMap<>(8);
this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);
}

protected ActorRef createSelfActor() {
Expand Down Expand Up @@ -257,7 +256,7 @@ protected void initialize() throws ResourceManagerException {
connectionMonitor = createConnectionMonitor();
launchCoordinator = createLaunchCoordinator();
reconciliationCoordinator = createReconciliationCoordinator();
taskRouter = createTaskRouter();
taskMonitor = createTaskRouter();

// recover state
try {
Expand Down Expand Up @@ -307,7 +306,7 @@ private void recoverWorkers() throws Exception {
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
break;
}
taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
}

// tell the launch coordinator about prior assignments
Expand Down Expand Up @@ -352,14 +351,14 @@ public void startNewWorker(ResourceProfile resourceProfile) {
LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());

// tell the task router about the new plans
taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
// tell the task monitor about the new plans
taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);

// tell the launch coordinator to launch the new tasks
launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
}
catch(Exception ex) {
onFatalErrorAsync(new ResourceManagerException("unable to request new workers", ex));
onFatalErrorAsync(new ResourceManagerException("Unable to request new workers.", ex));
}
}

Expand All @@ -370,6 +369,7 @@ public void stopWorker(InstanceID instanceId) {

/**
* Callback when a worker was started.
*
* @param resourceID The worker resource id (as provided by the TaskExecutor)
*/
@Override
Expand All @@ -379,11 +379,11 @@ protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
MesosWorkerStore.Worker inLaunch = workersInLaunch.get(resourceID);
if (inLaunch != null) {
return new RegisteredMesosWorkerNode(inLaunch);
} else {
// the worker is unrecognized or was already released
// return null to indicate that TaskExecutor registration should be declined
return null;
}

// the worker is unrecognized or was already released
// return null to indicate that TaskExecutor registration should be declined
return null;
}

// ------------------------------------------------------------------------
Expand All @@ -397,12 +397,13 @@ public void registered(Registered message) {
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
}
catch(Exception ex) {
onFatalError(new ResourceManagerException("unable to store the assigned framework ID", ex));
onFatalError(new ResourceManagerException("Unable to store the assigned framework ID.", ex));
return;
}

launchCoordinator.tell(message, selfActor);
reconciliationCoordinator.tell(message, selfActor);
taskRouter.tell(message, selfActor);
taskMonitor.tell(message, selfActor);
}

/**
Expand All @@ -413,7 +414,7 @@ public void reregistered(ReRegistered message) {
connectionMonitor.tell(message, selfActor);
launchCoordinator.tell(message, selfActor);
reconciliationCoordinator.tell(message, selfActor);
taskRouter.tell(message, selfActor);
taskMonitor.tell(message, selfActor);
}

/**
Expand All @@ -424,7 +425,7 @@ public void disconnected(Disconnected message) {
connectionMonitor.tell(message, selfActor);
launchCoordinator.tell(message, selfActor);
reconciliationCoordinator.tell(message, selfActor);
taskRouter.tell(message, selfActor);
taskMonitor.tell(message, selfActor);
}

/**
Expand Down Expand Up @@ -456,27 +457,26 @@ public void acceptOffers(AcceptOffers msg) {

// transition the persistent state of some tasks to Launched
for (Protos.Offer.Operation op : msg.operations()) {
if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) {
continue;
}
for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
assert (worker != null);
if (op.getType() == Protos.Offer.Operation.Type.LAUNCH) {
for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId()));
assert (worker != null);

worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
workerStore.putWorker(worker);
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
workerStore.putWorker(worker);
workersInLaunch.put(extractResourceID(worker.taskID()), worker);

LOG.info("Launching Mesos task {} on host {}.",
worker.taskID().getValue(), worker.hostname().get());
LOG.info("Launching Mesos task {} on host {}.",
worker.taskID().getValue(), worker.hostname().get());

toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
}
}
}

// tell the task router about the new plans
// tell the task monitor about the new plans
for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
taskRouter.tell(update, selfActor);
taskMonitor.tell(update, selfActor);
}

// send the acceptance message to Mesos
Expand All @@ -492,7 +492,7 @@ public void acceptOffers(AcceptOffers msg) {
*/
@RpcMethod
public void statusUpdate(StatusUpdate message) {
taskRouter.tell(message, selfActor);
taskMonitor.tell(message, selfActor);
reconciliationCoordinator.tell(message, selfActor);
schedulerDriver.acknowledgeStatusUpdate(message.status());
}
Expand Down Expand Up @@ -541,8 +541,8 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) {
// failed worker, either at startup, or running
final MesosWorkerStore.Worker launched = workersInLaunch.remove(id);
assert(launched != null);
LOG.info("Worker {} failed with status: {}, reason: {}, message: {}. " +
"State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage());
LOG.info("Worker {} failed with status: {}, reason: {}, message: {}.",
id, status.getState(), status.getReason(), status.getMessage());

// TODO : launch a replacement worker?
}
Expand Down Expand Up @@ -578,7 +578,7 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID,

// create the specific TM parameters from the resource profile and some defaults
MesosTaskManagerParameters params = new MesosTaskManagerParameters(
resourceProfile.getCpuCores() < 1 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
taskManagerParameters.containerType(),
taskManagerParameters.containerImageName(),
new ContaineredTaskManagerParameters(
Expand Down Expand Up @@ -616,6 +616,7 @@ static ResourceID extractResourceID(Protos.TaskID taskId) {

/**
* Extracts the Mesos task goal state from the worker information.
*
* @param worker the persistent worker information.
* @return goal state information for the {@Link TaskMonitor}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.mesos.runtime.clusterframework;

import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.SchedulerGateway;
import org.apache.flink.mesos.scheduler.TaskMonitor;
Expand All @@ -29,9 +30,30 @@
*/
public interface MesosResourceManagerGateway extends ResourceManagerGateway, SchedulerGateway {

void acceptOffers(AcceptOffers msg);
/**
* Accept the given offers as advised by the launch coordinator.
*
* Note: This method is a callback for the {@link LaunchCoordinator}.
*
* @param offersToAccept Offers to accept from Mesos
*/
void acceptOffers(AcceptOffers offersToAccept);

void reconcile(ReconciliationCoordinator.Reconcile message);
/**
* Trigger reconciliation with the Mesos master.
*
* Note: This method is a callback for the {@link TaskMonitor}.
*
* @param reconciliationRequest Message containing the tasks which shall be reconciled
*/
void reconcile(ReconciliationCoordinator.Reconcile reconciliationRequest);

void taskTerminated(TaskMonitor.TaskTerminated message);
/**
* Notify that the given Mesos task has been terminated.
*
* Note: This method is a callback for the {@link TaskMonitor}.
*
* @param terminatedTask Message containing the terminated task
*/
void taskTerminated(TaskMonitor.TaskTerminated terminatedTask);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,11 @@ class Worker implements Serializable {

private Worker(Protos.TaskID taskID, ResourceProfile profile,
Option<Protos.SlaveID> slaveID, Option<String> hostname, WorkerState state) {
requireNonNull(taskID, "taskID");
requireNonNull(profile, "profile");
requireNonNull(slaveID, "slaveID");
requireNonNull(hostname, "hostname");
requireNonNull(state, "state");

this.taskID = taskID;
this.profile = profile;
this.slaveID = slaveID;
this.hostname = hostname;
this.state = state;
this.taskID = requireNonNull(taskID, "taskID");
this.profile = requireNonNull(profile, "profile");
this.slaveID = requireNonNull(slaveID, "slaveID");
this.hostname = requireNonNull(hostname, "hostname");
this.state = requireNonNull(state, "state");
}

/**
Expand Down Expand Up @@ -207,12 +201,13 @@ public boolean equals(Object o) {
return Objects.equals(taskID, worker.taskID) &&
Objects.equals(slaveID, worker.slaveID) &&
Objects.equals(hostname, worker.hostname) &&
Objects.equals(profile, worker.profile) &&
state == worker.state;
}

@Override
public int hashCode() {
return Objects.hash(taskID, slaveID, hostname, state);
return Objects.hash(taskID, slaveID, hostname, state, profile);
}

@Override
Expand Down
Loading

0 comments on commit 4bb488c

Please sign in to comment.