Skip to content

Commit

Permalink
[FLINK-31527][tests] Stabilize ChangelogRescalingITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 22, 2023
1 parent 18869f6 commit 9ba3b1a
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,15 +329,21 @@ public boolean request() {
private String checkpointAndCancel(JobID jobID) throws Exception {
waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
cluster.getClusterClient().cancel(jobID).get();
checkStatus(jobID);
waitForSuccessfulTermination(jobID);
return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, cluster.getMiniCluster())
.<NoSuchElementException>orElseThrow(
() -> {
throw new NoSuchElementException("No checkpoint was created yet");
});
}

private void checkStatus(JobID jobID) throws InterruptedException, ExecutionException {
private void waitForSuccessfulTermination(JobID jobID) throws Exception {
CommonTestUtils.waitUntilCondition(
() ->
cluster.getClusterClient()
.getJobStatus(jobID)
.get()
.isGloballyTerminalState());
if (cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState()) {
cluster.getClusterClient()
.requestJobResult(jobID)
Expand Down

0 comments on commit 9ba3b1a

Please sign in to comment.