Skip to content

Commit

Permalink
[FLINK-35031][runtime] LatencyMarker/RecordAttribute emitting under a…
Browse files Browse the repository at this point in the history
…sync execution model (apache#25503)
  • Loading branch information
fredia authored Oct 17, 2024
1 parent 0313711 commit ecb4115
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.asyncprocessing.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
Expand Down Expand Up @@ -46,6 +47,8 @@
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
Expand Down Expand Up @@ -324,6 +327,24 @@ public Object getCurrentKey() {
return currentProcessingContext.getKey();
}

// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------

@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
if (!isAsyncStateProcessingEnabled()) {
// If async state processing is disabled, fallback to the super class.
super.reportOrForwardLatencyMarker(marker);
return;
}
asyncExecutionController.processNonRecord(() -> super.reportOrForwardLatencyMarker(marker));
}

// ------------------------------------------------------------------------
// Watermark handling
// ------------------------------------------------------------------------

@Override
public void processWatermark(Watermark mark) throws Exception {
if (!isAsyncStateProcessingEnabled()) {
Expand All @@ -345,6 +366,60 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Excep
() -> super.processWatermarkStatus(watermarkStatus));
}

@Override
protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index)
throws Exception {
if (!isAsyncStateProcessingEnabled()) {
super.processWatermarkStatus(watermarkStatus, index);
return;
}
asyncExecutionController.processNonRecord(
() -> {
boolean wasIdle = combinedWatermark.isIdle();
// index is 0-based
if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
super.processWatermark(
new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
output.emitWatermarkStatus(watermarkStatus);
}
});
}

@Experimental
@Override
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
if (!isAsyncStateProcessingEnabled()) {
super.processRecordAttributes(recordAttributes);
return;
}
asyncExecutionController.processNonRecord(
() -> super.processRecordAttributes(recordAttributes));
}

@Experimental
@Override
public void processRecordAttributes1(RecordAttributes recordAttributes) {
if (!isAsyncStateProcessingEnabled()) {
super.processRecordAttributes1(recordAttributes);
return;
}
asyncExecutionController.processNonRecord(
() -> super.processRecordAttributes1(recordAttributes));
}

@Experimental
@Override
public void processRecordAttributes2(RecordAttributes recordAttributes) {
if (!isAsyncStateProcessingEnabled()) {
super.processRecordAttributes2(recordAttributes);
return;
}
asyncExecutionController.processNonRecord(
() -> super.processRecordAttributes2(recordAttributes));
}

@VisibleForTesting
AsyncExecutionController<?> getAsyncExecutionController() {
return asyncExecutionController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.function.ThrowingConsumer;
Expand Down Expand Up @@ -253,6 +255,22 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
(AsyncExecutionController<K>) asyncExecutionController);
}

// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
if (!isAsyncStateProcessingEnabled()) {
// If async state processing is disabled, fallback to the super class.
super.reportOrForwardLatencyMarker(marker);
return;
}
asyncExecutionController.processNonRecord(() -> super.reportOrForwardLatencyMarker(marker));
}

// ------------------------------------------------------------------------
// Watermark handling
// ------------------------------------------------------------------------
@Override
public void processWatermark(Watermark mark) throws Exception {
if (!isAsyncStateProcessingEnabled()) {
Expand Down Expand Up @@ -282,6 +300,17 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
});
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes, int inputId)
throws Exception {
if (!isAsyncStateProcessingEnabled()) {
super.processRecordAttributes(recordAttributes, inputId);
return;
}
asyncExecutionController.processNonRecord(
() -> super.processRecordAttributes(recordAttributes, inputId));
}

@VisibleForTesting
public AsyncExecutionController<?> getAsyncExecutionController() {
return asyncExecutionController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public abstract class AbstractStreamOperator<OUT>

protected transient Output<StreamRecord<OUT>> output;

private transient IndexedCombinedWatermarkStatus combinedWatermark;
protected transient IndexedCombinedWatermarkStatus combinedWatermark;

/** The runtime context for UDFs. */
private transient StreamingRuntimeContext runtimeContext;
Expand Down Expand Up @@ -689,7 +689,7 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Excep
output.emitWatermarkStatus(watermarkStatus);
}

private void processWatermarkStatus(WatermarkStatus watermarkStatus, int index)
protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index)
throws Exception {
boolean wasIdle = combinedWatermark.isIdle();
if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ protected void reportWatermark(Watermark mark, int inputId) throws Exception {
public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
throws Exception {
boolean wasIdle = combinedWatermark.isIdle();
// inputId is 1-based
if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
Expand All @@ -33,7 +34,9 @@
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

Expand Down Expand Up @@ -244,6 +247,75 @@ public void onProcessingTime(InternalTimer timer) throws Exception {}
}
}

@Test
void testNonRecordProcess() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
testOperator.processLatencyMarker(
new LatencyMarker(1234, new OperatorID(), 0));
} catch (Exception e) {
}
});

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(1);
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount())
.isGreaterThan(1);
assertThat(testOperator.getLatencyProcessed()).isEqualTo(0);

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.getLatencyProcessed()).isEqualTo(1);
}
}

@Test
void testWatermarkStatus() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
testOperator.processWatermarkStatus(new WatermarkStatus(0), 1);
} catch (Exception e) {
}
});

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(1);
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount())
.isGreaterThan(1);
assertThat(testOperator.watermarkIndex).isEqualTo(-1);
assertThat(testOperator.watermarkStatus.isIdle()).isTrue();

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.watermarkStatus.isActive()).isTrue();
assertThat(testOperator.watermarkIndex).isEqualTo(1);
}
}

/** A simple testing operator. */
private static class TestOperator extends AbstractAsyncStateStreamOperator<String>
implements OneInputStreamOperator<Tuple2<Integer, String>, String>,
Expand All @@ -255,8 +327,13 @@ private static class TestOperator extends AbstractAsyncStateStreamOperator<Strin

final AtomicInteger processed = new AtomicInteger(0);

final AtomicInteger latencyProcessed = new AtomicInteger(0);

final Object objectToWait = new Object();

private WatermarkStatus watermarkStatus = new WatermarkStatus(-1);
private int watermarkIndex = -1;

TestOperator(ElementOrder elementOrder) {
this.elementOrder = elementOrder;
}
Expand All @@ -279,6 +356,20 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
}
}

@Override
public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
super.processLatencyMarker(latencyMarker);
latencyProcessed.incrementAndGet();
}

@Override
protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index)
throws Exception {
super.processWatermarkStatus(watermarkStatus, index);
this.watermarkStatus = watermarkStatus;
this.watermarkIndex = index;
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {}

Expand All @@ -290,6 +381,10 @@ public int getProcessed() {
return processed.get();
}

public int getLatencyProcessed() {
return latencyProcessed.get();
}

public void proceed() {
synchronized (objectToWait) {
objectToWait.notify();
Expand Down
Loading

0 comments on commit ecb4115

Please sign in to comment.