Skip to content

Commit

Permalink
[FLINK-14640][runtime] Change Type of Field currentExecutions from Co…
Browse files Browse the repository at this point in the history
…ncurrentHashMap to HashMap

This closes apache#10112.
  • Loading branch information
yanghua authored and GJL committed Nov 12, 2019
1 parent 512b4f7 commit 4d97e2f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -202,7 +201,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
private final Map<IntermediateDataSetID, IntermediateResult> intermediateResults;

/** The currently executed tasks, for callbacks. */
private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
private final Map<ExecutionAttemptID, Execution> currentExecutions;

/** Listeners that receive messages when the entire job switches it status
* (such as from RUNNING to FINISHED). */
Expand Down Expand Up @@ -465,7 +464,7 @@ public ExecutionGraph(
this.tasks = new HashMap<>(16);
this.intermediateResults = new HashMap<>(16);
this.verticesInCreationOrder = new ArrayList<>(16);
this.currentExecutions = new ConcurrentHashMap<>(16);
this.currentExecutions = new HashMap<>(16);

this.jobStatusListeners = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ public void testRegistrationOfExecutionsFinishing() {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);

Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350).f1;
Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = setupExecution(v1, 7650, v2, 2350);
ExecutionGraph testExecutionGraph = graphExecutionsTuple.f0;
Collection<Execution> executions = new ArrayList<>(graphExecutionsTuple.f1.values());

for (Execution e : executions.values()) {
for (Execution e : executions) {
e.markFinished();
}

assertEquals(0, executions.size());
assertEquals(0, testExecutionGraph.getRegisteredExecutions().size());
}
catch (Exception e) {
e.printStackTrace();
Expand All @@ -270,13 +272,15 @@ public void testRegistrationOfExecutionsFailing() {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);

Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6).f1;
Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = setupExecution(v1, 7, v2, 6);
ExecutionGraph testExecutionGraph = graphExecutionsTuple.f0;
Collection<Execution> executions = new ArrayList<>(graphExecutionsTuple.f1.values());

for (Execution e : executions.values()) {
for (Execution e : executions) {
e.markFailed(null);
}

assertEquals(0, executions.size());
assertEquals(0, testExecutionGraph.getRegisteredExecutions().size());
}
catch (Exception e) {
e.printStackTrace();
Expand All @@ -294,13 +298,15 @@ public void testRegistrationOfExecutionsFailedExternally() {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);

Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6).f1;
Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = setupExecution(v1, 7, v2, 6);
ExecutionGraph testExecutionGraph = graphExecutionsTuple.f0;
Collection<Execution> executions = new ArrayList<>(graphExecutionsTuple.f1.values());

for (Execution e : executions.values()) {
for (Execution e : executions) {
e.fail(null);
}

assertEquals(0, executions.size());
assertEquals(0, testExecutionGraph.getRegisteredExecutions().size());
}
catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -397,14 +403,16 @@ public void testRegistrationOfExecutionsCanceled() {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);

Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37).f1;
Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphExecutionsTuple = setupExecution(v1, 19, v2, 37);
ExecutionGraph testExecutionGraph = graphExecutionsTuple.f0;
Collection<Execution> executions = new ArrayList<>(graphExecutionsTuple.f1.values());

for (Execution e : executions.values()) {
for (Execution e : executions) {
e.cancel();
e.completeCancelling();
}

assertEquals(0, executions.size());
assertEquals(0, testExecutionGraph.getRegisteredExecutions().size());
}
catch (Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit 4d97e2f

Please sign in to comment.