Skip to content

Commit

Permalink
[FLINK-35886][task] Hide backpressure from idleness detection in Wate…
Browse files Browse the repository at this point in the history
…rmarkAssignerOperator
  • Loading branch information
pnowojski committed Aug 17, 2024
1 parent 02d89ca commit 6d100ab
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand Down Expand Up @@ -70,6 +71,9 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData>

private transient long lastIdleCheckProcessedElements = -1;

/** {@link PausableRelativeClock} that will be paused in case of backpressure. */
private transient PausableRelativeClock inputActivityClock;

/**
* Create a watermark assigner operator.
*
Expand All @@ -95,6 +99,12 @@ public WatermarkAssignerOperator(
@Override
public void open() throws Exception {
super.open();
inputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock());
getContainingTask()
.getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.registerBackPressureListener(inputActivityClock);

// watermark and timestamp should start from 0
this.currentWatermark = 0;
Expand Down Expand Up @@ -154,21 +164,23 @@ private void advanceWatermark() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
// timestamp and now can be off in case TM is heavily overloaded.
// now and inputActivityNow are using different clocks and can have very different values.
long now = getProcessingTimeService().getCurrentProcessingTime();
long inputActivityNow = inputActivityClock.relativeTimeMillis();

if (watermarkInterval > 0 && lastWatermarkPeriodicEmitTime + watermarkInterval <= now) {
lastWatermarkPeriodicEmitTime = now;
advanceWatermark();
}

if (processedElements != lastIdleCheckProcessedElements) {
timeSinceLastIdleCheck = now;
timeSinceLastIdleCheck = inputActivityNow;
lastIdleCheckProcessedElements = processedElements;
}

if (isIdlenessEnabled()
&& currentStatus.equals(WatermarkStatus.ACTIVE)
&& timeSinceLastIdleCheck + idleTimeout <= now) {
&& timeSinceLastIdleCheck + idleTimeout <= inputActivityNow) {
// mark the channel as idle to ignore watermarks from this channel
emitWatermarkStatus(WatermarkStatus.IDLE);
}
Expand Down Expand Up @@ -213,6 +225,11 @@ public void finish() throws Exception {

@Override
public void close() throws Exception {
getContainingTask()
.getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.unregisterBackPressureListener(inputActivityClock);
FunctionUtils.closeFunction(watermarkGenerator);
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
Expand Down Expand Up @@ -139,6 +140,34 @@ private void testIdleTimeout(long idleTimeout, long watermarkInterval) throws Ex
assertThat(extractWatermarkStatuses(output)).doesNotContain(WatermarkStatus.IDLE);
}

@Test
public void testIdleTimeoutUnderBackpressure() throws Exception {
long idleTimeout = 100;

OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarness(0, WATERMARK_GENERATOR, idleTimeout);
testHarness.getExecutionConfig().setAutoWatermarkInterval(idleTimeout);
testHarness.open();

TaskIOMetricGroup taskIOMetricGroup =
testHarness.getEnvironment().getMetricGroup().getIOMetricGroup();
taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();

stepProcessingTime(testHarness, 0, idleTimeout * 10, idleTimeout / 10);
assertThat(testHarness.getOutput()).isEmpty();

taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();

stepProcessingTime(testHarness, idleTimeout * 10, idleTimeout * 20, idleTimeout / 10);
assertThat(testHarness.getOutput()).isEmpty();

taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();

stepProcessingTime(testHarness, idleTimeout * 20, idleTimeout * 30, idleTimeout / 10);
assertThat(testHarness.getOutput()).containsExactly(WatermarkStatus.IDLE);
}

private void stepProcessingTime(
OneInputStreamOperatorTestHarness<?, ?> testHarness,
long fromInclusive,
Expand Down

0 comments on commit 6d100ab

Please sign in to comment.