diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index fc40911509907..6a43ddfc125e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -112,9 +112,6 @@ public class CheckpointCoordinator { * need to be ascending across job managers. */ private final CheckpointIDCounter checkpointIdCounter; - /** Class loader used to deserialize the state handles (as they may be user-defined) */ - private final ClassLoader userClassLoader; - /** The base checkpoint interval. Actual trigger time may be affected by the * max concurrent checkpoints and minimum-pause values */ private final long baseInterval; @@ -167,7 +164,6 @@ public CheckpointCoordinator( ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, - ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, SavepointStore savepointStore, @@ -198,7 +194,6 @@ public CheckpointCoordinator( this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.savepointStore = checkNotNull(savepointStore); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); - this.userClassLoader = checkNotNull(userClassLoader); this.statsTracker = checkNotNull(statsTracker); this.timer = new Timer("Checkpoint Timer", true); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index 657dd609e2308..f5e361860e33a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -62,7 +62,7 @@ public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) { "Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + "."); this.jobVertexID = jobVertexID; - //preallocate lists of the required size, so that we can randomly set values to indexes + this.subtaskStates = new HashMap<>(parallelism); this.keyGroupsStateHandles = new HashMap<>(parallelism); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index c3cf2977cd3f3..7c3fa0b8c100f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -390,7 +390,6 @@ public void enableSnapshotCheckpointing( tasksToTrigger, tasksToWaitFor, tasksToCommitTo, - userClassLoader, checkpointIDCounter, checkpointStore, savepointStore, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java index 3c5157e9381f0..aa2840488e254 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java @@ -69,7 +69,7 @@ public static void bestEffortDiscardAllStateObjects( * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception. * * @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values. - * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration + * @throws IOException exception that is a collection of all suppressed exceptions that were caught during iteration */ public static void bestEffortCloseAllStateObjects( Iterable handlesToDiscard) throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index bc61742169f0b..9adaa8632a938 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -42,10 +42,13 @@ import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; + import org.junit.Assert; import org.junit.Test; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; + import scala.concurrent.ExecutionContext; import scala.concurrent.Future; @@ -109,7 +112,6 @@ public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() { new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -162,7 +164,6 @@ public void testCheckpointAbortsIfTriggerTasksAreFinished() { new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -213,7 +214,6 @@ public void testCheckpointAbortsIfAckTasksAreNotExecuted() { new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -265,7 +265,6 @@ public void testTriggerAndDeclineCheckpointSimple() { new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -390,7 +389,6 @@ public void testTriggerAndDeclineCheckpointComplex() { new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -511,7 +509,6 @@ public void testTriggerAndConfirmSimpleCheckpoint() { new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -662,7 +659,6 @@ public void testMultipleConcurrentCheckpoints() { new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -798,7 +794,6 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10, cl), new HeapSavepointStore(), @@ -920,7 +915,6 @@ public void testCheckpointTimeoutIsolated() { new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -989,7 +983,6 @@ public void handleMessagesForNonExistingCheckpoints() { new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -1069,7 +1062,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -1161,7 +1153,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -1246,7 +1237,6 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -1384,7 +1374,6 @@ public void testSavepointsAreNotSubsumed() throws Exception { new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, - cl, counter, new StandaloneCompletedCheckpointStore(10, cl), new HeapSavepointStore(), @@ -1470,8 +1459,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable { maxConcurrentAttempts, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter - (), new StandaloneCompletedCheckpointStore(2, cl), + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), new DisabledCheckpointStatsTracker()); @@ -1541,8 +1531,9 @@ public void testMaxConcurrentAttempsWithSubsumption() { maxConcurrentAttempts, // max two concurrent checkpoints new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter - (), new StandaloneCompletedCheckpointStore(2, cl), + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), new DisabledCheckpointStatsTracker()); @@ -1621,7 +1612,8 @@ public ExecutionState answer(InvocationOnMock invocation){ 2, // max two concurrent checkpoints new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, - new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(), + new ExecutionVertex[] { commitVertex }, + new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), new DisabledCheckpointStatsTracker()); @@ -1672,7 +1664,6 @@ public void testConcurrentSavepoints() throws Exception { new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, - cl, checkpointIDCounter, new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -1723,7 +1714,6 @@ public void testMinDelayBetweenSavepoints() throws Exception { new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), new HeapSavepointStore(), @@ -1772,7 +1762,8 @@ public void testRestoreLatestCheckpointedState() throws Exception { allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices())); allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); - ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]); + ExecutionVertex[] arrayExecutionVertices = + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -1784,7 +1775,6 @@ public void testRestoreLatestCheckpointedState() throws Exception { arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -1796,8 +1786,8 @@ public void testRestoreLatestCheckpointedState() throws Exception { assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - List keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1); - List keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2); + List keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1); + List keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2); for (int index = 0; index < jobVertex1.getParallelism(); index++) { ChainedStateHandle nonPartitionedState = generateStateForVertex(jobVertexID1, index); @@ -1876,7 +1866,7 @@ public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices())); allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); - ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]); + ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -1888,7 +1878,6 @@ public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -1900,8 +1889,8 @@ public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - List keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1); - List keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2); + List keyGroupPartitions1 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1); + List keyGroupPartitions2 = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2); for (int index = 0; index < jobVertex1.getParallelism(); index++) { ChainedStateHandle valueSizeTuple = generateStateForVertex(jobVertexID1, index); @@ -1991,7 +1980,8 @@ public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices())); allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); - ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]); + ExecutionVertex[] arrayExecutionVertices = + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -2003,7 +1993,6 @@ public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -2015,8 +2004,10 @@ public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - List keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1); - List keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2); + List keyGroupPartitions1 = + CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1); + List keyGroupPartitions2 = + CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2); for (int index = 0; index < jobVertex1.getParallelism(); index++) { ChainedStateHandle valueSizeTuple = generateStateForVertex(jobVertexID1, index); @@ -2110,7 +2101,8 @@ public void testRestoreLatestCheckpointedStateWithChangingParallelism() throws E allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices())); allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); - ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[0]); + ExecutionVertex[] arrayExecutionVertices = + allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -2122,7 +2114,6 @@ public void testRestoreLatestCheckpointedStateWithChangingParallelism() throws E arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -2134,8 +2125,10 @@ public void testRestoreLatestCheckpointedStateWithChangingParallelism() throws E assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - List keyGroupPartitions1 = coord.createKeyGroupPartitions(maxParallelism1, parallelism1); - List keyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, parallelism2); + List keyGroupPartitions1 = + CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1); + List keyGroupPartitions2 = + CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2); for (int index = 0; index < jobVertex1.getParallelism(); index++) { ChainedStateHandle valueSizeTuple = generateStateForVertex(jobVertexID1, index); @@ -2173,7 +2166,8 @@ public void testRestoreLatestCheckpointedStateWithChangingParallelism() throws E int newParallelism2 = 13; - List newKeyGroupPartitions2 = coord.createKeyGroupPartitions(maxParallelism2, newParallelism2); + List newKeyGroupPartitions2 = + CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, newParallelism2); final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex( jobVertexID1, @@ -2207,37 +2201,12 @@ public void testRestoreLatestCheckpointedStateWithChangingParallelism() throws E // Utilities // ------------------------------------------------------------------------ - static void sendAckMessageToCoordinator( - CheckpointCoordinator coord, - long checkpointId, JobID jid, - ExecutionJobVertex jobVertex, - JobVertexID jobVertexID, - List keyGroupPartitions) throws Exception { - - for (int index = 0; index < jobVertex.getParallelism(); index++) { - ChainedStateHandle state = generateStateForVertex(jobVertexID, index); - List keyGroupState = generateKeyGroupState( - jobVertexID, - keyGroupPartitions.get(index)); - - AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - state, - keyGroupState); - - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); - } - } - public static List generateKeyGroupState( JobVertexID jobVertexID, KeyGroupRange keyGroupPartition) throws IOException { - KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupPartition); List testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups()); - int runningGroupsOffset = 0; + // generate state for one keygroup for (int keyGroupIndex : keyGroupPartition) { Random random = new Random(jobVertexID.hashCode() + keyGroupIndex); @@ -2270,8 +2239,7 @@ public static List generateKeyGroupState(KeyGroupRange key //write all generated values in a single byte array, which is index by groupOffsetsInFinalByteArray byte[] allSerializedValuesConcatenated = new byte[runningGroupsOffset]; runningGroupsOffset = 0; - byte[] old = null; - for(byte[] serializedGroupValue : serializedGroupValues) { + for (byte[] serializedGroupValue : serializedGroupValues) { System.arraycopy( serializedGroupValue, 0, @@ -2279,7 +2247,6 @@ public static List generateKeyGroupState(KeyGroupRange key runningGroupsOffset, serializedGroupValue.length); runningGroupsOffset += serializedGroupValue.length; - old = serializedGroupValue; } ByteStreamStateHandle allSerializedStatesHandle = new ByteStreamStateHandle( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 541629233099b..a4896aad80b11 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -32,12 +32,11 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; - import org.apache.flink.runtime.util.SerializableObject; + import org.junit.Test; import org.mockito.Mockito; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -101,7 +100,6 @@ public void testSetState() { new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -182,7 +180,6 @@ public void testStateOnlyPartiallyAvailable() { new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], - cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(), @@ -231,7 +228,7 @@ public void testNoCheckpointAvailable() { Integer.MAX_VALUE, new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[] { mock(ExecutionVertex.class) }, - new ExecutionVertex[0], cl, + new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), new HeapSavepointStore(),