Skip to content

Commit

Permalink
[FLINK-22493] Increase test stability in AdaptiveSchedulerITCase.
Browse files Browse the repository at this point in the history
This addresses the following problem in the testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() test.

Once all tasks are running, the test triggers a savepoint, which intentionally fails, because of a test exception in a Task's checkpointing method. The test then waits for the savepoint future to fail, and the scheduler to restart the tasks. Once they are running again, it performs a sanity check whether the savepoint directory has been properly removed. In the reported run, there was still the savepoint directory around.

The savepoint directory is removed via the PendingCheckpoint.discard() method. This method is executed using the i/o executor pool of the CheckpointCoordinator. There is no guarantee that this discard method has been executed when the job is running again (and the executor shuts down with the dispatcher, hence it is not bound to job restarts).
  • Loading branch information
rmetzger committed Apr 29, 2021
1 parent 16c2e46 commit 00584d3
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,32 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex

DummySource.awaitRunning();

// ensure failed savepoint files have been removed by now (this check is intentionally after
// the wait for the sources to be running again, due to instabilities observed)
File[] files = savepointDirectory.listFiles();
if (files.length > 0) {
fail(
"Found unexpected files: "
+ Arrays.stream(files)
.map(File::getAbsolutePath)
.collect(Collectors.joining(", ")));
}
// ensure failed savepoint files have been removed from the directory.
// We execute this in a retry loop with a timeout, because the savepoint deletion happens
// asynchronously and is not bound to the job lifecycle. See FLINK-22493 for more details.
CommonTestUtils.waitUntilCondition(
() -> isDirectoryEmpty(savepointDirectory),
Deadline.fromNow(Duration.ofSeconds(10)));

// trigger second savepoint
final String savepoint =
client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath()).get();
assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath()));
}

private boolean isDirectoryEmpty(File directory) {
File[] files = directory.listFiles();
if (files.length > 0) {
log.warn(
"There are still unexpected files: {}",
Arrays.stream(files)
.map(File::getAbsolutePath)
.collect(Collectors.joining(", ")));
return false;
}
return true;
}

private static StreamExecutionEnvironment getEnvWithSource(
StopWithSavepointTestBehavior behavior) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down

0 comments on commit 00584d3

Please sign in to comment.