Skip to content

Commit

Permalink
[FLINK-3439] Remove final Long.MAX_VALUE Watermark in StreamSource
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Feb 22, 2016
1 parent 9691d95 commit 80c0c65
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class IncrementalLearningSkeletonData {

public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;

import org.junit.After;
Expand Down Expand Up @@ -210,21 +211,22 @@ public void testSimplePatternEventTime() throws Exception {
Tuple2.of(new Event(2, "middle", 2.0), 1L),
Tuple2.of(new Event(3, "end", 3.0), 3L),
Tuple2.of(new Event(4, "end", 4.0), 10L),
Tuple2.of(new Event(5, "middle", 5.0), 7L)
).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<Event,Long>>() {

private long currentMaxTimestamp = -1;
Tuple2.of(new Event(5, "middle", 5.0), 7L),
// last element for high final watermark
Tuple2.of(new Event(5, "middle", 5.0), 100L)
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {

@Override
public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, element.f1);
return element.f1;
}

@Override
public long getCurrentWatermark() {
return currentMaxTimestamp - 5;
public long checkAndGetNextWatermark(Tuple2<Event, Long> lastElement,
long extractedTimestamp) {
return lastElement.f1 - 5;
}

}).map(new MapFunction<Tuple2<Event, Long>, Event>() {

@Override
Expand Down Expand Up @@ -295,21 +297,22 @@ public void testSimpleKeyedPatternEventTime() throws Exception {
Tuple2.of(new Event(2, "end", 2.0), 8L),
Tuple2.of(new Event(1, "middle", 5.0), 7L),
Tuple2.of(new Event(3, "middle", 6.0), 9L),
Tuple2.of(new Event(3, "end", 7.0), 7L)
).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<Event,Long>>() {

private long currentMaxTimestamp = -1L;
Tuple2.of(new Event(3, "end", 7.0), 7L),
// last element for high final watermark
Tuple2.of(new Event(3, "end", 7.0), 100L)
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {

@Override
public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
currentMaxTimestamp = Math.max(element.f1, currentMaxTimestamp);
return element.f1;
}

@Override
public long getCurrentWatermark() {
return currentMaxTimestamp - 5;
public long checkAndGetNextWatermark(Tuple2<Event, Long> lastElement,
long extractedTimestamp) {
return lastElement.f1 - 5;
}

}).map(new MapFunction<Tuple2<Event, Long>, Event>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,7 @@ public void run(final Object lockingObject, final Output<StreamRecord<OUT>> coll

userFunction.run(ctx);

// This will mostly emit a final +Inf Watermark to make the Watermark logic work
// when some sources finish before others do
ctx.close();

if (executionConfig.areTimestampsEnabled()) {
synchronized (lockingObject) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}

public void cancel() {
Expand Down Expand Up @@ -268,11 +260,6 @@ public Object getCheckpointLock() {
public void close() {
watermarkTimer.cancel(true);
scheduleExecutor.shutdownNow();
// emit one last +Inf watermark to make downstream watermark processing work
// when some sources close early
synchronized (lockingObject) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -73,6 +72,9 @@ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
ctx.collect(Tuple2.of("a", 6));
ctx.collect(Tuple2.of("a", 7));
ctx.collect(Tuple2.of("a", 8));

// so we get a final big watermark
ctx.collect(Tuple2.of("a", 20));
}

@Override
Expand All @@ -92,6 +94,9 @@ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
ctx.collect(Tuple2.of("c", 6));
ctx.collect(Tuple2.of("c", 7));
ctx.collect(Tuple2.of("c", 8));

// so we get a final big watermark
ctx.collect(Tuple2.of("a", 20));
}

@Override
Expand Down Expand Up @@ -165,6 +170,9 @@ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Excep
ctx.collect(Tuple3.of("a", "i", 6));
ctx.collect(Tuple3.of("a", "j", 7));
ctx.collect(Tuple3.of("a", "k", 8));

// so we get a final big watermark
ctx.collect(Tuple3.of("a", "k", 20));
}

@Override
Expand All @@ -184,6 +192,9 @@ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Excep

ctx.collect(Tuple3.of("a", "x", 6));
ctx.collect(Tuple3.of("a", "z", 8));

// so we get a final high watermark
ctx.collect(Tuple3.of("a", "z", 20));
}

@Override
Expand Down Expand Up @@ -259,6 +270,9 @@ public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Excep
ctx.collect(Tuple3.of("a", "i", 6));
ctx.collect(Tuple3.of("a", "j", 7));
ctx.collect(Tuple3.of("a", "k", 8));

// so we get a final high watermark
ctx.collect(Tuple3.of("a", "k", 20));
}

@Override
Expand Down Expand Up @@ -328,19 +342,17 @@ public long checkAndGetNextWatermark(Tuple2<String, Integer> element, long extra
}
}

private static class Tuple3TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple3<String, String, Integer>> {
private static class Tuple3TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Integer>> {

private long currentTimestamp;

@Override
public long extractTimestamp(Tuple3<String, String, Integer> element, long previousTimestamp) {
currentTimestamp = element.f2;
return element.f2;
}

@Override
public long getCurrentWatermark() {
return currentTimestamp - 1;
public long checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement,
long extractedTimestamp) {
return lastElement.f2 - 1;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
Expand Down Expand Up @@ -72,6 +72,9 @@ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
ctx.collect(Tuple2.of("a", 6));
ctx.collect(Tuple2.of("a", 7));
ctx.collect(Tuple2.of("a", 8));

// so that we get a high final watermark to process the previously sent elements
ctx.collect(Tuple2.of("a", 20));
}

@Override
Expand Down Expand Up @@ -135,6 +138,8 @@ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
ctx.collect(Tuple2.of("b", 5));
ctx.collect(Tuple2.of("a", 5));

// so that we get a high final watermark to process the previously sent elements
ctx.collect(Tuple2.of("a", 20));
}

@Override
Expand Down Expand Up @@ -172,19 +177,17 @@ public void invoke(Tuple2<String, Integer> value) throws Exception {
Assert.assertEquals(expectedResult, testResults);
}

private static class Tuple2TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Integer>> {
private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {

private long currentTimestamp = -1;

@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp) {
currentTimestamp = element.f1;
return element.f1;
}

@Override
public long getCurrentWatermark() {
return currentTimestamp - 1;
public long checkAndGetNextWatermark(Tuple2<String, Integer> lastElement,
long extractedTimestamp) {
return lastElement.f1 - 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -144,9 +145,8 @@ public void testWatermarkPropagation() throws Exception {

// verify that all the watermarks arrived at the final custom operator
for (int i = 0; i < PARALLELISM; i++) {
// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks in order, because
// after that source2 emits Long.MAX_VALUE which could match with an arbitrary
// mark from source 1, for example, we could see 0,1,2,4,5,7,MAX
// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
// other source stops emitting after that
for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
System.err.println("All Watermarks: ");
Expand All @@ -157,13 +157,7 @@ public void testWatermarkPropagation() throws Exception {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size() - 1).equals(new Watermark(Long.MAX_VALUE))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
Assert.fail("Wrong watermark.");
}
assertFalse(CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1).equals(new Watermark(Long.MAX_VALUE)));
}
}

Expand Down Expand Up @@ -286,9 +280,7 @@ public long extractAscendingTimestamp(Integer element, long currentTimestamp) {
Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
}
}
if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
Assert.fail("Wrong watermark.");
}
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE)));
}

/**
Expand Down Expand Up @@ -346,9 +338,7 @@ public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
Assert.fail("Wrong watermark.");
}
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE)));
}

/**
Expand Down Expand Up @@ -408,9 +398,7 @@ public long checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
Assert.fail("Wrong watermark.");
}
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", 6))
ctx.collect(("a", 7))
ctx.collect(("a", 8))

// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", 20))
}

def cancel() {}
Expand All @@ -67,6 +70,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("c", 6))
ctx.collect(("c", 7))
ctx.collect(("c", 8))

// so that we get a high final watermark to process the previously sent elements
ctx.collect(("c", 20))
}

def cancel() {
Expand Down Expand Up @@ -117,6 +123,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", "i", 6))
ctx.collect(("a", "j", 7))
ctx.collect(("a", "k", 8))

// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", "k", 20))
}

def cancel() {}
Expand All @@ -133,6 +142,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {

ctx.collect(("a", "x", 6))
ctx.collect(("a", "z", 8))

// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", "z", 20))
}

def cancel() {}
Expand Down Expand Up @@ -193,6 +205,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", "i", 6))
ctx.collect(("a", "j", 7))
ctx.collect(("a", "k", 8))

// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", "k", 20))
}

def cancel() {}
Expand Down
Loading

0 comments on commit 80c0c65

Please sign in to comment.