Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#372 from tgroh/backport_746
Browse files Browse the repository at this point in the history
Use AutoValue for StepTransformResult
  • Loading branch information
lukecwik authored Aug 17, 2016
2 parents e162039 + d1a28f3 commit c70dc4c
Showing 1 changed file with 16 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;

import org.joda.time.Instant;
Expand All @@ -36,67 +34,30 @@
/**
* An immutable {@link InProcessTransformResult}.
*/
class StepTransformResult implements InProcessTransformResult {
private final AppliedPTransform<?, ?, ?> transform;
private final Iterable<? extends UncommittedBundle<?>> bundles;
private final Iterable<? extends WindowedValue<?>> unprocessedElements;
@Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
private final TimerUpdate timerUpdate;
@Nullable private final CounterSet counters;
private final Instant watermarkHold;

private StepTransformResult(
AppliedPTransform<?, ?, ?> transform,
Iterable<? extends UncommittedBundle<?>> outputBundles,
Iterable<? extends WindowedValue<?>> unprocessedElements,
CopyOnAccessInMemoryStateInternals<?> state,
TimerUpdate timerUpdate,
CounterSet counters,
Instant watermarkHold) {
this.transform = checkNotNull(transform);
this.bundles = checkNotNull(outputBundles);
this.unprocessedElements = checkNotNull(unprocessedElements);
this.state = state;
this.timerUpdate = checkNotNull(timerUpdate);
this.counters = counters;
this.watermarkHold = checkNotNull(watermarkHold);
}

@AutoValue
public abstract class StepTransformResult implements InProcessTransformResult {
@Override
public Iterable<? extends UncommittedBundle<?>> getOutputBundles() {
return bundles;
}
public abstract AppliedPTransform<?, ?, ?> getTransform();

@Override
public Iterable<? extends WindowedValue<?>> getUnprocessedElements() {
return unprocessedElements;
}
public abstract Iterable<? extends UncommittedBundle<?>> getOutputBundles();

@Override
public CounterSet getCounters() {
return counters;
}
public abstract Iterable<? extends WindowedValue<?>> getUnprocessedElements();

@Override
public AppliedPTransform<?, ?, ?> getTransform() {
return transform;
}
@Nullable
public abstract CounterSet getCounters();

@Override
public Instant getWatermarkHold() {
return watermarkHold;
}
public abstract Instant getWatermarkHold();

@Nullable
@Override
public CopyOnAccessInMemoryStateInternals<?> getState() {
return state;
}
public abstract CopyOnAccessInMemoryStateInternals<?> getState();

@Override
public TimerUpdate getTimerUpdate() {
return timerUpdate;
}
public abstract TimerUpdate getTimerUpdate();

public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
return new Builder(transform, watermarkHold);
Expand All @@ -106,13 +67,6 @@ public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(StepTransformResult.class)
.add("transform", transform)
.toString();
}

/**
* A builder for creating instances of {@link StepTransformResult}.
*/
Expand All @@ -134,14 +88,14 @@ private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
}

public StepTransformResult build() {
return new StepTransformResult(
return new AutoValue_StepTransformResult(
transform,
bundlesBuilder.build(),
unprocessedElementsBuilder.build(),
state,
timerUpdate,
counters,
watermarkHold);
watermarkHold,
state,
timerUpdate);
}

public Builder withCounters(CounterSet counters) {
Expand Down Expand Up @@ -177,3 +131,4 @@ public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
}
}
}

0 comments on commit c70dc4c

Please sign in to comment.