From 4d9ea3524de90eab1290ee3e0eb858c99acb6825 Mon Sep 17 00:00:00 2001 From: Yun Gao Date: Wed, 8 Dec 2021 17:13:20 +0800 Subject: [PATCH] [FLINK-25105][checkpoint] Enables final checkpoint by default This closes #18068. --- .../fault-tolerance/checkpointing.md | 21 +++++++++++++------ docs/content/docs/internals/task_lifecycle.md | 6 +++++- ...execution_checkpointing_configuration.html | 2 +- .../sink/ElasticsearchSinkBaseITCase.java | 6 +----- .../connector/kafka/sink/KafkaSinkITCase.java | 9 ++------ .../ExecutionCheckpointingOptions.java | 2 +- ...amTaskChainedSourcesCheckpointingTest.java | 6 ------ .../tasks/MultipleInputStreamTaskTest.java | 6 ------ .../runtime/tasks/SourceStreamTaskTest.java | 11 +--------- .../tasks/StreamTaskFinalCheckpointsTest.java | 21 +------------------ .../runtime/tasks/StreamTaskTest.java | 21 ++----------------- .../lifecycle/BoundedSourceITCase.java | 3 +-- .../PartiallyFinishedSourcesITCase.java | 3 +-- .../test/checkpointing/RescalingITCase.java | 1 - 14 files changed, 31 insertions(+), 87 deletions(-) diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index 91bfed3dc1fe8..c98386ad9e8c9 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -83,7 +83,7 @@ Other parameters for checkpointing include: - *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure" >}}#unaligned-checkpoints) to greatly reduce checkpointing times under backpressure. This only works for exactly-once checkpoints and with one concurrent checkpoint. - - *checkpoints with finished tasks*: You can enable an experimental feature to continue performing checkpoints even if parts of the DAG have finished processing all of their records. Before doing so, please read through some [important considerations](#checkpointing-with-parts-of-the-graph-finished). + - *checkpoints with finished tasks*: By default Flink will continue performing checkpoints even if parts of the DAG have finished processing all of their records. Please refer to [important considerations](#checkpointing-with-parts-of-the-graph-finished) for details. {{< tabs "4b9c6a74-8a45-4ad2-9e80-52fe44a85991" >}} {{< tab "Java" >}} @@ -230,19 +230,19 @@ Flink currently only provides processing guarantees for jobs without iterations. Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure. -## Checkpointing with parts of the graph finished *(BETA)* +## Checkpointing with parts of the graph finished Starting from Flink 1.14 it is possible to continue performing checkpoints even if parts of the job -graph have finished processing all data, which might happen if it contains bounded sources. This -feature must be enabled via a feature flag: +graph have finished processing all data, which might happen if it contains bounded sources. This feature +is enabled by default since 1.15, and it could be disabled via a feature flag: ```java Configuration config = new Configuration(); -config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); +config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); ``` -Once the tasks/subtasks are finished, they do not contribute to the checkpoints any longer. This is an +Once the tasks/subtasks are finished, they do not contribute to the checkpoints any longer. This is an important consideration when implementing any custom operators or UDFs (User-Defined Functions). In order to support checkpointing with tasks that finished, we adjusted the [task lifecycle]({{}}) @@ -271,4 +271,13 @@ Any operator that is prepared to be rescaled should work well with tasks that pa Restoring from a checkpoint where only a subset of tasks finished is equivalent to restoring such a task with the number of new subtasks equal to the number of finished tasks. +### Waiting for the final checkpoint before task exit + +To ensure all the records could be committed for operators using the two-phase commit, +the tasks would wait for the final checkpoint completed successfully after all the operators finished. +It needs to be noted that this behavior would prolong the execution time of tasks. +If the checkpoint interval is long, the execution time would also be prolonged largely. +For the worst case, if the checkpoint interval is set to `Long.MAX_VALUE`, +the tasks would in fact be blocked forever since the final checkpoint would never happen. + {{< top >}} diff --git a/docs/content/docs/internals/task_lifecycle.md b/docs/content/docs/internals/task_lifecycle.md index 2af6732802e2c..68ca655ddecf1 100644 --- a/docs/content/docs/internals/task_lifecycle.md +++ b/docs/content/docs/internals/task_lifecycle.md @@ -122,6 +122,7 @@ The steps a task goes through when executed until completion without being inter open-operators run finish-operators + wait for the final checkponit completed (if enabled) close-operators task-specific-cleanup common-cleanup @@ -170,7 +171,10 @@ method, the task enters its shutdown process. Initially, the timer service stops fired timers that are being executed), clears all not-yet-started timers, and awaits the completion of currently executing timers. Then the `finishAllOperators()` notifies the operators involved in the computation by calling the `finish()` method of each operator. Then, any buffered output data is flushed so that they can be processed -by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the +by the downstream tasks. Then if final checkpoint is enabled, the task would +[wait for the final checkpoint completed]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing#waiting-for-the-final-checkpoint-before-task-exit" >}}) +to ensure operators using two-phase committing have committed all the records. +Finally the task tries to clear all the resources held by the operators by calling the `close()` method of each one. When opening the different operators, we mentioned that the order is from the last to the first. Closing happens in the opposite manner, from first to last. diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html index a9fa7ca30116a..8cd8ead064392 100644 --- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html @@ -16,7 +16,7 @@
execution.checkpointing.checkpoints-after-tasks-finish.enabled
- false + true Boolean Feature toggle for enabling checkpointing even if some of tasks have finished. Before you enable it, please take a look at the important considerations diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java index 757bf3114b9ba..927f672db5bcf 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java @@ -21,11 +21,9 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -172,9 +170,7 @@ private void runTest( .setDeliveryGuarantee(deliveryGuarantee) .build(); - final Configuration config = new Configuration(); - config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); - final StreamExecutionEnvironment env = new LocalStreamEnvironment(config); + final StreamExecutionEnvironment env = new LocalStreamEnvironment(); env.enableCheckpointing(100L); if (!allowRestarts) { env.setRestartStrategy(RestartStrategies.noRestart()); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index b8345e6669d7f..f1c41ca114065 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -285,7 +285,6 @@ private void executeWithMapper( Configuration config, @Nullable String transactionalIdPrefix) throws Exception { - config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); final StreamExecutionEnvironment env = new LocalStreamEnvironment(config); env.enableCheckpointing(100L); env.setRestartStrategy(RestartStrategies.noRestart()); @@ -314,9 +313,7 @@ private void testRecoveryWithAssertion( int maxConcurrentCheckpoints, java.util.function.Consumer> recordsAssertion) throws Exception { - Configuration config = new Configuration(); - config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); - final StreamExecutionEnvironment env = new LocalStreamEnvironment(config); + final StreamExecutionEnvironment env = new LocalStreamEnvironment(); env.enableCheckpointing(300L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints); DataStreamSource source = env.fromSequence(1, 10); @@ -345,9 +342,7 @@ private void testRecoveryWithAssertion( private void writeRecordsToKafka( DeliveryGuarantee deliveryGuarantee, SharedReference expectedRecords) throws Exception { - Configuration config = new Configuration(); - config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); - final StreamExecutionEnvironment env = new LocalStreamEnvironment(config); + final StreamExecutionEnvironment env = new LocalStreamEnvironment(); env.enableCheckpointing(100L); final DataStream source = env.addSource( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 536788bd775c0..253a963b6fc85 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -238,7 +238,7 @@ public class ExecutionCheckpointingOptions { public static final ConfigOption ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH = ConfigOptions.key("execution.checkpointing.checkpoints-after-tasks-finish.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( Description.builder() .text( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java index 46bc3a164f4b3..c7a1fef1e4072 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.Input; @@ -401,11 +400,6 @@ private void testTriggerCheckpointWithFinishedChannelsAndSourceChain( config.setUnalignedCheckpointsEnabled( checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable()); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); }) .modifyExecutionConfig(applyObjectReuse(objectReuse)) .setCheckpointResponder(checkpointResponder) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 2d8866cb973f6..38fc9b6d8d17f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -70,7 +70,6 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractInput; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -972,11 +971,6 @@ private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions chec config.setUnalignedCheckpointsEnabled( checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable()); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); }) .setupOperatorChain(new MapToStringMultipleInputOperatorFactory(3)) .finishForSingletonOperatorChain(StringSerializer.INSTANCE) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 0a790898300b6..beb49b5c489bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -536,15 +535,7 @@ public void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder<>( SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) - .modifyStreamConfig( - config -> { - config.setCheckpointingEnabled(true); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); - }) + .modifyStreamConfig(config -> config.setCheckpointingEnabled(true)) .setCheckpointResponder( new TestCheckpointResponder() { @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java index df74b006b5ca5..e01e0e269437d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -233,11 +232,6 @@ private StreamTaskMailboxTestHarness createTestHarness( config.setCheckpointingEnabled(true); config.setUnalignedCheckpointsEnabled( enableUnalignedCheckpoint); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); }) .setCheckpointResponder(checkpointResponder) .setupOperatorChain(new EmptyOperator()) @@ -462,11 +456,6 @@ public void acknowledgeCheckpoint( .modifyStreamConfig( config -> { config.setCheckpointingEnabled(true); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); }) .setCheckpointResponder(checkpointResponder) .setupOutputForSingletonOperatorChain( @@ -629,15 +618,7 @@ public void testReportOperatorsFinishedInCheckpoint() throws Exception { .addInput(BasicTypeInfo.STRING_TYPE_INFO, 1) .addAdditionalOutput(partitionWriters) .setCheckpointResponder(checkpointResponder) - .modifyStreamConfig( - config -> { - config.setCheckpointingEnabled(true); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); - }) + .modifyStreamConfig(config -> config.setCheckpointingEnabled(true)) .setupOperatorChain(new StatefulOperator()) .finishForSingletonOperatorChain(StringSerializer.INSTANCE) .build()) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d27c77948acac..ead5efc2244e5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -99,7 +99,6 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -1599,15 +1598,7 @@ public void testSkipRepeatCheckpointComplete() throws Exception { new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) - .modifyStreamConfig( - config -> { - config.setCheckpointingEnabled(true); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); - }) + .modifyStreamConfig(config -> config.setCheckpointingEnabled(true)) .setupOutputForSingletonOperatorChain( new CheckpointCompleteRecordOperator()) .build()) { @@ -1633,15 +1624,7 @@ public void testIgnoreCompleteCheckpointBeforeStartup() throws Exception { OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) .setTaskStateSnapshot(3, new TaskStateSnapshot()) - .modifyStreamConfig( - config -> { - config.setCheckpointingEnabled(true); - config.getConfiguration() - .set( - ExecutionCheckpointingOptions - .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, - true); - }) + .modifyStreamConfig(config -> config.setCheckpointingEnabled(true)) .setupOutputForSingletonOperatorChain( new CheckpointCompleteRecordOperator()) .build()) { diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java index d91379936ac8b..6b9549039abe9 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java @@ -36,7 +36,6 @@ import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.SIMPLE_GRAPH_BUILDER; import static org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator.checkDataFlow; import static org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator.checkOperatorsLifecycle; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH; /** * A test suite to check that the operator methods are called according to contract when sources are @@ -63,7 +62,7 @@ public void test() throws Exception { TestJobWithDescription testJob = graphBuilder.build( sharedObjects, - cfg -> cfg.setBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true), + cfg -> {}, env -> env.getCheckpointConfig() .setCheckpointStorage( diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java index 7a29238fae383..301acd9215990 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java @@ -54,7 +54,6 @@ import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.SIMPLE_GRAPH_BUILDER; import static org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator.checkDataFlow; import static org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator.checkOperatorsLifecycle; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH; /** * A test suite to check that the operator methods are called according to contract when sources are @@ -142,7 +141,7 @@ public void test() throws Exception { private TestJobWithDescription buildJob() throws Exception { return graphBuilder.build( sharedObjects, - cfg -> cfg.setBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true), + cfg -> {}, env -> { env.setRestartStrategy(fixedDelayRestart(1, 0)); // checkpoints can hang (because of not yet fixed bugs and triggering diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 15a8997f54a94..fbbfe267f9936 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -617,7 +617,6 @@ private static JobGraph createJobGraphWithOperatorState( StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.getConfig().setMaxParallelism(maxParallelism); - env.enableCheckpointing(Long.MAX_VALUE); env.setRestartStrategy(RestartStrategies.noRestart()); StateSourceBase.workStartedLatch = new CountDownLatch(parallelism);