From f6694ca057ed01221968e4fdedec9dc4e3fc62e5 Mon Sep 17 00:00:00 2001 From: ifndef-SleePy Date: Mon, 30 Jan 2023 20:07:58 +0800 Subject: [PATCH] [FLINK-30798][runtime] Make OutputFormat support speculative execution through implementing SupportsConcurrentExecutionAttempts interface This closes #21785. --- .../LegacySinkTransformation.java | 12 +++ .../graph/StreamingJobGraphGeneratorTest.java | 65 ++++++++++++++++ .../SpeculativeSchedulerITCase.java | 77 +++++++++++++++++++ 3 files changed, 154 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java index 523bec12c3c71..29b2915160574 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java @@ -22,10 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -149,7 +152,16 @@ public boolean isSupportsConcurrentExecutionAttempts() { if (userFunction instanceof SupportsConcurrentExecutionAttempts) { return true; } + + if (userFunction instanceof OutputFormatSinkFunction) { + return ((OutputFormatSinkFunction) userFunction).getFormat() + instanceof SupportsConcurrentExecutionAttempts; + } } + } else if (operatorFactory instanceof OutputFormatOperatorFactory) { + final OutputFormat outputFormat = + ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat(); + return outputFormat instanceof SupportsConcurrentExecutionAttempts; } return false; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index ed8160bb7e3e2..53916f8120336 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -1840,6 +1840,45 @@ void testSinkFunctionSupportConcurrentExecutionAttempts() { new TestingSinkFunctionSupportConcurrentExecutionAttempts<>(), true); } + @Test + void testOutputFormatNotSupportConcurrentExecutionAttempts() { + testWhetherOutputFormatSupportsConcurrentExecutionAttempts( + new TestingOutputFormatNotSupportConcurrentExecutionAttempts<>(), false); + } + + @Test + void testOutputFormatSupportConcurrentExecutionAttempts() { + testWhetherOutputFormatSupportsConcurrentExecutionAttempts( + new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true); + } + + private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts( + OutputFormat outputFormat, boolean isSupported) { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + final DataStream source = env.fromElements(1, 2, 3).name("source"); + source.rebalance().writeUsingOutputFormat(outputFormat).name("sink"); + + final StreamGraph streamGraph = env.getStreamGraph(); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2); + for (JobVertex jobVertex : jobGraph.getVertices()) { + if (jobVertex.getName().contains("source")) { + assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue(); + } else if (jobVertex.getName().contains("sink")) { + if (isSupported) { + assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue(); + } else { + assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse(); + } + } else { + Assertions.fail("Unexpected job vertex " + jobVertex.getName()); + } + } + } + private static void testWhetherSinkFunctionSupportsConcurrentExecutionAttempts( SinkFunction function, boolean isSupported) { final StreamExecutionEnvironment env = @@ -1867,6 +1906,32 @@ private static void testWhetherSinkFunctionSupportsConcurrentExecutionAttempts( } } + private static class TestingOutputFormatNotSupportConcurrentExecutionAttempts + implements OutputFormat { + + @Override + public void configure(Configuration parameters) {} + + @Override + public void writeRecord(T record) throws IOException {} + + @Override + public void close() throws IOException {} + } + + private static class TestingOutputFormatSupportConcurrentExecutionAttempts + implements OutputFormat, SupportsConcurrentExecutionAttempts { + + @Override + public void configure(Configuration parameters) {} + + @Override + public void writeRecord(T record) throws IOException {} + + @Override + public void close() throws IOException {} + } + private static class TestingSinkFunctionNotSupportConcurrentExecutionAttempts implements SinkFunction { @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java index c1f9c936ee087..e352a0670cda7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java @@ -22,7 +22,9 @@ import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.io.FinalizeOnMaster; import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; @@ -200,6 +202,16 @@ public void testSpeculativeSlowSinkFunction() throws Exception { checkResults(); } + @Test + public void testSpeculativeOutputFormatSink() throws Exception { + executeJob(this::setupSlowOutputFormatSink); + waitUntilJobArchived(); + + checkResults(); + + assertThat(DummySpeculativeOutputFormat.foundSpeculativeAttempt).isTrue(); + } + private void checkResults() { final Map numberCountResultMap = numberCountResults.values().stream() @@ -364,6 +376,20 @@ private void setupSpeculativeSlowSinkFunction(StreamExecutionEnvironment env) { .slotSharingGroup("sinkGroup"); } + private void setupSlowOutputFormatSink(StreamExecutionEnvironment env) { + final DataStream source = + env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) + .setParallelism(parallelism) + .name("source") + .slotSharingGroup("group1"); + + source.rebalance() + .writeUsingOutputFormat(new DummySpeculativeOutputFormat()) + .setParallelism(parallelism) + .name("sink") + .slotSharingGroup("group3"); + } + private void addSink(DataStream dataStream) { dataStream .rebalance() @@ -644,6 +670,57 @@ public void finish() { } } + /** Outputs format which waits for the previous mapper. */ + private static class DummySpeculativeOutputFormat + implements OutputFormat, FinalizeOnMaster, SupportsConcurrentExecutionAttempts { + + private static final long serialVersionUID = 1L; + + private static volatile boolean foundSpeculativeAttempt; + + private int taskNumber; + + private boolean taskFailed; + + private final Map numberCountResult = new HashMap<>(); + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(InitializationContext context) throws IOException { + taskNumber = context.getTaskNumber(); + } + + @Override + public void writeRecord(Long value) throws IOException { + try { + numberCountResult.merge(value, 1L, Long::sum); + if (taskNumber == 0) { + maybeSleep(); + } + } catch (Throwable t) { + taskFailed = true; + } + } + + @Override + public void close() throws IOException { + if (!taskFailed) { + numberCountResults.put(taskNumber, numberCountResult); + } + } + + @Override + public void finalizeGlobal(FinalizationContext context) throws IOException { + for (int i = 0; i < context.getParallelism(); i++) { + if (context.getFinishedAttempt(i) != 0) { + foundSpeculativeAttempt = true; + } + } + } + } + private static void maybeSleep() { if (slowTaskCounter.getAndDecrement() > 0) { try {