Skip to content

Commit

Permalink
Temp.
Browse files Browse the repository at this point in the history
  • Loading branch information
sewen committed Apr 8, 2013
1 parent 4c1a87c commit f0ff109
Show file tree
Hide file tree
Showing 21 changed files with 151 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
* data processed in a job. Currently this method acts as an entry point only for obtaining cached
* statistics.
*/
public class DataStatistics
{
public class DataStatistics {
private final Map<String, BaseStatistics> baseStatisticsCache;

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.compiler;

import eu.stratosphere.pact.common.io.statistics.BaseStatistics;

/**
* A variant of the data statistics that never caches.
*/
public class NonCachingDataStatistics extends DataStatistics {

public BaseStatistics getBaseStatistics(String inputIdentifier) {
return null;
}

public void cacheBaseStatistics(BaseStatistics statistics, String identifyer) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,7 @@ public OptimizedPlan compile(Plan pactPlan, InstanceTypeDescription type) throws
* Thrown, if the plan is invalid or the optimizer encountered an inconsistent
* situation during the compilation process.
*/
public OptimizedPlan compile(Plan pactPlan, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException
{
public OptimizedPlan compile(Plan pactPlan, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException {
if (LOG.isDebugEnabled()) {
LOG.debug("Beginning compilation of PACT program '" + pactPlan.getJobName() + '\'');
}
Expand Down Expand Up @@ -991,7 +990,7 @@ public void postVisit(Contract c) {
}

// add an outgoing connection to the root of the step function
PactConnection rootConn = new PactConnection(rootOfStepFunction, null);
PactConnection rootConn = new PactConnection(rootOfStepFunction, null, -1);
rootOfStepFunction.addOutgoingConnection(rootConn);

iterNode.setNextPartialSolution(rootOfStepFunction, rootConn);
Expand Down Expand Up @@ -1044,9 +1043,9 @@ else if (n instanceof WorksetIterationNode) {
}

// add an outgoing connection to the root of the step function
PactConnection worksetRootConn = new PactConnection(nextWorksetNode, null);
PactConnection worksetRootConn = new PactConnection(nextWorksetNode, null, -1);
nextWorksetNode.addOutgoingConnection(worksetRootConn);
PactConnection solutionSetDeltaRootConn = new PactConnection(solutionSetDeltaNode, null);
PactConnection solutionSetDeltaRootConn = new PactConnection(solutionSetDeltaNode, null, -1);
solutionSetDeltaNode.addOutgoingConnection(solutionSetDeltaRootConn);

iterNode.setPartialSolution(solutionSetNode, worksetNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,26 @@ public void costOperator(PlanNode n) {
for (Iterator<Channel> channels = n.getInputs(); channels.hasNext(); ) {
final Channel channel = channels.next();
final Costs costs = new Costs();

// Plans that apply the same strategies, but at different points
// are equally expensive. For example, if a partitioning can be
// pushed below a Map function there is often no difference in plan
// costs between the pushed down version and the version that partitions
// after the Mapper. However, in those cases, we want the expensive
// strategy to appear later in the plan, as data reduction often occurs
// by large factors, while blowup is rare and typically by smaller fractions.
// We achieve this by adding a penalty to small penalty to the FORWARD strategy,
// weighted by the current plan depth (steps to the earliest data source).
// that way, later FORWARDS are more expensive than earlier forwards.
// Note that this only applies to the heuristic costs.

switch (channel.getShipStrategy()) {
case NONE:
throw new CompilerException(
"Cannot determine costs: Shipping strategy has not been set for an input.");
case FORWARD:
// costs.addHeuristicNetworkCost(channel.getMaxDepth());
break;
case PARTITION_LOCAL_HASH:
break;
case PARTITION_RANDOM:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public long getNetworkCost() {
* The network cost to set, in bytes to be transferred.
*/
public void setNetworkCost(long bytes) {
if (bytes >= -1) {
if (bytes == UNKNOWN || bytes >= 0) {
this.networkCost = bytes;
} else {
throw new IllegalArgumentException();
Expand All @@ -113,7 +113,7 @@ public void setNetworkCost(long bytes) {
* @param bytes The network cost to add, in bytes to be transferred.
*/
public void addNetworkCost(long bytes) {
this.networkCost = (this.networkCost < 0 || bytes < 0) ? -1 : this.networkCost + bytes;
this.networkCost = (this.networkCost < 0 || bytes < 0) ? UNKNOWN : this.networkCost + bytes;
}

/**
Expand All @@ -131,7 +131,7 @@ public long getDiskCost() {
* @param bytes The disk cost to set, in bytes to be written and read.
*/
public void setDiskCost(long bytes) {
if (bytes >= -1) {
if (bytes == UNKNOWN || bytes >= 0) {
this.diskCost = bytes;
} else {
throw new IllegalArgumentException();
Expand All @@ -146,7 +146,7 @@ public void setDiskCost(long bytes) {
*/
public void addDiskCost(long bytes) {
this.diskCost =
(this.diskCost < 0 || bytes < 0) ? -1 : this.diskCost + bytes;
(this.diskCost < 0 || bytes < 0) ? UNKNOWN : this.diskCost + bytes;
}

/**
Expand All @@ -164,7 +164,7 @@ public long getCpuCost() {
* @param cost The CPU Cost.
*/
public void setCpuCost(long cost) {
if (cost >= -1) {
if (cost == UNKNOWN || cost >= 0) {
this.cpuCost = cost;
} else {
throw new IllegalArgumentException();
Expand All @@ -178,7 +178,7 @@ public void setCpuCost(long cost) {
*/
public void addCpuCost(long cost) {
this.cpuCost =
(this.cpuCost < 0 || cost < 0) ? -1 : this.cpuCost + cost;
(this.cpuCost < 0 || cost < 0) ? UNKNOWN : this.cpuCost + cost;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -306,20 +306,20 @@ public void addHeuristicCpuCost(long cost) {
*/
public void addCosts(Costs other) {
// ---------- quantifiable costs ----------
if (this.networkCost == -1 || other.networkCost == -1) {
this.networkCost = -1;
if (this.networkCost == UNKNOWN || other.networkCost == UNKNOWN) {
this.networkCost = UNKNOWN;
} else {
this.networkCost += other.networkCost;
}

if (this.diskCost == -1 || other.diskCost == -1) {
this.diskCost = -1;
if (this.diskCost == UNKNOWN || other.diskCost == UNKNOWN) {
this.diskCost = UNKNOWN;
} else {
this.diskCost += other.diskCost;
}

if (this.cpuCost == -1 || other.cpuCost == -1) {
this.cpuCost = -1;
if (this.cpuCost == UNKNOWN || other.cpuCost == UNKNOWN) {
this.cpuCost = UNKNOWN;
} else {
this.cpuCost += other.cpuCost;
}
Expand All @@ -337,19 +337,19 @@ public void addCosts(Costs other) {
* @param other The costs to subtract.
*/
public void subtractCosts(Costs other) {
if (this.networkCost != -1 && other.networkCost != -1) {
if (this.networkCost != UNKNOWN && other.networkCost != UNKNOWN) {
this.networkCost -= other.networkCost;
if (this.networkCost < 0) {
throw new IllegalArgumentException("Cannot subtract more cost then there is.");
}
}
if (this.diskCost != -1 && other.diskCost != -1) {
if (this.diskCost != UNKNOWN && other.diskCost != UNKNOWN) {
this.diskCost -= other.diskCost;
if (this.diskCost < 0) {
throw new IllegalArgumentException("Cannot subtract more cost then there is.");
}
}
if (this.cpuCost != -1 && other.cpuCost != -1) {
if (this.cpuCost != UNKNOWN && other.cpuCost != UNKNOWN) {
this.cpuCost -= other.cpuCost;
if (this.cpuCost < 0) {
throw new IllegalArgumentException("Cannot subtract more cost then there is.");
Expand Down Expand Up @@ -392,25 +392,29 @@ public void multiplyWith(int factor) {
@Override
public int compareTo(Costs o) {
// check the network cost. if we have actual costs on both, use them, otherwise use the heuristic costs.
if (this.networkCost > -1 && o.networkCost > -1) {
return this.networkCost < o.networkCost ? -1 : this.networkCost > o.networkCost ? 1 : 0;
if (this.networkCost != UNKNOWN && o.networkCost != UNKNOWN) {
if (this.networkCost != o.networkCost) {
return this.networkCost < o.networkCost ? -1 : 1;
}
} else if (this.heuristicNetworkCost < o.heuristicNetworkCost) {
return -1;
} else if (this.heuristicNetworkCost > o.heuristicNetworkCost) {
return 1;
}

// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
if (this.diskCost > -1 && o.diskCost > -1) {
return this.diskCost < o.diskCost ? -1 : this.diskCost > o.diskCost ? 1 : 0;
if (this.diskCost != UNKNOWN && o.diskCost != UNKNOWN) {
if (this.diskCost != o.diskCost) {
return this.diskCost < o.diskCost ? -1 : 1;
}
} else if (this.heuristicDiskCost < o.heuristicDiskCost) {
return -1;
} else if (this.heuristicDiskCost > o.heuristicDiskCost) {
return 1;
}

// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
if (this.cpuCost > -1 && o.cpuCost > -1) {
if (this.cpuCost != UNKNOWN && o.cpuCost != UNKNOWN) {
return this.cpuCost < o.cpuCost ? -1 : this.cpuCost > o.cpuCost ? 1 : 0;
} else if (this.heuristicCpuCost < o.heuristicCpuCost) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
* of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still
* choose to move tasks to different nodes, so that we do not know that no data is shipped.
*/
public class DefaultCostEstimator extends CostEstimator
{
public class DefaultCostEstimator extends CostEstimator {
/**
* The case of the estimation for all relative costs. We heuristically pick a very large data volume, which
* will favor strategies that are less expensive on large data volumes. This is robust and
Expand Down Expand Up @@ -144,9 +144,9 @@ public void addLocalSelfNestedLoopCost(EstimateProvider estimates, long bufferSi
public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs) {
long bs = buildSideInput.getEstimatedOutputSize();
long ps = probeSideInput.getEstimatedOutputSize();
// heuristic: half the table has to spill, times 2 I/O

if (bs > 0 && ps > 0) {
costs.addDiskCost(bs + ps);
costs.addDiskCost(2*bs + ps);
} else {
costs.setDiskCost(Costs.UNKNOWN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package eu.stratosphere.pact.compiler.dataproperties;

import java.util.Arrays;

import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
Expand Down Expand Up @@ -192,7 +194,9 @@ public void parameterizeChannel(Channel channel) {
if (this.ordering != null) {
channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
} else if (this.groupedFields != null) {
channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields));
boolean[] dirs = new boolean[this.groupedFields.size()];
Arrays.fill(dirs, true);
channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields), dirs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class BinaryUnionNode extends TwoInputNode {
public BinaryUnionNode(OptimizerNode pred1, OptimizerNode pred2) {
super(new UnionPlaceholderContract());

this.input1 = new PactConnection(pred1, this);
this.input2 = new PactConnection(pred2, this);
this.input1 = new PactConnection(pred1, this, pred1.getMaxDepth() + 1);
this.input2 = new PactConnection(pred2, this, pred2.getMaxDepth() + 1);

pred1.addOutgoingConnection(this.input1);
pred2.addOutgoingConnection(this.input2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ public void setInputs(Map<Contract, OptimizerNode> contractToNode) {
final PactConnection conn;
if (children.size() == 1) {
pred = contractToNode.get(children.get(0));
conn = new PactConnection(pred, this);
conn = new PactConnection(pred, this, pred.getMaxDepth() + 1);
} else {
pred = createdUnionCascade(children, contractToNode, null);
conn = new PactConnection(pred, this);
conn = new PactConnection(pred, this, pred.getMaxDepth() + 1);
conn.setShipStrategy(ShipStrategyType.FORWARD);
}
// create the connection and add it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
/**
* The optimizer's internal representation of a <i>Map</i> contract node.
*/
public class MapNode extends SingleInputNode
{
public class MapNode extends SingleInputNode {
/**
* Creates a new MapNode for the given contract.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,14 @@ public void identifyDynamicPath(int costWeight) {
public int getCostWeight() {
return this.costWeight;
}

public int getMaxDepth() {
int maxDepth = 0;
for (PactConnection conn : getIncomingConnections()) {
maxDepth = Math.max(maxDepth, conn.getMaxDepth());
}
return maxDepth;
}

/**
* Gets the properties that are interesting for this node to produce.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
* The connections are also used by the optimization algorithm to propagate interesting properties from the sinks in the
* direction of the sources.
*/
public class PactConnection implements EstimateProvider, DumpableConnection<OptimizerNode>
{
public class PactConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
private final OptimizerNode source; // The source node of the connection

private final OptimizerNode target; // The target node of the connection.
Expand All @@ -41,6 +41,8 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
private ShipStrategyType shipStrategy; // The data distribution strategy, if preset

private TempMode materializationMode = TempMode.NONE;

private final int maxDepth;

/**
* Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
Expand All @@ -51,8 +53,8 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
* @param target
* The target node.
*/
public PactConnection(OptimizerNode source, OptimizerNode target) {
this(source, target, null);
public PactConnection(OptimizerNode source, OptimizerNode target, int maxDepth) {
this(source, target, null, maxDepth);
}

/**
Expand All @@ -65,14 +67,14 @@ public PactConnection(OptimizerNode source, OptimizerNode target) {
* @param shipStrategy
* The shipping strategy.
*/
public PactConnection(OptimizerNode source, OptimizerNode target, ShipStrategyType shipStrategy) {
public PactConnection(OptimizerNode source, OptimizerNode target, ShipStrategyType shipStrategy, int maxDepth) {
if (source == null) {
throw new NullPointerException("Source must not be null.");
}

this.source = source;
this.target = target;
this.shipStrategy = shipStrategy;
this.maxDepth = maxDepth;
}

/**
Expand Down Expand Up @@ -139,6 +141,10 @@ public void setInterestingProperties(InterestingProperties props) {
public void clearInterestingProperties() {
this.interestingProps = null;
}

public int getMaxDepth() {
return this.maxDepth;
}

// --------------------------------------------------------------------------------------------

Expand Down
Loading

0 comments on commit f0ff109

Please sign in to comment.