Skip to content

Commit

Permalink
[hotfix] [streaming] Fix race in stream tasks when canceling tasks ea…
Browse files Browse the repository at this point in the history
…rly.
  • Loading branch information
StephanEwen committed Aug 2, 2015
1 parent af88aa0 commit 40eef52
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public void registerInputOutput() {

@Override
public void invoke() throws Exception {
this.isRunning = true;

boolean operatorOpen = false;

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
public void invoke() throws Exception {
final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);

this.isRunning = true;

boolean operatorOpen = false;

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public void registerInputOutput() {
@SuppressWarnings("unchecked")
@Override
public void invoke() throws Exception {
isRunning = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration source {} invoked", getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs

protected boolean hasChainedOperators;

protected volatile boolean isRunning = false;
// needs to be initialized to true, so that early cancel() before invoke() behaves correctly
protected volatile boolean isRunning = true;

protected List<StreamingRuntimeContext> contexts;

Expand Down Expand Up @@ -229,10 +230,12 @@ public void setInitialState(StateHandle<Serializable> stateHandle) throws Except
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {

LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());

synchronized (checkpointLock) {
if (isRunning) {
try {
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());


// We wrap the states of the chained operators in a list, marking non-stateful oeprators with null
List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ public void registerInputOutput() {

@Override
public void invoke() throws Exception {
this.isRunning = true;

boolean operatorOpen = false;

if (LOG.isDebugEnabled()) {
Expand Down

0 comments on commit 40eef52

Please sign in to comment.