diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 605b8f5cfeb1a..6136f247cbcb9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -61,8 +61,6 @@ public void registerInputOutput() { @Override public void invoke() throws Exception { - this.isRunning = true; - boolean operatorOpen = false; if (LOG.isDebugEnabled()) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 1940c11f50c99..4b25577d26d96 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -45,8 +45,6 @@ public class SourceStreamTask extends StreamTask> { public void invoke() throws Exception { final SourceOutput> output = new SourceOutput>(outputHandler.getOutput(), checkpointLock); - this.isRunning = true; - boolean operatorOpen = false; if (LOG.isDebugEnabled()) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 1736e525b97d8..2911f44963b3c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -72,7 +72,6 @@ public void registerInputOutput() { @SuppressWarnings("unchecked") @Override public void invoke() throws Exception { - isRunning = true; if (LOG.isDebugEnabled()) { LOG.debug("Iteration source {} invoked", getName()); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index aabc95d0df956..88813d082990b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -66,7 +66,8 @@ public abstract class StreamTask> extends Abs protected boolean hasChainedOperators; - protected volatile boolean isRunning = false; + // needs to be initialized to true, so that early cancel() before invoke() behaves correctly + protected volatile boolean isRunning = true; protected List contexts; @@ -229,10 +230,12 @@ public void setInitialState(StateHandle stateHandle) throws Except @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { + LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); + synchronized (checkpointLock) { if (isRunning) { try { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); + // We wrap the states of the chained operators in a list, marking non-stateful oeprators with null List, Map>> chainedStates = new ArrayList, Map>>(); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 99c053b124271..8cf5a405a885e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -85,8 +85,6 @@ public void registerInputOutput() { @Override public void invoke() throws Exception { - this.isRunning = true; - boolean operatorOpen = false; if (LOG.isDebugEnabled()) {