Skip to content

Commit

Permalink
Merge pull request apache#10992 from lukecwik/splittabledofn2
Browse files Browse the repository at this point in the history
[BEAM-2939] Provide user facing API for reporting the watermark in SplittableDoFns.
  • Loading branch information
lukecwik authored Mar 13, 2020
2 parents 1d059ec + feea2ba commit feefaca
Show file tree
Hide file tree
Showing 18 changed files with 1,706 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -416,6 +417,17 @@ public void updateWatermark(Instant watermark) {
// Ignore watermark updates
}

@Override
public Object watermarkEstimatorState() {
throw new UnsupportedOperationException(
"@WatermarkEstimatorState parameters are not supported.");
}

@Override
public WatermarkEstimator<?> watermarkEstimator() {
throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
}

// ----------- Unsupported methods --------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ public Result invokeProcessElement(

DoFn.ProcessContinuation cont =
invoker.invokeProcessElement(
new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
@Override
public String getErrorContext() {
return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName();
}

@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(
DoFn<InputT, OutputT> doFn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
Expand Down Expand Up @@ -540,6 +541,17 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
}

@Override
public Object watermarkEstimatorState() {
throw new UnsupportedOperationException(
"@WatermarkEstimatorState parameters are not supported.");
}

@Override
public WatermarkEstimator<?> watermarkEstimator() {
throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
}

@Override
public State state(String stateId, boolean alwaysFetched) {
try {
Expand Down Expand Up @@ -744,6 +756,17 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
}

@Override
public Object watermarkEstimatorState() {
throw new UnsupportedOperationException(
"@WatermarkEstimatorState parameters are not supported.");
}

@Override
public WatermarkEstimator<?> watermarkEstimator() {
throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
}

@Override
public State state(String stateId, boolean alwaysFetched) {
try {
Expand Down
143 changes: 139 additions & 4 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand Down Expand Up @@ -670,6 +674,14 @@ public interface MultiOutputReceiver {
* GetInitialRestriction}. This method is optional only if the restriction type returned by
* {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
* <li>It <i>may</i> define a {@link GetInitialWatermarkEstimatorState} method. If none is
* defined then the watermark estimator state is of type {@link Void}.
* <li>It <i>may</i> define a {@link GetWatermarkEstimatorStateCoder} method.
* <li>It <i>may</i> define a {@link NewWatermarkEstimator} method returning a subtype of {@code
* WatermarkEstimator<W>} where {@code W} is the watermark estimator state type returned by
* {@link GetInitialWatermarkEstimatorState}. This method is optional only if {@link
* GetInitialWatermarkEstimatorState} has not been defined or {@code W} implements {@link
* HasDefaultWatermarkEstimator}.
* <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or {@link
* UnboundedPerElement}, but not both at the same time. If it's not annotated with either of
* these, it's assumed to be {@link BoundedPerElement} if its {@link ProcessElement} method
Expand All @@ -692,6 +704,12 @@ public interface MultiOutputReceiver {
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is of the type {@link WatermarkEstimator}, then it will be passed
* the watermark estimator.
* <li>If one of its arguments is of the type {@link ManualWatermarkEstimator}, then it will be
* passed a watermark estimator that can be updated manually. This parameter can only be
* supplied if the method annotated with {@link GetInitialWatermarkEstimatorState} returns a
* sub-type of {@link ManualWatermarkEstimator}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
* window of the current element. When applied by {@link ParDo} the subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
Expand Down Expand Up @@ -720,7 +738,8 @@ public interface MultiOutputReceiver {

/**
* Parameter annotation for the input element for {@link ProcessElement}, {@link
* GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and {@link NewTracker}
* GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
* GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
* methods.
*/
@Documented
Expand All @@ -729,8 +748,9 @@ public interface MultiOutputReceiver {
public @interface Element {}

/**
* Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, and
* {@link NewTracker} methods. Must match the return type used on the method annotated with {@link
* Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, {@link
* GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
* methods. Must match the return type used on the method annotated with {@link
* GetInitialRestriction}.
*/
@Documented
Expand All @@ -740,7 +760,8 @@ public interface MultiOutputReceiver {

/**
* Parameter annotation for the input element timestamp for {@link ProcessElement}, {@link
* GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and {@link NewTracker}
* GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
* GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
* methods.
*/
@Documented
Expand Down Expand Up @@ -1070,6 +1091,120 @@ public interface MultiOutputReceiver {
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface NewTracker {}

/**
* Annotation for the method that maps an element and restriction to initial watermark estimator
* state for a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>Signature: {@code WatermarkEstimatorStateT getInitialWatermarkState(<arguments>);}
*
* <p>This method must satisfy the following constraints:
*
* <ul>
* <li>The return type {@code WatermarkEstimatorStateT} defines the watermark state type used
* within this splittable DoFn. All other methods that use a {@link
* WatermarkEstimatorState @WatermarkEstimatorState} parameter must use the same type that
* is used here. It is suggested to use as narrow of a return type definition as possible
* (for example prefer to use a square type over a shape type as a square is a type of a
* shape).
* <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
* passed the current element being processed; the argument must be of type {@code InputT}.
* Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are
* currently unsupported.
* <li>If one of its arguments is tagged with the {@link Restriction} annotation, then it will
* be passed the current restriction being processed; the argument must be of type {@code
* RestrictionT}.
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
* window of the current element. When applied by {@link ParDo} the subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
* window is not accessed a runner may perform additional optimizations.
* <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
* about the current triggering pane.
* <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
* options for the current pipeline.
* </ul>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface GetInitialWatermarkEstimatorState {}

/**
* Annotation for the method that returns the coder to use for the watermark estimator state of a
* <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>If not defined, a coder will be inferred using standard coder inference rules and the
* pipeline's {@link Pipeline#getCoderRegistry coder registry}.
*
* <p>This method will be called only at pipeline construction time.
*
* <p>Signature: {@code Coder<WatermarkEstimatorStateT> getWatermarkEstimatorStateCoder();}
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface GetWatermarkEstimatorStateCoder {}

/**
* Annotation for the method that creates a new {@link WatermarkEstimator} for the watermark state
* of a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>Signature: {@code MyWatermarkEstimator newWatermarkEstimator(<optional arguments>);}
*
* <p>If the return type is a subtype of {@link TimestampObservingWatermarkEstimator} then the
* timestamp of each element output from this DoFn is provided to the watermark estimator.
*
* <p>This method must satisfy the following constraints:
*
* <ul>
* <li>The return type must be a subtype of {@code
* WatermarkEstimator<WatermarkEstimatorStateT>}. It is suggested to use as narrow of a
* return type definition as possible (for example prefer to use a square type over a shape
* type as a square is a type of a shape).
* <li>If one of its arguments is tagged with the {@link WatermarkEstimatorState} annotation,
* then it will be passed the current watermark estimator state; the argument must be of
* type {@code WatermarkEstimatorStateT}.
* <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
* passed the current element being processed; the argument must be of type {@code InputT}.
* Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are
* currently unsupported.
* <li>If one of its arguments is tagged with the {@link Restriction} annotation, then it will
* be passed the current restriction being processed; the argument must be of type {@code
* RestrictionT}.
* <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
* passed the timestamp of the current element being processed; the argument must be of type
* {@link Instant}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
* window of the current element. When applied by {@link ParDo} the subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
* window is not accessed a runner may perform additional optimizations.
* <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
* about the current triggering pane.
* <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
* options for the current pipeline.
* </ul>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface NewWatermarkEstimator {}

/**
* Parameter annotation for the watermark estimator state for the {@link NewWatermarkEstimator}
* method. Must match the return type on the method annotated with {@link
* GetInitialWatermarkEstimatorState}.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface WatermarkEstimatorState {}

/**
* Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
* specifying that the {@link DoFn} performs a bounded amount of work per input element, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
Expand All @@ -55,7 +51,6 @@
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand Down Expand Up @@ -217,7 +212,13 @@ public void processWindowedElement(InputT element, Instant timestamp, final Boun
createProcessContext(
ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING));
fnInvoker.invokeProcessElement(
new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {

@Override
public String getErrorContext() {
return "DoFnTester";
}

@Override
public BoundedWindow window() {
return window;
Expand Down Expand Up @@ -257,16 +258,6 @@ public InputT element(DoFn<InputT, OutputT> doFn) {
return processContext.element();
}

@Override
public InputT sideInput(String sideInputTag) {
throw new UnsupportedOperationException("SideInputs are not supported by DoFnTester");
}

@Override
public InputT schemaElement(int index) {
throw new UnsupportedOperationException("Schemas are not supported by DoFnTester");
}

@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return processContext.timestamp();
Expand All @@ -289,21 +280,11 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(processContext, null);
}

@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("Schemas are not supported by DoFnTester");
}

@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
}

@Override
public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("DoFnTester doesn't support timers yet.");
}

@Override
public Object restriction() {
throw new UnsupportedOperationException(
Expand All @@ -315,27 +296,6 @@ public Object restriction() {
throw new UnsupportedOperationException(
"Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");
}

@Override
public org.apache.beam.sdk.state.State state(String stateId, boolean alwaysFetched) {
throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
}

@Override
public Timer timer(String timerId) {
throw new UnsupportedOperationException("DoFnTester doesn't support timers yet");
}

@Override
public TimerMap timerFamily(String tagId) {
throw new UnsupportedOperationException("DoFnTester doesn't support timerFamily yet");
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
"DoFnTester doesn't support bundleFinalizer yet");
}
});
} catch (UserCodeException e) {
unwrapUserCodeException(e);
Expand Down
Loading

0 comments on commit feefaca

Please sign in to comment.