Skip to content

Commit

Permalink
Fixed possible race in recovery logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jul 17, 2012
1 parent ea9dd3b commit 17ff56c
Showing 1 changed file with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package eu.stratosphere.nephele.jobmanager.scheduler;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import eu.stratosphere.nephele.execution.ExecutionListener;
Expand All @@ -25,6 +26,7 @@
import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.scheduler.local.LocalScheduler;

Expand Down Expand Up @@ -89,9 +91,14 @@ public void executionStateChanged(final JobID jobID, final ExecutionVertexID ver

if (newExecutionState == ExecutionState.CANCELED || newExecutionState == ExecutionState.FINISHED) {

synchronized (this.executionVertex.getExecutionGraph()) {
synchronized (eg) {

if (this.scheduler.getVerticesToBeRestarted().remove(this.executionVertex.getID()) != null) {

if (eg.getJobStatus() == InternalJobStatus.FAILING) {
return;
}

this.executionVertex.updateExecutionState(ExecutionState.ASSIGNED, "Restart as part of recovery");

// Run through the deployment procedure
Expand Down Expand Up @@ -122,7 +129,21 @@ public void executionStateChanged(final JobID jobID, final ExecutionVertexID ver
}

} else {
// TODO: Cancel the entire job in this case

// Make sure the map with the vertices to be restarted is cleaned up properly
synchronized (eg) {

final Iterator<ExecutionVertex> it = this.scheduler.getVerticesToBeRestarted().values()
.iterator();

while (it.hasNext()) {
if (eg.equals(it.next().getExecutionGraph())) {
it.remove();
}
}
}

// Actual cancellation of job is performed by job manager
}
}
}
Expand Down

0 comments on commit 17ff56c

Please sign in to comment.