Skip to content

Commit

Permalink
[FLINK-34516][checkpoint] Rename the new introduced API to get/setChe…
Browse files Browse the repository at this point in the history
…ckpointingConsistencyMode
  • Loading branch information
Zakelly authored and masteryhx committed Mar 22, 2024
1 parent a1507f2 commit 7794591
Show file tree
Hide file tree
Showing 14 changed files with 33 additions and 26 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ env.setMaxParallelism(256);

env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);

env.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);

// then switch to Java Table API

Expand Down Expand Up @@ -568,7 +568,7 @@ env.setMaxParallelism(256)

env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], classOf[CustomKryoSerializer])

env.getCheckpointConfig.setConsistencyMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE)

// then switch to Scala Table API

Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ env.setMaxParallelism(256);

env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);

env.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);

// then switch to Java Table API

Expand Down Expand Up @@ -566,7 +566,7 @@ env.setMaxParallelism(256)

env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], classOf[CustomKryoSerializer])

env.getCheckpointConfig.setConsistencyMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE)

// then switch to Scala Table API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* whether the system draws checkpoints such that a recovery behaves as if the operators/functions
* see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn in a
* simpler fashion that typically encounters some duplicates upon recovery ({@link #AT_LEAST_ONCE})
*
* <p>Also called "CheckpointingConsistencyMode" everywhere in APIs.
*/
@Public
public enum CheckpointingMode {
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/datastream/checkpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_checkpointing_mode(self) -> CheckpointingMode:
:return: The :class:`CheckpointingMode`.
"""
return CheckpointingMode._from_j_checkpointing_mode(
self._j_checkpoint_config.getConsistencyMode())
self._j_checkpoint_config.getCheckpointingConsistencyMode())

def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'CheckpointConfig':
"""
Expand All @@ -90,7 +90,7 @@ def set_checkpointing_mode(self, checkpointing_mode: CheckpointingMode) -> 'Chec
:param checkpointing_mode: The :class:`CheckpointingMode`.
"""
self._j_checkpoint_config.setConsistencyMode(
self._j_checkpoint_config.setCheckpointingConsistencyMode(
CheckpointingMode._to_j_checkpointing_mode(checkpointing_mode))
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public boolean isCheckpointingEnabled() {
* Gets the checkpointing mode (exactly-once vs. at-least-once).
*
* @return The checkpointing mode.
* @deprecated Use {@link #getConsistencyMode} instead.
* @deprecated Use {@link #getCheckpointingConsistencyMode} instead.
*/
@Deprecated
public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() {
Expand All @@ -188,7 +188,7 @@ public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() {
* Sets the checkpointing mode (exactly-once vs. at-least-once).
*
* @param checkpointingMode The checkpointing mode.
* @deprecated Use {@link #setConsistencyMode} instead.
* @deprecated Use {@link #setCheckpointingConsistencyMode} instead.
*/
@Deprecated
public void setCheckpointingMode(
Expand All @@ -201,7 +201,7 @@ public void setCheckpointingMode(
*
* @return The checkpointing mode.
*/
public CheckpointingMode getConsistencyMode() {
public CheckpointingMode getCheckpointingConsistencyMode() {
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE);
}

Expand All @@ -210,7 +210,7 @@ public CheckpointingMode getConsistencyMode() {
*
* @param checkpointingMode The checkpointing mode.
*/
public void setConsistencyMode(CheckpointingMode checkpointingMode) {
public void setCheckpointingConsistencyMode(CheckpointingMode checkpointingMode) {
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE, checkpointingMode);
}
Expand Down Expand Up @@ -999,7 +999,7 @@ public InlineElement getDescription() {
public void configure(ReadableConfig configuration) {
configuration
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE)
.ifPresent(this::setConsistencyMode);
.ifPresent(this::setCheckpointingConsistencyMode);
configuration
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
.ifPresent(i -> this.setCheckpointInterval(i.toMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public StreamExecutionEnvironment enableCheckpointing(
* guaranteed.
*/
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
checkpointCfg.setConsistencyMode(mode);
checkpointCfg.setCheckpointingConsistencyMode(mode);
checkpointCfg.setCheckpointInterval(interval);
return this;
}
Expand Down Expand Up @@ -675,12 +675,12 @@ public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() {
/**
* Returns the checkpointing consistency mode (exactly-once vs. at-least-once).
*
* <p>Shorthand for {@code getCheckpointConfig().getConsistencyMode()}.
* <p>Shorthand for {@code getCheckpointConfig().getCheckpointingConsistencyMode()}.
*
* @return The checkpoint mode
*/
public CheckpointingMode getCheckpointingConsistencyMode() {
return checkpointCfg.getConsistencyMode();
return checkpointCfg.getCheckpointingConsistencyMode();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ private void tryConvertPartitionerForDynamicGraph(
}

private CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) {
CheckpointingMode checkpointingMode = checkpointConfig.getConsistencyMode();
CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingConsistencyMode();

checkArgument(
checkpointingMode == CheckpointingMode.EXACTLY_ONCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ private T nextResultFromFetcher() {
private AbstractCollectResultBuffer<T> createBuffer(
TypeSerializer<T> serializer, CheckpointConfig checkpointConfig) {
if (checkpointConfig.isCheckpointingEnabled()) {
if (checkpointConfig.getConsistencyMode() == CheckpointingMode.EXACTLY_ONCE) {
if (checkpointConfig.getCheckpointingConsistencyMode()
== CheckpointingMode.EXACTLY_ONCE) {
return new CheckpointedCollectResultBuffer<>(serializer);
} else {
return new UncheckpointedCollectResultBuffer<>(serializer, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ private static Stream<TestSpec<?>> specs() {
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(CheckpointConfig::setConsistencyMode)
.getterVia(CheckpointConfig::getConsistencyMode)
.viaSetter(CheckpointConfig::setCheckpointingConsistencyMode)
.getterVia(CheckpointConfig::getCheckpointingConsistencyMode)
.nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
Expand All @@ -62,13 +62,14 @@ private static Stream<TestSpec<?>> specs() {
org.apache.flink.streaming.api.CheckpointingMode
.valueOf(v.name()));
})
.getterVia(CheckpointConfig::getConsistencyMode)
.getterVia(CheckpointConfig::getCheckpointingConsistencyMode)
.nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(
(config, v) -> {
config.setConsistencyMode(CheckpointingMode.valueOf(v.name()));
config.setCheckpointingConsistencyMode(
CheckpointingMode.valueOf(v.name()));
})
.getterVia(CheckpointConfig::getCheckpointingMode)
.nonDefaultValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase {

env.setParallelism(1)
env.enableCheckpointing(100)
env.getCheckpointConfig.setConsistencyMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE)
}

def additionalProperties(): Array[String] = Array()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ private void restartFromSavepoint(

// Step 3: Build and execute Flink job
final StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
execEnv.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig()
.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
execEnv.enableCheckpointing(50);
execEnv.setRestartStrategy(RestartStrategies.noRestart());
DataStreamSource<T> source =
Expand Down Expand Up @@ -358,7 +359,9 @@ private void restartFromSavepoint(
final StreamExecutionEnvironment restartEnv =
testEnv.createExecutionEnvironment(restartEnvOptions);
restartEnv.enableCheckpointing(500);
restartEnv.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
restartEnv
.getCheckpointConfig()
.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);

DataStreamSource<T> restartSource =
restartEnv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private static StreamExecutionEnvironment prepareEnv(
env.setParallelism(4);
env.setRestartStrategy(noRestart());
env.enableCheckpointing(200); // shouldn't matter
env.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
env.getConfig().setAutoWatermarkInterval(50);
envConsumer.accept(env);
return env;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public void testCheckpointRescalingPartitionedOperatorState(

private static void configureCheckpointing(CheckpointConfig config) {
config.setCheckpointInterval(100);
config.setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
config.enableUnalignedCheckpoints(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private boolean executeIgnoreInFlightDataDuringRecovery() {
env.disableOperatorChaining();
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
env.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1);
env.setRestartStrategy(fixedDelayRestart(1, 0));

Expand Down

0 comments on commit 7794591

Please sign in to comment.