Skip to content

Commit

Permalink
Added ITCase for Join with merge only as local strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthias Ringwald committed Jul 17, 2012
1 parent a937895 commit 753d199
Show file tree
Hide file tree
Showing 2 changed files with 332 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.test.pactPrograms;

import java.util.Collection;
import java.util.LinkedList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.test.testPrograms.mergeOnlyJoin.MergeOnlyJoin;
import eu.stratosphere.pact.test.util.TestBase;

@RunWith(Parameterized.class)
public class MergeOnlyJoinITCase extends TestBase {

private static final Log LOG = LogFactory.getLog(MergeOnlyJoinITCase.class);

private String input1Path = null;
private String input2Path = null;
private String resultPath = null;

private final String INPUT1 = "1|9|\n"
+ "2|8\n"
+ "3|7\n"
+ "5|5\n"
+ "6|4\n"
+ "7|3\n"
+ "4|6\n"
+ "8|2\n"
+ "2|1\n";

private final String INPUT2 = "2|2|\n"
+ "2|6|\n"
+ "2|1|\n"
+ "4|1|\n"
+ "5|1|\n"
+ "2|1|\n";


private final String EXPECTED_RESULT = "2|8|2\n"
+ "2|8|6\n"
+ "2|8|1\n"
+ "2|8|1\n"
+ "2|1|2\n"
+ "2|1|6\n"
+ "2|1|1\n"
+ "2|1|1\n"
+ "4|6|1\n"
+ "5|5|1\n";

public MergeOnlyJoinITCase(Configuration config) {
super(config);
}

@Override
protected void preSubmit() throws Exception {

input1Path = getFilesystemProvider().getTempDirPath() + "/input1";
input2Path = getFilesystemProvider().getTempDirPath() + "/input2";
resultPath = getFilesystemProvider().getTempDirPath() + "/result";

String[] splits = splitInputString(INPUT1, '\n', 4);
getFilesystemProvider().createDir(input1Path);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(input1Path + "/part_" + i + ".txt", splits[i]);
LOG.debug("Input 1 Part " + (i + 1) + ":\n>" + splits[i] + "<");
}

splits = splitInputString(INPUT2, '\n', 4);
getFilesystemProvider().createDir(input2Path);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(input2Path + "/part_" + i + ".txt", splits[i]);
LOG.debug("Input 2 Part " + (i + 1) + ":\n>" + splits[i] + "<");
}
}

@Override
protected JobGraph getJobGraph() throws Exception {

MergeOnlyJoin mergeOnlyJoin = new MergeOnlyJoin();
Plan plan = mergeOnlyJoin.getPlan(
config.getString("MergeOnlyJoinTest#NoSubtasks", "1"),
getFilesystemProvider().getURIPrefix()+input1Path,
getFilesystemProvider().getURIPrefix()+input2Path,
getFilesystemProvider().getURIPrefix()+resultPath,
config.getString("MergeOnlyJoinTest#NoSubtasksInput2", "1"));

PactCompiler pc = new PactCompiler();
OptimizedPlan op = pc.compile(plan);

JobGraphGenerator jgg = new JobGraphGenerator();
return jgg.compileJobGraph(op);
}

@Override
protected void postSubmit() throws Exception {

// Test results
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);

}

@Override
public void stopCluster() throws Exception {
getFilesystemProvider().delete(input1Path, true);
getFilesystemProvider().delete(input2Path, true);
getFilesystemProvider().delete(resultPath, true);
super.stopCluster();
}


@Parameters
public static Collection<Object[]> getConfigurations() {

LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();

Configuration config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 3);
tConfigs.add(config);

config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 4);
tConfigs.add(config);

config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 2);
tConfigs.add(config);

return toParameterList(tConfigs);
}

private String[] splitInputString(String inputString, char splitChar, int noSplits) {

String splitString = inputString.toString();
String[] splits = new String[noSplits];
int partitionSize = (splitString.length() / noSplits) - 2;

// split data file and copy parts
for (int i = 0; i < noSplits - 1; i++) {
int cutPos = splitString.indexOf(splitChar, (partitionSize < splitString.length() ? partitionSize
: (splitString.length() - 1)));
if (cutPos != -1) {
splits[i] = splitString.substring(0, cutPos) + "\n";
splitString = splitString.substring(cutPos + 1);
}
else {
splits[i] = "";
}

}
splits[noSplits - 1] = splitString;

return splits;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package eu.stratosphere.pact.test.testPrograms.mergeOnlyJoin;
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

import java.util.Iterator;

import eu.stratosphere.pact.common.contract.FileDataSink;
import eu.stratosphere.pact.common.contract.FileDataSource;
import eu.stratosphere.pact.common.contract.MatchContract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.io.RecordInputFormat;
import eu.stratosphere.pact.common.io.RecordOutputFormat;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.PlanAssembler;
import eu.stratosphere.pact.common.plan.PlanAssemblerDescription;
import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.common.stubs.MatchStub;
import eu.stratosphere.pact.common.stubs.ReduceStub;
import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsExcept;
import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsFirstExcept;
import eu.stratosphere.pact.common.stubs.StubAnnotation.OutCardBounds;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.parser.DecimalTextIntParser;

public class MergeOnlyJoin implements PlanAssembler, PlanAssemblerDescription {

@ConstantFieldsFirstExcept(fields={2})
@OutCardBounds(upperBound=1, lowerBound=1)
public static class JoinInputs extends MatchStub
{
@Override
public void match(PactRecord input1, PactRecord input2, Collector<PactRecord> out)
{
input1.setField(2, input2.getField(1, PactInteger.class));
out.collect(input1);
}
}

@ConstantFieldsExcept(fields={})
public static class DummyReduce extends ReduceStub
{

@Override
public void reduce(Iterator<PactRecord> values, Collector<PactRecord> out)
{
while (values.hasNext()) {
out.collect(values.next());
}
}

}

/**
* {@inheritDoc}
*/
@Override
public Plan getPlan(final String... args)
{
// parse program parameters
int noSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
String input1Path = (args.length > 1 ? args[1] : "");
String input2Path = (args.length > 2 ? args[2] : "");
String output = (args.length > 3 ? args[3] : "");
int noSubtasksInput2 = (args.length > 4 ? Integer.parseInt(args[4]) : 1);

// create DataSourceContract for Orders input
FileDataSource input1 = new FileDataSource(RecordInputFormat.class, input1Path, "Input 1");
input1.setDegreeOfParallelism(noSubtasks);

input1.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
input1.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
input1.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);

input1.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
input1.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);

input1.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class);
input1.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1);


ReduceContract aggInput1 = new ReduceContract(DummyReduce.class, PactInteger.class, 0, input1, "AggOrders");
aggInput1.setDegreeOfParallelism(noSubtasks);


// create DataSourceContract for Orders input
FileDataSource input2 = new FileDataSource(RecordInputFormat.class, input2Path, "Input 2");
input2.setDegreeOfParallelism(noSubtasksInput2);

input2.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n");
input2.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|");
input2.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2);
// order id
input2.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class);
input2.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0);
// ship prio
input2.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class);
input2.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1);

ReduceContract aggInput2 = new ReduceContract(DummyReduce.class, PactInteger.class, 0, input2, "AggLines");
aggInput2.setDegreeOfParallelism(noSubtasksInput2);

// create MatchContract for joining Orders and LineItems
MatchContract joinLiO = new MatchContract(JoinInputs.class, PactInteger.class, 0, 0, aggInput1, aggInput2, "JoinLiO");
joinLiO.setDegreeOfParallelism(noSubtasks);
// compiler hints


// create DataSinkContract for writing the result
FileDataSink result = new FileDataSink(RecordOutputFormat.class, output, joinLiO, "Output");
result.setDegreeOfParallelism(noSubtasks);
result.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
result.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
result.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true);
result.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3);
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class);
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class);
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactInteger.class);
result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2);

// assemble the PACT plan
Plan plan = new Plan(result, "Merge Only Join");
plan.setDefaultParallelism(noSubtasks);
return plan;
}

/**
* {@inheritDoc}
*/
@Override
public String getDescription() {
return "Parameters: [noSubTasks], [input], [input2], [output], [noSubTasksInput2]";
}

}

0 comments on commit 753d199

Please sign in to comment.