Skip to content

Commit

Permalink
# 完善Flink 重启策略
Browse files Browse the repository at this point in the history
  • Loading branch information
lei-zuquan committed Sep 3, 2020
1 parent 6f51083 commit 8698e8f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
Expand All @@ -20,9 +22,27 @@ public class C05_RestartStrategiesDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 只有开启了checkpointing 才会有重启策略,
// 只有开启了checkpoint 才会有重启策略,
// 默认保存到内存中,即检查点保存在JobManager内存中,状态保存在TaskManager内存中
env.enableCheckpointing(5000); // 开启,检查点周期,单位毫秒;默认是-1,不开启
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 开启在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 允许在有更近 savepoint 时回退到 checkpoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);


// 默认的重启策略是固定延迟无限重启
//env.getConfig().setRestartStrategy(RestartStrategies.fallBackRestart());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 只有开启了checkpointing,重启策略才会生效;默认不开启重启策略
// 只有开启了checkpoint,重启策略才会生效;默认不开启重启策略
env.enableCheckpointing(5000); // 开启,检查点周期,单位毫秒;默认是-1,不开启

// 默认的重启策略是固定延迟无限重启
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 只有开启了checkpointing 才会有重启策略
// 只有开启了checkpoint 才会有重启策略
env.enableCheckpointing(5000); // 开启,检查点周期,单位毫秒;默认是-1,不开启

// 默认的重启策略是固定延迟无限重启
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static void main(String[] args) throws Exception {
// 开启CheckPointing,同时开启重启策略
env.enableCheckpointing(5000);
// 设置StateBackend
env.setStateBackend(new FsStateBackend("ile:\\\\lei_test_project\\idea_workspace\\FlinkTutorial\\check_point_dir"));
env.setStateBackend(new FsStateBackend("file:\\\\lei_test_project\\idea_workspace\\FlinkTutorial\\check_point_dir"));
// 取消任务checkPoint不删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkPoint的模式
Expand Down

0 comments on commit 8698e8f

Please sign in to comment.