Skip to content

Commit

Permalink
onTimer/setTimer signature updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Rehman committed Jan 6, 2020
1 parent e2bb239 commit 15a7ef1
Show file tree
Hide file tree
Showing 32 changed files with 239 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,12 @@ public void fireTimer(Object key, Collection<TimerData> timerDataSet) {
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow();
pushbackDoFnRunner.onTimer(
timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
timerData.getTimerId(),
timerData.getTimerFamilyId(),
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
timerData.getDomain());
}
pushbackDoFnRunner.finishBundle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ public interface DoFnRunner<InputT, OutputT> {
* Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer in the
* given window.
*/
void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
void onTimer(
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain);

/**
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ public void finishBundle() {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ public interface PushbackSideInputDoFnRunner<InputT, OutputT> {
Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem);

/** Calls the underlying {@link DoFn.OnTimer} method. */
void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
void onTimer(
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain);

/** Calls the underlying {@link DoFn.FinishBundle} method. */
void finishBundle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ public void processElement(WindowedValue<InputT> compressedElem) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {

// The effective timestamp is when derived elements will have their timestamp set, if not
// otherwise specified. If this is an event time timer, then they have the timestamp of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ private boolean isReady(BoundedWindow mainInputWindow) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
underlying.onTimer(timerId, window, timestamp, timeDomain);
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
underlying.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ private boolean isLate(BoundedWindow window) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
Expand All @@ -134,7 +139,7 @@ public void onTimer(
window,
cleanupTimer.currentInputWatermarkTime());
} else {
doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() {
thrown.expectCause(is(fn.exceptionToThrow));

runner.onTimer(
ThrowingDoFn.TIMER_ID, GlobalWindow.INSTANCE, new Instant(0), TimeDomain.EVENT_TIME);
ThrowingDoFn.TIMER_ID,
ThrowingDoFn.TIMER_ID,
GlobalWindow.INSTANCE,
new Instant(0),
new Instant(0),
TimeDomain.EVENT_TIME);
}

/**
Expand Down Expand Up @@ -238,18 +243,22 @@ public void testOnTimerCalled() {
// Mocking is not easily compatible with annotation analysis, so we manually record
// the method call.
runner.onTimer(
DoFnWithTimers.TIMER_ID,
DoFnWithTimers.TIMER_ID,
GlobalWindow.INSTANCE,
currentTime.plus(offset),
currentTime.plus(offset),
TimeDomain.EVENT_TIME);

assertThat(
fn.onTimerInvocations,
contains(
TimerData.of(
DoFnWithTimers.TIMER_ID,
DoFnWithTimers.TIMER_ID,
StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
currentTime.plus(offset),
currentTime.plus(offset),
TimeDomain.EVENT_TIME)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,13 @@ public void testOnTimerCalled() {

// Mocking is not easily compatible with annotation analysis, so we manually record
// the method call.
runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
runner.onTimer(
timerId,
timerId,
window,
new Instant(timestamp),
new Instant(timestamp),
TimeDomain.EVENT_TIME);

assertThat(
underlying.firedTimers,
Expand Down Expand Up @@ -320,12 +326,19 @@ public void processElement(WindowedValue<InputT> elem) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
firedTimers.add(
TimerData.of(
timerId,
timerFamilyId,
StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
timestamp,
outputTimestamp,
timeDomain));
}

Expand Down Expand Up @@ -458,7 +471,13 @@ private static void advanceInputWatermark(
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
toTrigger.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
timer.getDomain());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ private static void advanceInputWatermark(
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
toTrigger.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
timer.getDomain());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ public void processElement(WindowedValue<InputT> element) {

public void onTimer(TimerData timer, BoundedWindow window) {
try {
fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
fnRunner.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
timer.getDomain());
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ public void processElement(final WindowedValue<InputT> elem) {
@Override
public void onTimer(
final String timerId,
final String timerFamilyId,
final BoundedWindow window,
final Instant timestamp,
final Instant outputTimestamp,
final TimeDomain timeDomain) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
delegate.onTimer(timerId, window, timestamp, timeDomain);
delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,13 @@ private void fireTimer(TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, Outp
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
doFnRunner.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
timer.getDomain());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,12 @@ protected void fireTimer(InternalTimer<ByteBuffer, TimerData> timer) {
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.cleanupPendingTimer(timer.getNamespace());
pushbackDoFnRunner.onTimer(
timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
timerData.getTimerId(),
timerData.getTimerFamilyId(),
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
timerData.getDomain());
}

private void setCurrentInputWatermark(long currentInputWatermark) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,12 @@ public void processElement(WindowedValue<InputT> element) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
Object timerKey = keyForTimer.get();
Preconditions.checkNotNull(timerKey, "Key for timer needs to be set before calling onTimer");
Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,30 @@ public int hashCode() {
static final class Timer implements BufferedElement {

private final String timerId;
private final String timerFamilyId;
private final BoundedWindow window;
private final Instant timestamp;
private final Instant outputTimestamp;
private final TimeDomain timeDomain;

Timer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
Timer(
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
this.timerId = timerId;
this.window = window;
this.timestamp = timestamp;
this.timeDomain = timeDomain;
this.outputTimestamp = outputTimestamp;
this.timerFamilyId = timerFamilyId;
}

@Override
public void processWith(DoFnRunner doFnRunner) {
doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}

@Override
Expand Down Expand Up @@ -130,8 +140,10 @@ public void encode(BufferedElement value, OutputStream outStream) throws IOExcep
outStream.write(TIMER_MAGIC_BYTE);
Timer timer = (Timer) value;
STRING_CODER.encode(timer.timerId, outStream);
STRING_CODER.encode(timer.timerFamilyId, outStream);
windowCoder.encode(timer.window, outStream);
INSTANT_CODER.encode(timer.timestamp, outStream);
INSTANT_CODER.encode(timer.outputTimestamp, outStream);
outStream.write(timer.timeDomain.ordinal());
} else {
throw new IllegalStateException("Unexpected element " + value);
Expand All @@ -146,9 +158,11 @@ public BufferedElement decode(InputStream inStream) throws IOException {
return new Element(elementCoder.decode(inStream));
case TIMER_MAGIC_BYTE:
return new Timer(
STRING_CODER.decode(inStream),
STRING_CODER.decode(inStream),
windowCoder.decode(inStream),
INSTANT_CODER.decode(inStream),
INSTANT_CODER.decode(inStream),
TimeDomain.values()[inStream.read()]);
default:
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,15 @@ public void processElement(WindowedValue<InputT> elem) {

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
String timerFamilyId,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
currentBufferingElementsHandler.buffer(
new BufferedElements.Timer(timerId, window, timestamp, timeDomain));
new BufferedElements.Timer(
timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ public void testCoder() throws IOException {
WindowedValue.of("test", new Instant(2), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
BufferedElement timerElement =
new BufferedElements.Timer(
"timerId", GlobalWindow.INSTANCE, new Instant(1), TimeDomain.EVENT_TIME);
"timerId",
"timerId",
GlobalWindow.INSTANCE,
new Instant(1),
new Instant(1),
TimeDomain.EVENT_TIME);

testRoundTrip(ImmutableList.of(element), coder);
testRoundTrip(ImmutableList.of(timerElement), coder);
Expand Down
Loading

0 comments on commit 15a7ef1

Please sign in to comment.