Skip to content

Commit

Permalink
[FLINK-35886][task] Hide backpressure and watermark alignment from id…
Browse files Browse the repository at this point in the history
…leness detection in SourceOperator
  • Loading branch information
pnowojski committed Aug 17, 2024
1 parent 49eaba0 commit 02d89ca
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
Expand All @@ -52,6 +53,7 @@
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
Expand Down Expand Up @@ -197,6 +199,12 @@ private enum OperatingMode {

private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;

/**
* {@link PausableRelativeClock} tracking activity of the operator's main input. It's paused on
* backpressure. Note, each split output has its own independent {@link PausableRelativeClock}.
*/
private transient PausableRelativeClock mainInputActivityClock;

public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
Expand Down Expand Up @@ -322,6 +330,11 @@ public InternalSourceReaderMetricGroup getSourceMetricGroup() {

@Override
public void open() throws Exception {
mainInputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock());
TaskIOMetricGroup taskIOMetricGroup =
getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup();
taskIOMetricGroup.registerBackPressureListener(mainInputActivityClock);

initReader();

// in the future when we this one is migrated to the "eager initialization" operator
Expand All @@ -332,11 +345,14 @@ public void open() throws Exception {
watermarkStrategy,
sourceMetricGroup,
getProcessingTimeService(),
getExecutionConfig().getAutoWatermarkInterval());
getExecutionConfig().getAutoWatermarkInterval(),
mainInputActivityClock,
getProcessingTimeService().getClock(),
taskIOMetricGroup);
} else {
eventTimeLogic =
TimestampsAndWatermarks.createNoOpEventTimeLogic(
watermarkStrategy, sourceMetricGroup);
watermarkStrategy, sourceMetricGroup, mainInputActivityClock);
}

// restore the state if necessary.
Expand Down Expand Up @@ -396,6 +412,12 @@ public CompletableFuture<Void> stop(StopMode mode) {

@Override
public void close() throws Exception {
getContainingTask()
.getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.unregisterBackPressureListener(mainInputActivityClock);

if (sourceReader != null) {
sourceReader.close();
}
Expand Down Expand Up @@ -676,6 +698,7 @@ private void pauseOrResumeSplits(
Collection<String> splitsToPause, Collection<String> splitsToResume) {
try {
sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
eventTimeLogic.pauseOrResumeSplits(splitsToPause, splitsToResume);
} catch (UnsupportedOperationException e) {
if (!allowUnalignedSourceSplits) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;

import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -68,6 +70,10 @@ public void emitImmediateWatermark(long wallClockTimestamp) {
// do nothing
}

@Override
public void pauseOrResumeSplits(
Collection<String> splitsToPause, Collection<String> splitsToResume) {}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.RelativeClock;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand All @@ -53,12 +60,18 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater

private final WatermarkGeneratorSupplier<T> watermarksFactory;

private final WatermarkGeneratorSupplier.Context watermarksContext;
private final TimestampsAndWatermarksContextProvider watermarksContextProvider;

private final ProcessingTimeService timeService;

private final long periodicWatermarkInterval;

private final RelativeClock mainInputActivityClock;

private final Clock clock;

private final TaskIOMetricGroup taskIOMetricGroup;

@Nullable private SplitLocalOutputs<T> currentPerSplitOutputs;

@Nullable private StreamingReaderOutput<T> currentMainOutput;
Expand All @@ -68,14 +81,20 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
public ProgressiveTimestampsAndWatermarks(
TimestampAssigner<T> timestampAssigner,
WatermarkGeneratorSupplier<T> watermarksFactory,
WatermarkGeneratorSupplier.Context watermarksContext,
TimestampsAndWatermarksContextProvider watermarksContextProvider,
ProcessingTimeService timeService,
Duration periodicWatermarkInterval) {
Duration periodicWatermarkInterval,
RelativeClock mainInputActivityClock,
Clock clock,
TaskIOMetricGroup taskIOMetricGroup) {

this.timestampAssigner = timestampAssigner;
this.watermarksFactory = watermarksFactory;
this.watermarksContext = watermarksContext;
this.watermarksContextProvider = watermarksContextProvider;
this.timeService = timeService;
this.mainInputActivityClock = mainInputActivityClock;
this.clock = clock;
this.taskIOMetricGroup = taskIOMetricGroup;

long periodicWatermarkIntervalMillis;
try {
Expand Down Expand Up @@ -106,7 +125,8 @@ public ReaderOutput<T> createMainOutput(
IdlenessManager idlenessManager = new IdlenessManager(watermarkOutput);

final WatermarkGenerator<T> watermarkGenerator =
watermarksFactory.createWatermarkGenerator(watermarksContext);
watermarksFactory.createWatermarkGenerator(
watermarksContextProvider.create(mainInputActivityClock));

currentPerSplitOutputs =
new SplitLocalOutputs<>(
Expand All @@ -115,7 +135,9 @@ public ReaderOutput<T> createMainOutput(
watermarkUpdateListener,
timestampAssigner,
watermarksFactory,
watermarksContext);
watermarksContextProvider,
clock,
taskIOMetricGroup);

currentMainOutput =
new StreamingReaderOutput<>(
Expand Down Expand Up @@ -162,6 +184,12 @@ public void emitImmediateWatermark(@SuppressWarnings("unused") long wallClockTim
}
}

@Override
public void pauseOrResumeSplits(
Collection<String> splitsToPause, Collection<String> splitsToResume) {
currentPerSplitOutputs.pauseOrResumeSplits(splitsToPause, splitsToResume);
}

// ------------------------------------------------------------------------

private static final class StreamingReaderOutput<T> extends SourceOutputWithWatermarks<T>
Expand Down Expand Up @@ -203,25 +231,32 @@ private static final class SplitLocalOutputs<T> {

private final WatermarkOutputMultiplexer watermarkMultiplexer;
private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
private final Map<String, PausableRelativeClock> inputActivityClocks = new HashMap<>();
private final PushingAsyncDataInput.DataOutput<T> recordOutput;
private final TimestampAssigner<T> timestampAssigner;
private final WatermarkGeneratorSupplier<T> watermarksFactory;
private final WatermarkGeneratorSupplier.Context watermarkContext;
private final TimestampsAndWatermarksContextProvider watermarksContextProvider;
private final WatermarkUpdateListener watermarkUpdateListener;
private final Clock clock;
private final TaskIOMetricGroup taskIOMetricGroup;

private SplitLocalOutputs(
PushingAsyncDataInput.DataOutput<T> recordOutput,
WatermarkOutput watermarkOutput,
WatermarkUpdateListener watermarkUpdateListener,
TimestampAssigner<T> timestampAssigner,
WatermarkGeneratorSupplier<T> watermarksFactory,
WatermarkGeneratorSupplier.Context watermarkContext) {
TimestampsAndWatermarksContextProvider watermarksContextProvider,
Clock clock,
TaskIOMetricGroup taskIOMetricGroup) {

this.recordOutput = recordOutput;
this.timestampAssigner = timestampAssigner;
this.watermarksFactory = watermarksFactory;
this.watermarkContext = watermarkContext;
this.watermarksContextProvider = watermarksContextProvider;
this.watermarkUpdateListener = watermarkUpdateListener;
this.clock = clock;
this.taskIOMetricGroup = taskIOMetricGroup;

this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
this.localOutputs =
Expand All @@ -234,6 +269,7 @@ SourceOutput<T> createOutputForSplit(String splitId) {
return previous;
}

PausableRelativeClock inputActivityClock = createInputActivityClock(splitId);
watermarkMultiplexer.registerNewOutput(
splitId,
watermark ->
Expand All @@ -243,7 +279,8 @@ SourceOutput<T> createOutputForSplit(String splitId) {
final WatermarkOutput periodicOutput = watermarkMultiplexer.getDeferredOutput(splitId);

final WatermarkGenerator<T> watermarks =
watermarksFactory.createWatermarkGenerator(watermarkContext);
watermarksFactory.createWatermarkGenerator(
watermarksContextProvider.create(inputActivityClock));

final SourceOutputWithWatermarks<T> localOutput =
SourceOutputWithWatermarks.createWithSeparateOutputs(
Expand All @@ -257,9 +294,21 @@ SourceOutput<T> createOutputForSplit(String splitId) {
return localOutput;
}

private PausableRelativeClock createInputActivityClock(String splitId) {
// Dedicated inputActivityClock for a particular split. It will be paused both in case
// of back pressure and when split is paused due to watermark alignment.
PausableRelativeClock inputActivityClock = new PausableRelativeClock(clock);
inputActivityClocks.put(splitId, inputActivityClock);
taskIOMetricGroup.registerBackPressureListener(inputActivityClock);
return inputActivityClock;
}

void releaseOutputForSplit(String splitId) {
localOutputs.remove(splitId);
watermarkMultiplexer.unregisterOutput(splitId);
PausableRelativeClock inputActivityClock =
requireNonNull(inputActivityClocks.remove(splitId));
taskIOMetricGroup.unregisterBackPressureListener(inputActivityClock);
}

void emitPeriodicWatermark() {
Expand All @@ -273,6 +322,16 @@ void emitPeriodicWatermark() {
}
watermarkMultiplexer.onPeriodicEmit();
}

public void pauseOrResumeSplits(
Collection<String> splitsToPause, Collection<String> splitsToResume) {
for (String splitId : splitsToPause) {
inputActivityClocks.get(splitId).pause();
}
for (String splitId : splitsToResume) {
inputActivityClocks.get(splitId).unPause();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.RelativeClock;

import java.time.Duration;
import java.util.Collection;

/**
* Basic interface for the timestamp extraction and watermark generation logic for the {@link
Expand Down Expand Up @@ -83,6 +87,8 @@ ReaderOutput<T> createMainOutput(
/** Emit a watermark immediately. */
void emitImmediateWatermark(long wallClockTimestamp);

void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume);

// ------------------------------------------------------------------------
// factories
// ------------------------------------------------------------------------
Expand All @@ -91,27 +97,51 @@ static <E> TimestampsAndWatermarks<E> createProgressiveEventTimeLogic(
WatermarkStrategy<E> watermarkStrategy,
MetricGroup metrics,
ProcessingTimeService timeService,
long periodicWatermarkIntervalMillis) {
long periodicWatermarkIntervalMillis,
RelativeClock mainInputActivityClock,
Clock clock,
TaskIOMetricGroup taskIOMetricGroup) {

final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
final TimestampAssigner<E> timestampAssigner =
watermarkStrategy.createTimestampAssigner(context);
TimestampsAndWatermarksContextProvider contextProvider =
new TimestampsAndWatermarksContextProvider(metrics);
TimestampAssigner<E> timestampAssigner =
watermarkStrategy.createTimestampAssigner(
contextProvider.create(mainInputActivityClock));

return new ProgressiveTimestampsAndWatermarks<>(
timestampAssigner,
watermarkStrategy,
context,
contextProvider,
timeService,
Duration.ofMillis(periodicWatermarkIntervalMillis));
Duration.ofMillis(periodicWatermarkIntervalMillis),
mainInputActivityClock,
clock,
taskIOMetricGroup);
}

static <E> TimestampsAndWatermarks<E> createNoOpEventTimeLogic(
WatermarkStrategy<E> watermarkStrategy, MetricGroup metrics) {
WatermarkStrategy<E> watermarkStrategy,
MetricGroup metrics,
RelativeClock inputActivityClock) {

final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
final TimestampsAndWatermarksContext context =
new TimestampsAndWatermarksContext(metrics, inputActivityClock);
final TimestampAssigner<E> timestampAssigner =
watermarkStrategy.createTimestampAssigner(context);

return new NoOpTimestampsAndWatermarks<>(timestampAssigner);
}

@Internal
class TimestampsAndWatermarksContextProvider {
private final MetricGroup metrics;

public TimestampsAndWatermarksContextProvider(MetricGroup metrics) {
this.metrics = metrics;
}

public TimestampsAndWatermarksContext create(RelativeClock inputActivityClock) {
return new TimestampsAndWatermarksContext(metrics, inputActivityClock);
}
}
}
Loading

0 comments on commit 02d89ca

Please sign in to comment.