Skip to content

Commit

Permalink
[BEAM-9265] @RequiresTimeSortedInput respects allowedLateness
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Feb 14, 2020
1 parent b91560c commit 8654f20
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void processElementOrdered(BoundedWindow window, WindowedValue<InputT> v
if (value.getTimestamp().isBefore(minStamp)) {
minStamp = value.getTimestamp();
minStampState.write(minStamp);
setupFlushTimerAndWatermarkHold(namespace, minStamp);
setupFlushTimerAndWatermarkHold(namespace, window, minStamp);
}
} else {
reportDroppedElement(value, window);
Expand Down Expand Up @@ -206,6 +206,9 @@ public void onTimer(
if (timerId.equals(SORT_FLUSH_TIMER)) {
onSortFlushTimer(window, stepContext.timerInternals().currentInputWatermarkTime());
} else if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
if (requiresTimeSortedInput) {
onSortFlushTimer(window, BoundedWindow.TIMESTAMP_MAX_VALUE);
}
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
Expand Down Expand Up @@ -252,18 +255,36 @@ private void onSortFlushTimer(BoundedWindow window, Instant timestamp) {
keep.forEach(sortBuffer::add);
minStampState.write(newMinStamp);
if (newMinStamp.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
setupFlushTimerAndWatermarkHold(namespace, newMinStamp);
setupFlushTimerAndWatermarkHold(namespace, window, newMinStamp);
} else {
clearWatermarkHold(namespace);
}
}

private void setupFlushTimerAndWatermarkHold(StateNamespace namespace, Instant flush) {
/**
* Setup timer for flush time @{code flush}. The time is adjusted to respect allowed lateness and
* window garbage collection time. Setup watermark hold for the flush time.
*
* <p>Note that this is equivalent to {@link org.apache.beam.sdk.state.Timer#withOutputTimestamp}
* and should be reworked to use that feature once that is stable.
*/
private void setupFlushTimerAndWatermarkHold(
StateNamespace namespace, BoundedWindow window, Instant flush) {
Instant flushWithLateness = flush.plus(windowingStrategy.getAllowedLateness());
Instant windowGcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
if (flushWithLateness.isAfter(windowGcTime)) {
flushWithLateness = windowGcTime;
}
WatermarkHoldState watermark = stepContext.stateInternals().state(namespace, watermarkHold);
stepContext
.timerInternals()
.setTimer(
namespace, SORT_FLUSH_TIMER, SORT_FLUSH_TIMER, flush, flush, TimeDomain.EVENT_TIME);
namespace,
SORT_FLUSH_TIMER,
SORT_FLUSH_TIMER,
flushWithLateness,
flush,
TimeDomain.EVENT_TIME);
watermark.clear();
watermark.add(flush);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void testGarbageCollect(boolean ordered) throws Exception {

if (ordered) {
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1), runner);
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}

assertEquals(1, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
Expand All @@ -194,8 +194,9 @@ private void testGarbageCollect(boolean ordered) throws Exception {
KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING));

if (ordered) {
// move forward in time to so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1 + WINDOW_SIZE), runner);
// move forward in time so that the input might get flushed
advanceInputWatermark(
timerInternals, elementTime.plus(ALLOWED_LATENESS + 1 + WINDOW_SIZE), runner);
}

assertEquals(2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
Expand All @@ -204,13 +205,7 @@ private void testGarbageCollect(boolean ordered) throws Exception {
// the cleanup timer is set to window.maxTimestamp() + allowed lateness + 1
// to ensure that state is still available when a user timer for window.maxTimestamp() fires
advanceInputWatermark(
timerInternals,
WINDOW_1
.maxTimestamp()
.plus(ALLOWED_LATENESS)
.plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
.plus(1), // so the watermark is past the GC horizon, not on it
runner);
timerInternals, elementTime.plus(ALLOWED_LATENESS + 1 + WINDOW_SIZE), runner);

assertTrue(
stateInternals.isEmptyForTesting(
Expand Down Expand Up @@ -260,8 +255,8 @@ private void testOutput(
WindowedValue.of(KV.of("hello", 2), elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING));

if (ordered) {
// move forward in time to so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1), runner);
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}

assertEquals(3, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
Expand Down Expand Up @@ -297,8 +292,8 @@ private void testOutput(
WindowedValue.of(KV.of("hello", 3), elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING));

if (ordered) {
// move forward in time to so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1), runner);
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}

assertEquals(6, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
* Category tag for validation tests which utilize{@link DoFn.RequiresTimeSortedInput} in stateful
* {@link ParDo}.
*/
public @interface UsesRequiresTimeSortedInput {}
public interface UsesRequiresTimeSortedInput extends UsesTimersInParDo {}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand Down Expand Up @@ -2409,6 +2410,39 @@ public void testRequiresTimeSortedInputWithTestStream() {
testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
}

@Test
@Category({
ValidatesRunner.class,
UsesStatefulParDo.class,
UsesRequiresTimeSortedInput.class,
UsesStrictTimerOrdering.class,
UsesTestStream.class
})
public void testRequiresTimeSortedInputWithLateDataAndAllowedLateness() {
// generate list long enough to rule out random shuffle in sorted order
int numElements = 1000;
List<Long> eventStamps =
LongStream.range(0, numElements)
.mapToObj(i -> numElements - i)
.collect(Collectors.toList());
TestStream.Builder<Long> input = TestStream.create(VarLongCoder.of());
for (Long stamp : eventStamps) {
input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp)));
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
}
}
testTimeSortedInput(
numElements,
pipeline
.apply(input.advanceWatermarkToInfinity())
.apply(
Window.<Long>into(new GlobalWindows())
.withAllowedLateness(Duration.millis(5000))));
}

@Test
@Category({
ValidatesRunner.class,
Expand Down

0 comments on commit 8654f20

Please sign in to comment.