Skip to content

Commit

Permalink
Fix continuation of OrFinally trigger
Browse files Browse the repository at this point in the history
Since the continuation is specified to fire as-soon-as-reasonable, the simple translation was firing the or-finally too early.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=107423899
  • Loading branch information
dpmills authored and davorbonaci committed Nov 10, 2015
1 parent 1d92b7d commit c33aaaa
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ public Instant getWatermarkThatGuaranteesFiring(W window) {

@Override
public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
return new OrFinallyTrigger<W>(
continuationTriggers.get(ACTUAL),
(Trigger.OnceTrigger<W>) continuationTriggers.get(UNTIL));
// Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL
// may not be a OnceTrigger.
return Repeatedly.forever(
new OrFinallyTrigger<W>(
continuationTriggers.get(ACTUAL),
(Trigger.OnceTrigger<W>) continuationTriggers.get(UNTIL)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@ public void testContinuation() throws Exception {
Trigger<IntervalWindow> aOrFinallyB = triggerA.orFinally(triggerB);
Trigger<IntervalWindow> bOrFinallyA = triggerB.orFinally(triggerA);
assertEquals(
triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger()),
Repeatedly.forever(
triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
aOrFinallyB.getContinuationTrigger());
assertEquals(
triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger()),
Repeatedly.forever(
triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
bOrFinallyA.getContinuationTrigger());
}
}

0 comments on commit c33aaaa

Please sign in to comment.