Skip to content

Commit

Permalink
Extended compiler tests for K-Means.
Browse files Browse the repository at this point in the history
Fixed Cost formulas to properly distinguish between sorted, hashed, and arbitrary materialization.
  • Loading branch information
sewen committed Apr 13, 2013
1 parent 15feaff commit bbb00dd
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ public class DelimitedInputFormatSamplingTest {

private static final int TEST_DATA_1_LINES = TEST_DATA1.split("\n").length;

private static final int TEST_DATA_2_LINES = TEST_DATA2.split("\n").length;
private static final int TEST_DATA_1_LINEWIDTH = TEST_DATA1.split("\n")[0].length();

private static final int TEST_DATA_2_LINEWIDTH = TEST_DATA2.split("\n")[0].length();

private static final int TOTAL_SIZE = TEST_DATA1.length() + TEST_DATA2.length();

private static final int DEFAULT_NUM_SAMPLES = 4;

Expand Down Expand Up @@ -174,10 +178,18 @@ public void testSamplingDirectory() {
format.configure(conf);
BaseStatistics stats = format.getStatistics(null);

final int numLines = TEST_DATA_1_LINES + TEST_DATA_2_LINES;
final float avgWidth = ((float) (TEST_DATA1.length() + TEST_DATA2.length())) / numLines;
Assert.assertTrue("Wrong record count.", stats.getNumberOfRecords() < numLines + 2 & stats.getNumberOfRecords() > numLines - 2);
Assert.assertTrue("Wrong avg record size.", stats.getAverageRecordWidth() < avgWidth + 1 & stats.getAverageRecordWidth() > avgWidth - 1);
final int maxNumLines = (int) Math.ceil(TOTAL_SIZE / ((double) Math.min(TEST_DATA_1_LINEWIDTH, TEST_DATA_2_LINEWIDTH)));
final int minNumLines = (int) (TOTAL_SIZE / ((double) Math.max(TEST_DATA_1_LINEWIDTH, TEST_DATA_2_LINEWIDTH)));
final float maxAvgWidth = ((float) (TOTAL_SIZE)) / minNumLines;
final float minAvgWidth = ((float) (TOTAL_SIZE)) / maxNumLines;

if (!(stats.getNumberOfRecords() <= maxNumLines & stats.getNumberOfRecords() >= minNumLines)) {
System.err.println("Records: " + stats.getNumberOfRecords() + " out of (" + minNumLines + ", " + maxNumLines + ").");
Assert.fail("Wrong record count.");
}
if (!(stats.getAverageRecordWidth() <= maxAvgWidth & stats.getAverageRecordWidth() >= minAvgWidth)) {
Assert.fail("Wrong avg record size.");
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class DefaultCostEstimator extends CostEstimator {
*/
private static final long HEURISTIC_COST_BASE = 10000000000l;

// The numbers for the CPU effort are rather magic at the moment and should be seen rather ordinal

private static final float MATERIALIZATION_CPU_FACTOR = 1;

private static final float HASHING_CPU_FACTOR = 4;

private static final float SORTING_CPU_FACTOR = 7;


// --------------------------------------------------------------------------------------------
// Shipping Strategy Cost
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -118,10 +127,13 @@ public void addLocalSortCost(EstimateProvider estimates, long availableMemory, C
// we assume a two phase merge sort, so all in all 2 I/O operations per block
if (s <= 0) {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
} else {
costs.addDiskCost(2 * s);
costs.addCpuCost((long) (s * SORTING_CPU_FACTOR));
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * SORTING_CPU_FACTOR));
}

@Override
Expand All @@ -147,10 +159,13 @@ public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider

if (bs > 0 && ps > 0) {
costs.addDiskCost(2*bs + ps);
costs.addCpuCost((long) ((2*bs + ps) * HASHING_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
}

@Override
Expand All @@ -163,12 +178,15 @@ public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProv
if (is > bufferSize) {
costs.addDiskCost(oc * is);
}
costs.addCpuCost((long) (oc * is * MATERIALIZATION_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}

// hack: assume 100k loops (should be expensive enough)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 100000 * MATERIALIZATION_CPU_FACTOR));
}

@Override
Expand All @@ -179,12 +197,15 @@ public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvide
if (is > 0 && os > 0) {
long loops = Math.max(os / blockSize, 1);
costs.addDiskCost(loops * is);
costs.addCpuCost((long) (loops * is * MATERIALIZATION_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}

// hack: assume 1k loops (much cheaper than the streamed variant!)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 1000 * MATERIALIZATION_CPU_FACTOR));
}

// --------------------------------------------------------------------------------------------
Expand All @@ -200,9 +221,12 @@ public void addArtificialDamCost(EstimateProvider estimates, long bufferSize, Co
// we assume spilling and re-reading
if (s <= 0) {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
} else {
costs.addDiskCost(2 * s);
costs.setCpuCost((long) (s * MATERIALIZATION_CPU_FACTOR));
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * MATERIALIZATION_CPU_FACTOR));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -79,6 +80,13 @@ public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) throws IOEx
}
}

public String getOptimizerPlanAsJSON(OptimizedPlan plan) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
dumpOptimizerPlanAsJSON(plan, pw);
return sw.toString();
}

public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter writer) {
Collection<SinkPlanNode> sinks = plan.getDataSinks();
if (sinks instanceof List) {
Expand Down
Loading

0 comments on commit bbb00dd

Please sign in to comment.