Skip to content

Commit

Permalink
Rename Triggers to TriggerTranslation
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed May 23, 2017
1 parent c8b2119 commit 4fa38e2
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -163,7 +163,7 @@ private ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> newReduceFnRunner(K key
windowingStrategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(
Triggers.toProto(windowingStrategy.getTrigger()))),
TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
stateInternalsFactory.stateInternalsForKey(key),
timerInternals,
new OutputWindowedValue<KV<K, Iterable<V>>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.joda.time.Duration;
import org.joda.time.Instant;

/** Utilities for working with {@link Triggers Triggers}. */
/** Utilities for working with {@link TriggerTranslation Triggers}. */
@Experimental(Experimental.Kind.TRIGGER)
public class Triggers implements Serializable {
public class TriggerTranslation implements Serializable {

@VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();

Expand Down Expand Up @@ -332,5 +332,5 @@ private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers)
}

// Do not instantiate
private Triggers() {}
private TriggerTranslation() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public static RunnerApi.WindowingStrategy toProto(
.setAccumulationMode(toProto(windowingStrategy.getMode()))
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
.setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
.setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))
.setWindowFn(windowFnSpec)
.setWindowCoderId(
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
Expand Down Expand Up @@ -247,7 +247,7 @@ public static RunnerApi.WindowingStrategy toProto(
WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
Trigger trigger = Triggers.fromProto(proto.getTrigger());
Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
Duration allowedLateness = Duration.millis(proto.getAllowedLateness());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

/** Tests for utilities in {@link Triggers}. */
/** Tests for utilities in {@link TriggerTranslation}. */
@RunWith(Parameterized.class)
public class TriggersTest {
public class TriggerTranslationTest {

@AutoValue
abstract static class ToProtoAndBackSpec {
abstract Trigger getTrigger();
}

private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
return new AutoValue_TriggerTranslationTest_ToProtoAndBackSpec(trigger);
}

@Parameters(name = "{index}: {0}")
Expand Down Expand Up @@ -104,7 +104,8 @@ public static Iterable<ToProtoAndBackSpec> data() {
@Test
public void testToProtoAndBack() throws Exception {
Trigger trigger = toProtoAndBackSpec.getTrigger();
Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger));
Trigger toProtoAndBackTrigger =
TriggerTranslation.fromProto(TriggerTranslation.toProto(trigger));

assertThat(toProtoAndBackTrigger, equalTo(trigger));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.beam.runners.core;

import java.util.Collection;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -122,7 +122,7 @@ public void processElement(ProcessContext c) throws Exception {
windowingStrategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(
Triggers.toProto(windowingStrategy.getTrigger()))),
TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
stateInternals,
timerInternals,
outputWindowedValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return new ReduceFnTester<Integer, Iterable<Integer>, W>(
windowingStrategy,
TriggerStateMachines.stateMachineForTrigger(
Triggers.toProto(windowingStrategy.getTrigger())),
TriggerTranslation.toProto(windowingStrategy.getTrigger())),
SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
IterableCoder.of(VarIntCoder.of()),
PipelineOptionsFactory.create(),
Expand Down Expand Up @@ -179,7 +179,8 @@ ReduceFnTester<Integer, OutputT, W> combining(

return combining(
strategy,
TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())),
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(strategy.getTrigger())),
combineFn,
outputCoder);
}
Expand Down Expand Up @@ -227,7 +228,8 @@ ReduceFnTester<Integer, OutputT, W> combining(

return combining(
strategy,
TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())),
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(strategy.getTrigger())),
combineFn,
outputCoder,
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
Expand Down Expand Up @@ -162,7 +162,7 @@ public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Ex
(CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
DirectTimerInternals timerInternals = stepContext.timerInternals();
RunnerApi.Trigger runnerApiTrigger =
Triggers.toProto(windowingStrategy.getTrigger());
TriggerTranslation.toProto(windowingStrategy.getTrigger());
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
new ReduceFnRunner<>(
key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
Expand Down Expand Up @@ -260,7 +260,7 @@ public JavaPairRDD<ByteArray, byte[]> call(
windowingStrategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(
Triggers.toProto(windowingStrategy.getTrigger()))),
TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
stateInternals,
timerInternals,
outputHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
Expand Down Expand Up @@ -92,7 +92,7 @@ public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
windowingStrategy,
ExecutableTriggerStateMachine.create(
TriggerStateMachines.stateMachineForTrigger(
Triggers.toProto(windowingStrategy.getTrigger()))),
TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
stateInternals,
timerInternals,
outputter,
Expand Down

0 comments on commit 4fa38e2

Please sign in to comment.