Skip to content

Commit

Permalink
[FLINK-953] Remove fake tail from iterations
Browse files Browse the repository at this point in the history
This closes apache#124
  • Loading branch information
StephanEwen committed Sep 21, 2014
1 parent 91cfbc5 commit 7861849
Show file tree
Hide file tree
Showing 10 changed files with 5 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import org.apache.flink.runtime.iterative.io.FakeOutputTask;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
Expand Down Expand Up @@ -1179,21 +1178,11 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
tailConfig.setIsWorksetUpdate();

// No following termination criterion
if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {
if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {

rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);

tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setParallelism(headVertex.getParallelism());
this.auxVertices.add(fakeTail);

// connect the fake tail
fakeTail.connectNewDataSetAsInput(rootOfStepFunctionVertex, DistributionPattern.POINTWISE);
}


Expand Down Expand Up @@ -1222,15 +1211,6 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
// Hack
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);

AbstractJobVertex fakeTailTerminationCriterion = new AbstractJobVertex("Fake Tail for Termination Criterion");
fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
fakeTailTerminationCriterion.setParallelism(headVertex.getParallelism());
this.auxVertices.add(fakeTailTerminationCriterion);

// connect the fake tail
fakeTailTerminationCriterion.connectNewDataSetAsInput(rootOfTerminationCriterionVertex, DistributionPattern.POINTWISE);

// tell the head that it needs to wait for the solution set updates
headConfig.setWaitForSolutionSetUpdate();
Expand Down Expand Up @@ -1345,16 +1325,6 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);

worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setParallelism(headVertex.getParallelism());
this.auxVertices.add(fakeTail);

// connect the fake tail
fakeTail.connectNewDataSetAsInput(nextWorksetVertex, DistributionPattern.POINTWISE);
}
}
{
Expand All @@ -1379,16 +1349,6 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);

solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setParallelism(headVertex.getParallelism());
this.auxVertices.add(fakeTail);

// connect the fake tail
fakeTail.connectNewDataSetAsInput(solutionDeltaVertex, DistributionPattern.POINTWISE);

// tell the head that it needs to wait for the solution set updates
headConfig.setWaitForSolutionSetUpdate();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1241,10 +1241,10 @@ public static void logAndThrowException(Exception ex, AbstractInvokable parent)
* @return The OutputCollector that data produced in this task is submitted to.
*/
public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<BufferWriter> eventualOutputs, int numOutputs)
throws Exception
throws Exception
{
if (numOutputs <= 0) {
throw new Exception("BUG: The task must have at least one output");
if (numOutputs == 0) {
return null;
}

// get the factory for the serializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ private static AbstractJobVertex createReducer(JobGraph jobGraph, int numSubTask
tailConfig.setSpillingThresholdInput(0, 0.9f);

// output
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(outputSerializer);

// the udf
Expand Down Expand Up @@ -284,8 +283,6 @@ private static JobGraph createJobGraph(String pointsPath, String centersPath, St

AbstractJobVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);

AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);

AbstractJobVertex sync = createSync(jobGraph, numIterations, numSubTasks);

OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
Expand All @@ -304,8 +301,6 @@ private static JobGraph createJobGraph(String pointsPath, String centersPath, St
JobGraphUtils.connect(mapper, reducer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);

JobGraphUtils.connect(reducer, fakeTailOutput, ChannelType.NETWORK, DistributionPattern.POINTWISE);

JobGraphUtils.connect(head, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);

JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
Expand All @@ -319,13 +314,11 @@ private static JobGraph createJobGraph(String pointsPath, String centersPath, St
head.setSlotSharingGroup(sharingGroup);
mapper.setSlotSharingGroup(sharingGroup);
reducer.setSlotSharingGroup(sharingGroup);
fakeTailOutput.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);

mapper.setStrictlyCoLocatedWith(head);
reducer.setStrictlyCoLocatedWith(head);
fakeTailOutput.setStrictlyCoLocatedWith(reducer);

return jobGraph;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultP
return output;
}

private static AbstractJobVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
return JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
}

private static AbstractJobVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
Expand Down Expand Up @@ -391,7 +387,6 @@ public JobGraph createJobGraphUnifiedTails(
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());

OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);

// --------------- the tail (solution set join) ---------------
Expand All @@ -411,7 +406,6 @@ public JobGraph createJobGraphUnifiedTails(
tailConfig.setInputSerializer(serializer, 0);

// output
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(serializer);

// the driver
Expand All @@ -435,7 +429,6 @@ public JobGraph createJobGraphUnifiedTails(
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);

JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);

Expand All @@ -447,11 +440,9 @@ public JobGraph createJobGraphUnifiedTails(
tail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
fakeTail.setSlotSharingGroup(sharingGroup);

intermediate.setStrictlyCoLocatedWith(head);
tail.setStrictlyCoLocatedWith(head);
fakeTail.setStrictlyCoLocatedWith(tail);

return jobGraph;
}
Expand Down Expand Up @@ -483,8 +474,6 @@ public JobGraph createJobGraphSeparateTails(

// output and auxiliaries
OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
AbstractJobVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);

// ------------------ the intermediate (ss join) ----------------------
Expand Down Expand Up @@ -532,7 +521,6 @@ public JobGraph createJobGraphSeparateTails(
ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);

// output
ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
ssTailConfig.setOutputSerializer(serializer);

// the driver
Expand All @@ -555,7 +543,6 @@ public JobGraph createJobGraphSeparateTails(
wsTailConfig.setInputSerializer(serializer, 0);

// output
wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
wsTailConfig.setOutputSerializer(serializer);

// the driver
Expand Down Expand Up @@ -584,9 +571,6 @@ public JobGraph createJobGraphSeparateTails(

JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);

SlotSharingGroup sharingGroup = new SlotSharingGroup();
Expand All @@ -599,15 +583,11 @@ public JobGraph createJobGraphSeparateTails(
ssTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
wsFakeTail.setSlotSharingGroup(sharingGroup);
ssFakeTail.setSlotSharingGroup(sharingGroup);

intermediate.setStrictlyCoLocatedWith(head);
ssJoinIntermediate.setStrictlyCoLocatedWith(head);
wsTail.setStrictlyCoLocatedWith(head);
ssTail.setStrictlyCoLocatedWith(head);
wsFakeTail.setStrictlyCoLocatedWith(wsTail);
ssFakeTail.setStrictlyCoLocatedWith(ssTail);

return jobGraph;
}
Expand Down Expand Up @@ -639,7 +619,6 @@ public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(

// output and auxiliaries
AbstractJobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);

// ------------------ the intermediate (ws update) ----------------------
Expand Down Expand Up @@ -685,7 +664,6 @@ public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(
ssTailConfig.setInputSerializer(serializer, 0);

// output
ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
ssTailConfig.setOutputSerializer(serializer);

// the driver
Expand All @@ -712,8 +690,6 @@ public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(

JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(ssTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);

SlotSharingGroup sharingGroup = new SlotSharingGroup();
Expand All @@ -725,12 +701,10 @@ public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(
ssTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
fakeTail.setSlotSharingGroup(sharingGroup);

intermediate.setStrictlyCoLocatedWith(head);
wsUpdateIntermediate.setStrictlyCoLocatedWith(head);
ssTail.setStrictlyCoLocatedWith(head);
fakeTail.setStrictlyCoLocatedWith(ssTail);

return jobGraph;
}
Expand Down Expand Up @@ -764,7 +738,6 @@ public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(

// output and auxiliaries
AbstractJobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);

// ------------------ the intermediate (ss update) ----------------------
Expand Down Expand Up @@ -808,7 +781,6 @@ public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(
wsTailConfig.setInputSerializer(serializer, 0);

// output
wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
wsTailConfig.setOutputSerializer(serializer);

// the driver
Expand All @@ -834,8 +806,6 @@ public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(

JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(wsTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);


Expand All @@ -848,12 +818,10 @@ public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(
wsTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
fakeTail.setSlotSharingGroup(sharingGroup);

intermediate.setStrictlyCoLocatedWith(head);
ssJoinIntermediate.setStrictlyCoLocatedWith(head);
wsTail.setStrictlyCoLocatedWith(head);
fakeTail.setStrictlyCoLocatedWith(wsTail);

return jobGraph;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSub
chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
chainedMapperConfig.setInputSerializer(serializer, 0);

chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
chainedMapperConfig.setOutputSerializer(serializer);

chainedMapperConfig.setIsWorksetUpdate();
Expand All @@ -220,9 +219,6 @@ private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSub
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
}

// - fake tail -------------------------------------------------------------------------------------------------
AbstractJobVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);

// - sync ------------------------------------------------------------------------------------------------------
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
Expand All @@ -241,8 +237,6 @@ private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSub

JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);

JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);

// --------------------------------------------------------------------------------------------------------------
// 3. INSTANCE SHARING
// --------------------------------------------------------------------------------------------------------------
Expand All @@ -252,12 +246,10 @@ private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSub
input.setSlotSharingGroup(sharingGroup);
head.setSlotSharingGroup(sharingGroup);
tail.setSlotSharingGroup(sharingGroup);
fakeTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);

tail.setStrictlyCoLocatedWith(head);
fakeTail.setStrictlyCoLocatedWith(tail);

return jobGraph;
}
Expand Down
Loading

0 comments on commit 7861849

Please sign in to comment.