Skip to content

Commit

Permalink
[FLINK-25105][checkpoint] Enables final checkpoint by default
Browse files Browse the repository at this point in the history
This closes apache#18068.
  • Loading branch information
gaoyunhaii committed Jan 4, 2022
1 parent 9247bf2 commit 4d9ea35
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 87 deletions.
21 changes: 15 additions & 6 deletions docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Other parameters for checkpointing include:

- *unaligned checkpoints*: You can enable [unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure" >}}#unaligned-checkpoints) to greatly reduce checkpointing times under backpressure. This only works for exactly-once checkpoints and with one concurrent checkpoint.

- *checkpoints with finished tasks*: You can enable an experimental feature to continue performing checkpoints even if parts of the DAG have finished processing all of their records. Before doing so, please read through some [important considerations](#checkpointing-with-parts-of-the-graph-finished).
- *checkpoints with finished tasks*: By default Flink will continue performing checkpoints even if parts of the DAG have finished processing all of their records. Please refer to [important considerations](#checkpointing-with-parts-of-the-graph-finished) for details.

{{< tabs "4b9c6a74-8a45-4ad2-9e80-52fe44a85991" >}}
{{< tab "Java" >}}
Expand Down Expand Up @@ -230,19 +230,19 @@ Flink currently only provides processing guarantees for jobs without iterations.

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

## Checkpointing with parts of the graph finished *(BETA)*
## Checkpointing with parts of the graph finished

Starting from Flink 1.14 it is possible to continue performing checkpoints even if parts of the job
graph have finished processing all data, which might happen if it contains bounded sources. This
feature must be enabled via a feature flag:
graph have finished processing all data, which might happen if it contains bounded sources. This feature
is enabled by default since 1.15, and it could be disabled via a feature flag:

```java
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
```

Once the tasks/subtasks are finished, they do not contribute to the checkpoints any longer. This is an
Once the tasks/subtasks are finished, they do not contribute to the checkpoints any longer. This is
an important consideration when implementing any custom operators or UDFs (User-Defined Functions).

In order to support checkpointing with tasks that finished, we adjusted the [task lifecycle]({{<ref "docs/internals/task_lifecycle" >}})
Expand Down Expand Up @@ -271,4 +271,13 @@ Any operator that is prepared to be rescaled should work well with tasks that pa
Restoring from a checkpoint where only a subset of tasks finished is equivalent to restoring such a
task with the number of new subtasks equal to the number of finished tasks.

### Waiting for the final checkpoint before task exit

To ensure all the records could be committed for operators using the two-phase commit,
the tasks would wait for the final checkpoint completed successfully after all the operators finished.
It needs to be noted that this behavior would prolong the execution time of tasks.
If the checkpoint interval is long, the execution time would also be prolonged largely.
For the worst case, if the checkpoint interval is set to `Long.MAX_VALUE`,
the tasks would in fact be blocked forever since the final checkpoint would never happen.

{{< top >}}
6 changes: 5 additions & 1 deletion docs/content/docs/internals/task_lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ The steps a task goes through when executed until completion without being inter
open-operators
run
finish-operators
wait for the final checkponit completed (if enabled)
close-operators
task-specific-cleanup
common-cleanup
Expand Down Expand Up @@ -170,7 +171,10 @@ method, the task enters its shutdown process. Initially, the timer service stops
fired timers that are being executed), clears all not-yet-started timers, and awaits the completion of currently
executing timers. Then the `finishAllOperators()` notifies the operators involved in the computation by
calling the `finish()` method of each operator. Then, any buffered output data is flushed so that they can be processed
by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the
by the downstream tasks. Then if final checkpoint is enabled, the task would
[wait for the final checkpoint completed]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing#waiting-for-the-final-checkpoint-before-task-exit" >}})
to ensure operators using two-phase committing have committed all the records.
Finally the task tries to clear all the resources held by the operators by calling the
`close()` method of each one. When opening the different operators, we mentioned that the order is from the
last to the first. Closing happens in the opposite manner, from first to last.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</tr>
<tr>
<td><h5>execution.checkpointing.checkpoints-after-tasks-finish.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Feature toggle for enabling checkpointing even if some of tasks have finished. Before you enable it, please take a look at <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta">the important considerations</a> </td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand Down Expand Up @@ -172,9 +170,7 @@ private void runTest(
.setDeliveryGuarantee(deliveryGuarantee)
.build();

final Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
env.enableCheckpointing(100L);
if (!allowRestarts) {
env.setRestartStrategy(RestartStrategies.noRestart());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ private void executeWithMapper(
Configuration config,
@Nullable String transactionalIdPrefix)
throws Exception {
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
env.enableCheckpointing(100L);
env.setRestartStrategy(RestartStrategies.noRestart());
Expand Down Expand Up @@ -314,9 +313,7 @@ private void testRecoveryWithAssertion(
int maxConcurrentCheckpoints,
java.util.function.Consumer<List<Long>> recordsAssertion)
throws Exception {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
env.enableCheckpointing(300L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);
DataStreamSource<Long> source = env.fromSequence(1, 10);
Expand Down Expand Up @@ -345,9 +342,7 @@ private void testRecoveryWithAssertion(
private void writeRecordsToKafka(
DeliveryGuarantee deliveryGuarantee, SharedReference<AtomicLong> expectedRecords)
throws Exception {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
final StreamExecutionEnvironment env = new LocalStreamEnvironment(config);
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
env.enableCheckpointing(100L);
final DataStream<Long> source =
env.addSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public class ExecutionCheckpointingOptions {
public static final ConfigOption<Boolean> ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH =
ConfigOptions.key("execution.checkpointing.checkpoints-after-tasks-finish.enabled")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription(
Description.builder()
.text(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Input;
Expand Down Expand Up @@ -401,11 +400,6 @@ private void testTriggerCheckpointWithFinishedChannelsAndSourceChain(
config.setUnalignedCheckpointsEnabled(
checkpointOptions.isUnalignedCheckpoint()
|| checkpointOptions.isTimeoutable());
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.modifyExecutionConfig(applyObjectReuse(objectReuse))
.setCheckpointResponder(checkpointResponder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -972,11 +971,6 @@ private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions chec
config.setUnalignedCheckpointsEnabled(
checkpointOptions.isUnalignedCheckpoint()
|| checkpointOptions.isTimeoutable());
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.setupOperatorChain(new MapToStringMultipleInputOperatorFactory(3))
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
Expand Down Expand Up @@ -536,15 +535,7 @@ public void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception
try (StreamTaskMailboxTestHarness<String> testHarness =
new StreamTaskMailboxTestHarnessBuilder<>(
SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.modifyStreamConfig(
config -> {
config.setCheckpointingEnabled(true);
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.modifyStreamConfig(config -> config.setCheckpointingEnabled(true))
.setCheckpointResponder(
new TestCheckpointResponder() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -233,11 +232,6 @@ private StreamTaskMailboxTestHarness<String> createTestHarness(
config.setCheckpointingEnabled(true);
config.setUnalignedCheckpointsEnabled(
enableUnalignedCheckpoint);
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.setCheckpointResponder(checkpointResponder)
.setupOperatorChain(new EmptyOperator())
Expand Down Expand Up @@ -462,11 +456,6 @@ public void acknowledgeCheckpoint(
.modifyStreamConfig(
config -> {
config.setCheckpointingEnabled(true);
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.setCheckpointResponder(checkpointResponder)
.setupOutputForSingletonOperatorChain(
Expand Down Expand Up @@ -629,15 +618,7 @@ public void testReportOperatorsFinishedInCheckpoint() throws Exception {
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 1)
.addAdditionalOutput(partitionWriters)
.setCheckpointResponder(checkpointResponder)
.modifyStreamConfig(
config -> {
config.setCheckpointingEnabled(true);
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.modifyStreamConfig(config -> config.setCheckpointingEnabled(true))
.setupOperatorChain(new StatefulOperator())
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
.build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
Expand Down Expand Up @@ -1599,15 +1598,7 @@ public void testSkipRepeatCheckpointComplete() throws Exception {
new StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
.modifyStreamConfig(
config -> {
config.setCheckpointingEnabled(true);
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.modifyStreamConfig(config -> config.setCheckpointingEnabled(true))
.setupOutputForSingletonOperatorChain(
new CheckpointCompleteRecordOperator())
.build()) {
Expand All @@ -1633,15 +1624,7 @@ public void testIgnoreCompleteCheckpointBeforeStartup() throws Exception {
OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
.setTaskStateSnapshot(3, new TaskStateSnapshot())
.modifyStreamConfig(
config -> {
config.setCheckpointingEnabled(true);
config.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
true);
})
.modifyStreamConfig(config -> config.setCheckpointingEnabled(true))
.setupOutputForSingletonOperatorChain(
new CheckpointCompleteRecordOperator())
.build()) {
Expand Down
Loading

0 comments on commit 4d9ea35

Please sign in to comment.