Skip to content

Commit

Permalink
[FLINK-30798][runtime] Make OutputFormat support speculative executio…
Browse files Browse the repository at this point in the history
…n through implementing SupportsConcurrentExecutionAttempts interface

This closes apache#21785.
  • Loading branch information
ifndef-SleePy authored and zhuzhurk committed Jan 31, 2023
1 parent 642d28e commit f6694ca
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
Expand Down Expand Up @@ -149,7 +152,16 @@ public boolean isSupportsConcurrentExecutionAttempts() {
if (userFunction instanceof SupportsConcurrentExecutionAttempts) {
return true;
}

if (userFunction instanceof OutputFormatSinkFunction) {
return ((OutputFormatSinkFunction<?>) userFunction).getFormat()
instanceof SupportsConcurrentExecutionAttempts;
}
}
} else if (operatorFactory instanceof OutputFormatOperatorFactory) {
final OutputFormat<?> outputFormat =
((OutputFormatOperatorFactory<?, ?>) operatorFactory).getOutputFormat();
return outputFormat instanceof SupportsConcurrentExecutionAttempts;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,45 @@ void testSinkFunctionSupportConcurrentExecutionAttempts() {
new TestingSinkFunctionSupportConcurrentExecutionAttempts<>(), true);
}

@Test
void testOutputFormatNotSupportConcurrentExecutionAttempts() {
testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
new TestingOutputFormatNotSupportConcurrentExecutionAttempts<>(), false);
}

@Test
void testOutputFormatSupportConcurrentExecutionAttempts() {
testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true);
}

private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
OutputFormat<Integer> outputFormat, boolean isSupported) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

final DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
source.rebalance().writeUsingOutputFormat(outputFormat).name("sink");

final StreamGraph streamGraph = env.getStreamGraph();
final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
for (JobVertex jobVertex : jobGraph.getVertices()) {
if (jobVertex.getName().contains("source")) {
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
} else if (jobVertex.getName().contains("sink")) {
if (isSupported) {
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
} else {
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
}
} else {
Assertions.fail("Unexpected job vertex " + jobVertex.getName());
}
}
}

private static void testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(
SinkFunction<Integer> function, boolean isSupported) {
final StreamExecutionEnvironment env =
Expand Down Expand Up @@ -1867,6 +1906,32 @@ private static void testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(
}
}

private static class TestingOutputFormatNotSupportConcurrentExecutionAttempts<T>
implements OutputFormat<T> {

@Override
public void configure(Configuration parameters) {}

@Override
public void writeRecord(T record) throws IOException {}

@Override
public void close() throws IOException {}
}

private static class TestingOutputFormatSupportConcurrentExecutionAttempts<T>
implements OutputFormat<T>, SupportsConcurrentExecutionAttempts {

@Override
public void configure(Configuration parameters) {}

@Override
public void writeRecord(T record) throws IOException {}

@Override
public void close() throws IOException {}
}

private static class TestingSinkFunctionNotSupportConcurrentExecutionAttempts<T>
implements SinkFunction<T> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
Expand Down Expand Up @@ -200,6 +202,16 @@ public void testSpeculativeSlowSinkFunction() throws Exception {
checkResults();
}

@Test
public void testSpeculativeOutputFormatSink() throws Exception {
executeJob(this::setupSlowOutputFormatSink);
waitUntilJobArchived();

checkResults();

assertThat(DummySpeculativeOutputFormat.foundSpeculativeAttempt).isTrue();
}

private void checkResults() {
final Map<Long, Long> numberCountResultMap =
numberCountResults.values().stream()
Expand Down Expand Up @@ -364,6 +376,20 @@ private void setupSpeculativeSlowSinkFunction(StreamExecutionEnvironment env) {
.slotSharingGroup("sinkGroup");
}

private void setupSlowOutputFormatSink(StreamExecutionEnvironment env) {
final DataStream<Long> source =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(parallelism)
.name("source")
.slotSharingGroup("group1");

source.rebalance()
.writeUsingOutputFormat(new DummySpeculativeOutputFormat())
.setParallelism(parallelism)
.name("sink")
.slotSharingGroup("group3");
}

private void addSink(DataStream<Long> dataStream) {
dataStream
.rebalance()
Expand Down Expand Up @@ -644,6 +670,57 @@ public void finish() {
}
}

/** Outputs format which waits for the previous mapper. */
private static class DummySpeculativeOutputFormat
implements OutputFormat<Long>, FinalizeOnMaster, SupportsConcurrentExecutionAttempts {

private static final long serialVersionUID = 1L;

private static volatile boolean foundSpeculativeAttempt;

private int taskNumber;

private boolean taskFailed;

private final Map<Long, Long> numberCountResult = new HashMap<>();

@Override
public void configure(Configuration parameters) {}

@Override
public void open(InitializationContext context) throws IOException {
taskNumber = context.getTaskNumber();
}

@Override
public void writeRecord(Long value) throws IOException {
try {
numberCountResult.merge(value, 1L, Long::sum);
if (taskNumber == 0) {
maybeSleep();
}
} catch (Throwable t) {
taskFailed = true;
}
}

@Override
public void close() throws IOException {
if (!taskFailed) {
numberCountResults.put(taskNumber, numberCountResult);
}
}

@Override
public void finalizeGlobal(FinalizationContext context) throws IOException {
for (int i = 0; i < context.getParallelism(); i++) {
if (context.getFinishedAttempt(i) != 0) {
foundSpeculativeAttempt = true;
}
}
}
}

private static void maybeSleep() {
if (slowTaskCounter.getAndDecrement() > 0) {
try {
Expand Down

0 comments on commit f6694ca

Please sign in to comment.