From 02d89caede088d334daf13fb26103aaec6d3ac19 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 5 Aug 2024 15:26:29 +0200 Subject: [PATCH] [FLINK-35886][task] Hide backpressure and watermark alignment from idleness detection in SourceOperator --- .../api/operators/SourceOperator.java | 27 ++- .../source/NoOpTimestampsAndWatermarks.java | 6 + .../ProgressiveTimestampsAndWatermarks.java | 79 +++++++- .../source/TimestampsAndWatermarks.java | 46 ++++- .../TimestampsAndWatermarksContext.java | 8 +- ...ceOperatorSplitWatermarkAlignmentTest.java | 174 +++++++++++++++++- 6 files changed, 313 insertions(+), 27 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 972415c1ff0c9..d1257e2caf541 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; @@ -52,6 +53,7 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks; +import org.apache.flink.streaming.api.operators.util.PausableRelativeClock; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.DataInputStatus; @@ -197,6 +199,12 @@ private enum OperatingMode { private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords; + /** + * {@link PausableRelativeClock} tracking activity of the operator's main input. It's paused on + * backpressure. Note, each split output has its own independent {@link PausableRelativeClock}. + */ + private transient PausableRelativeClock mainInputActivityClock; + public SourceOperator( FunctionWithException, Exception> readerFactory, @@ -322,6 +330,11 @@ public InternalSourceReaderMetricGroup getSourceMetricGroup() { @Override public void open() throws Exception { + mainInputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock()); + TaskIOMetricGroup taskIOMetricGroup = + getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup(); + taskIOMetricGroup.registerBackPressureListener(mainInputActivityClock); + initReader(); // in the future when we this one is migrated to the "eager initialization" operator @@ -332,11 +345,14 @@ public void open() throws Exception { watermarkStrategy, sourceMetricGroup, getProcessingTimeService(), - getExecutionConfig().getAutoWatermarkInterval()); + getExecutionConfig().getAutoWatermarkInterval(), + mainInputActivityClock, + getProcessingTimeService().getClock(), + taskIOMetricGroup); } else { eventTimeLogic = TimestampsAndWatermarks.createNoOpEventTimeLogic( - watermarkStrategy, sourceMetricGroup); + watermarkStrategy, sourceMetricGroup, mainInputActivityClock); } // restore the state if necessary. @@ -396,6 +412,12 @@ public CompletableFuture stop(StopMode mode) { @Override public void close() throws Exception { + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .unregisterBackPressureListener(mainInputActivityClock); + if (sourceReader != null) { sourceReader.close(); } @@ -676,6 +698,7 @@ private void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { try { sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume); + eventTimeLogic.pauseOrResumeSplits(splitsToPause, splitsToResume); } catch (UnsupportedOperationException e) { if (!allowUnalignedSourceSplits) { throw e; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java index 3d21512a143fb..b2debd089b5c8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java @@ -27,6 +27,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException; +import java.util.Collection; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -68,6 +70,10 @@ public void emitImmediateWatermark(long wallClockTimestamp) { // do nothing } + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) {} + // ------------------------------------------------------------------------ /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java index 98507c096b945..ca0c5f47b9bb6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java @@ -27,16 +27,23 @@ import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.streaming.api.operators.util.PausableRelativeClock; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.RelativeClock; import javax.annotation.Nullable; import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ScheduledFuture; +import static java.util.Objects.requireNonNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -53,12 +60,18 @@ public class ProgressiveTimestampsAndWatermarks implements TimestampsAndWater private final WatermarkGeneratorSupplier watermarksFactory; - private final WatermarkGeneratorSupplier.Context watermarksContext; + private final TimestampsAndWatermarksContextProvider watermarksContextProvider; private final ProcessingTimeService timeService; private final long periodicWatermarkInterval; + private final RelativeClock mainInputActivityClock; + + private final Clock clock; + + private final TaskIOMetricGroup taskIOMetricGroup; + @Nullable private SplitLocalOutputs currentPerSplitOutputs; @Nullable private StreamingReaderOutput currentMainOutput; @@ -68,14 +81,20 @@ public class ProgressiveTimestampsAndWatermarks implements TimestampsAndWater public ProgressiveTimestampsAndWatermarks( TimestampAssigner timestampAssigner, WatermarkGeneratorSupplier watermarksFactory, - WatermarkGeneratorSupplier.Context watermarksContext, + TimestampsAndWatermarksContextProvider watermarksContextProvider, ProcessingTimeService timeService, - Duration periodicWatermarkInterval) { + Duration periodicWatermarkInterval, + RelativeClock mainInputActivityClock, + Clock clock, + TaskIOMetricGroup taskIOMetricGroup) { this.timestampAssigner = timestampAssigner; this.watermarksFactory = watermarksFactory; - this.watermarksContext = watermarksContext; + this.watermarksContextProvider = watermarksContextProvider; this.timeService = timeService; + this.mainInputActivityClock = mainInputActivityClock; + this.clock = clock; + this.taskIOMetricGroup = taskIOMetricGroup; long periodicWatermarkIntervalMillis; try { @@ -106,7 +125,8 @@ public ReaderOutput createMainOutput( IdlenessManager idlenessManager = new IdlenessManager(watermarkOutput); final WatermarkGenerator watermarkGenerator = - watermarksFactory.createWatermarkGenerator(watermarksContext); + watermarksFactory.createWatermarkGenerator( + watermarksContextProvider.create(mainInputActivityClock)); currentPerSplitOutputs = new SplitLocalOutputs<>( @@ -115,7 +135,9 @@ public ReaderOutput createMainOutput( watermarkUpdateListener, timestampAssigner, watermarksFactory, - watermarksContext); + watermarksContextProvider, + clock, + taskIOMetricGroup); currentMainOutput = new StreamingReaderOutput<>( @@ -162,6 +184,12 @@ public void emitImmediateWatermark(@SuppressWarnings("unused") long wallClockTim } } + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + currentPerSplitOutputs.pauseOrResumeSplits(splitsToPause, splitsToResume); + } + // ------------------------------------------------------------------------ private static final class StreamingReaderOutput extends SourceOutputWithWatermarks @@ -203,11 +231,14 @@ private static final class SplitLocalOutputs { private final WatermarkOutputMultiplexer watermarkMultiplexer; private final Map> localOutputs; + private final Map inputActivityClocks = new HashMap<>(); private final PushingAsyncDataInput.DataOutput recordOutput; private final TimestampAssigner timestampAssigner; private final WatermarkGeneratorSupplier watermarksFactory; - private final WatermarkGeneratorSupplier.Context watermarkContext; + private final TimestampsAndWatermarksContextProvider watermarksContextProvider; private final WatermarkUpdateListener watermarkUpdateListener; + private final Clock clock; + private final TaskIOMetricGroup taskIOMetricGroup; private SplitLocalOutputs( PushingAsyncDataInput.DataOutput recordOutput, @@ -215,13 +246,17 @@ private SplitLocalOutputs( WatermarkUpdateListener watermarkUpdateListener, TimestampAssigner timestampAssigner, WatermarkGeneratorSupplier watermarksFactory, - WatermarkGeneratorSupplier.Context watermarkContext) { + TimestampsAndWatermarksContextProvider watermarksContextProvider, + Clock clock, + TaskIOMetricGroup taskIOMetricGroup) { this.recordOutput = recordOutput; this.timestampAssigner = timestampAssigner; this.watermarksFactory = watermarksFactory; - this.watermarkContext = watermarkContext; + this.watermarksContextProvider = watermarksContextProvider; this.watermarkUpdateListener = watermarkUpdateListener; + this.clock = clock; + this.taskIOMetricGroup = taskIOMetricGroup; this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput); this.localOutputs = @@ -234,6 +269,7 @@ SourceOutput createOutputForSplit(String splitId) { return previous; } + PausableRelativeClock inputActivityClock = createInputActivityClock(splitId); watermarkMultiplexer.registerNewOutput( splitId, watermark -> @@ -243,7 +279,8 @@ SourceOutput createOutputForSplit(String splitId) { final WatermarkOutput periodicOutput = watermarkMultiplexer.getDeferredOutput(splitId); final WatermarkGenerator watermarks = - watermarksFactory.createWatermarkGenerator(watermarkContext); + watermarksFactory.createWatermarkGenerator( + watermarksContextProvider.create(inputActivityClock)); final SourceOutputWithWatermarks localOutput = SourceOutputWithWatermarks.createWithSeparateOutputs( @@ -257,9 +294,21 @@ SourceOutput createOutputForSplit(String splitId) { return localOutput; } + private PausableRelativeClock createInputActivityClock(String splitId) { + // Dedicated inputActivityClock for a particular split. It will be paused both in case + // of back pressure and when split is paused due to watermark alignment. + PausableRelativeClock inputActivityClock = new PausableRelativeClock(clock); + inputActivityClocks.put(splitId, inputActivityClock); + taskIOMetricGroup.registerBackPressureListener(inputActivityClock); + return inputActivityClock; + } + void releaseOutputForSplit(String splitId) { localOutputs.remove(splitId); watermarkMultiplexer.unregisterOutput(splitId); + PausableRelativeClock inputActivityClock = + requireNonNull(inputActivityClocks.remove(splitId)); + taskIOMetricGroup.unregisterBackPressureListener(inputActivityClock); } void emitPeriodicWatermark() { @@ -273,6 +322,16 @@ void emitPeriodicWatermark() { } watermarkMultiplexer.onPeriodicEmit(); } + + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + for (String splitId : splitsToPause) { + inputActivityClocks.get(splitId).pause(); + } + for (String splitId : splitsToResume) { + inputActivityClocks.get(splitId).unPause(); + } + } } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java index 84bb745760110..bdbd3479da51e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java @@ -24,10 +24,14 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.RelativeClock; import java.time.Duration; +import java.util.Collection; /** * Basic interface for the timestamp extraction and watermark generation logic for the {@link @@ -83,6 +87,8 @@ ReaderOutput createMainOutput( /** Emit a watermark immediately. */ void emitImmediateWatermark(long wallClockTimestamp); + void pauseOrResumeSplits(Collection splitsToPause, Collection splitsToResume); + // ------------------------------------------------------------------------ // factories // ------------------------------------------------------------------------ @@ -91,27 +97,51 @@ static TimestampsAndWatermarks createProgressiveEventTimeLogic( WatermarkStrategy watermarkStrategy, MetricGroup metrics, ProcessingTimeService timeService, - long periodicWatermarkIntervalMillis) { + long periodicWatermarkIntervalMillis, + RelativeClock mainInputActivityClock, + Clock clock, + TaskIOMetricGroup taskIOMetricGroup) { - final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics); - final TimestampAssigner timestampAssigner = - watermarkStrategy.createTimestampAssigner(context); + TimestampsAndWatermarksContextProvider contextProvider = + new TimestampsAndWatermarksContextProvider(metrics); + TimestampAssigner timestampAssigner = + watermarkStrategy.createTimestampAssigner( + contextProvider.create(mainInputActivityClock)); return new ProgressiveTimestampsAndWatermarks<>( timestampAssigner, watermarkStrategy, - context, + contextProvider, timeService, - Duration.ofMillis(periodicWatermarkIntervalMillis)); + Duration.ofMillis(periodicWatermarkIntervalMillis), + mainInputActivityClock, + clock, + taskIOMetricGroup); } static TimestampsAndWatermarks createNoOpEventTimeLogic( - WatermarkStrategy watermarkStrategy, MetricGroup metrics) { + WatermarkStrategy watermarkStrategy, + MetricGroup metrics, + RelativeClock inputActivityClock) { - final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics); + final TimestampsAndWatermarksContext context = + new TimestampsAndWatermarksContext(metrics, inputActivityClock); final TimestampAssigner timestampAssigner = watermarkStrategy.createTimestampAssigner(context); return new NoOpTimestampsAndWatermarks<>(timestampAssigner); } + + @Internal + class TimestampsAndWatermarksContextProvider { + private final MetricGroup metrics; + + public TimestampsAndWatermarksContextProvider(MetricGroup metrics) { + this.metrics = metrics; + } + + public TimestampsAndWatermarksContext create(RelativeClock inputActivityClock) { + return new TimestampsAndWatermarksContext(metrics, inputActivityClock); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java index 8ec6cc8c01fcf..6f543e0392c90 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.clock.RelativeClock; -import org.apache.flink.util.clock.SystemClock; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -36,9 +35,12 @@ public final class TimestampsAndWatermarksContext implements TimestampAssignerSupplier.Context, WatermarkGeneratorSupplier.Context { private final MetricGroup metricGroup; + private final RelativeClock inputActivityClock; - public TimestampsAndWatermarksContext(MetricGroup metricGroup) { + public TimestampsAndWatermarksContext( + MetricGroup metricGroup, RelativeClock inputActivityClock) { this.metricGroup = checkNotNull(metricGroup); + this.inputActivityClock = inputActivityClock; } @Override @@ -48,6 +50,6 @@ public MetricGroup getMetricGroup() { @Override public RelativeClock getInputActivityClock() { - return SystemClock.getInstance(); + return inputActivityClock; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index eee59752e2dee..e776395d31ad5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -29,6 +29,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.source.event.AddSplitEvent; @@ -40,6 +41,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.MockOutput; import org.apache.flink.streaming.util.MockStreamConfig; @@ -49,18 +51,17 @@ Licensed to the Apache Software Foundation (ASF) under one import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Optional; -import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; /** Unit test for split alignment in {@link SourceOperator}. */ -class SourceOperatorSplitWatermarkAlignmentTest { +class SourceOperatorSplitWatermarkAlignmentTest { @Test void testSplitWatermarkAlignment() throws Exception { - MockSourceReader sourceReader = new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); SourceOperator operator = new TestingSourceOperator<>( sourceReader, @@ -111,6 +112,144 @@ void testSplitWatermarkAlignment() throws Exception { assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1"); } + @Test + void testBackpressureAndIdleness() throws Exception { + long idleTimeout = 100; + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + SourceOperator operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + /** + * Intention behind this setup is that split0 emits a couple of records, while we keep + * advancing processing time and keep firing timers. Normally split1 would switch to idle + * first (it hasn't emitted any records), which would cause a watermark from split0 to be + * emitted and then WatermarkStatus.IDLE should be emitted after split0 also switches to + * idle. However we assert that neither watermark no idle status this doesn't happen due to + * the back pressure status. + */ + MockSourceSplit split0 = new MockSourceSplit(0, 0, 10).addRecord(42).addRecord(44); + MockSourceSplit split1 = new MockSourceSplit(1, 10, 20); + operator.handleOperatorEvent( + new AddSplitEvent<>( + Arrays.asList(split0, split1), new MockSourceSplitSerializer())); + + CollectingDataOutput dataOutput = new CollectingDataOutput<>(); + + // Output is initialised by the SourceOperator on the first emitNext invocation + operator.emitNext(dataOutput); + + TaskIOMetricGroup taskIOMetricGroup = + operator.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup(); + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart(); + + for (int i = 0; i < 10; i++) { + processingTimeService.advance(idleTimeout); + operator.emitNext(dataOutput); + } + assertThat(dataOutput.getEvents()).doesNotContain(WatermarkStatus.IDLE); + assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark()); + + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd(); + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart(); + + for (int i = 0; i < 10; i++) { + processingTimeService.advance(idleTimeout); + } + assertThat(dataOutput.getEvents()).doesNotContain(WatermarkStatus.IDLE); + assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark()); + + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd(); + + for (int i = 0; i < 10; i++) { + processingTimeService.advance(idleTimeout); + } + + assertThat(dataOutput.getEvents()).contains(WatermarkStatus.IDLE); + assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark()); + } + + @Test + void testSplitWatermarkAlignmentAndIdleness() throws Exception { + long idleTimeout = 100; + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + SourceOperator operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + MockSourceSplit split0 = new MockSourceSplit(0, 0, 10); + MockSourceSplit split1 = new MockSourceSplit(1, 10, 20); + int maxAllowedWatermark = 4; + int maxEmittedWatermark = maxAllowedWatermark + 1; + // the intention is that only first record from split0 gets emitted, then split0 gets + // blocked and record (maxEmittedWatermark + 100) is never emitted from split0 + split0.addRecord(maxEmittedWatermark).addRecord(maxEmittedWatermark + 100); + split1.addRecord(3) + .addRecord(3) + .addRecord(3) + .addRecord(3) + .addRecord(3) + .addRecord(3) + .addRecord(3); + split1.addRecord(maxEmittedWatermark + 100); + + operator.handleOperatorEvent( + new AddSplitEvent<>( + Arrays.asList(split0, split1), new MockSourceSplitSerializer())); + CollectingDataOutput dataOutput = new CollectingDataOutput<>(); + + operator.emitNext(dataOutput); // split0 emits first (and only) record (maxEmittedWatermark) + + operator.handleOperatorEvent( + new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks split0 + assertThat(sourceReader.getPausedSplits()).containsExactly("0"); + + while (operator.isAvailable()) { + // We are advancing a couple of times by (idleTimeout - 1) to make sure the active input + // never switches idle, while giving plenty of time for the blocked split0 to evaluate + // it's idle state + processingTimeService.advance(idleTimeout - 1); + operator.emitNext(dataOutput); // split1 keeps emitting records + } + // in the end, all records are emitted from split1. This shouldn't cause the watermark to + // get bumped above maxEmittedWatermark, as split0 shouldn't be idle and it is still + // blocked. + assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1"); + assertThat(dataOutput.getEvents()).doNotHave(new WatermarkAbove(maxEmittedWatermark)); + } + + private SourceOperator createAndOpenSourceOperatorWithIdleness( + MockSourceReader sourceReader, + TestProcessingTimeService processingTimeService, + long idleTimeout) + throws Exception { + + SourceOperator operator = + new TestingSourceOperator<>( + sourceReader, + WatermarkStrategy.forGenerator(ctx -> new TestWatermarkGenerator()) + .withTimestampAssigner((r, l) -> r) + .withWatermarkAlignment("group-1", Duration.ofMillis(1)) + .withIdleness(Duration.ofMillis(idleTimeout)), + processingTimeService, + new MockOperatorEventGateway(), + 1, + 5, + true); + Environment env = getTestingEnvironment(); + operator.setup( + new SourceOperatorStreamTask(env), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>())); + operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend())); + operator.open(); + return operator; + } + private Environment getTestingEnvironment() { return new StreamMockEnvironment( new Configuration(), @@ -139,4 +278,31 @@ public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxWatermark)); } } + + /** Condition checking if there is no watermark above a certain value among StreamElements. */ + public static class WatermarkAbove extends Condition { + public WatermarkAbove(int maxEmittedWatermark) { + super( + event -> { + if (!(event + instanceof org.apache.flink.streaming.api.watermark.Watermark)) { + return false; + } + org.apache.flink.streaming.api.watermark.Watermark w = + (org.apache.flink.streaming.api.watermark.Watermark) event; + return w.getTimestamp() > maxEmittedWatermark; + }, + "watermark value of greater than %d", + maxEmittedWatermark); + } + } + + /** Condition checking if there is any watermark among StreamElements. */ + public static class AnyWatermark extends Condition { + public AnyWatermark() { + super( + event -> event instanceof org.apache.flink.streaming.api.watermark.Watermark, + "any watermark"); + } + } }