Skip to content

Commit

Permalink
[FLINK-24444][runtime][tests] Wait until checkpoints stopped triggering
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk authored Jan 16, 2022
1 parent 99a101c commit f5429e8
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
Expand Down Expand Up @@ -105,7 +106,6 @@
* Tests for the integration of the {@link OperatorCoordinator} with the scheduler, to ensure the
* relevant actions are leading to the right method invocations on the coordinator.
*/
@SuppressWarnings("serial")
public class OperatorCoordinatorSchedulerTest extends TestLogger {

private final JobVertexID testVertexId = new JobVertexID();
Expand Down Expand Up @@ -774,10 +774,19 @@ private void failAndRestartTask(DefaultScheduler scheduler, int subtask) {
SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
}

private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason) {
private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason)
throws InterruptedException {
scheduler.handleGlobalFailure(reason);
SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);

// make sure the checkpoint is no longer triggering (this means that the operator event
// valve has been closed)
final CheckpointCoordinator checkpointCoordinator =
scheduler.getExecutionGraph().getCheckpointCoordinator();
while (checkpointCoordinator != null && checkpointCoordinator.isTriggering()) {
Thread.sleep(1);
}

// make sure we propagate all asynchronous and delayed actions
executor.triggerAll();
executor.triggerScheduledTasks();
Expand Down

0 comments on commit f5429e8

Please sign in to comment.