Skip to content

Commit

Permalink
[FLINK-35886][task] Use RelativeClock in WatermarksWithIdleness
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Aug 17, 2024
1 parent 9659a5c commit 934007c
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.clock.RelativeClock;

import java.io.Serializable;

Expand All @@ -39,7 +40,7 @@ public interface WatermarkGeneratorSupplier<T> extends Serializable {

/**
* Additional information available to {@link #createWatermarkGenerator(Context)}. This can be
* access to {@link org.apache.flink.metrics.MetricGroup MetricGroups}, for example.
* access to {@link MetricGroup MetricGroups}, for example.
*/
interface Context {

Expand All @@ -54,5 +55,14 @@ interface Context {
* @see MetricGroup
*/
MetricGroup getMetricGroup();

/**
* Returns a {@link RelativeClock} that hides periods when input was not active and {@link
* WatermarkGenerator} could not have been executed due to execution being blocked by the
* runtime. For example a backpressure or watermark alignment blocking the progress.
*
* @see RelativeClock
*/
RelativeClock getInputActivityClock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Co
public WatermarkGenerator<T> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarksWithIdleness<>(
baseStrategy.createWatermarkGenerator(context), idlenessTimeout);
baseStrategy.createWatermarkGenerator(context),
idlenessTimeout,
context.getInputActivityClock());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;

import java.time.Duration;
Expand All @@ -42,19 +43,27 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {

private boolean isIdleNow = false;

/**
* This is not used anymore, but it's technically part of the {@link Public} API. Please use
* {@link #WatermarksWithIdleness(WatermarkGenerator, Duration, RelativeClock)} instead.
*/
@Deprecated
public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
this(watermarks, idleTimeout, SystemClock.getInstance());
}

/**
* Creates a new WatermarksWithIdleness generator to the given generator idleness detection with
* the given timeout.
*
* @param watermarks The original watermark generator.
* @param idleTimeout The timeout for the idleness detection.
* @param clock The clock that will be used to measure idleness period. It is expected that this
* clock will hide periods when this {@link WatermarkGenerator} has been blocked from making
* any progress despite availability of records on the input.
*/
public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
this(watermarks, idleTimeout, SystemClock.getInstance());
}

@VisibleForTesting
WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {
public WatermarksWithIdleness(
WatermarkGenerator<T> watermarks, Duration idleTimeout, RelativeClock clock) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(
!(idleTimeout.isZero() || idleTimeout.isNegative()),
Expand Down Expand Up @@ -88,7 +97,7 @@ public void onPeriodicEmit(WatermarkOutput output) {
static final class IdlenessTimer {

/** The clock used to measure elapsed time. */
private final Clock clock;
private final RelativeClock clock;

/** Counter to detect change. No problem if it overflows. */
private long counter;
Expand All @@ -105,7 +114,7 @@ static final class IdlenessTimer {
/** The duration before the output is marked as idle. */
private final long maxIdleTimeNanos;

IdlenessTimer(Clock clock, Duration idleTimeout) {
IdlenessTimer(RelativeClock clock, Duration idleTimeout) {
this.clock = clock;

long idleNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -171,6 +174,16 @@ static TimestampAssignerSupplier.Context assignerContext() {
}

static WatermarkGeneratorSupplier.Context generatorContext() {
return UnregisteredMetricsGroup::new;
return new WatermarkGeneratorSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
return new UnregisteredMetricsGroup();
}

@Override
public RelativeClock getInputActivityClock() {
return SystemClock.getInstance();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -60,7 +64,18 @@ public TestSourceFunction(WatermarkStrategy<RowData> watermarkStrategy) {
@Override
public void run(SourceContext<RowData> ctx) {
WatermarkGenerator<RowData> generator =
watermarkStrategy.createWatermarkGenerator(() -> null);
watermarkStrategy.createWatermarkGenerator(
new WatermarkGeneratorSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
return null;
}

@Override
public RelativeClock getInputActivityClock() {
return SystemClock.getInstance();
}
});
WatermarkOutput output = new TestWatermarkOutput(ctx);

int rowDataSize = DATA.get(0).size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -43,4 +45,9 @@ public TimestampsAndWatermarksContext(MetricGroup metricGroup) {
public MetricGroup getMetricGroup() {
return metricGroup;
}

@Override
public RelativeClock getInputActivityClock() {
return SystemClock.getInstance();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;

import static org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -80,7 +84,18 @@ public void open() throws Exception {
timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
watermarkGenerator =
emitProgressiveWatermarks
? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
? watermarkStrategy.createWatermarkGenerator(
new WatermarkGeneratorSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
return this.getMetricGroup();
}

@Override
public RelativeClock getInputActivityClock() {
return SystemClock.getInstance();
}
})
: new NoWatermarksGenerator<>();

wmOutput = new WatermarkEmitter(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.RichOutputFormat;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
Expand Down Expand Up @@ -60,6 +62,8 @@
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -266,7 +270,18 @@ public void run(SourceContext<RowData> ctx) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
final DataInputView input = new DataInputViewStreamWrapper(bais);
WatermarkGenerator<RowData> generator =
watermarkStrategy.createWatermarkGenerator(() -> null);
watermarkStrategy.createWatermarkGenerator(
new WatermarkGeneratorSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
return null;
}

@Override
public RelativeClock getInputActivityClock() {
return SystemClock.getInstance();
}
});
WatermarkOutput output = new TestValuesWatermarkOutput(ctx);
final Object lock = ctx.getCheckpointLock();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import org.apache.flink.table.runtime.generated.WatermarkGenerator
import org.apache.flink.table.types.logical.{IntType, TimestampType}
import org.apache.flink.table.utils.CatalogManagerMocks
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.util.clock.{RelativeClock, SystemClock}

import org.junit.jupiter.api.{Test, TestTemplate}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.TestTemplate
import org.junit.jupiter.api.extension.ExtendWith

import java.lang.{Integer => JInt, Long => JLong}
Expand Down Expand Up @@ -167,6 +168,8 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) {
val newReferences = generated.getReferences :+
new WatermarkGeneratorSupplier.Context {
override def getMetricGroup: MetricGroup = null

override def getInputActivityClock: RelativeClock = SystemClock.getInstance()
}
generated.newInstance(Thread.currentThread().getContextClassLoader, newReferences)
} else {
Expand Down

0 comments on commit 934007c

Please sign in to comment.