diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java index 0b0a1fd3ed8e1..4d149d39259b3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.clock.RelativeClock; import java.io.Serializable; @@ -39,7 +40,7 @@ public interface WatermarkGeneratorSupplier extends Serializable { /** * Additional information available to {@link #createWatermarkGenerator(Context)}. This can be - * access to {@link org.apache.flink.metrics.MetricGroup MetricGroups}, for example. + * access to {@link MetricGroup MetricGroups}, for example. */ interface Context { @@ -54,5 +55,14 @@ interface Context { * @see MetricGroup */ MetricGroup getMetricGroup(); + + /** + * Returns a {@link RelativeClock} that hides periods when input was not active and {@link + * WatermarkGenerator} could not have been executed due to execution being blocked by the + * runtime. For example a backpressure or watermark alignment blocking the progress. + * + * @see RelativeClock + */ + RelativeClock getInputActivityClock(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java index 660a789027720..7c0ae86195294 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java @@ -42,7 +42,9 @@ public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Co public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context) { return new WatermarksWithIdleness<>( - baseStrategy.createWatermarkGenerator(context), idlenessTimeout); + baseStrategy.createWatermarkGenerator(context), + idlenessTimeout, + context.getInputActivityClock()); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java index 3d61217ce049c..a4d5639a5e15a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.RelativeClock; import org.apache.flink.util.clock.SystemClock; import java.time.Duration; @@ -42,19 +43,27 @@ public class WatermarksWithIdleness implements WatermarkGenerator { private boolean isIdleNow = false; + /** + * This is not used anymore, but it's technically part of the {@link Public} API. Please use + * {@link #WatermarksWithIdleness(WatermarkGenerator, Duration, RelativeClock)} instead. + */ + @Deprecated + public WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout) { + this(watermarks, idleTimeout, SystemClock.getInstance()); + } + /** * Creates a new WatermarksWithIdleness generator to the given generator idleness detection with * the given timeout. * * @param watermarks The original watermark generator. * @param idleTimeout The timeout for the idleness detection. + * @param clock The clock that will be used to measure idleness period. It is expected that this + * clock will hide periods when this {@link WatermarkGenerator} has been blocked from making + * any progress despite availability of records on the input. */ - public WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout) { - this(watermarks, idleTimeout, SystemClock.getInstance()); - } - - @VisibleForTesting - WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout, Clock clock) { + public WatermarksWithIdleness( + WatermarkGenerator watermarks, Duration idleTimeout, RelativeClock clock) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument( !(idleTimeout.isZero() || idleTimeout.isNegative()), @@ -88,7 +97,7 @@ public void onPeriodicEmit(WatermarkOutput output) { static final class IdlenessTimer { /** The clock used to measure elapsed time. */ - private final Clock clock; + private final RelativeClock clock; /** Counter to detect change. No problem if it overflows. */ private long counter; @@ -105,7 +114,7 @@ static final class IdlenessTimer { /** The duration before the output is marked as idle. */ private final long maxIdleTimeNanos; - IdlenessTimer(Clock clock, Duration idleTimeout) { + IdlenessTimer(RelativeClock clock, Duration idleTimeout) { this.clock = clock; long idleNanos; diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java index 2ce7b88d3f60d..2ddafea101711 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java @@ -20,7 +20,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.clock.RelativeClock; +import org.apache.flink.util.clock.SystemClock; import org.junit.jupiter.api.Test; @@ -171,6 +174,16 @@ static TimestampAssignerSupplier.Context assignerContext() { } static WatermarkGeneratorSupplier.Context generatorContext() { - return UnregisteredMetricsGroup::new; + return new WatermarkGeneratorSupplier.Context() { + @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public RelativeClock getInputActivityClock() { + return SystemClock.getInstance(); + } + }; } } diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java index 347db1589c8c4..3fe4dce03aa2d 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java +++ b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java @@ -20,13 +20,17 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.util.clock.RelativeClock; +import org.apache.flink.util.clock.SystemClock; import java.util.ArrayList; import java.util.Arrays; @@ -60,7 +64,18 @@ public TestSourceFunction(WatermarkStrategy watermarkStrategy) { @Override public void run(SourceContext ctx) { WatermarkGenerator generator = - watermarkStrategy.createWatermarkGenerator(() -> null); + watermarkStrategy.createWatermarkGenerator( + new WatermarkGeneratorSupplier.Context() { + @Override + public MetricGroup getMetricGroup() { + return null; + } + + @Override + public RelativeClock getInputActivityClock() { + return SystemClock.getInstance(); + } + }); WatermarkOutput output = new TestWatermarkOutput(ctx); int rowDataSize = DATA.get(0).size(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java index 4685691385ddd..8ec6cc8c01fcf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.clock.RelativeClock; +import org.apache.flink.util.clock.SystemClock; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -43,4 +45,9 @@ public TimestampsAndWatermarksContext(MetricGroup metricGroup) { public MetricGroup getMetricGroup() { return metricGroup; } + + @Override + public RelativeClock getInputActivityClock() { + return SystemClock.getInstance(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java index 570cb6b90b3e6..02f72f3ba32b5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java @@ -21,14 +21,18 @@ import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.clock.RelativeClock; +import org.apache.flink.util.clock.SystemClock; import static org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -80,7 +84,18 @@ public void open() throws Exception { timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); watermarkGenerator = emitProgressiveWatermarks - ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) + ? watermarkStrategy.createWatermarkGenerator( + new WatermarkGeneratorSupplier.Context() { + @Override + public MetricGroup getMetricGroup() { + return this.getMetricGroup(); + } + + @Override + public RelativeClock getInputActivityClock() { + return SystemClock.getInstance(); + } + }) : new NoWatermarksGenerator<>(); wmOutput = new WatermarkEmitter(output); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index 0f8d0dd70a9d0..cb12026b4a048 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.io.RichOutputFormat; @@ -31,6 +32,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -60,6 +62,8 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.types.RowUtils; +import org.apache.flink.util.clock.RelativeClock; +import org.apache.flink.util.clock.SystemClock; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -266,7 +270,18 @@ public void run(SourceContext ctx) throws Exception { ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized); final DataInputView input = new DataInputViewStreamWrapper(bais); WatermarkGenerator generator = - watermarkStrategy.createWatermarkGenerator(() -> null); + watermarkStrategy.createWatermarkGenerator( + new WatermarkGeneratorSupplier.Context() { + @Override + public MetricGroup getMetricGroup() { + return null; + } + + @Override + public RelativeClock getInputActivityClock() { + return SystemClock.getInstance(); + } + }); WatermarkOutput output = new TestValuesWatermarkOutput(ctx); final Object lock = ctx.getCheckpointLock(); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala index 98265f6729994..75ae617a4f281 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala @@ -31,9 +31,10 @@ import org.apache.flink.table.runtime.generated.WatermarkGenerator import org.apache.flink.table.types.logical.{IntType, TimestampType} import org.apache.flink.table.utils.CatalogManagerMocks import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} +import org.apache.flink.util.clock.{RelativeClock, SystemClock} -import org.junit.jupiter.api.{Test, TestTemplate} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.TestTemplate import org.junit.jupiter.api.extension.ExtendWith import java.lang.{Integer => JInt, Long => JLong} @@ -167,6 +168,8 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) { val newReferences = generated.getReferences :+ new WatermarkGeneratorSupplier.Context { override def getMetricGroup: MetricGroup = null + + override def getInputActivityClock: RelativeClock = SystemClock.getInstance() } generated.newInstance(Thread.currentThread().getContextClassLoader, newReferences) } else {