-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally #24246
Conversation
9eec91b
to
7cf737d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job, @1996fanrui . I bet that was not an easy one. I left a few comments. IMHO, we might want to focus on the tests that actual require that change and not touch other tests without a reason. WDYT?
* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the | ||
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called. | ||
*/ | ||
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { | |
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster, int checkpointCount) throws Exception { |
Can't we make the number of checkpoints to wait for configurable? That way, we can pass in 2
in the test implementation analogously to waitForCheckpoint
. I also feel like we can remove some redundant code within the two methods. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That way, we can pass in 2 in the test implementation analogously to waitForCheckpoint. I also feel like we can remove some redundant code within the two methods. 🤔
IIUC, the semantic between waitForCheckpoint
and waitForOneMoreCheckpoint
are different. (waitForOneMoreCheckpoint
is renamed to waitForNewCheckpoint
in this PR.)
waitForCheckpoint
check the total count of all completed checkpoints.waitForOneMoreCheckpoint
check the whether the new checkpoint is completed after it's called.- For example, the job has 10 completed checkpoint before it's called.
waitForOneMoreCheckpoint
will wait for checkpoint-11 is completed.
BTW, I have refactored the waitForNewCheckpoint
. I check the checkpoint trigger time instead of checkpointCount.
I think checking trigger time is clearer than checkpointCount >= 2. Other developers might don't know why check 2 checkpoint here, and checkpointCount >= 2
doesn't work when enabling the concurrent checkpoint.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Checking the trigger time is a better solution. I like that. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the redundant code:
/** Wait for (at least) the given number of successful checkpoints. */
public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
throws Exception {
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot ->
checkpointStatsSnapshot != null
&& checkpointStatsSnapshot
.getCounts()
.getNumberOfCompletedCheckpoints()
>= numCheckpoints);
}
/**
* Wait for a new completed checkpoint, the new checkpoint must be triggered after
* waitForNewCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
final long startTime = System.currentTimeMillis();
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot -> {
if (checkpointStatsSnapshot != null) {
final CompletedCheckpointStats latestCompletedCheckpoint =
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
return latestCompletedCheckpoint != null
&& latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
}
return false;
});
}
private static void waitForCheckpoints(
JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
throws Exception {
waitUntilCondition(
() -> {
final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (condition.test(snapshot)) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated (state=%s) before completing the requested checkpoint(s).",
graph.getState());
throw graph.getFailureInfo().getException();
}
return false;
});
}
...just to clarify what I meant. Feel free to ignore that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your code, this refactor makes sense to me. I have updated.
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
Show resolved
Hide resolved
@@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce | |||
// clear the CollectionSink set for the restarted job | |||
CollectionSink.clearElementsSet(); | |||
|
|||
waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); | |||
waitForNewCheckpoint(jobID, cluster.getMiniCluster()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the scenario can happen in this test as well because it's almost the same test implementation as in #testCheckpointRescalingKeyedState
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether the redundant code could be removed here. But that's probably a bit out-of-scope for this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean createJobGraphWithKeyedState
and createJobGraphWithKeyedAndNonPartitionedOperatorState
have redundant code ? Or testCheckpointRescalingWithKeyedAndNonPartitionedState
and testCheckpointRescalingKeyedState
?
I checked them, they have a lot of differences in details. Such as:
- Source is different
- The parallelism and MaxParallelism is fixed parallelism for
NonPartitionedOperator
I will check could they extract some common code later. If yes, I can submit a hotfix PR and cc you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a PR to show what I had in mind for the code redundancy reduction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?
Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
Show resolved
Hide resolved
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
Show resolved
Hide resolved
7cf737d
to
7265b6b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @XComp for the quick review!
we might want to focus on the tests that actual require that change and not touch other tests without a reason. WDYT?
I have commented here: #24246 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui thanks for looking into this and fixing the test and test util. Changes lgtm!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, too. See my comments below.
* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the | ||
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called. | ||
*/ | ||
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Checking the trigger time is a better solution. I like that. 👍
* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the | ||
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called. | ||
*/ | ||
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the redundant code:
/** Wait for (at least) the given number of successful checkpoints. */
public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
throws Exception {
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot ->
checkpointStatsSnapshot != null
&& checkpointStatsSnapshot
.getCounts()
.getNumberOfCompletedCheckpoints()
>= numCheckpoints);
}
/**
* Wait for a new completed checkpoint, the new checkpoint must be triggered after
* waitForNewCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
final long startTime = System.currentTimeMillis();
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot -> {
if (checkpointStatsSnapshot != null) {
final CompletedCheckpointStats latestCompletedCheckpoint =
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
return latestCompletedCheckpoint != null
&& latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
}
return false;
});
}
private static void waitForCheckpoints(
JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
throws Exception {
waitUntilCondition(
() -> {
final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (condition.test(snapshot)) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated (state=%s) before completing the requested checkpoint(s).",
graph.getState());
throw graph.getFailureInfo().getException();
}
return false;
});
}
...just to clarify what I meant. Feel free to ignore that one.
// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint. | ||
// This test will fail if the job recovers from a checkpoint triggered before | ||
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling | ||
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects | ||
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from | ||
// the checkpoint to be the count and sum of all data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint. | |
// This test will fail if the job recovers from a checkpoint triggered before | |
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling | |
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects | |
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from | |
// the checkpoint to be the count and sum of all data. | |
// We need to wait for a checkpoint to be completed that was triggered after all the data was processed. That ensures the entire data being flushed out of the Operator's network buffers to avoid reprocessing test data twice after the restore (see FLINK-34200). |
Just as a proposal to keep it shorter. Refering back to the jira issue for more context should be good enough. Additionally, adding markdown features might not add much value in JavaDoc. If you want to go for that you might want to use {@code }
syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds make sense. I have updated.
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
Show resolved
Hide resolved
@@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce | |||
// clear the CollectionSink set for the restarted job | |||
CollectionSink.clearElementsSet(); | |||
|
|||
waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); | |||
waitForNewCheckpoint(jobID, cluster.getMiniCluster()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a PR to show what I had in mind for the code redundancy reduction
3185807
to
34012a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @XComp , I have addressed your comments.
You can see the latest change here[1].
// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint. | ||
// This test will fail if the job recovers from a checkpoint triggered before | ||
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling | ||
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects | ||
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from | ||
// the checkpoint to be the count and sum of all data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds make sense. I have updated.
* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the | ||
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called. | ||
*/ | ||
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your code, this refactor makes sense to me. I have updated.
@@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce | |||
// clear the CollectionSink set for the restarted job | |||
CollectionSink.clearElementsSet(); | |||
|
|||
waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); | |||
waitForNewCheckpoint(jobID, cluster.getMiniCluster()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?
Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?
Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?
Fine with me. I have nothing else to add aside from the commit message.
@@ -163,6 +163,8 @@ public void setup() throws Exception { | |||
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel); | |||
|
|||
config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); | |||
// Disable the scaling cooldown to speed up the test | |||
config.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit message needs to be changed to someting like [hotfix][tests] Disables cool down phase for faster test execution
…ntRescalingInKeyedState fails occasionally
34012a4
to
da5db2c
Compare
Thanks @XComp @StefanRRichter for the review, merging~ |
What is the purpose of the change
see 1996fanrui@420fdf3#r138161972
Brief change log
Wait for 2 checkpoint to ensure the latest checkpoint start after waitForTwoOrMoreCheckpoint is called.