From 753d19916666fb49d773e265660668037c074214 Mon Sep 17 00:00:00 2001 From: Matthias Ringwald Date: Tue, 17 Jul 2012 15:50:02 +0200 Subject: [PATCH] Added ITCase for Join with merge only as local strategy --- .../pactPrograms/MergeOnlyJoinITCase.java | 183 ++++++++++++++++++ .../mergeOnlyJoin/MergeOnlyJoin.java | 149 ++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java create mode 100644 pact/pact-tests/src/test/java/eu/stratosphere/pact/test/testPrograms/mergeOnlyJoin/MergeOnlyJoin.java diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java new file mode 100644 index 0000000000000..e703a0aad4af3 --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/pactPrograms/MergeOnlyJoinITCase.java @@ -0,0 +1,183 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.test.pactPrograms; + +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.pact.common.plan.Plan; +import eu.stratosphere.pact.compiler.PactCompiler; +import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator; +import eu.stratosphere.pact.compiler.plan.OptimizedPlan; +import eu.stratosphere.pact.test.testPrograms.mergeOnlyJoin.MergeOnlyJoin; +import eu.stratosphere.pact.test.util.TestBase; + +@RunWith(Parameterized.class) +public class MergeOnlyJoinITCase extends TestBase { + + private static final Log LOG = LogFactory.getLog(MergeOnlyJoinITCase.class); + + private String input1Path = null; + private String input2Path = null; + private String resultPath = null; + + private final String INPUT1 = "1|9|\n" + + "2|8\n" + + "3|7\n" + + "5|5\n" + + "6|4\n" + + "7|3\n" + + "4|6\n" + + "8|2\n" + + "2|1\n"; + + private final String INPUT2 = "2|2|\n" + + "2|6|\n" + + "2|1|\n" + + "4|1|\n" + + "5|1|\n" + + "2|1|\n"; + + + private final String EXPECTED_RESULT = "2|8|2\n" + + "2|8|6\n" + + "2|8|1\n" + + "2|8|1\n" + + "2|1|2\n" + + "2|1|6\n" + + "2|1|1\n" + + "2|1|1\n" + + "4|6|1\n" + + "5|5|1\n"; + + public MergeOnlyJoinITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + + input1Path = getFilesystemProvider().getTempDirPath() + "/input1"; + input2Path = getFilesystemProvider().getTempDirPath() + "/input2"; + resultPath = getFilesystemProvider().getTempDirPath() + "/result"; + + String[] splits = splitInputString(INPUT1, '\n', 4); + getFilesystemProvider().createDir(input1Path); + for (int i = 0; i < splits.length; i++) { + getFilesystemProvider().createFile(input1Path + "/part_" + i + ".txt", splits[i]); + LOG.debug("Input 1 Part " + (i + 1) + ":\n>" + splits[i] + "<"); + } + + splits = splitInputString(INPUT2, '\n', 4); + getFilesystemProvider().createDir(input2Path); + for (int i = 0; i < splits.length; i++) { + getFilesystemProvider().createFile(input2Path + "/part_" + i + ".txt", splits[i]); + LOG.debug("Input 2 Part " + (i + 1) + ":\n>" + splits[i] + "<"); + } + } + + @Override + protected JobGraph getJobGraph() throws Exception { + + MergeOnlyJoin mergeOnlyJoin = new MergeOnlyJoin(); + Plan plan = mergeOnlyJoin.getPlan( + config.getString("MergeOnlyJoinTest#NoSubtasks", "1"), + getFilesystemProvider().getURIPrefix()+input1Path, + getFilesystemProvider().getURIPrefix()+input2Path, + getFilesystemProvider().getURIPrefix()+resultPath, + config.getString("MergeOnlyJoinTest#NoSubtasksInput2", "1")); + + PactCompiler pc = new PactCompiler(); + OptimizedPlan op = pc.compile(plan); + + JobGraphGenerator jgg = new JobGraphGenerator(); + return jgg.compileJobGraph(op); + } + + @Override + protected void postSubmit() throws Exception { + + // Test results + compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); + + } + + @Override + public void stopCluster() throws Exception { + getFilesystemProvider().delete(input1Path, true); + getFilesystemProvider().delete(input2Path, true); + getFilesystemProvider().delete(resultPath, true); + super.stopCluster(); + } + + + @Parameters + public static Collection getConfigurations() { + + LinkedList tConfigs = new LinkedList(); + + Configuration config = new Configuration(); + config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3); + config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 3); + tConfigs.add(config); + + config = new Configuration(); + config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3); + config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 4); + tConfigs.add(config); + + config = new Configuration(); + config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3); + config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 2); + tConfigs.add(config); + + return toParameterList(tConfigs); + } + + private String[] splitInputString(String inputString, char splitChar, int noSplits) { + + String splitString = inputString.toString(); + String[] splits = new String[noSplits]; + int partitionSize = (splitString.length() / noSplits) - 2; + + // split data file and copy parts + for (int i = 0; i < noSplits - 1; i++) { + int cutPos = splitString.indexOf(splitChar, (partitionSize < splitString.length() ? partitionSize + : (splitString.length() - 1))); + if (cutPos != -1) { + splits[i] = splitString.substring(0, cutPos) + "\n"; + splitString = splitString.substring(cutPos + 1); + } + else { + splits[i] = ""; + } + + } + splits[noSplits - 1] = splitString; + + return splits; + + } + +} diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/testPrograms/mergeOnlyJoin/MergeOnlyJoin.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/testPrograms/mergeOnlyJoin/MergeOnlyJoin.java new file mode 100644 index 0000000000000..615de1bf90568 --- /dev/null +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/testPrograms/mergeOnlyJoin/MergeOnlyJoin.java @@ -0,0 +1,149 @@ +package eu.stratosphere.pact.test.testPrograms.mergeOnlyJoin; +/*********************************************************************************************************************** +* +* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +* +**********************************************************************************************************************/ + +import java.util.Iterator; + +import eu.stratosphere.pact.common.contract.FileDataSink; +import eu.stratosphere.pact.common.contract.FileDataSource; +import eu.stratosphere.pact.common.contract.MatchContract; +import eu.stratosphere.pact.common.contract.ReduceContract; +import eu.stratosphere.pact.common.io.RecordInputFormat; +import eu.stratosphere.pact.common.io.RecordOutputFormat; +import eu.stratosphere.pact.common.plan.Plan; +import eu.stratosphere.pact.common.plan.PlanAssembler; +import eu.stratosphere.pact.common.plan.PlanAssemblerDescription; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.common.stubs.MatchStub; +import eu.stratosphere.pact.common.stubs.ReduceStub; +import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsExcept; +import eu.stratosphere.pact.common.stubs.StubAnnotation.ConstantFieldsFirstExcept; +import eu.stratosphere.pact.common.stubs.StubAnnotation.OutCardBounds; +import eu.stratosphere.pact.common.type.PactRecord; +import eu.stratosphere.pact.common.type.base.PactInteger; +import eu.stratosphere.pact.common.type.base.parser.DecimalTextIntParser; + +public class MergeOnlyJoin implements PlanAssembler, PlanAssemblerDescription { + + @ConstantFieldsFirstExcept(fields={2}) + @OutCardBounds(upperBound=1, lowerBound=1) + public static class JoinInputs extends MatchStub + { + @Override + public void match(PactRecord input1, PactRecord input2, Collector out) + { + input1.setField(2, input2.getField(1, PactInteger.class)); + out.collect(input1); + } + } + + @ConstantFieldsExcept(fields={}) + public static class DummyReduce extends ReduceStub + { + + @Override + public void reduce(Iterator values, Collector out) + { + while (values.hasNext()) { + out.collect(values.next()); + } + } + + } + + /** + * {@inheritDoc} + */ + @Override + public Plan getPlan(final String... args) + { + // parse program parameters + int noSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); + String input1Path = (args.length > 1 ? args[1] : ""); + String input2Path = (args.length > 2 ? args[2] : ""); + String output = (args.length > 3 ? args[3] : ""); + int noSubtasksInput2 = (args.length > 4 ? Integer.parseInt(args[4]) : 1); + + // create DataSourceContract for Orders input + FileDataSource input1 = new FileDataSource(RecordInputFormat.class, input1Path, "Input 1"); + input1.setDegreeOfParallelism(noSubtasks); + + input1.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n"); + input1.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|"); + input1.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2); + + input1.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class); + input1.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0); + + input1.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class); + input1.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1); + + + ReduceContract aggInput1 = new ReduceContract(DummyReduce.class, PactInteger.class, 0, input1, "AggOrders"); + aggInput1.setDegreeOfParallelism(noSubtasks); + + + // create DataSourceContract for Orders input + FileDataSource input2 = new FileDataSource(RecordInputFormat.class, input2Path, "Input 2"); + input2.setDegreeOfParallelism(noSubtasksInput2); + + input2.setParameter(RecordInputFormat.RECORD_DELIMITER, "\n"); + input2.setParameter(RecordInputFormat.FIELD_DELIMITER_PARAMETER, "|"); + input2.setParameter(RecordInputFormat.NUM_FIELDS_PARAMETER, 2); + // order id + input2.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+0, DecimalTextIntParser.class); + input2.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+0, 0); + // ship prio + input2.getParameters().setClass(RecordInputFormat.FIELD_PARSER_PARAMETER_PREFIX+1, DecimalTextIntParser.class); + input2.setParameter(RecordInputFormat.TEXT_POSITION_PARAMETER_PREFIX+1, 1); + + ReduceContract aggInput2 = new ReduceContract(DummyReduce.class, PactInteger.class, 0, input2, "AggLines"); + aggInput2.setDegreeOfParallelism(noSubtasksInput2); + + // create MatchContract for joining Orders and LineItems + MatchContract joinLiO = new MatchContract(JoinInputs.class, PactInteger.class, 0, 0, aggInput1, aggInput2, "JoinLiO"); + joinLiO.setDegreeOfParallelism(noSubtasks); + // compiler hints + + + // create DataSinkContract for writing the result + FileDataSink result = new FileDataSink(RecordOutputFormat.class, output, joinLiO, "Output"); + result.setDegreeOfParallelism(noSubtasks); + result.getParameters().setString(RecordOutputFormat.RECORD_DELIMITER_PARAMETER, "\n"); + result.getParameters().setString(RecordOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); + result.getParameters().setBoolean(RecordOutputFormat.LENIENT_PARSING, true); + result.getParameters().setInteger(RecordOutputFormat.NUM_FIELDS_PARAMETER, 3); + result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, PactInteger.class); + result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0); + result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, PactInteger.class); + result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1); + result.getParameters().setClass(RecordOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, PactInteger.class); + result.getParameters().setInteger(RecordOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2); + + // assemble the PACT plan + Plan plan = new Plan(result, "Merge Only Join"); + plan.setDefaultParallelism(noSubtasks); + return plan; + } + + /** + * {@inheritDoc} + */ + @Override + public String getDescription() { + return "Parameters: [noSubTasks], [input], [input2], [output], [noSubTasksInput2]"; + } + +} \ No newline at end of file