diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java index e0d2f4ec8be3b..69a7eb6712657 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/DataStatistics.java @@ -25,8 +25,8 @@ * data processed in a job. Currently this method acts as an entry point only for obtaining cached * statistics. */ -public class DataStatistics -{ +public class DataStatistics { + private final Map baseStatisticsCache; // -------------------------------------------------------------------------------------------- diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/NonCachingDataStatistics.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/NonCachingDataStatistics.java new file mode 100644 index 0000000000000..6a45df90ed1a4 --- /dev/null +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/NonCachingDataStatistics.java @@ -0,0 +1,31 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.compiler; + +import eu.stratosphere.pact.common.io.statistics.BaseStatistics; + +/** + * A variant of the data statistics that never caches. + */ +public class NonCachingDataStatistics extends DataStatistics { + + public BaseStatistics getBaseStatistics(String inputIdentifier) { + return null; + } + + public void cacheBaseStatistics(BaseStatistics statistics, String identifyer) { + } +} diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java index 8d86990aa02e3..f3f1b27ddfcbb 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java @@ -575,8 +575,7 @@ public OptimizedPlan compile(Plan pactPlan, InstanceTypeDescription type) throws * Thrown, if the plan is invalid or the optimizer encountered an inconsistent * situation during the compilation process. */ - public OptimizedPlan compile(Plan pactPlan, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException - { + public OptimizedPlan compile(Plan pactPlan, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException { if (LOG.isDebugEnabled()) { LOG.debug("Beginning compilation of PACT program '" + pactPlan.getJobName() + '\''); } @@ -991,7 +990,7 @@ public void postVisit(Contract c) { } // add an outgoing connection to the root of the step function - PactConnection rootConn = new PactConnection(rootOfStepFunction, null); + PactConnection rootConn = new PactConnection(rootOfStepFunction, null, -1); rootOfStepFunction.addOutgoingConnection(rootConn); iterNode.setNextPartialSolution(rootOfStepFunction, rootConn); @@ -1044,9 +1043,9 @@ else if (n instanceof WorksetIterationNode) { } // add an outgoing connection to the root of the step function - PactConnection worksetRootConn = new PactConnection(nextWorksetNode, null); + PactConnection worksetRootConn = new PactConnection(nextWorksetNode, null, -1); nextWorksetNode.addOutgoingConnection(worksetRootConn); - PactConnection solutionSetDeltaRootConn = new PactConnection(solutionSetDeltaNode, null); + PactConnection solutionSetDeltaRootConn = new PactConnection(solutionSetDeltaNode, null, -1); solutionSetDeltaNode.addOutgoingConnection(solutionSetDeltaRootConn); iterNode.setPartialSolution(solutionSetNode, worksetNode); diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java index a62868336dda2..764bcc5833d3c 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/CostEstimator.java @@ -78,12 +78,26 @@ public void costOperator(PlanNode n) { for (Iterator channels = n.getInputs(); channels.hasNext(); ) { final Channel channel = channels.next(); final Costs costs = new Costs(); + + // Plans that apply the same strategies, but at different points + // are equally expensive. For example, if a partitioning can be + // pushed below a Map function there is often no difference in plan + // costs between the pushed down version and the version that partitions + // after the Mapper. However, in those cases, we want the expensive + // strategy to appear later in the plan, as data reduction often occurs + // by large factors, while blowup is rare and typically by smaller fractions. + // We achieve this by adding a penalty to small penalty to the FORWARD strategy, + // weighted by the current plan depth (steps to the earliest data source). + // that way, later FORWARDS are more expensive than earlier forwards. + // Note that this only applies to the heuristic costs. switch (channel.getShipStrategy()) { case NONE: throw new CompilerException( "Cannot determine costs: Shipping strategy has not been set for an input."); case FORWARD: +// costs.addHeuristicNetworkCost(channel.getMaxDepth()); + break; case PARTITION_LOCAL_HASH: break; case PARTITION_RANDOM: diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/Costs.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/Costs.java index b04821c9029f5..b8b0b2d3065d2 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/Costs.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/Costs.java @@ -99,7 +99,7 @@ public long getNetworkCost() { * The network cost to set, in bytes to be transferred. */ public void setNetworkCost(long bytes) { - if (bytes >= -1) { + if (bytes == UNKNOWN || bytes >= 0) { this.networkCost = bytes; } else { throw new IllegalArgumentException(); @@ -113,7 +113,7 @@ public void setNetworkCost(long bytes) { * @param bytes The network cost to add, in bytes to be transferred. */ public void addNetworkCost(long bytes) { - this.networkCost = (this.networkCost < 0 || bytes < 0) ? -1 : this.networkCost + bytes; + this.networkCost = (this.networkCost < 0 || bytes < 0) ? UNKNOWN : this.networkCost + bytes; } /** @@ -131,7 +131,7 @@ public long getDiskCost() { * @param bytes The disk cost to set, in bytes to be written and read. */ public void setDiskCost(long bytes) { - if (bytes >= -1) { + if (bytes == UNKNOWN || bytes >= 0) { this.diskCost = bytes; } else { throw new IllegalArgumentException(); @@ -146,7 +146,7 @@ public void setDiskCost(long bytes) { */ public void addDiskCost(long bytes) { this.diskCost = - (this.diskCost < 0 || bytes < 0) ? -1 : this.diskCost + bytes; + (this.diskCost < 0 || bytes < 0) ? UNKNOWN : this.diskCost + bytes; } /** @@ -164,7 +164,7 @@ public long getCpuCost() { * @param cost The CPU Cost. */ public void setCpuCost(long cost) { - if (cost >= -1) { + if (cost == UNKNOWN || cost >= 0) { this.cpuCost = cost; } else { throw new IllegalArgumentException(); @@ -178,7 +178,7 @@ public void setCpuCost(long cost) { */ public void addCpuCost(long cost) { this.cpuCost = - (this.cpuCost < 0 || cost < 0) ? -1 : this.cpuCost + cost; + (this.cpuCost < 0 || cost < 0) ? UNKNOWN : this.cpuCost + cost; } // -------------------------------------------------------------------------------------------- @@ -306,20 +306,20 @@ public void addHeuristicCpuCost(long cost) { */ public void addCosts(Costs other) { // ---------- quantifiable costs ---------- - if (this.networkCost == -1 || other.networkCost == -1) { - this.networkCost = -1; + if (this.networkCost == UNKNOWN || other.networkCost == UNKNOWN) { + this.networkCost = UNKNOWN; } else { this.networkCost += other.networkCost; } - if (this.diskCost == -1 || other.diskCost == -1) { - this.diskCost = -1; + if (this.diskCost == UNKNOWN || other.diskCost == UNKNOWN) { + this.diskCost = UNKNOWN; } else { this.diskCost += other.diskCost; } - if (this.cpuCost == -1 || other.cpuCost == -1) { - this.cpuCost = -1; + if (this.cpuCost == UNKNOWN || other.cpuCost == UNKNOWN) { + this.cpuCost = UNKNOWN; } else { this.cpuCost += other.cpuCost; } @@ -337,19 +337,19 @@ public void addCosts(Costs other) { * @param other The costs to subtract. */ public void subtractCosts(Costs other) { - if (this.networkCost != -1 && other.networkCost != -1) { + if (this.networkCost != UNKNOWN && other.networkCost != UNKNOWN) { this.networkCost -= other.networkCost; if (this.networkCost < 0) { throw new IllegalArgumentException("Cannot subtract more cost then there is."); } } - if (this.diskCost != -1 && other.diskCost != -1) { + if (this.diskCost != UNKNOWN && other.diskCost != UNKNOWN) { this.diskCost -= other.diskCost; if (this.diskCost < 0) { throw new IllegalArgumentException("Cannot subtract more cost then there is."); } } - if (this.cpuCost != -1 && other.cpuCost != -1) { + if (this.cpuCost != UNKNOWN && other.cpuCost != UNKNOWN) { this.cpuCost -= other.cpuCost; if (this.cpuCost < 0) { throw new IllegalArgumentException("Cannot subtract more cost then there is."); @@ -392,8 +392,10 @@ public void multiplyWith(int factor) { @Override public int compareTo(Costs o) { // check the network cost. if we have actual costs on both, use them, otherwise use the heuristic costs. - if (this.networkCost > -1 && o.networkCost > -1) { - return this.networkCost < o.networkCost ? -1 : this.networkCost > o.networkCost ? 1 : 0; + if (this.networkCost != UNKNOWN && o.networkCost != UNKNOWN) { + if (this.networkCost != o.networkCost) { + return this.networkCost < o.networkCost ? -1 : 1; + } } else if (this.heuristicNetworkCost < o.heuristicNetworkCost) { return -1; } else if (this.heuristicNetworkCost > o.heuristicNetworkCost) { @@ -401,8 +403,10 @@ public int compareTo(Costs o) { } // next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs. - if (this.diskCost > -1 && o.diskCost > -1) { - return this.diskCost < o.diskCost ? -1 : this.diskCost > o.diskCost ? 1 : 0; + if (this.diskCost != UNKNOWN && o.diskCost != UNKNOWN) { + if (this.diskCost != o.diskCost) { + return this.diskCost < o.diskCost ? -1 : 1; + } } else if (this.heuristicDiskCost < o.heuristicDiskCost) { return -1; } else if (this.heuristicDiskCost > o.heuristicDiskCost) { @@ -410,7 +414,7 @@ public int compareTo(Costs o) { } // next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs. - if (this.cpuCost > -1 && o.cpuCost > -1) { + if (this.cpuCost != UNKNOWN && o.cpuCost != UNKNOWN) { return this.cpuCost < o.cpuCost ? -1 : this.cpuCost > o.cpuCost ? 1 : 0; } else if (this.heuristicCpuCost < o.heuristicCpuCost) { return -1; diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java index 709c620e817c5..c1ace9552597b 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/costs/DefaultCostEstimator.java @@ -30,8 +30,8 @@ * of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still * choose to move tasks to different nodes, so that we do not know that no data is shipped. */ -public class DefaultCostEstimator extends CostEstimator -{ +public class DefaultCostEstimator extends CostEstimator { + /** * The case of the estimation for all relative costs. We heuristically pick a very large data volume, which * will favor strategies that are less expensive on large data volumes. This is robust and @@ -144,9 +144,9 @@ public void addLocalSelfNestedLoopCost(EstimateProvider estimates, long bufferSi public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs) { long bs = buildSideInput.getEstimatedOutputSize(); long ps = probeSideInput.getEstimatedOutputSize(); - // heuristic: half the table has to spill, times 2 I/O + if (bs > 0 && ps > 0) { - costs.addDiskCost(bs + ps); + costs.addDiskCost(2*bs + ps); } else { costs.setDiskCost(Costs.UNKNOWN); } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java index 5f120bce0c3ac..a5e64914b08bf 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/dataproperties/RequestedLocalProperties.java @@ -15,6 +15,8 @@ package eu.stratosphere.pact.compiler.dataproperties; +import java.util.Arrays; + import eu.stratosphere.pact.common.contract.Ordering; import eu.stratosphere.pact.common.util.FieldList; import eu.stratosphere.pact.common.util.FieldSet; @@ -192,7 +194,9 @@ public void parameterizeChannel(Channel channel) { if (this.ordering != null) { channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections()); } else if (this.groupedFields != null) { - channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields)); + boolean[] dirs = new boolean[this.groupedFields.size()]; + Arrays.fill(dirs, true); + channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields), dirs); } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java index e268322865300..bff315b975cc4 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/BinaryUnionNode.java @@ -42,8 +42,8 @@ public class BinaryUnionNode extends TwoInputNode { public BinaryUnionNode(OptimizerNode pred1, OptimizerNode pred2) { super(new UnionPlaceholderContract()); - this.input1 = new PactConnection(pred1, this); - this.input2 = new PactConnection(pred2, this); + this.input1 = new PactConnection(pred1, this, pred1.getMaxDepth() + 1); + this.input2 = new PactConnection(pred2, this, pred2.getMaxDepth() + 1); pred1.addOutgoingConnection(this.input1); pred2.addOutgoingConnection(this.input2); diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java index 2b9ff9a0078c9..32728bc913268 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/DataSinkNode.java @@ -142,10 +142,10 @@ public void setInputs(Map contractToNode) { final PactConnection conn; if (children.size() == 1) { pred = contractToNode.get(children.get(0)); - conn = new PactConnection(pred, this); + conn = new PactConnection(pred, this, pred.getMaxDepth() + 1); } else { pred = createdUnionCascade(children, contractToNode, null); - conn = new PactConnection(pred, this); + conn = new PactConnection(pred, this, pred.getMaxDepth() + 1); conn.setShipStrategy(ShipStrategyType.FORWARD); } // create the connection and add it diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java index f484777191f90..0671eaeaaf31a 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/MapNode.java @@ -25,8 +25,8 @@ /** * The optimizer's internal representation of a Map contract node. */ -public class MapNode extends SingleInputNode -{ +public class MapNode extends SingleInputNode { + /** * Creates a new MapNode for the given contract. * diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java index 4faadc6b162ef..a9df71c40170d 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/OptimizerNode.java @@ -405,6 +405,14 @@ public void identifyDynamicPath(int costWeight) { public int getCostWeight() { return this.costWeight; } + + public int getMaxDepth() { + int maxDepth = 0; + for (PactConnection conn : getIncomingConnections()) { + maxDepth = Math.max(maxDepth, conn.getMaxDepth()); + } + return maxDepth; + } /** * Gets the properties that are interesting for this node to produce. diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java index c8cb041617e3a..1f91e93a14f62 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/PactConnection.java @@ -30,8 +30,8 @@ * The connections are also used by the optimization algorithm to propagate interesting properties from the sinks in the * direction of the sources. */ -public class PactConnection implements EstimateProvider, DumpableConnection -{ +public class PactConnection implements EstimateProvider, DumpableConnection { + private final OptimizerNode source; // The source node of the connection private final OptimizerNode target; // The target node of the connection. @@ -41,6 +41,8 @@ public class PactConnection implements EstimateProvider, DumpableConnectionNONE. @@ -51,8 +53,8 @@ public class PactConnection implements EstimateProvider, DumpableConnection contractToNode) throws Compil PactConnection conn; if (children.size() == 1) { pred = contractToNode.get(children.get(0)); - conn = new PactConnection(pred, this); + conn = new PactConnection(pred, this, pred.getMaxDepth() + 1); if (preSet != null) { conn.setShipStrategy(preSet); } } else { pred = createdUnionCascade(children, contractToNode, preSet); - conn = new PactConnection(pred, this); + conn = new PactConnection(pred, this, pred.getMaxDepth() + 1); conn.setShipStrategy(ShipStrategyType.FORWARD); } // create the connection and add it diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SinkJoiner.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SinkJoiner.java index 99eac6cedf46c..7117e01304a00 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SinkJoiner.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/SinkJoiner.java @@ -35,8 +35,8 @@ public class SinkJoiner extends TwoInputNode public SinkJoiner(OptimizerNode input1, OptimizerNode input2) { super(new NoContract()); - PactConnection conn1 = new PactConnection(input1, this); - PactConnection conn2 = new PactConnection(input2, this); + PactConnection conn1 = new PactConnection(input1, this, -1); + PactConnection conn2 = new PactConnection(input2, this, -1); this.input1 = conn1; this.input2 = conn2; diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java index e73d713174212..4385611b3d467 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/TwoInputNode.java @@ -241,13 +241,13 @@ public void setInputs(Map contractToNode) { PactConnection conn1; if (leftPreds.size() == 1) { pred1 = contractToNode.get(leftPreds.get(0)); - conn1 = new PactConnection(pred1, this); + conn1 = new PactConnection(pred1, this, pred1.getMaxDepth() + 1); if (preSet1 != null) { conn1.setShipStrategy(preSet1); } } else { pred1 = createdUnionCascade(leftPreds, contractToNode, preSet1); - conn1 = new PactConnection(pred1, this); + conn1 = new PactConnection(pred1, this, pred1.getMaxDepth() + 1); conn1.setShipStrategy(ShipStrategyType.FORWARD); } // create the connection and add it @@ -258,13 +258,13 @@ public void setInputs(Map contractToNode) { PactConnection conn2; if (rightPreds.size() == 1) { pred2 = contractToNode.get(rightPreds.get(0)); - conn2 = new PactConnection(pred2, this); + conn2 = new PactConnection(pred2, this, pred2.getMaxDepth() + 1); if (preSet2 != null) { conn2.setShipStrategy(preSet2); } } else { pred2 = createdUnionCascade(rightPreds, contractToNode, preSet1); - conn2 = new PactConnection(pred2, this); + conn2 = new PactConnection(pred2, this, pred2.getMaxDepth() + 1); conn2.setShipStrategy(ShipStrategyType.FORWARD); } // create the connection and add it diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/WorksetIterationNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/WorksetIterationNode.java index b38c6066f36e4..e324a40573905 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/WorksetIterationNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/WorksetIterationNode.java @@ -33,7 +33,6 @@ import eu.stratosphere.pact.compiler.dataproperties.RequestedGlobalProperties; import eu.stratosphere.pact.compiler.dataproperties.RequestedLocalProperties; import eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual; -import eu.stratosphere.pact.compiler.operators.OperatorDescriptorDual.LocalPropertiesPair; import eu.stratosphere.pact.compiler.plan.candidate.Channel; import eu.stratosphere.pact.compiler.plan.candidate.DualInputPlanNode; import eu.stratosphere.pact.compiler.plan.candidate.PlanNode; diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java index 1ae6cba75e3aa..e93e5e3666c55 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/Channel.java @@ -35,8 +35,8 @@ /** * */ -public class Channel implements EstimateProvider, Cloneable, DumpableConnection -{ +public class Channel implements EstimateProvider, Cloneable, DumpableConnection { + private PlanNode source; private PlanNode target; @@ -496,6 +496,11 @@ public void swapUnionNodes(UnionPlanNode newUnionNode) { } // -------------------------------------------------------------------------------------------- + + public int getMaxDepth() { + return this.source.getOptimizerNode().getMaxDepth() + 1; + } + // -------------------------------------------------------------------------------------------- /* (non-Javadoc) diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/DualInputPlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/DualInputPlanNode.java index d6da90f2b7426..fae01f576fba4 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/DualInputPlanNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/DualInputPlanNode.java @@ -37,8 +37,8 @@ /** * */ -public class DualInputPlanNode extends PlanNode -{ +public class DualInputPlanNode extends PlanNode { + protected final Channel input1; protected final Channel input2; @@ -56,21 +56,18 @@ public class DualInputPlanNode extends PlanNode // -------------------------------------------------------------------------------------------- - public DualInputPlanNode(OptimizerNode template, Channel input1, Channel input2, DriverStrategy localStrategy) - { - this(template, input1, input2, localStrategy, null, null); + public DualInputPlanNode(OptimizerNode template, Channel input1, Channel input2, DriverStrategy localStrategy) { + this(template, input1, input2, localStrategy, null, null, null); } public DualInputPlanNode(OptimizerNode template, Channel input1, Channel input2, - DriverStrategy localStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2) - { - this(template, input1, input2, localStrategy, driverKeyFields1, driverKeyFields2, null); + DriverStrategy localStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2) { + this(template, input1, input2, localStrategy, driverKeyFields1, driverKeyFields2, + SingleInputPlanNode.getTrueArray(driverKeyFields1.size())); } - public DualInputPlanNode(OptimizerNode template, Channel input1, Channel input2, - DriverStrategy localStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2, - boolean[] driverSortOrders) - { + public DualInputPlanNode(OptimizerNode template, Channel input1, Channel input2, DriverStrategy localStrategy, + FieldList driverKeyFields1, FieldList driverKeyFields2, boolean[] driverSortOrders) { super(template, localStrategy); this.input1 = input1; this.input2 = input2; diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java index 5487042a05545..15502f7d0f18e 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/PlanNode.java @@ -42,8 +42,8 @@ * Hence, many methods will delegate to the {@code OptimizerNode} that represents the node this candidate was * created for. */ -public abstract class PlanNode implements Visitable, DumpableNode -{ +public abstract class PlanNode implements Visitable, DumpableNode { + protected final OptimizerNode template; protected final List outChannels; diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java index a58a8787f19aa..c2bf7ee46ac4d 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/plan/candidate/SingleInputPlanNode.java @@ -47,13 +47,13 @@ public class SingleInputPlanNode extends PlanNode // -------------------------------------------------------------------------------------------- public SingleInputPlanNode(OptimizerNode template, Channel input, DriverStrategy driverStrategy) { - this(template, input, driverStrategy, null); + this(template, input, driverStrategy, null, null); } public SingleInputPlanNode(OptimizerNode template, Channel input, DriverStrategy driverStrategy, FieldList driverKeyFields) { - this(template, input, driverStrategy, driverKeyFields, null); + this(template, input, driverStrategy, driverKeyFields, getTrueArray(driverKeyFields.size())); } public SingleInputPlanNode(OptimizerNode template, Channel input, @@ -215,4 +215,14 @@ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { return res; } } + + // -------------------------------------------------------------------------------------------- + + protected static boolean[] getTrueArray(int length) { + final boolean[] a = new boolean[length]; + for (int i = 0; i < length; i++) { + a[i] = true; + } + return a; + } } diff --git a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/examples/TPCHQuery3CompilerTest.java b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/examples/TPCHQuery3CompilerTest.java index fd1255d1cee5b..fc94c6151b738 100644 --- a/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/examples/TPCHQuery3CompilerTest.java +++ b/pact/pact-compiler/src/test/java/eu/stratosphere/pact/compiler/examples/TPCHQuery3CompilerTest.java @@ -70,7 +70,7 @@ public void testQueryNoStatistics() { // verify the optimizer choices checkStandardStrategies(filteringMapper, join, combiner, reducer, sink); Assert.assertTrue(checkRepartitionShipStrategies(join, reducer, combiner)); - Assert.assertTrue(checkHashJoinStrategies(join, reducer, true) || checkHashJoinStrategies(join, reducer, true)); + Assert.assertTrue(checkHashJoinStrategies(join, reducer, true) || checkHashJoinStrategies(join, reducer, false)); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -91,7 +91,7 @@ public void testQueryAnyValidPlan() { */ @Test public void testQueryWithStatsForBroadcastHash() { - testQueryGeneric(1024*1024*1024*1024L, 100*1024*1024*1024*1024L, true, false, true, false, false); + testQueryGeneric(1024l*1024*1024*1024, 100l*1024*1024*1024*1024, true, false, true, false, false); } /** @@ -99,7 +99,7 @@ public void testQueryWithStatsForBroadcastHash() { */ @Test public void testQueryWithStatsForRepartition() { - testQueryGeneric(100*1024*1024*1024*1024L, 100*1024*1024*1024*1024L, false, true, true, true, true); + testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, false, true, true, true, true); } // ------------------------------------------------------------------------