Skip to content

Commit b497726

Browse files
committed
[FLINK-34511][Checkpoint] Remove force checkpointing flag
1 parent 5a63c56 commit b497726

File tree

7 files changed

+0
-184
lines changed

7 files changed

+0
-184
lines changed

flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java

-26
Original file line numberDiff line numberDiff line change
@@ -382,32 +382,6 @@ public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
382382
CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, maxConcurrentCheckpoints);
383383
}
384384

385-
/**
386-
* Checks whether checkpointing is forced, despite currently non-checkpointable iteration
387-
* feedback.
388-
*
389-
* @return True, if checkpointing is forced, false otherwise.
390-
* @deprecated This will be removed once iterations properly participate in checkpointing.
391-
*/
392-
@Deprecated
393-
@PublicEvolving
394-
public boolean isForceCheckpointing() {
395-
return configuration.get(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING);
396-
}
397-
398-
/**
399-
* Checks whether checkpointing is forced, despite currently non-checkpointable iteration
400-
* feedback.
401-
*
402-
* @param forceCheckpointing The flag to force checkpointing.
403-
* @deprecated This will be removed once iterations properly participate in checkpointing.
404-
*/
405-
@Deprecated
406-
@PublicEvolving
407-
public void setForceCheckpointing(boolean forceCheckpointing) {
408-
configuration.set(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING, forceCheckpointing);
409-
}
410-
411385
/**
412386
* Checks whether unaligned checkpoints are forced, despite iteration feedback.
413387
*

flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java

-13
Original file line numberDiff line numberDiff line change
@@ -338,19 +338,6 @@ public class ExecutionCheckpointingOptions {
338338
"the important considerations"))
339339
.build());
340340

341-
/**
342-
* Access to this option is officially only supported via {@link
343-
* CheckpointConfig#setForceCheckpointing(boolean)}, but there is no good reason behind this.
344-
*
345-
* @deprecated This will be removed once iterations properly participate in checkpointing.
346-
*/
347-
@Internal @Deprecated @Documentation.ExcludeFromDocumentation
348-
public static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
349-
key("execution.checkpointing.force")
350-
.booleanType()
351-
.defaultValue(false)
352-
.withDescription("Flag to force checkpointing in iterative jobs.");
353-
354341
/**
355342
* Access to this option is officially only supported via {@link
356343
* CheckpointConfig#enableApproximateLocalRecovery(boolean)}, but there is no good reason behind

flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

-41
Original file line numberDiff line numberDiff line change
@@ -559,35 +559,6 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi
559559
return this;
560560
}
561561

562-
/**
563-
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
564-
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
565-
* restarted from the latest completed checkpoint.
566-
*
567-
* <p>The job draws checkpoints periodically, in the given interval. The state will be stored in
568-
* the configured state backend.
569-
*
570-
* <p>NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment.
571-
* If the "force" parameter is set to true, the system will execute the job nonetheless.
572-
*
573-
* @param interval Time interval between state checkpoints in millis.
574-
* @param mode The checkpointing mode, selecting between "exactly once" and "at least once"
575-
* guaranteed.
576-
* @param force If true checkpointing will be enabled for iterative jobs as well.
577-
* @deprecated Use {@link #enableCheckpointing(long, CheckpointingMode)} instead. Forcing
578-
* checkpoints will be removed in the future.
579-
*/
580-
@Deprecated
581-
@SuppressWarnings("deprecation")
582-
@PublicEvolving
583-
public StreamExecutionEnvironment enableCheckpointing(
584-
long interval, org.apache.flink.streaming.api.CheckpointingMode mode, boolean force) {
585-
checkpointCfg.setCheckpointingMode(mode);
586-
checkpointCfg.setCheckpointInterval(interval);
587-
checkpointCfg.setForceCheckpointing(force);
588-
return this;
589-
}
590-
591562
/**
592563
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
593564
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
@@ -620,18 +591,6 @@ public long getCheckpointInterval() {
620591
return checkpointCfg.getCheckpointInterval();
621592
}
622593

623-
/**
624-
* Returns whether checkpointing is force-enabled.
625-
*
626-
* @deprecated Forcing checkpoints will be removed in future version.
627-
*/
628-
@Deprecated
629-
@SuppressWarnings("deprecation")
630-
@PublicEvolving
631-
public boolean isForceCheckpointing() {
632-
return checkpointCfg.isForceCheckpointing();
633-
}
634-
635594
/** Returns whether unaligned checkpoints are enabled. */
636595
@PublicEvolving
637596
public boolean isUnalignedCheckpointsEnabled() {

flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

-6
Original file line numberDiff line numberDiff line change
@@ -501,12 +501,6 @@ private void preValidate() {
501501

502502
if (checkpointConfig.isCheckpointingEnabled()) {
503503
// temporarily forbid checkpointing for iterative jobs
504-
if (streamGraph.isIterative() && !checkpointConfig.isForceCheckpointing()) {
505-
throw new UnsupportedOperationException(
506-
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
507-
+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
508-
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
509-
}
510504
if (streamGraph.isIterative()
511505
&& checkpointConfig.isUnalignedCheckpointsEnabled()
512506
&& !checkpointConfig.isForceUnalignedCheckpoints()) {

flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala

-28
Original file line numberDiff line numberDiff line change
@@ -177,34 +177,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
177177
*/
178178
def getCheckpointConfig = javaEnv.getCheckpointConfig()
179179

180-
/**
181-
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
182-
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
183-
* restarted from the latest completed checkpoint.
184-
*
185-
* The job draws checkpoints periodically, in the given interval. The state will be stored in the
186-
* configured state backend.
187-
*
188-
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. If
189-
* the "force" parameter is set to true, the system will execute the job nonetheless.
190-
*
191-
* @param interval
192-
* Time interval between state checkpoints in millis.
193-
* @param mode
194-
* The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
195-
* @param force
196-
* If true checkpointing will be enabled for iterative jobs as well.
197-
*/
198-
@deprecated
199-
@PublicEvolving
200-
def enableCheckpointing(
201-
interval: Long,
202-
mode: org.apache.flink.streaming.api.CheckpointingMode,
203-
force: Boolean): StreamExecutionEnvironment = {
204-
javaEnv.enableCheckpointing(interval, mode, force)
205-
this
206-
}
207-
208180
/**
209181
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
210182
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java

-12
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,6 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi
162162
"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
163163
}
164164

165-
@Override
166-
public StreamExecutionEnvironment enableCheckpointing(
167-
long interval, org.apache.flink.streaming.api.CheckpointingMode mode, boolean force) {
168-
throw new UnsupportedOperationException(
169-
"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
170-
}
171-
172165
@Override
173166
public StreamExecutionEnvironment enableCheckpointing() {
174167
throw new UnsupportedOperationException(
@@ -180,11 +173,6 @@ public long getCheckpointInterval() {
180173
return realExecEnv.getCheckpointInterval();
181174
}
182175

183-
@Override
184-
public boolean isForceCheckpointing() {
185-
return realExecEnv.isForceCheckpointing();
186-
}
187-
188176
@Override
189177
public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() {
190178
return realExecEnv.getCheckpointingMode();

flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java

-58
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.api.java.tuple.Tuple2;
2626
import org.apache.flink.runtime.jobgraph.JobGraph;
2727
import org.apache.flink.runtime.jobgraph.JobVertex;
28-
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
2928
import org.apache.flink.streaming.api.datastream.DataStream;
3029
import org.apache.flink.streaming.api.datastream.DataStreamSink;
3130
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -631,63 +630,6 @@ public void close() {
631630
}
632631
}
633632

634-
@SuppressWarnings("deprecation")
635-
@Test
636-
public void testWithCheckPointing() throws Exception {
637-
int numRetries = 5;
638-
int timeoutScale = 1;
639-
640-
for (int numRetry = 0; numRetry < numRetries; numRetry++) {
641-
try {
642-
StreamExecutionEnvironment env =
643-
StreamExecutionEnvironment.getExecutionEnvironment();
644-
645-
try {
646-
createIteration(env, timeoutScale);
647-
env.execute();
648-
649-
// this statement should never be reached
650-
fail();
651-
} catch (UnsupportedOperationException e) {
652-
// expected behaviour
653-
}
654-
655-
// Test force checkpointing
656-
657-
try {
658-
createIteration(env, timeoutScale);
659-
env.enableCheckpointing(
660-
CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME,
661-
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE,
662-
false);
663-
env.execute();
664-
665-
// this statement should never be reached
666-
fail();
667-
} catch (UnsupportedOperationException e) {
668-
// expected behaviour
669-
}
670-
671-
createIteration(env, timeoutScale);
672-
env.enableCheckpointing(
673-
CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME,
674-
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE,
675-
true);
676-
env.getStreamGraph().getJobGraph();
677-
678-
break; // success
679-
} catch (Throwable t) {
680-
LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
681-
682-
if (numRetry >= numRetries - 1) {
683-
throw t;
684-
} else {
685-
timeoutScale *= 2;
686-
}
687-
}
688-
}
689-
}
690-
691633
private void createIteration(StreamExecutionEnvironment env, int timeoutScale) {
692634
env.enableCheckpointing();
693635

0 commit comments

Comments
 (0)