Skip to content

Commit

Permalink
Merge pull request apache#4618: [BEAM-3627] Switch FullWindowedValueC…
Browse files Browse the repository at this point in the history
…oder to bypass validation
  • Loading branch information
kennknowles authored Feb 7, 2018
2 parents 1ac8a24 + ec69c8c commit 0e60ca4
Showing 1 changed file with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public abstract class WindowedValue<T> {
/** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */
public static <T> WindowedValue<T> of(
T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
checkNotNull(pane);
checkArgument(windows.size() > 0);
checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null");
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");

if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), pane);
Expand All @@ -64,10 +64,21 @@ public static <T> WindowedValue<T> of(
}
}

/** @deprecated for use only in compatibility with old broken code */
@Deprecated
static <T> WindowedValue<T> createWithoutValidation(
T value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), pane);
} else {
return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
}
}

/** Returns a {@code WindowedValue} with the given value, timestamp, and window. */
public static <T> WindowedValue<T> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo pane) {
checkNotNull(pane);
checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null");

boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
Expand Down Expand Up @@ -530,7 +541,10 @@ public WindowedValue<T> decode(InputStream inStream, Context context)
Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream);
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
T value = valueCoder.decode(inStream, context);
return WindowedValue.of(value, timestamp, windows, pane);

// Because there are some remaining (incorrect) uses of WindowedValue with no windows,
// we call this deprecated no-validation path when decoding
return WindowedValue.createWithoutValidation(value, timestamp, windows, pane);
}

@Override
Expand Down

0 comments on commit 0e60ca4

Please sign in to comment.