Skip to content

Commit

Permalink
Merge pull request apache#9841 from je-ik/BEAM-8439: [BEAM-8439] avoi…
Browse files Browse the repository at this point in the history
…d creation of empty bundles
  • Loading branch information
je-ik authored Oct 21, 2019
2 parents 3b491ba + 49def1e commit 7f8e4e4
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ public void processWatermark(Watermark mark) throws Exception {

@Override
public void processWatermark1(Watermark mark) throws Exception {
checkInvokeStartBundle();
// We do the check here because we are guaranteed to at least get the +Inf watermark on the
// main input when the job finishes.
if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
Expand Down Expand Up @@ -677,7 +676,6 @@ private void emitWatermark(long watermark) {

@Override
public void processWatermark2(Watermark mark) throws Exception {
checkInvokeStartBundle();

setCurrentSideInputWatermark(mark.getTimestamp());
if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
Expand All @@ -698,6 +696,7 @@ private void emitAllPushedBackData() throws Exception {
Iterator<WindowedValue<InputT>> it = pushedBackElementsHandler.getElements().iterator();

while (it.hasNext()) {
checkInvokeStartBundle();
WindowedValue<InputT> element = it.next();
// we need to set the correct key in case the operator is
// a (keyed) window operator
Expand Down Expand Up @@ -790,8 +789,7 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception {

@Override
public void onEventTime(InternalTimer<ByteBuffer, TimerData> timer) throws Exception {
// We don't have to cal checkInvokeStartBundle() because it's already called in
// processWatermark*().
checkInvokeStartBundle();
fireTimer(timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,15 +1274,15 @@ public void finishBundle(FinishBundleContext context) {
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));

// A final bundle will be created when sending the MAX watermark
// No bundle will be created when sending the MAX watermark
// (unless pushed back items are emitted)
newHarness.close();

assertThat(
stripStreamRecordFromWindowedValue(newHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));

// close() will also call dispose(), but call again to verify no new bundle
Expand All @@ -1294,7 +1294,6 @@ public void finishBundle(FinishBundleContext context) {
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,6 @@ public void testStageBundleClosed() throws Exception {
verify(stageBundleFactory).getProcessBundleDescriptor();
verify(stageBundleFactory).close();
verify(stageContext).close();
// DoFnOperator generates a final watermark, which triggers a new bundle..
verify(stageBundleFactory).getBundle(any(), any(), any());
verify(bundle).getInputReceivers();
verify(bundle).close();
verifyNoMoreInteractions(stageBundleFactory);

// close() will also call dispose(), but call again to verify no new bundle
Expand Down

0 comments on commit 7f8e4e4

Please sign in to comment.