Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally #24246

Merged
merged 2 commits into from
Feb 8, 2024

Conversation

1996fanrui
Copy link
Member

What is the purpose of the change

see 1996fanrui@420fdf3#r138161972

Brief change log

Wait for 2 checkpoint to ensure the latest checkpoint start after waitForTwoOrMoreCheckpoint is called.

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 1, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, @1996fanrui . I bet that was not an easy one. I left a few comments. IMHO, we might want to focus on the tests that actual require that change and not touch other tests without a reason. WDYT?

* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster, int checkpointCount) throws Exception {

Can't we make the number of checkpoints to wait for configurable? That way, we can pass in 2 in the test implementation analogously to waitForCheckpoint. I also feel like we can remove some redundant code within the two methods. 🤔

Copy link
Member Author

@1996fanrui 1996fanrui Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way, we can pass in 2 in the test implementation analogously to waitForCheckpoint. I also feel like we can remove some redundant code within the two methods. 🤔

IIUC, the semantic between waitForCheckpoint and waitForOneMoreCheckpoint are different. (waitForOneMoreCheckpoint is renamed to waitForNewCheckpoint in this PR.)

  • waitForCheckpoint check the total count of all completed checkpoints.
  • waitForOneMoreCheckpoint check the whether the new checkpoint is completed after it's called.
    • For example, the job has 10 completed checkpoint before it's called.
    • waitForOneMoreCheckpoint will wait for checkpoint-11 is completed.

BTW, I have refactored the waitForNewCheckpoint. I check the checkpoint trigger time instead of checkpointCount.

I think checking trigger time is clearer than checkpointCount >= 2. Other developers might don't know why check 2 checkpoint here, and checkpointCount >= 2 doesn't work when enabling the concurrent checkpoint.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Checking the trigger time is a better solution. I like that. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the redundant code:

    /** Wait for (at least) the given number of successful checkpoints. */
    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
            throws Exception {
        waitForCheckpoints(
                jobID,
                miniCluster,
                checkpointStatsSnapshot ->
                        checkpointStatsSnapshot != null
                                && checkpointStatsSnapshot
                                                .getCounts()
                                                .getNumberOfCompletedCheckpoints()
                                        >= numCheckpoints);
    }

    /**
     * Wait for a new completed checkpoint, the new checkpoint must be triggered after
     * waitForNewCheckpoint is called.
     */
    public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
        final long startTime = System.currentTimeMillis();
        waitForCheckpoints(
                jobID,
                miniCluster,
                checkpointStatsSnapshot -> {
                    if (checkpointStatsSnapshot != null) {
                        final CompletedCheckpointStats latestCompletedCheckpoint =
                                checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
                        return latestCompletedCheckpoint != null
                                && latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
                    }

                    return false;
                });
    }

    private static void waitForCheckpoints(
            JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
            throws Exception {
        waitUntilCondition(
                () -> {
                    final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
                    final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
                    if (condition.test(snapshot)) {
                        return true;
                    } else if (graph.getState().isGloballyTerminalState()) {
                        checkState(
                                graph.getFailureInfo() != null,
                                "Job terminated (state=%s) before completing the requested checkpoint(s).",
                                graph.getState());
                        throw graph.getFailureInfo().getException();
                    }
                    return false;
                });
    }

...just to clarify what I meant. Feel free to ignore that one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your code, this refactor makes sense to me. I have updated.

@@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the scenario can happen in this test as well because it's almost the same test implementation as in #testCheckpointRescalingKeyedState 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether the redundant code could be removed here. But that's probably a bit out-of-scope for this issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean createJobGraphWithKeyedState and createJobGraphWithKeyedAndNonPartitionedOperatorState have redundant code ? Or testCheckpointRescalingWithKeyedAndNonPartitionedState and testCheckpointRescalingKeyedState?

I checked them, they have a lot of differences in details. Such as:

  • Source is different
  • The parallelism and MaxParallelism is fixed parallelism for NonPartitionedOperator

I will check could they extract some common code later. If yes, I can submit a hotfix PR and cc you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a PR to show what I had in mind for the code redundancy reduction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?

Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?

1996fanrui referenced this pull request in 1996fanrui/flink Feb 2, 2024
@1996fanrui 1996fanrui force-pushed the 34200/rescale-itcase branch from 7cf737d to 7265b6b Compare February 2, 2024 07:35
Copy link
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @XComp for the quick review!

we might want to focus on the tests that actual require that change and not touch other tests without a reason. WDYT?

I have commented here: #24246 (comment)

Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui thanks for looking into this and fixing the test and test util. Changes lgtm!

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, too. See my comments below.

* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Checking the trigger time is a better solution. I like that. 👍

* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the redundant code:

    /** Wait for (at least) the given number of successful checkpoints. */
    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
            throws Exception {
        waitForCheckpoints(
                jobID,
                miniCluster,
                checkpointStatsSnapshot ->
                        checkpointStatsSnapshot != null
                                && checkpointStatsSnapshot
                                                .getCounts()
                                                .getNumberOfCompletedCheckpoints()
                                        >= numCheckpoints);
    }

    /**
     * Wait for a new completed checkpoint, the new checkpoint must be triggered after
     * waitForNewCheckpoint is called.
     */
    public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
        final long startTime = System.currentTimeMillis();
        waitForCheckpoints(
                jobID,
                miniCluster,
                checkpointStatsSnapshot -> {
                    if (checkpointStatsSnapshot != null) {
                        final CompletedCheckpointStats latestCompletedCheckpoint =
                                checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
                        return latestCompletedCheckpoint != null
                                && latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
                    }

                    return false;
                });
    }

    private static void waitForCheckpoints(
            JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
            throws Exception {
        waitUntilCondition(
                () -> {
                    final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
                    final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
                    if (condition.test(snapshot)) {
                        return true;
                    } else if (graph.getState().isGloballyTerminalState()) {
                        checkState(
                                graph.getFailureInfo() != null,
                                "Job terminated (state=%s) before completing the requested checkpoint(s).",
                                graph.getState());
                        throw graph.getFailureInfo().getException();
                    }
                    return false;
                });
    }

...just to clarify what I meant. Feel free to ignore that one.

Comment on lines 266 to 271
// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint.
// This test will fail if the job recovers from a checkpoint triggered before
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from
// the checkpoint to be the count and sum of all data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint.
// This test will fail if the job recovers from a checkpoint triggered before
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from
// the checkpoint to be the count and sum of all data.
// We need to wait for a checkpoint to be completed that was triggered after all the data was processed. That ensures the entire data being flushed out of the Operator's network buffers to avoid reprocessing test data twice after the restore (see FLINK-34200).

Just as a proposal to keep it shorter. Refering back to the jira issue for more context should be good enough. Additionally, adding markdown features might not add much value in JavaDoc. If you want to go for that you might want to use {@code } syntax.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds make sense. I have updated.

@@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a PR to show what I had in mind for the code redundancy reduction

@1996fanrui 1996fanrui force-pushed the 34200/rescale-itcase branch 2 times, most recently from 3185807 to 34012a4 Compare February 6, 2024 03:07
Copy link
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 266 to 271
// We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint.
// This test will fail if the job recovers from a checkpoint triggered before
// `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling
// `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects
// `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from
// the checkpoint to be the count and sum of all data.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds make sense. I have updated.

* Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the
* latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your code, this refactor makes sense to me. I have updated.

@@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?

Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?

Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?

Fine with me. I have nothing else to add aside from the commit message.

@@ -163,6 +163,8 @@ public void setup() throws Exception {
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel);

config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
// Disable the scaling cooldown to speed up the test
config.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit message needs to be changed to someting like [hotfix][tests] Disables cool down phase for faster test execution

@1996fanrui 1996fanrui force-pushed the 34200/rescale-itcase branch from 34012a4 to da5db2c Compare February 8, 2024 02:14
@1996fanrui
Copy link
Member Author

Thanks @XComp @StefanRRichter for the review, merging~

@1996fanrui 1996fanrui merged commit 1b95b19 into apache:master Feb 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants