Skip to content

Commit

Permalink
[hotfix] Small code cleanups for Persistent Intermediate Results
Browse files Browse the repository at this point in the history
  - Make explicit that BlockingShuffleOutputFormat is only used for tagging, so its methods should never be invoked.
  - Add serialVersionUid for consistency with other formats
  - Move adjustment of JobGraph into separate helper method in JobGraphGenerator
  - Fix use of generics in JobGraphGeneratorTest
  - Make test more targeted, i.e., remove checks/assertions that are unrelated.
  • Loading branch information
StephanEwen committed Jun 15, 2019
1 parent 35fceea commit 8558548
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@Internal
public final class BlockingShuffleOutputFormat<T> implements OutputFormat<T> {

private static final long serialVersionUID = 1L;

private final AbstractID intermediateDataSetId;

private BlockingShuffleOutputFormat(AbstractID intermediateDataSetId) {
Expand All @@ -44,16 +46,24 @@ public static <T> BlockingShuffleOutputFormat<T> createOutputFormat(AbstractID i
}

@Override
public void configure(Configuration parameters) {}
public void configure(Configuration parameters) {
throw new UnsupportedOperationException();
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {}
public void open(int taskNumber, int numTasks) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void writeRecord(T record) throws IOException {}
public void writeRecord(T record) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
throw new UnsupportedOperationException();
}

public AbstractID getIntermediateDataSetId() {
return intermediateDataSetId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkState;

/**
* This component translates the optimizer's resulting {@link org.apache.flink.optimizer.plan.OptimizedPlan}
* to a {@link org.apache.flink.runtime.jobgraph.JobGraph}. The translation is not strictly a one-to-one,
Expand Down Expand Up @@ -484,36 +486,10 @@ public void postVisit(PlanNode node) {
}

// if this is a blocking shuffle vertex, we add one IntermediateDataSetID to its predecessor and return
if (node instanceof SinkPlanNode) {
Object userCodeObject = node.getProgramOperator().getUserCodeWrapper().getUserCodeObject();
if (userCodeObject instanceof BlockingShuffleOutputFormat) {
Iterable<Channel> inputIterable = node.getInputs();
if (inputIterable == null || inputIterable.iterator() == null ||
!inputIterable.iterator().hasNext()) {
throw new IllegalStateException("SinkPlanNode must have a input.");
}
PlanNode precedentNode = inputIterable.iterator().next().getSource();
JobVertex precedentVertex;
if (vertices.containsKey(precedentNode)) {
precedentVertex = vertices.get(precedentNode);
} else {
precedentVertex = chainedTasks.get(precedentNode).getContainingVertex();
}
if (precedentVertex == null) {
throw new IllegalStateException("Bug: Chained task has not been assigned its containing vertex when connecting.");
}
precedentVertex.createAndAddResultDataSet(
// use specified intermediateDataSetID
new IntermediateDataSetID(((BlockingShuffleOutputFormat) userCodeObject).getIntermediateDataSetId()),
ResultPartitionType.BLOCKING_PERSISTENT
);

// remove this node so the OutputFormatVertex will not shown in the final JobGraph.
vertices.remove(node);
return;
}
if (checkAndConfigurePersistentIntermediateResult(node)) {
return;
}

// check if we have an iteration. in that case, translate the step function now
if (node instanceof IterationPlanNode) {
// prevent nested iterations
Expand Down Expand Up @@ -1160,6 +1136,36 @@ private void assignLocalStrategyResources(Channel c, TaskConfig config, int inpu
}
}

private boolean checkAndConfigurePersistentIntermediateResult(PlanNode node) {
if (!(node instanceof SinkPlanNode)) {
return false;
}

final Object userCodeObject = node.getProgramOperator().getUserCodeWrapper().getUserCodeObject();
if (!(userCodeObject instanceof BlockingShuffleOutputFormat)) {
return false;
}

final Iterator<Channel> inputIterator = node.getInputs().iterator();
checkState(inputIterator.hasNext(), "SinkPlanNode must have a input.");

final PlanNode predecessorNode = inputIterator.next().getSource();
final JobVertex predecessorVertex = (vertices.containsKey(predecessorNode)) ?
vertices.get(predecessorNode) :
chainedTasks.get(predecessorNode).getContainingVertex();

checkState(predecessorVertex != null, "Bug: Chained task has not been assigned its containing vertex when connecting.");

predecessorVertex.createAndAddResultDataSet(
// use specified intermediateDataSetID
new IntermediateDataSetID(((BlockingShuffleOutputFormat) userCodeObject).getIntermediateDataSetId()),
ResultPartitionType.BLOCKING_PERSISTENT);

// remove this node so the OutputFormatVertex will not shown in the final JobGraph.
vertices.remove(node);
return true;
}

// ------------------------------------------------------------------------
// Connecting Vertices
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.BlockingShuffleOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
Expand All @@ -38,14 +36,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;

import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.AbstractID;

import org.hamcrest.Matchers;
Expand Down Expand Up @@ -275,7 +270,7 @@ public void testGeneratingJobGraphWithUnconsumedResultPartition() {
DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<>(1L, 2L))
.setParallelism(1);

DataSet ds = input.map((MapFunction<Tuple2<Long, Long>, Object>) value -> new Tuple2<>(value.f0 + 1, value.f1))
DataSet<Tuple2<Long, Long>> ds = input.map(new IdentityMapper<>())
.setParallelism(3);

AbstractID intermediateDataSetID = new AbstractID();
Expand All @@ -285,35 +280,22 @@ public void testGeneratingJobGraphWithUnconsumedResultPartition() {
.setParallelism(1);

// this is the normal output branch.
ds.output(new DiscardingOutputFormat())
ds.output(new DiscardingOutputFormat<>())
.setParallelism(1);

JobGraph jobGraph = compileJob(env);

Assert.assertEquals(3, jobGraph.getVerticesSortedTopologicallyFromSources().size());

JobVertex inputVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
JobVertex mapVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
JobVertex outputVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(2);

Assert.assertThat(inputVertex, Matchers.instanceOf(InputFormatVertex.class));
Assert.assertThat(mapVertex, Matchers.instanceOf(JobVertex.class));
Assert.assertThat(outputVertex, Matchers.instanceOf(OutputFormatVertex.class));

TaskConfig cfg = new TaskConfig(outputVertex.getConfiguration());
UserCodeWrapper<OutputFormat<?>> wrapper = cfg.getStubWrapper(this.getClass().getClassLoader());
OutputFormat<?> outputFormat = wrapper.getUserCodeObject(OutputFormat.class, this.getClass().getClassLoader());

// the only OutputFormatVertex is DiscardingOutputFormat
Assert.assertThat(outputFormat, Matchers.instanceOf(DiscardingOutputFormat.class));

// there are 2 output result with one of them is ResultPartitionType.BLOCKING_PERSISTENT
Assert.assertEquals(2, mapVertex.getProducedDataSets().size());

Assert.assertTrue(mapVertex.getProducedDataSets().stream()
.anyMatch(dataSet -> dataSet.getId().equals(new IntermediateDataSetID(intermediateDataSetID)) &&
dataSet.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT));

}

private static void assertState(DistributedCache.DistributedCacheEntry entry, boolean isExecutable, boolean isZipped) throws IOException {
Expand Down

0 comments on commit 8558548

Please sign in to comment.