Skip to content

Commit

Permalink
Updates to windowing javadoc
Browse files Browse the repository at this point in the history
----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=107801906
  • Loading branch information
dpmills authored and davorbonaci committed Nov 30, 2015
1 parent 3385cbb commit 198eb1f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
* The default window into which all data is placed (via {@link GlobalWindows}).
*/
public class GlobalWindow extends BoundedWindow {
/**
* Singleton instance of {@link GlobalWindow}.
*/
public static final GlobalWindow INSTANCE = new GlobalWindow();

// Triggers use maxTimestamp to set timers' timestamp. Timers fires when
Expand All @@ -47,7 +50,7 @@ public Instant maxTimestamp() {
private GlobalWindow() {}

/**
* {@link Coder} for encoding and decoding {@code Window}s.
* {@link Coder} for encoding and decoding {@code GlobalWindow}s.
*/
public static class Coder extends AtomicCoder<GlobalWindow> {
public static final Coder INSTANCE = new Coder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Collections;

/**
* Default {@link WindowFn} where all data is in the same window.
* Default {@link WindowFn} that assigns all data to the same window.
*/
public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import java.util.List;

/**
* A {@link WindowFn} that merges overlapping {@link IntervalWindow}s.
* A utility function for merging overlapping {@link IntervalWindow}s.
*/
public class MergeOverlappingIntervalWindows {

/**
* Merge overlapping intervals.
* Merge overlapping {@link IntervalWindow}s.
*/
public static void mergeWindows(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception {
// Merge any overlapping windows into a single window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class NonMergingWindowFn<T, W extends BoundedWindow>
public final void mergeWindows(MergeContext c) { }

@Override
public boolean isNonMerging() {
public final boolean isNonMerging() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ public Timing getTiming() {
}

/**
* The zero-based index of this trigger firing that produced this pane. i.e.
* 0 for the first time the timer fires, 1 for the next time, etc.
* The zero-based index of this trigger firing that produced this pane.
*
* <p>This will return 0 for the first time the timer fires, 1 for the next time, etc.
*
* <p>A given (key, window, pane-index) is guaranteed to be unique in the
* output of a group-by-key operation.
Expand All @@ -182,8 +183,10 @@ public long getIndex() {
}

/**
* The zero-based index of this trigger firing among non-speculative panes, i.e.
* 0 for the first non-{@link Timing#EARLY} timer firing, 1 for the next one, etc.
* The zero-based index of this trigger firing among non-speculative panes.
*
* <p> This will return 0 for the first non-{@link Timing#EARLY} timer firing, 1 for the next one,
* etc.
*
* <p>Always -1 for speculative data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@
* <pre> {@code
* PCollection<String> items = ...;
* PCollection<String> windowed_items = items.apply(
* Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))
* .triggering(AfterEach.inOrder(
* AfterWatermark.pastEndOfWindow(),
* Repeatedly
* .forever(AfterProcessingTime
* .pastFirstElementInPane().plusDelay(Duration.standardMinutes(1)))
* .orFinally(AfterWatermark
* .pastEndOfWindow().plusDelay(Duration.standardDays(1)))));
* Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
* .triggering(
* AfterWatermark.pastEndOfWindow()
* .withLateFirings(AfterProcessingTime
* .pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))
* .withAllowedLateness(Duration.standardDays(1)));
* PCollection<KV<String, Long>> windowed_counts = windowed_items.apply(
* Count.<String>perElement());
* } </pre>
Expand All @@ -132,15 +130,17 @@
* <pre> {@code
* PCollection<String> windowed_items = items.apply(
* Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))
* .triggering(Repeatedly
* .forever(AfterProcessingTime
* .pastFirstElementInPane().plusDelay(Duration.standardMinutes(1)))
* .orFinally(AfterWatermark.pastEndOfWindow())));
* .triggering(
* .triggering(
* AfterWatermark.pastEndOfWindow()
* .withEarlyFirings(AfterProcessingTime
* .pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
* .withAllowedLateness(Duration.ZERO));
* } </pre>
*
* <p>After a {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} the trigger is reset to
* the default trigger. If you want to produce early results from a pipeline consisting of multiple
* {@code GroupByKey}s, you must set a trigger before <i>each</i> {@code GroupByKey}.
* <p>After a {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} the trigger is set to
* a trigger that will preserve the intent of the upstream trigger. See
* {@link Trigger@getContinuationTrigger} for more information.
*
* <p>See {@link Trigger} for details on the available triggers.
*/
Expand Down Expand Up @@ -185,7 +185,7 @@ public static Unbound named(String name) {
*
* <p>The resulting {@code PTransform}'s types have been bound, with both the
* input and output being a {@code PCollection<T>}, inferred from the types of
* the argument {@code WindowFn<T, B>}. It is ready to be applied, or further
* the argument {@code WindowFn}. It is ready to be applied, or further
* properties can be set on it first.
*/
public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
Expand Down Expand Up @@ -360,7 +360,7 @@ public <T> Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavio

/**
* A {@code PTransform} that windows the elements of a {@code PCollection<T>},
* into finite windows according to a user-specified {@code WindowFn<T, B>}.
* into finite windows according to a user-specified {@code WindowFn}.
*
* @param <T> The type of elements this {@code Window} is applied to
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@
* predefined {@code WindowFn}s.
*
* <p>Users will generally want to use the predefined
* {@code WindowFn}s, but it is also possible to create new
* {@code WindowFn}s, but it is also possible to create new
* subclasses.
* TODO: Describe how to properly create {@code WindowFn}s.
*
* <p>To create a custom {@code WindowFn}, inherit from this class and override all required
* methods. If no merging is required, inherit from {@link NonMergingWindowFn}
* instead. If no merging is required and each element is assigned to a single window, inherit from
* {@code PartitioningWindowFn}. Inheriting from the most specific subclass will enable more
* optimizations in the runner.
*
* @param <T> type of elements being windowed
* @param <W> {@link BoundedWindow} subclass used to represent the
Expand Down

0 comments on commit 198eb1f

Please sign in to comment.