Skip to content

Commit

Permalink
[FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.
Browse files Browse the repository at this point in the history
This fixes a race condition where HA data might have been accidentally cleaned up due to job transition to the terminal state.
  • Loading branch information
dmvk committed Apr 21, 2023
1 parent f240a51 commit c2ab806
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
Expand All @@ -44,6 +45,7 @@
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.time.Duration;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;

Expand All @@ -61,15 +63,26 @@ class UpdateJobResourceRequirementsRecoveryITCase {
/** Tests that a rescaled job graph will be recovered with the latest parallelism. */
@Test
void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
final Configuration configuration = new Configuration();

final JobVertex jobVertex = new JobVertex("operator");
jobVertex.setParallelism(1);
jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
final JobID jobId = jobGraph.getJobID();

final Configuration configuration = new Configuration();
// We need to have a restart strategy set, to prevent the job from failing during the first
// cluster shutdown when TM disconnects.
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));

// The test is only supposed to pass with AdaptiveScheduler enabled.
configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

// High-Availability settings.
configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
configuration.set(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
Expand Down Expand Up @@ -103,7 +116,8 @@ void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) thro
.setParallelismForJobVertex(jobVertex.getID(), 1, 2)
.build()))
.eventuallySucceeds();
miniCluster.closeAsyncWithoutCleaningHighAvailabilityData();
assertThatFuture(miniCluster.closeAsyncWithoutCleaningHighAvailabilityData())
.eventuallySucceeds();

LOG.info("Start second mini cluster to recover the persisted job.");

Expand Down

0 comments on commit c2ab806

Please sign in to comment.