Skip to content

Commit

Permalink
SAMZA-1578: Fix watermark bug found by BEAM tests
Browse files Browse the repository at this point in the history
The problem is getOutputWatermark() does not return the real outputWatermark. This caused problem in user override watermark function.

Author: xiliu <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes apache#415 from xinyuiscool/SAMZA-1578
  • Loading branch information
xinyuiscool committed Jan 26, 2018
1 parent 9674836 commit 7e68e4b
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private final void onWatermark(long watermark, MessageCollector collector, TaskC
// use samza-provided watermark handling
// default is to propagate the input watermark
output = handleWatermark(currentWatermark, collector, coordinator);
outputWm = getOutputWatermark();
outputWm = currentWatermark;
}

if (!output.isEmpty()) {
Expand Down Expand Up @@ -402,14 +402,13 @@ final long getInputWatermark() {
}

/**
* Returns the output watermark, default is the same as input.
* Operators which keep track of watermark should override this to return the current watermark.
* Returns the output watermark,
* @return output watermark
*/
protected long getOutputWatermark() {
final long getOutputWatermark() {
if (usedInCurrentTask) {
// default as input
return this.currentWatermark;
return this.outputWatermark;
} else {
// always emit the max to indicate no input will be emitted afterwards
return Long.MAX_VALUE;
Expand Down

0 comments on commit 7e68e4b

Please sign in to comment.