Skip to content

Commit

Permalink
[FLINK-7063] [checkpoint] Call super.cancel(...) before closing strea…
Browse files Browse the repository at this point in the history
…ms in AsyncStoppableTaskWithCallback
  • Loading branch information
StefanRRichter committed Jul 10, 2017
1 parent ccf1000 commit 74adb84
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@ public AsyncStoppableTaskWithCallback(StoppableCallbackCallable<V> callable) {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
stoppableCallbackCallable.stop();
return super.cancel(mayInterruptIfRunning);
final boolean cancel = super.cancel(mayInterruptIfRunning);
if (cancel) {
stoppableCallbackCallable.stop();
// this is where we report done() for the cancel case, after calling stop().
stoppableCallbackCallable.done(true);
}
return cancel;
}

@Override
protected void done() {
stoppableCallbackCallable.done(isCancelled());
// we suppress forwarding if we have not been canceled, because the cancel case will call to this method separately.
if (!isCancelled()) {
stoppableCallbackCallable.done(false);
}
}

public static <V> AsyncStoppableTaskWithCallback<V> from(StoppableCallbackCallable<V> callable) {
Expand Down

0 comments on commit 74adb84

Please sign in to comment.