Skip to content

Commit

Permalink
[FLINK-33202][runtime] SourceOperator send Watermark and RecordAttrib…
Browse files Browse the repository at this point in the history
…utes downstream when backlog status changed
  • Loading branch information
Sxnan authored and xintongsong committed Dec 19, 2023
1 parent 28c2d29 commit 89a8a16
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
Expand All @@ -56,6 +57,7 @@
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand Down Expand Up @@ -569,6 +571,14 @@ public void handleOperatorEvent(OperatorEvent event) {
sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
} else if (event instanceof NoMoreSplitsEvent) {
sourceReader.notifyNoMoreSplits();
} else if (event instanceof IsProcessingBacklogEvent) {
if (eventTimeLogic != null) {
eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());
}
output.emitRecordAttributes(
new RecordAttributesBuilder(Collections.emptyList())
.setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog())
.build());
} else {
throw new IllegalStateException("Received unexpected operator event " + event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public void stopPeriodicWatermarkEmits() {
// no periodic watermarks
}

@Override
public void emitImmediateWatermark(long wallClockTimestamp) {
// do nothing
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void startPeriodicWatermarkEmits() {

periodicEmitHandle =
timeService.scheduleWithFixedDelay(
this::triggerPeriodicEmit,
this::emitImmediateWatermark,
periodicWatermarkInterval,
periodicWatermarkInterval);
}
Expand All @@ -152,7 +152,8 @@ public void stopPeriodicWatermarkEmits() {
}
}

void triggerPeriodicEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
@Override
public void emitImmediateWatermark(@SuppressWarnings("unused") long wallClockTimestamp) {
if (currentPerSplitOutputs != null) {
currentPerSplitOutputs.emitPeriodicWatermark();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ ReaderOutput<T> createMainOutput(
/** Stops emitting periodic watermarks. */
void stopPeriodicWatermarkEmits();

/** Emit a watermark immediately. */
void emitImmediateWatermark(long wallClockTimestamp);

// ------------------------------------------------------------------------
// factories
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one

package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
Expand All @@ -26,12 +27,21 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.IsProcessingBacklogEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.util.CollectionUtil;

import org.junit.After;
Expand All @@ -40,11 +50,13 @@ Licensed to the Apache Software Foundation (ASF) under one

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -192,4 +204,74 @@ public void testNotifyCheckpointAborted() throws Exception {
operator.notifyCheckpointAborted(100L);
assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0));
}

@Test
public void testHandleBacklogEvent() throws Exception {
List<StreamElement> outputStreamElements = new ArrayList<>();
context =
new SourceOperatorTestContext(
false,
WatermarkStrategy.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element),
new CollectorOutput<>(outputStreamElements));
operator = context.getOperator();
operator.initializeState(context.createStateContext());
operator.open();

MockSourceSplit newSplit = new MockSourceSplit(2);
newSplit.addRecord(1);
newSplit.addRecord(1001);
operator.handleOperatorEvent(
new AddSplitEvent<>(
Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
final DataOutputToOutput<Integer> output = new DataOutputToOutput<>(operator.output);
operator.emitNext(output);
operator.handleOperatorEvent(new IsProcessingBacklogEvent(true));

operator.emitNext(output);
operator.handleOperatorEvent(new IsProcessingBacklogEvent(false));

assertThat(outputStreamElements)
.containsExactly(
new StreamRecord<>(1, 1),
new Watermark(0),
new RecordAttributes(true),
new StreamRecord<>(1001, 1001),
new Watermark(1000),
new RecordAttributes(false));
}

private static class DataOutputToOutput<T> implements PushingAsyncDataInput.DataOutput<T> {

private final Output<StreamRecord<T>> output;

DataOutputToOutput(Output<StreamRecord<T>> output) {
this.output = output;
}

@Override
public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
output.collect(streamRecord);
}

@Override
public void emitWatermark(Watermark watermark) throws Exception {
output.emitWatermark(watermark);
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
output.emitWatermarkStatus(watermarkStatus);
}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
output.emitLatencyMarker(latencyMarker);
}

@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
output.emitRecordAttributes(recordAttributes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
Expand Down Expand Up @@ -71,7 +72,14 @@ public SourceOperatorTestContext(boolean idle) throws Exception {

public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> watermarkStrategy)
throws Exception {
this(idle, watermarkStrategy, new MockOutput<>(new ArrayList<>()));
}

public SourceOperatorTestContext(
boolean idle,
WatermarkStrategy<Integer> watermarkStrategy,
Output<StreamRecord<Integer>> output)
throws Exception {
mockSourceReader = new MockSourceReader(idle, idle);
mockGateway = new MockOperatorEventGateway();
timeService = new TestProcessingTimeService();
Expand All @@ -88,7 +96,7 @@ public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> waterm
operator.setup(
new SourceOperatorStreamTask<Integer>(env),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()));
output);
operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend()));
}

Expand Down

0 comments on commit 89a8a16

Please sign in to comment.