Skip to content

Commit

Permalink
FIX: windmill uses kint64max usec as the timer max timestamp, keep th…
Browse files Browse the repository at this point in the history
…e Java side in

sync.

----Release Notes----

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=88582344
  • Loading branch information
peihe authored and davorbonaci committed Mar 13, 2015
1 parent d6d4a34 commit 4e0ecff
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.dataflow.sdk.coders.InstantCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
Expand Down Expand Up @@ -194,7 +195,7 @@ public long add(WindowedValue<T> windowedElem) throws IOException {

} else if (groupValues) {
// Sort values by timestamp so that GroupAlsoByWindows can run efficiently.
if (windowedElem.getTimestamp().getMillis() == Long.MIN_VALUE) {
if (windowedElem.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
// Empty secondary keys sort before all other secondary keys, so we
// can omit this common value here for efficiency.
secondaryKeyBytes = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.joda.time.Instant;

import java.util.concurrent.TimeUnit;

/**
* A {@code BoundedWindow} represents a finite grouping of elements, with an
* upper bound (larger timestamps represent more recent data) on the timestamps
Expand All @@ -30,6 +32,13 @@
* be treated as equal by {@code equals()} and {@code hashCode()}.
*/
public abstract class BoundedWindow {
// The min and max timestmaps that won't overflow when they are converted to
// usec.
public static final Instant TIMESTAMP_MIN_VALUE =
new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
public static final Instant TIMESTAMP_MAX_VALUE =
new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));

/**
* Returns the upper bound of timestamps for values in this window.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class GlobalWindow extends BoundedWindow {

@Override
public Instant maxTimestamp() {
return new Instant(Long.MAX_VALUE);
return TIMESTAMP_MAX_VALUE;
}

private GlobalWindow() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ <T> WindowedValue<T> makeWindowedValue(
final Instant inputTimestamp = timestamp;

if (timestamp == null) {
timestamp = new Instant(Long.MIN_VALUE);
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
}

if (windows == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static <V> WindowedValue<V> of(
*/
public static <V> WindowedValue<V> valueInGlobalWindow(V value) {
return new WindowedValue<>(value,
new Instant(Long.MIN_VALUE),
BoundedWindow.TIMESTAMP_MIN_VALUE,
Arrays.asList(GlobalWindow.INSTANCE));
}

Expand All @@ -80,7 +80,7 @@ public static <V> WindowedValue<V> valueInGlobalWindow(V value) {
*/
public static <V> WindowedValue<V> valueInEmptyWindows(V value) {
return new WindowedValue<V>(value,
new Instant(Long.MIN_VALUE),
BoundedWindow.TIMESTAMP_MIN_VALUE,
Collections.<BoundedWindow>emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
import com.google.cloud.dataflow.sdk.util.BatchModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
Expand Down Expand Up @@ -150,7 +151,7 @@ private void runTestReadFromShuffle(

WindowedValue<KV<Integer, Reiterable<String>>> windowedValue = iter.next();
// Verify value is in an empty windows.
assertEquals(Long.MIN_VALUE, windowedValue.getTimestamp().getMillis());
assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windowedValue.getTimestamp());
assertEquals(0, windowedValue.getWindows().size());

KV<Integer, Reiterable<String>> elem = windowedValue.getValue();
Expand Down

0 comments on commit 4e0ecff

Please sign in to comment.